Skip to content

Commit

Permalink
Merge 2aa1139 into 3ead809
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Feb 15, 2022
2 parents 3ead809 + 2aa1139 commit 16ac008
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 3 deletions.
32 changes: 30 additions & 2 deletions kv.go
Expand Up @@ -159,6 +159,16 @@ type KeyValueConfig struct {
MaxBytes int64
Storage StorageType
Replicas int

// This is for the maintenance PurgeDeletes() function. Normally, when
// this function is invoked, keys that have a delete or purge marker
// as the last entry see all their entries removed, including the
// marker. This option allows to delete old data but keep the marker
// if its timestamp is not older than this value. If this option
// is not specified, the API will pick a default of 30 minutes.
// Explicitly set it to a negative value to restore previous behavior
// to delete markers, regardless their age.
PurgeDeletesMarkerThreshold time.Duration
}

// Used to watch all keys.
Expand Down Expand Up @@ -324,6 +334,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
stream: scfg.Name,
pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket),
js: js,
pdthr: cfg.PurgeDeletesMarkerThreshold,
}
return kv, nil
}
Expand All @@ -342,6 +353,7 @@ type kvs struct {
stream string
pre string
js *js
pdthr time.Duration
}

// Underlying entry.
Expand Down Expand Up @@ -536,6 +548,8 @@ func (kv *kvs) delete(key string, purge bool) error {
return err
}

const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute

// PurgeDeletes will remove all current delete markers.
// This is a maintenance option if there is a larger buildup of delete markers.
func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
Expand All @@ -545,6 +559,17 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
}
defer watcher.Stop()

var limit time.Time
olderThan := kv.pdthr
// Negative value is used to instruct to always remove markers, regardless
// of age. If set to 0 (or not set), use our default value.
if olderThan == 0 {
olderThan = kvDefaultPurgeDeletesMarkerThreshold
}
if olderThan > 0 {
limit = time.Now().Add(-olderThan)
}

var deleteMarkers []KeyValueEntry
for entry := range watcher.Updates() {
if entry == nil {
Expand All @@ -564,8 +589,11 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
b.WriteString(kv.pre)
b.WriteString(entry.Key())
pr.Subject = b.String()
err := kv.js.purgeStream(kv.stream, &pr)
if err != nil {
pr.Keep = 0
if olderThan > 0 && entry.Created().After(limit) {
pr.Keep = 1
}
if err := kv.js.purgeStream(kv.stream, &pr); err != nil {
return err
}
b.Reset()
Expand Down
56 changes: 55 additions & 1 deletion test/kv_test.go
Expand Up @@ -400,7 +400,11 @@ func TestKeyValueDeleteTombstones(t *testing.T) {
nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10})
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "KVS",
History: 10,
PurgeDeletesMarkerThreshold: -1, // Set negative to remove all markers, regardless of age
})
expectOk(t, err)

put := func(key, value string) {
Expand Down Expand Up @@ -430,6 +434,56 @@ func TestKeyValueDeleteTombstones(t *testing.T) {
}
}

func TestKeyValuePurgeDeletesMarkerThreshold(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "KVS",
History: 10,
PurgeDeletesMarkerThreshold: 100 * time.Millisecond,
})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

put("foo", "foo1")
put("bar", "bar1")
put("foo", "foo2")
err = kv.Delete("foo")
expectOk(t, err)

time.Sleep(200 * time.Millisecond)

err = kv.Delete("bar")
expectOk(t, err)

err = kv.PurgeDeletes()
expectOk(t, err)

// The key foo should have been completely cleared of the data
// and the delete marker.
fooEntries, err := kv.History("foo")
if err != nats.ErrKeyNotFound {
t.Fatalf("Expected all entries for key foo to be gone, got err=%v entries=%v", err, fooEntries)
}
barEntries, err := kv.History("bar")
expectOk(t, err)
if len(barEntries) != 1 {
t.Fatalf("Expected 1 entry, got %v", barEntries)
}
if e := barEntries[0]; e.Operation() != nats.KeyValueDelete {
t.Fatalf("Unexpected entry: %+v", e)
}
}

func TestKeyValueKeys(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down

0 comments on commit 16ac008

Please sign in to comment.