Skip to content

Commit

Permalink
Merge fd44bee into 3ead809
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Feb 15, 2022
2 parents 3ead809 + fd44bee commit a91df6d
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 3 deletions.
22 changes: 20 additions & 2 deletions kv.go
Expand Up @@ -342,6 +342,7 @@ type kvs struct {
stream string
pre string
js *js
pdthr time.Duration
}

// Underlying entry.
Expand Down Expand Up @@ -536,6 +537,8 @@ func (kv *kvs) delete(key string, purge bool) error {
return err
}

const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute

// PurgeDeletes will remove all current delete markers.
// This is a maintenance option if there is a larger buildup of delete markers.
func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
Expand All @@ -545,6 +548,18 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
}
defer watcher.Stop()

var limit time.Time
// We are not exposing the option for now, but allow tests to set it...
olderThan := kv.pdthr
// Negative value is used to instruct to always remove markers, regardless
// of age. If set to 0 (or not set), use our default value.
if olderThan == 0 {
olderThan = kvDefaultPurgeDeletesMarkerThreshold
}
if olderThan > 0 {
limit = time.Now().Add(-olderThan)
}

var deleteMarkers []KeyValueEntry
for entry := range watcher.Updates() {
if entry == nil {
Expand All @@ -564,8 +579,11 @@ func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error {
b.WriteString(kv.pre)
b.WriteString(entry.Key())
pr.Subject = b.String()
err := kv.js.purgeStream(kv.stream, &pr)
if err != nil {
pr.Keep = 0
if olderThan > 0 && entry.Created().After(limit) {
pr.Keep = 1
}
if err := kv.js.purgeStream(kv.stream, &pr); err != nil {
return err
}
b.Reset()
Expand Down
75 changes: 75 additions & 0 deletions kv_test.go
@@ -0,0 +1,75 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
"testing"
"time"
)

func expectOk(t *testing.T, err error) {
t.Helper()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

func TestKeyValuePurgeDeletesMarkerThreshold(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&KeyValueConfig{Bucket: "KVS", History: 10})
expectOk(t, err)

// Override the marker threshold
kv.(*kvs).pdthr = 100 * time.Millisecond

put := func(key, value string) {
t.Helper()
_, err := kv.Put(key, []byte(value))
expectOk(t, err)
}

put("foo", "foo1")
put("bar", "bar1")
put("foo", "foo2")
err = kv.Delete("foo")
expectOk(t, err)

time.Sleep(200 * time.Millisecond)

err = kv.Delete("bar")
expectOk(t, err)

err = kv.PurgeDeletes()
expectOk(t, err)

// The key foo should have been completely cleared of the data
// and the delete marker.
fooEntries, err := kv.History("foo")
if err != ErrKeyNotFound {
t.Fatalf("Expected all entries for key foo to be gone, got err=%v entries=%v", err, fooEntries)
}
barEntries, err := kv.History("bar")
expectOk(t, err)
if len(barEntries) != 1 {
t.Fatalf("Expected 1 entry, got %v", barEntries)
}
if e := barEntries[0]; e.Operation() != KeyValueDelete {
t.Fatalf("Unexpected entry: %+v", e)
}
}
19 changes: 18 additions & 1 deletion test/kv_test.go
Expand Up @@ -425,9 +425,26 @@ func TestKeyValueDeleteTombstones(t *testing.T) {

si, err := js.StreamInfo("KV_KVS")
expectOk(t, err)
if si.State.Msgs != 0 {
// Since tombstones are less than 30 minutes old, there should be 100 messages,
// corresponding to 1 tombstone per key.
if si.State.Msgs != 100 {
t.Fatalf("Expected no stream msgs to be left, got %d", si.State.Msgs)
}
w, err := kv.WatchAll()
expectOk(t, err)
count := 0
for e := range w.Updates() {
if e == nil {
break
}
if e.Operation() != nats.KeyValueDelete {
t.Fatalf("Invalid entry: %+v", e)
}
count++
}
if count != 100 {
t.Fatalf("Expected 100 tombstones, got %v", count)
}
}

func TestKeyValueKeys(t *testing.T) {
Expand Down

0 comments on commit a91df6d

Please sign in to comment.