Skip to content

Commit

Permalink
[ADDED] KV: ability to delete purges older than a certain time
Browse files Browse the repository at this point in the history
A new option when creating the KeyValueStore allow specifying
the threshold that dictates if the purge/delete marker would be
removed or not during a PurgeDeletes() call. By default, this
threshold is set to 30 minutes, that is, markers that are older
than that would be removed, the ones that are younger are kept.
The data is always removed.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 15, 2022
1 parent 3ead809 commit 2aa1139
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 3 deletions.
32 changes: 30 additions & 2 deletions kv.go
Expand Up @@ -159,6 +159,16 @@ type KeyValueConfig struct {
MaxBytes int64
Storage StorageType
Replicas int

// This is for the maintenance PurgeDeletes() function. Normally, when
// this function is invoked, keys that have a delete or purge marker
// as the last entry see all their entries removed, including the
// marker. This option allows to delete old data but keep the marker
// if its timestamp is not older than this value. If this option
// is not specified, the API will pick a default of 30 minutes.
// Explicitly set it to a negative value to restore previous behavior
// to delete markers, regardless their age.
PurgeDeletesMarkerThreshold time.Duration
}

// Used to watch all keys.
Expand Down Expand Up @@ -324,6 +334,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
stream: scfg.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket),
js: js,
pdthr: cfg.PurgeDeletesMarkerThreshold,
}
return kv, nil
}
Expand All @@ -342,6 +353,7 @@ type kvs struct {
stream string
pre string
js *js
pdthr time.Duration
}

// Underlying entry.
Expand Down Expand Up @@ -536,6 +548,8 @@ func (kv *kvs) delete(key string, purge bool) error {
return err
}

const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute

// PurgeDeletes will remove all current delete markers.
// This is a maintenance option if there is a larger buildup of delete markers.
func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
Expand All @@ -545,6 +559,17 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
}
defer watcher.Stop()

var limit time.Time
olderThan := kv.pdthr
// Negative value is used to instruct to always remove markers, regardless
// of age. If set to 0 (or not set), use our default value.
if olderThan == 0 {
olderThan = kvDefaultPurgeDeletesMarkerThreshold
}
if olderThan > 0 {
limit = time.Now().Add(-olderThan)
}

var deleteMarkers []KeyValueEntry
for entry := range watcher.Updates() {
if entry == nil {
Expand All @@ -564,8 +589,11 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
b.WriteString(kv.pre)
b.WriteString(entry.Key())
pr.Subject = b.String()
err := kv.js.purgeStream(kv.stream, &pr)
if err != nil {
pr.Keep = 0
if olderThan > 0 && entry.Created().After(limit) {
pr.Keep = 1
}
if err := kv.js.purgeStream(kv.stream, &pr); err != nil {
return err
}
b.Reset()
Expand Down
56 changes: 55 additions & 1 deletion test/kv_test.go
Expand Up @@ -400,7 +400,11 @@ func TestKeyValueDeleteTombstones(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10})
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "KVS",
History: 10,
PurgeDeletesMarkerThreshold: -1, // Set negative to remove all markers, regardless of age
})
expectOk(t, err)

put := func(key, value string) {
Expand Down Expand Up @@ -430,6 +434,56 @@ func TestKeyValueDeleteTombstones(t *testing.T) {
}
}

func TestKeyValuePurgeDeletesMarkerThreshold(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "KVS",
History: 10,
PurgeDeletesMarkerThreshold: 100 * time.Millisecond,
})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

put("foo", "foo1")
put("bar", "bar1")
put("foo", "foo2")
err = kv.Delete("foo")
expectOk(t, err)

time.Sleep(200 * time.Millisecond)

err = kv.Delete("bar")
expectOk(t, err)

err = kv.PurgeDeletes()
expectOk(t, err)

// The key foo should have been completely cleared of the data
// and the delete marker.
fooEntries, err := kv.History("foo")
if err != nats.ErrKeyNotFound {
t.Fatalf("Expected all entries for key foo to be gone, got err=%v entries=%v", err, fooEntries)
}
barEntries, err := kv.History("bar")
expectOk(t, err)
if len(barEntries) != 1 {
t.Fatalf("Expected 1 entry, got %v", barEntries)
}
if e := barEntries[0]; e.Operation() != nats.KeyValueDelete {
t.Fatalf("Unexpected entry: %+v", e)
}
}

func TestKeyValueKeys(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down

0 comments on commit 2aa1139

Please sign in to comment.