From 2aa113969e6aa66600aef946c1e912407f3d7ad0 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 10 Feb 2022 18:06:44 -0700 Subject: [PATCH] [ADDED] KV: ability to delete purges older than a certain time A new option when creating the KeyValueStore allow specifying the threshold that dictates if the purge/delete marker would be removed or not during a PurgeDeletes() call. By default, this threshold is set to 30 minutes, that is, markers that are older than that would be removed, the ones that are younger are kept. The data is always removed. Signed-off-by: Ivan Kozlovic --- kv.go | 32 ++++++++++++++++++++++++++-- test/kv_test.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 85 insertions(+), 3 deletions(-) diff --git a/kv.go b/kv.go index cdbb4e302..0c8abdd0a 100644 --- a/kv.go +++ b/kv.go @@ -159,6 +159,16 @@ type KeyValueConfig struct { MaxBytes int64 Storage StorageType Replicas int + + // This is for the maintenance PurgeDeletes() function. Normally, when + // this function is invoked, keys that have a delete or purge marker + // as the last entry see all their entries removed, including the + // marker. This option allows to delete old data but keep the marker + // if its timestamp is not older than this value. If this option + // is not specified, the API will pick a default of 30 minutes. + // Explicitly set it to a negative value to restore previous behavior + // to delete markers, regardless their age. + PurgeDeletesMarkerThreshold time.Duration } // Used to watch all keys. @@ -324,6 +334,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { stream: scfg.Name, pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket), js: js, + pdthr: cfg.PurgeDeletesMarkerThreshold, } return kv, nil } @@ -342,6 +353,7 @@ type kvs struct { stream string pre string js *js + pdthr time.Duration } // Underlying entry. @@ -536,6 +548,8 @@ func (kv *kvs) delete(key string, purge bool) error { return err } +const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute + // PurgeDeletes will remove all current delete markers. // This is a maintenance option if there is a larger buildup of delete markers. func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error { @@ -545,6 +559,17 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error { } defer watcher.Stop() + var limit time.Time + olderThan := kv.pdthr + // Negative value is used to instruct to always remove markers, regardless + // of age. If set to 0 (or not set), use our default value. + if olderThan == 0 { + olderThan = kvDefaultPurgeDeletesMarkerThreshold + } + if olderThan > 0 { + limit = time.Now().Add(-olderThan) + } + var deleteMarkers []KeyValueEntry for entry := range watcher.Updates() { if entry == nil { @@ -564,8 +589,11 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error { b.WriteString(kv.pre) b.WriteString(entry.Key()) pr.Subject = b.String() - err := kv.js.purgeStream(kv.stream, &pr) - if err != nil { + pr.Keep = 0 + if olderThan > 0 && entry.Created().After(limit) { + pr.Keep = 1 + } + if err := kv.js.purgeStream(kv.stream, &pr); err != nil { return err } b.Reset() diff --git a/test/kv_test.go b/test/kv_test.go index 69c7da10d..9d87b8f52 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -400,7 +400,11 @@ func TestKeyValueDeleteTombstones(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10}) + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "KVS", + History: 10, + PurgeDeletesMarkerThreshold: -1, // Set negative to remove all markers, regardless of age + }) expectOk(t, err) put := func(key, value string) { @@ -430,6 +434,56 @@ func TestKeyValueDeleteTombstones(t *testing.T) { } } +func TestKeyValuePurgeDeletesMarkerThreshold(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "KVS", + History: 10, + PurgeDeletesMarkerThreshold: 100 * time.Millisecond, + }) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(key, []byte(value)) + expectOk(t, err) + } + + put("foo", "foo1") + put("bar", "bar1") + put("foo", "foo2") + err = kv.Delete("foo") + expectOk(t, err) + + time.Sleep(200 * time.Millisecond) + + err = kv.Delete("bar") + expectOk(t, err) + + err = kv.PurgeDeletes() + expectOk(t, err) + + // The key foo should have been completely cleared of the data + // and the delete marker. + fooEntries, err := kv.History("foo") + if err != nats.ErrKeyNotFound { + t.Fatalf("Expected all entries for key foo to be gone, got err=%v entries=%v", err, fooEntries) + } + barEntries, err := kv.History("bar") + expectOk(t, err) + if len(barEntries) != 1 { + t.Fatalf("Expected 1 entry, got %v", barEntries) + } + if e := barEntries[0]; e.Operation() != nats.KeyValueDelete { + t.Fatalf("Unexpected entry: %+v", e) + } +} + func TestKeyValueKeys(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s)