Skip to content

Commit

Permalink
Merge pull request #906 from nats-io/kv_purge_with_time
Browse files Browse the repository at this point in the history
[CHANGED] PurgeDeletes() will now keep markers that are less than 30min old
  • Loading branch information
kozlovic committed Feb 15, 2022
2 parents 3ead809 + 4afdb30 commit 519bc35
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 6 deletions.
69 changes: 64 additions & 5 deletions kv.go
Expand Up @@ -68,7 +68,7 @@ type KeyValue interface {
// Bucket returns the current bucket name.
Bucket() string
// PurgeDeletes will remove all current delete markers.
PurgeDeletes(opts ...WatchOpt) error
PurgeDeletes(opts ...PurgeOpt) error
// Status retrieves the status and configuration of a bucket
Status() (KeyValueStatus, error)
}
Expand Down Expand Up @@ -149,6 +149,34 @@ func MetaOnly() WatchOpt {
})
}

type PurgeOpt interface {
configurePurge(opts *purgeOpts) error
}

type purgeOpts struct {
dmthr time.Duration // Delete markers threshold
ctx context.Context
}

// DeleteMarkersOlderThan indicates that delete or purge markers older than that
// will be deleted as part of PurgeDeletes() operation, otherwise, only the data
// will be removed but markers that are recent will be kept.
// Note that if no option is specified, the default is 30 minutes. You can set
// this option to a negative value to instruct to always remove the markers,
// regardless of their age.
type DeleteMarkersOlderThan time.Duration

func (ttl DeleteMarkersOlderThan) configurePurge(opts *purgeOpts) error {
opts.dmthr = time.Duration(ttl)
return nil
}

// For nats.Context() support.
func (ctx ContextOpt) configurePurge(opts *purgeOpts) error {
opts.ctx = ctx
return nil
}

// KeyValueConfig is for configuring a KeyValue store.
type KeyValueConfig struct {
Bucket string
Expand Down Expand Up @@ -536,15 +564,43 @@ func (kv *kvs) delete(key string, purge bool) error {
return err
}

const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute

// PurgeDeletes will remove all current delete markers.
// This is a maintenance option if there is a larger buildup of delete markers.
func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
watcher, err := kv.WatchAll(opts...)
// See DeleteMarkersOlderThan() option for more information.
func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error {
var o purgeOpts
for _, opt := range opts {
if opt != nil {
if err := opt.configurePurge(&o); err != nil {
return err
}
}
}
// Transfer possible context purge option to the watcher. This is the
// only option that matters for the PurgeDeletes() feature.
var wopts []WatchOpt
if o.ctx != nil {
wopts = append(wopts, Context(o.ctx))
}
watcher, err := kv.WatchAll(wopts...)
if err != nil {
return err
}
defer watcher.Stop()

var limit time.Time
olderThan := o.dmthr
// Negative value is used to instruct to always remove markers, regardless
// of age. If set to 0 (or not set), use our default value.
if olderThan == 0 {
olderThan = kvDefaultPurgeDeletesMarkerThreshold
}
if olderThan > 0 {
limit = time.Now().Add(-olderThan)
}

var deleteMarkers []KeyValueEntry
for entry := range watcher.Updates() {
if entry == nil {
Expand All @@ -564,8 +620,11 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
b.WriteString(kv.pre)
b.WriteString(entry.Key())
pr.Subject = b.String()
err := kv.js.purgeStream(kv.stream, &pr)
if err != nil {
pr.Keep = 0
if olderThan > 0 && entry.Created().After(limit) {
pr.Keep = 1
}
if err := kv.js.purgeStream(kv.stream, &pr); err != nil {
return err
}
b.Reset()
Expand Down
54 changes: 53 additions & 1 deletion test/kv_test.go
Expand Up @@ -420,14 +420,66 @@ func TestKeyValueDeleteTombstones(t *testing.T) {
}

// Now cleanup.
err = kv.PurgeDeletes()
err = kv.PurgeDeletes(nats.DeleteMarkersOlderThan(-1))
expectOk(t, err)

si, err := js.StreamInfo("KV_KVS")
expectOk(t, err)
if si.State.Msgs != 0 {
t.Fatalf("Expected no stream msgs to be left, got %d", si.State.Msgs)
}

// Try with context
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err = kv.PurgeDeletes(nats.Context(ctx))
expectOk(t, err)
}

func TestKeyValuePurgeDeletesMarkerThreshold(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "KVS", History: 10})
expectOk(t, err)

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

put("foo", "foo1")
put("bar", "bar1")
put("foo", "foo2")
err = kv.Delete("foo")
expectOk(t, err)

time.Sleep(200 * time.Millisecond)

err = kv.Delete("bar")
expectOk(t, err)

err = kv.PurgeDeletes(nats.DeleteMarkersOlderThan(100 * time.Millisecond))
expectOk(t, err)

// The key foo should have been completely cleared of the data
// and the delete marker.
fooEntries, err := kv.History("foo")
if err != nats.ErrKeyNotFound {
t.Fatalf("Expected all entries for key foo to be gone, got err=%v entries=%v", err, fooEntries)
}
barEntries, err := kv.History("bar")
expectOk(t, err)
if len(barEntries) != 1 {
t.Fatalf("Expected 1 entry, got %v", barEntries)
}
if e := barEntries[0]; e.Operation() != nats.KeyValueDelete {
t.Fatalf("Unexpected entry: %+v", e)
}
}

func TestKeyValueKeys(t *testing.T) {
Expand Down

0 comments on commit 519bc35

Please sign in to comment.