Skip to content

Commit

Permalink
Merge 79d6644 into 0096b1b
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Feb 19, 2022
2 parents 0096b1b + 79d6644 commit 922c6b1
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 44 deletions.
36 changes: 35 additions & 1 deletion kv.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -346,7 +347,21 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
}

if _, err := js.AddStream(scfg); err != nil {
return nil, err
// If we have a failure to add, it could be because we have
// a config change if the KV was created against a pre 2.7.2
// and we are now moving to a v2.7.2+. If that is the case
// and the only difference is the discard policy, then update
// the stream.
if strings.Contains(err.Error(), "already in use") {
if si, _ := js.StreamInfo(scfg.Name); si != nil {
if streamCfgSameSansDiscard(&si.Config, scfg) {
_, err = js.UpdateStream(scfg)
}
}
}
if err != nil {
return nil, err
}
}

kv := &kvs{
Expand All @@ -360,6 +375,25 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
return kv, nil
}

func streamCfgSameSansDiscard(srvCfg, ourCfg *StreamConfig) bool {
// The server sets some values (like -1) when sending 0.
// So we need to align to what the server may return to
// have a meaningfull comparison.
ours := *ourCfg
if ours.MaxMsgSize == 0 {
ours.MaxMsgSize = -1
}
if ours.MaxBytes == 0 {
ours.MaxBytes = -1
}
ours.Duplicates = 2 * time.Minute
ours.MaxMsgs = -1
ours.MaxConsumers = -1
// Set our discard to what server has for the comparison
ours.Discard = srvCfg.Discard
return reflect.DeepEqual(srvCfg, &ours)
}

// DeleteKeyValue will delete this KeyValue store (JetStream stream).
func (js *js) DeleteKeyValue(bucket string) error {
if !validBucketRe.MatchString(bucket) {
Expand Down
69 changes: 69 additions & 0 deletions kv_test.go
@@ -0,0 +1,69 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
"testing"
)

func TestKeyValueDiscardOldToDiscardNew(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

checkDiscard := func(expected DiscardPolicy) KeyValue {
t.Helper()
kv, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "TEST", History: 1})
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
si, err := js.StreamInfo("KV_TEST")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}
if si.Config.Discard != expected {
t.Fatalf("Expected discard policy %v, got %+v", expected, si)
}
return kv
}

// We are going to go from 2.7.1->2.7.2->2.7.1 and 2.7.2 again.
for i := 0; i < 2; i++ {
// Change the server version in the connection to
// create as-if we were connecting to a v2.7.1 server.
nc.mu.Lock()
nc.info.Version = "2.7.1"
nc.mu.Unlock()

kv := checkDiscard(DiscardOld)
if i == 0 {
if _, err := kv.PutString("foo", "value"); err != nil {
t.Fatalf("Error adding key: %v", err)
}
}

// Now change version to 2.7.2
nc.mu.Lock()
nc.info.Version = "2.7.2"
nc.mu.Unlock()

kv = checkDiscard(DiscardNew)
// Make sure the key still exists
if e, err := kv.Get("foo"); err != nil || string(e.Value()) != "value" {
t.Fatalf("Error getting key: err=%v e=%+v", err, e)
}
}
}
43 changes: 0 additions & 43 deletions test/kv_test.go
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"os"
"reflect"
"regexp"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -558,48 +557,6 @@ func TestKeyValueKeys(t *testing.T) {
}
}

func TestKeyValueDiscardNew(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 1, MaxBytes: 256})
expectOk(t, err)

vc := func() (major, minor, patch int) {
semVerRe := regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`)
m := semVerRe.FindStringSubmatch(nc.ConnectedServerVersion())
expectOk(t, err)
major, err = strconv.Atoi(m[1])
expectOk(t, err)
minor, err = strconv.Atoi(m[2])
expectOk(t, err)
patch, err = strconv.Atoi(m[3])
expectOk(t, err)
return major, minor, patch
}

major, minor, patch := vc()
status, err := kv.Status()
expectOk(t, err)
kvs := status.(*nats.KeyValueBucketStatus)
si := kvs.StreamInfo()

// If we are 2.7.1 or below DiscardOld should be used.
// If 2.7.2 or above should be DiscardNew
if major <= 2 && minor <= 7 && patch <= 1 {
if si.Config.Discard != nats.DiscardOld {
t.Fatalf("Expected Discard Old for server version %d.%d.%d", major, minor, patch)
}
} else {
if si.Config.Discard != nats.DiscardNew {
t.Fatalf("Expected Discard New for server version %d.%d.%d", major, minor, patch)
}
}
}

func TestKeyValueCrossAccounts(t *testing.T) {
conf := createConfFile(t, []byte(`
jetstream: enabled
Expand Down

0 comments on commit 922c6b1

Please sign in to comment.