Skip to content

Commit

Permalink
Keep tailing slash for kv operations (#214)
Browse files Browse the repository at this point in the history
Since `path.Join` will remove tailing slash, the newly added kv
implementation will return some other kv values when prefix need this
tailing slash.

Add a util function to keep this tailing slash and replace all
`path.Join` in kv impl with it

Fix #213

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Nov 18, 2023
1 parent 700564d commit 78ebe67
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
35 changes: 17 additions & 18 deletions states/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/json"
"fmt"
"math"
"path"
"strings"
"time"

Expand Down Expand Up @@ -74,7 +73,7 @@ func NewEtcdKV(client *clientv3.Client) *etcdKV {

// Load returns value of the key.
func (kv *etcdKV) Load(ctx context.Context, key string) (string, error) {
key = path.Join(kv.rootPath, key)
key = joinPath(kv.rootPath, key)
resp, err := kv.client.Get(ctx, key)
if err != nil {
return "", err
Expand All @@ -87,7 +86,7 @@ func (kv *etcdKV) Load(ctx context.Context, key string) (string, error) {

// LoadWithPrefix returns all the keys and values with the given key prefix.
func (kv *etcdKV) LoadWithPrefix(ctx context.Context, key string) ([]string, []string, error) {
key = path.Join(kv.rootPath, key)
key = joinPath(kv.rootPath, key)
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
Expand All @@ -104,27 +103,27 @@ func (kv *etcdKV) LoadWithPrefix(ctx context.Context, key string) ([]string, []s

// Save saves the key-value pair.
func (kv *etcdKV) Save(ctx context.Context, key, value string) error {
key = path.Join(kv.rootPath, key)
key = joinPath(kv.rootPath, key)
_, err := kv.client.Put(ctx, key, value)
return err
}

// Remove removes the key.
func (kv *etcdKV) Remove(ctx context.Context, key string) error {
key = path.Join(kv.rootPath, key)
key = joinPath(kv.rootPath, key)
_, err := kv.client.Delete(ctx, key)
return err
}

// RemoveWithPrefix removes the keys with given prefix.
func (kv *etcdKV) RemoveWithPrefix(ctx context.Context, prefix string) error {
key := path.Join(kv.rootPath, prefix)
key := joinPath(kv.rootPath, prefix)
_, err := kv.client.Delete(ctx, key, clientv3.WithPrefix())
return err
}

func (kv *etcdKV) removeWithPrevKV(ctx context.Context, key string) (*mvccpb.KeyValue, error) {
key = path.Join(kv.rootPath, key)
key = joinPath(kv.rootPath, key)
resp, err := kv.client.Delete(ctx, key, clientv3.WithPrevKV())
if err != nil {
return nil, err
Expand All @@ -136,7 +135,7 @@ func (kv *etcdKV) removeWithPrevKV(ctx context.Context, key string) (*mvccpb.Key
}

func (kv *etcdKV) removeWithPrefixAndPrevKV(ctx context.Context, prefix string) ([]*mvccpb.KeyValue, error) {
key := path.Join(kv.rootPath, prefix)
key := joinPath(kv.rootPath, prefix)
resp, err := kv.client.Delete(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV())
return resp.PrevKvs, err
}
Expand Down Expand Up @@ -180,7 +179,7 @@ func writeBackupBytes(w *bufio.Writer, data []byte) {
func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision bool, batchSize int64) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
resp, err := kv.client.Get(ctx, path.Join(base, prefix), clientv3.WithCountOnly(), clientv3.WithPrefix())
resp, err := kv.client.Get(ctx, joinPath(base, prefix), clientv3.WithCountOnly(), clientv3.WithPrefix())
if err != nil {
return err
}
Expand All @@ -200,7 +199,7 @@ func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision
parts := strings.Split(base, "/")
if len(parts) > 1 {
metaPath = parts[len(parts)-1]
instance = path.Join(parts[:len(parts)-1]...)
instance = joinPath(parts[:len(parts)-1]...)
} else {
instance = base
}
Expand Down Expand Up @@ -230,7 +229,7 @@ func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision
options = append(options, clientv3.WithRev(rev))
}

currentKey := path.Join(base, prefix)
currentKey := joinPath(base, prefix)
var i int
prefixBS := []byte(currentKey)
for int64(i) < cnt {
Expand Down Expand Up @@ -323,7 +322,7 @@ func NewTiKV(txn *txnkv.Client) *txnTiKV {

// Load returns value of the key.
func (kv *txnTiKV) Load(ctx context.Context, key string) (string, error) {
key = path.Join(kv.rootPath, key)
key = joinPath(kv.rootPath, key)

ss := kv.client.GetSnapshot(MaxSnapshotTS)
ss.SetScanBatchSize(SnapshotScanSize)
Expand All @@ -341,7 +340,7 @@ func (kv *txnTiKV) Load(ctx context.Context, key string) (string, error) {

// LoadWithPrefix returns all the keys and values for the given key prefix.
func (kv *txnTiKV) LoadWithPrefix(ctx context.Context, prefix string) ([]string, []string, error) {
prefix = path.Join(kv.rootPath, prefix)
prefix = joinPath(kv.rootPath, prefix)

ss := kv.client.GetSnapshot(MaxSnapshotTS)
ss.SetScanBatchSize(SnapshotScanSize)
Expand Down Expand Up @@ -377,7 +376,7 @@ func (kv *txnTiKV) LoadWithPrefix(ctx context.Context, prefix string) ([]string,

// Save saves the input key-value pair.
func (kv *txnTiKV) Save(ctx context.Context, key, value string) error {
key = path.Join(kv.rootPath, key)
key = joinPath(kv.rootPath, key)

txn, err := kv.client.Begin()
if err != nil {
Expand All @@ -398,7 +397,7 @@ func (kv *txnTiKV) Save(ctx context.Context, key, value string) error {

// Remove removes the input key.
func (kv *txnTiKV) Remove(ctx context.Context, key string) error {
key = path.Join(kv.rootPath, key)
key = joinPath(kv.rootPath, key)

txn, err := kv.client.Begin()
if err != nil {
Expand All @@ -414,7 +413,7 @@ func (kv *txnTiKV) Remove(ctx context.Context, key string) error {

// RemoveWithPrefix removes the keys for the given prefix.
func (kv *txnTiKV) RemoveWithPrefix(ctx context.Context, prefix string) error {
prefix = path.Join(kv.rootPath, prefix)
prefix = joinPath(kv.rootPath, prefix)
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
defer cancel()

Expand Down Expand Up @@ -502,7 +501,7 @@ func (kv *txnTiKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision
ss := txn.GetSnapshot()
ss.SetScanBatchSize(SnapshotScanSize)

keyprefix := path.Join(base, prefix)
keyprefix := joinPath(base, prefix)
startKey := []byte(keyprefix)
endKey := tikv.PrefixNextKey([]byte(keyprefix))
iter, err := ss.Iter(startKey, endKey)
Expand All @@ -525,7 +524,7 @@ func (kv *txnTiKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision
parts := strings.Split(base, "/")
if len(parts) > 1 {
metaPath = parts[len(parts)-1]
instance = path.Join(parts[:len(parts)-1]...)
instance = joinPath(parts[:len(parts)-1]...)
} else {
instance = base
}
Expand Down
14 changes: 14 additions & 0 deletions states/kv/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kv

import (
"path"
"strings"
)

func joinPath(parts ...string) string {
r := path.Join(parts...)
if len(parts) > 0 && strings.HasSuffix(parts[len(parts)-1], "/") {
r += "/"
}
return r
}

0 comments on commit 78ebe67

Please sign in to comment.