Skip to content

Commit

Permalink
fix: replace removeWithPrefix with remove to avoid delete redundantly (
Browse files Browse the repository at this point in the history
…#33328)

#33288

---------

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
  • Loading branch information
smellthemoon and lixinguo committed May 31, 2024
1 parent c6a1c49 commit 2c7bb0b
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 37 deletions.
1 change: 1 addition & 0 deletions internal/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,6 @@ type SnapShotKV interface {
Load(key string, ts typeutil.Timestamp) (string, error)
MultiSave(kvs map[string]string, ts typeutil.Timestamp) error
LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error)
MultiSaveAndRemove(saves map[string]string, removals []string, ts typeutil.Timestamp) error
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error
}
8 changes: 8 additions & 0 deletions internal/kv/mock_snapshot_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type mockSnapshotKV struct {
MultiSaveFunc func(kvs map[string]string, ts typeutil.Timestamp) error
LoadWithPrefixFunc func(key string, ts typeutil.Timestamp) ([]string, []string, error)
MultiSaveAndRemoveWithPrefixFunc func(saves map[string]string, removals []string, ts typeutil.Timestamp) error
MultiSaveAndRemoveFunc func(saves map[string]string, removals []string, ts typeutil.Timestamp) error
}

func NewMockSnapshotKV() *mockSnapshotKV {
Expand Down Expand Up @@ -51,3 +52,10 @@ func (m mockSnapshotKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, re
}
return nil
}

func (m mockSnapshotKV) MultiSaveAndRemove(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
if m.MultiSaveAndRemoveFunc != nil {
return m.MultiSaveAndRemoveFunc(saves, removals, ts)
}
return nil
}
16 changes: 16 additions & 0 deletions internal/kv/mock_snapshot_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,19 @@ func Test_mockSnapshotKV_MultiSaveAndRemoveWithPrefix(t *testing.T) {
assert.NoError(t, err)
})
}

func Test_mockSnapshotKV_MultiSaveAndRemove(t *testing.T) {
t.Run("func not set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
err := snapshot.MultiSaveAndRemove(nil, nil, 0)
assert.NoError(t, err)
})
t.Run("func set", func(t *testing.T) {
snapshot := NewMockSnapshotKV()
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return nil
}
err := snapshot.MultiSaveAndRemove(nil, nil, 0)
assert.NoError(t, err)
})
}
44 changes: 44 additions & 0 deletions internal/kv/mocks/snapshot_kv.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 9 additions & 16 deletions internal/metastore/kv/rootcoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"sort"

"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -85,22 +84,16 @@ func BuildAliasPrefixWithDB(dbID int64) string {

// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error {
func batchMultiSaveAndRemove(snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error {
saveFn := func(partialKvs map[string]string) error {
return snapshot.MultiSave(partialKvs, ts)
}
if err := etcd.SaveByBatchWithLimit(saves, limit, saveFn); err != nil {
return err
}

// avoid a case that the former key is the prefix of the later key.
// for example, `root-coord/fields/collection_id/1` is the prefix of `root-coord/fields/collection_id/100`.
sort.Slice(removals, func(i, j int) bool {
return removals[i] > removals[j]
})

removeFn := func(partialKeys []string) error {
return snapshot.MultiSaveAndRemoveWithPrefix(nil, partialKeys, ts)
return snapshot.MultiSaveAndRemove(nil, partialKeys, ts)
}
return etcd.RemoveByBatchWithLimit(removals, limit, removeFn)
}
Expand All @@ -127,7 +120,7 @@ func (kc *Catalog) AlterDatabase(ctx context.Context, newColl *model.Database, t

func (kc *Catalog) DropDatabase(ctx context.Context, dbID int64, ts typeutil.Timestamp) error {
key := BuildDatabaseKey(dbID)
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{key}, ts)
return kc.Snapshot.MultiSaveAndRemove(nil, []string{key}, ts)
}

func (kc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) {
Expand Down Expand Up @@ -300,7 +293,7 @@ func (kc *Catalog) CreateAlias(ctx context.Context, alias *model.Alias, ts typeu
return err
}
kvs := map[string]string{k: string(v)}
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(kvs, []string{oldKBefore210, oldKeyWithoutDb}, ts)
return kc.Snapshot.MultiSaveAndRemove(kvs, []string{oldKBefore210, oldKeyWithoutDb}, ts)
}

func (kc *Catalog) CreateCredential(ctx context.Context, credential *model.Credential) error {
Expand Down Expand Up @@ -455,12 +448,12 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col
// However, if we remove collection first, we cannot remove other metas.
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
// MaxEtcdTxnNum need to divided by 2
if err := batchMultiSaveAndRemoveWithPrefix(kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil {
if err := batchMultiSaveAndRemove(kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil {
return err
}

// if we found collection dropping, we should try removing related resources.
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, collectionKeys, ts)
return kc.Snapshot.MultiSaveAndRemove(nil, collectionKeys, ts)
}

func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *model.Collection, ts typeutil.Timestamp) error {
Expand Down Expand Up @@ -491,7 +484,7 @@ func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *mod
if oldKey == newKey {
return kc.Snapshot.Save(newKey, string(value), ts)
}
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(saves, []string{oldKey}, ts)
return kc.Snapshot.MultiSaveAndRemove(saves, []string{oldKey}, ts)
}

func (kc *Catalog) AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, alterType metastore.AlterType, ts typeutil.Timestamp) error {
Expand Down Expand Up @@ -559,7 +552,7 @@ func (kc *Catalog) DropPartition(ctx context.Context, dbID int64, collectionID t

if partitionVersionAfter210(collMeta) {
k := BuildPartitionKey(collectionID, partitionID)
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k}, ts)
return kc.Snapshot.MultiSaveAndRemove(nil, []string{k}, ts)
}

k := BuildCollectionKey(util.NonDBID, collectionID)
Expand Down Expand Up @@ -601,7 +594,7 @@ func (kc *Catalog) DropAlias(ctx context.Context, dbID int64, alias string, ts t
oldKBefore210 := BuildAliasKey210(alias)
oldKeyWithoutDb := BuildAliasKey(alias)
k := BuildAliasKeyWithDB(dbID, alias)
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k, oldKeyWithoutDb, oldKBefore210}, ts)
return kc.Snapshot.MultiSaveAndRemove(nil, []string{k, oldKeyWithoutDb, oldKBefore210}, ts)
}

func (kc *Catalog) GetCollectionByName(ctx context.Context, dbID int64, collectionName string, ts typeutil.Timestamp) (*model.Collection, error) {
Expand Down
53 changes: 32 additions & 21 deletions internal/metastore/kv/rootcoord/kv_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func TestCatalog_CreateAliasV2(t *testing.T) {
ctx := context.Background()

snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return errors.New("mock")
}

Expand All @@ -504,7 +504,7 @@ func TestCatalog_CreateAliasV2(t *testing.T) {
err := kc.CreateAlias(ctx, &model.Alias{}, 0)
assert.Error(t, err)

snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return nil
}
err = kc.CreateAlias(ctx, &model.Alias{}, 0)
Expand Down Expand Up @@ -623,7 +623,7 @@ func TestCatalog_AlterAliasV2(t *testing.T) {
ctx := context.Background()

snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return errors.New("mock")
}

Expand All @@ -632,7 +632,7 @@ func TestCatalog_AlterAliasV2(t *testing.T) {
err := kc.AlterAlias(ctx, &model.Alias{}, 0)
assert.Error(t, err)

snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return nil
}
err = kc.AlterAlias(ctx, &model.Alias{}, 0)
Expand Down Expand Up @@ -706,7 +706,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) {
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
return string(value), nil
}
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return errors.New("mock")
}

Expand All @@ -715,7 +715,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) {
err = kc.DropPartition(ctx, 0, 100, 101, 0)
assert.Error(t, err)

snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return nil
}
err = kc.DropPartition(ctx, 0, 100, 101, 0)
Expand Down Expand Up @@ -758,7 +758,7 @@ func TestCatalog_DropAliasV2(t *testing.T) {
ctx := context.Background()

snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return errors.New("mock")
}

Expand All @@ -767,7 +767,7 @@ func TestCatalog_DropAliasV2(t *testing.T) {
err := kc.DropAlias(ctx, testDb, "alias", 0)
assert.Error(t, err)

snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return nil
}
err = kc.DropAlias(ctx, testDb, "alias", 0)
Expand Down Expand Up @@ -942,27 +942,27 @@ func TestCatalog_ListAliasesV2(t *testing.T) {
})
}

