diff --git a/kv.go b/kv.go index 0b75054d9..435314c17 100644 --- a/kv.go +++ b/kv.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "reflect" "regexp" "strconv" "strings" @@ -346,7 +347,21 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { } if _, err := js.AddStream(scfg); err != nil { - return nil, err + // If we have a failure to add, it could be because we have + // a config change if the KV was created against a pre 2.7.2 + // and we are now moving to a v2.7.2+. If that is the case + // and the only difference is the discard policy, then update + // the stream. + if strings.Contains(err.Error(), "already in use") { + if si, _ := js.StreamInfo(scfg.Name); si != nil { + if streamCfgSameSansDiscard(&si.Config, scfg) { + _, err = js.UpdateStream(scfg) + } + } + } + if err != nil { + return nil, err + } } kv := &kvs{ @@ -360,6 +375,25 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { return kv, nil } +func streamCfgSameSansDiscard(srvCfg, ourCfg *StreamConfig) bool { + // The server sets some values (like -1) when sending 0. + // So we need to align to what the server may return to + // have a meaningfull comparison. + ours := *ourCfg + if ours.MaxMsgSize == 0 { + ours.MaxMsgSize = -1 + } + if ours.MaxBytes == 0 { + ours.MaxBytes = -1 + } + ours.Duplicates = 2 * time.Minute + ours.MaxMsgs = -1 + ours.MaxConsumers = -1 + // Set our discard to what server has for the comparison + ours.Discard = srvCfg.Discard + return reflect.DeepEqual(srvCfg, &ours) +} + // DeleteKeyValue will delete this KeyValue store (JetStream stream). func (js *js) DeleteKeyValue(bucket string) error { if !validBucketRe.MatchString(bucket) { diff --git a/kv_test.go b/kv_test.go new file mode 100644 index 000000000..a7cd03e20 --- /dev/null +++ b/kv_test.go @@ -0,0 +1,69 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nats + +import ( + "testing" +) + +func TestKeyValueDiscardOldToDiscardNew(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + checkDiscard := func(expected DiscardPolicy) KeyValue { + t.Helper() + kv, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "TEST", History: 1}) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + si, err := js.StreamInfo("KV_TEST") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + if si.Config.Discard != expected { + t.Fatalf("Expected discard policy %v, got %+v", expected, si) + } + return kv + } + + // We are going to go from 2.7.1->2.7.2->2.7.1 and 2.7.2 again. + for i := 0; i < 2; i++ { + // Change the server version in the connection to + // create as-if we were connecting to a v2.7.1 server. + nc.mu.Lock() + nc.info.Version = "2.7.1" + nc.mu.Unlock() + + kv := checkDiscard(DiscardOld) + if i == 0 { + if _, err := kv.PutString("foo", "value"); err != nil { + t.Fatalf("Error adding key: %v", err) + } + } + + // Now change version to 2.7.2 + nc.mu.Lock() + nc.info.Version = "2.7.2" + nc.mu.Unlock() + + kv = checkDiscard(DiscardNew) + // Make sure the key still exists + if e, err := kv.Get("foo"); err != nil || string(e.Value()) != "value" { + t.Fatalf("Error getting key: err=%v e=%+v", err, e) + } + } +} diff --git a/test/kv_test.go b/test/kv_test.go index 831ad1315..7568f43bb 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -18,7 +18,6 @@ import ( "fmt" "os" "reflect" - "regexp" "strconv" "strings" "testing" @@ -558,48 +557,6 @@ func TestKeyValueKeys(t *testing.T) { } } -func TestKeyValueDiscardNew(t *testing.T) { - s := RunBasicJetStreamServer() - defer shutdownJSServerAndRemoveStorage(t, s) - - nc, js := jsClient(t, s) - defer nc.Close() - - kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST", History: 1, MaxBytes: 256}) - expectOk(t, err) - - vc := func() (major, minor, patch int) { - semVerRe := regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`) - m := semVerRe.FindStringSubmatch(nc.ConnectedServerVersion()) - expectOk(t, err) - major, err = strconv.Atoi(m[1]) - expectOk(t, err) - minor, err = strconv.Atoi(m[2]) - expectOk(t, err) - patch, err = strconv.Atoi(m[3]) - expectOk(t, err) - return major, minor, patch - } - - major, minor, patch := vc() - status, err := kv.Status() - expectOk(t, err) - kvs := status.(*nats.KeyValueBucketStatus) - si := kvs.StreamInfo() - - // If we are 2.7.1 or below DiscardOld should be used. - // If 2.7.2 or above should be DiscardNew - if major <= 2 && minor <= 7 && patch <= 1 { - if si.Config.Discard != nats.DiscardOld { - t.Fatalf("Expected Discard Old for server version %d.%d.%d", major, minor, patch) - } - } else { - if si.Config.Discard != nats.DiscardNew { - t.Fatalf("Expected Discard New for server version %d.%d.%d", major, minor, patch) - } - } -} - func TestKeyValueCrossAccounts(t *testing.T) { conf := createConfFile(t, []byte(` jetstream: enabled