func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) {
func Test_batchMultiSaveAndRemove(t *testing.T) {
t.Run("failed to save", func(t *testing.T) {
snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error {
return errors.New("error mock MultiSave")
}
saves := map[string]string{"k": "v"}
err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0)
err := batchMultiSaveAndRemove(snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0)
assert.Error(t, err)
})
t.Run("failed to remove", func(t *testing.T) {
snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
}
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return errors.New("error mock MultiSaveAndRemoveWithPrefix")
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
return errors.New("error mock MultiSaveAndRemove")
}
saves := map[string]string{"k": "v"}
removals := []string{"prefix1", "prefix2"}
err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
err := batchMultiSaveAndRemove(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
Expand All @@ -971,7 +971,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) {
log.Info("multi save", zap.Any("len", len(kvs)), zap.Any("saves", kvs))
return nil
}
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
log.Info("multi save and remove with prefix", zap.Any("len of saves", len(saves)), zap.Any("len of removals", len(removals)),
zap.Any("saves", saves), zap.Any("removals", removals))
return nil
Expand All @@ -983,7 +983,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) {
saves[fmt.Sprintf("k%d", i)] = fmt.Sprintf("v%d", i)
removals = append(removals, fmt.Sprintf("k%d", i))
}
err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
err := batchMultiSaveAndRemove(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
assert.NoError(t, err)
})
}
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func TestCatalog_AlterCollection(t *testing.T) {
t.Run("modify db name", func(t *testing.T) {
var collectionID int64 = 1
snapshot := kv.NewMockSnapshotKV()
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
assert.ElementsMatch(t, []string{BuildCollectionKey(0, collectionID)}, removals)
assert.Equal(t, len(saves), 1)
assert.Contains(t, maps.Keys(saves), BuildCollectionKey(1, collectionID))
Expand Down Expand Up @@ -1149,6 +1149,17 @@ func withMockMultiSaveAndRemoveWithPrefix(err error) mockSnapshotOpt {
}
}

func withMockMultiSaveAndRemove(err error) mockSnapshotOpt {
return func(ss *mocks.SnapShotKV) {
ss.On(
"MultiSaveAndRemove",
mock.AnythingOfType("map[string]string"),
mock.AnythingOfType("[]string"),
mock.AnythingOfType("uint64")).
Return(err)
}
}

func TestCatalog_CreateCollection(t *testing.T) {
t.Run("collection not creating", func(t *testing.T) {
kc := &Catalog{}
Expand Down Expand Up @@ -1198,7 +1209,7 @@ func TestCatalog_CreateCollection(t *testing.T) {

func TestCatalog_DropCollection(t *testing.T) {
t.Run("failed to remove", func(t *testing.T) {
mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemoveWithPrefix(errors.New("error mock MultiSaveAndRemoveWithPrefix")))
mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemove(errors.New("error mock MultiSaveAndRemove")))
kc := &Catalog{Snapshot: mockSnapshot}
ctx := context.Background()
coll := &model.Collection{
Expand All @@ -1216,7 +1227,7 @@ func TestCatalog_DropCollection(t *testing.T) {
removeOtherCalled := false
removeCollectionCalled := false
mockSnapshot.On(
"MultiSaveAndRemoveWithPrefix",
"MultiSaveAndRemove",
mock.AnythingOfType("map[string]string"),
mock.AnythingOfType("[]string"),
mock.AnythingOfType("uint64")).
Expand All @@ -1225,13 +1236,13 @@ func TestCatalog_DropCollection(t *testing.T) {
return nil
}).Once()
mockSnapshot.On(
"MultiSaveAndRemoveWithPrefix",
"MultiSaveAndRemove",
mock.AnythingOfType("map[string]string"),
mock.AnythingOfType("[]string"),
mock.AnythingOfType("uint64")).
Return(func(map[string]string, []string, typeutil.Timestamp) error {
removeCollectionCalled = true
return errors.New("error mock MultiSaveAndRemoveWithPrefix")
return errors.New("error mock MultiSaveAndRemove")
}).Once()
kc := &Catalog{Snapshot: mockSnapshot}
ctx := context.Background()
Expand All @@ -1248,7 +1259,7 @@ func TestCatalog_DropCollection(t *testing.T) {
})

t.Run("normal case", func(t *testing.T) {
mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemoveWithPrefix(nil))
mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemove(nil))
kc := &Catalog{Snapshot: mockSnapshot}
ctx := context.Background()
coll := &model.Collection{
Expand Down

0 comments on commit 2c7bb0b

Please sign in to comment.