diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 14091cdc1e84..929febe2c808 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -91,5 +91,6 @@ type SnapShotKV interface { Load(key string, ts typeutil.Timestamp) (string, error) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) + MultiSaveAndRemove(saves map[string]string, removals []string, ts typeutil.Timestamp) error MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error } diff --git a/internal/kv/mock_snapshot_kv.go b/internal/kv/mock_snapshot_kv.go index 35cc851853dc..9eed83499732 100644 --- a/internal/kv/mock_snapshot_kv.go +++ b/internal/kv/mock_snapshot_kv.go @@ -11,6 +11,7 @@ type mockSnapshotKV struct { MultiSaveFunc func(kvs map[string]string, ts typeutil.Timestamp) error LoadWithPrefixFunc func(key string, ts typeutil.Timestamp) ([]string, []string, error) MultiSaveAndRemoveWithPrefixFunc func(saves map[string]string, removals []string, ts typeutil.Timestamp) error + MultiSaveAndRemoveFunc func(saves map[string]string, removals []string, ts typeutil.Timestamp) error } func NewMockSnapshotKV() *mockSnapshotKV { @@ -51,3 +52,10 @@ func (m mockSnapshotKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, re } return nil } + +func (m mockSnapshotKV) MultiSaveAndRemove(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + if m.MultiSaveAndRemoveFunc != nil { + return m.MultiSaveAndRemoveFunc(saves, removals, ts) + } + return nil +} diff --git a/internal/kv/mock_snapshot_kv_test.go b/internal/kv/mock_snapshot_kv_test.go index 94e6f2136afb..0b2df70f9173 100644 --- a/internal/kv/mock_snapshot_kv_test.go +++ b/internal/kv/mock_snapshot_kv_test.go @@ -87,3 +87,19 @@ func Test_mockSnapshotKV_MultiSaveAndRemoveWithPrefix(t *testing.T) { assert.NoError(t, err) }) } + +func Test_mockSnapshotKV_MultiSaveAndRemove(t *testing.T) { + t.Run("func not set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + err := snapshot.MultiSaveAndRemove(nil, nil, 0) + assert.NoError(t, err) + }) + t.Run("func set", func(t *testing.T) { + snapshot := NewMockSnapshotKV() + snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return nil + } + err := snapshot.MultiSaveAndRemove(nil, nil, 0) + assert.NoError(t, err) + }) +} diff --git a/internal/kv/mocks/snapshot_kv.go b/internal/kv/mocks/snapshot_kv.go index e1e4ef7c1c3f..dc2de1d78379 100644 --- a/internal/kv/mocks/snapshot_kv.go +++ b/internal/kv/mocks/snapshot_kv.go @@ -177,6 +177,50 @@ func (_c *SnapShotKV_MultiSave_Call) RunAndReturn(run func(map[string]string, ui return _c } +// MultiSaveAndRemove provides a mock function with given fields: saves, removals, ts +func (_m *SnapShotKV) MultiSaveAndRemove(saves map[string]string, removals []string, ts uint64) error { + ret := _m.Called(saves, removals, ts) + + var r0 error + if rf, ok := ret.Get(0).(func(map[string]string, []string, uint64) error); ok { + r0 = rf(saves, removals, ts) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SnapShotKV_MultiSaveAndRemove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSaveAndRemove' +type SnapShotKV_MultiSaveAndRemove_Call struct { + *mock.Call +} + +// MultiSaveAndRemove is a helper method to define mock.On call +// - saves map[string]string +// - removals []string +// - ts uint64 +func (_e *SnapShotKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}, ts interface{}) *SnapShotKV_MultiSaveAndRemove_Call { + return &SnapShotKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals, ts)} +} + +func (_c *SnapShotKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string, ts uint64)) *SnapShotKV_MultiSaveAndRemove_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[string]string), args[1].([]string), args[2].(uint64)) + }) + return _c +} + +func (_c *SnapShotKV_MultiSaveAndRemove_Call) Return(_a0 error) *SnapShotKV_MultiSaveAndRemove_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *SnapShotKV_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string, uint64) error) *SnapShotKV_MultiSaveAndRemove_Call { + _c.Call.Return(run) + return _c +} + // MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals, ts func (_m *SnapShotKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts uint64) error { ret := _m.Called(saves, removals, ts) diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index 9edcfe13f6be..916195598efc 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "sort" "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" @@ -85,7 +84,7 @@ func BuildAliasPrefixWithDB(dbID int64) string { // since SnapshotKV may save both snapshot key and the original key if the original key is newest // MaxEtcdTxnNum need to divided by 2 -func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error { +func batchMultiSaveAndRemove(snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error { saveFn := func(partialKvs map[string]string) error { return snapshot.MultiSave(partialKvs, ts) } @@ -93,14 +92,8 @@ func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, limit int, saves return err } - // avoid a case that the former key is the prefix of the later key. - // for example, `root-coord/fields/collection_id/1` is the prefix of `root-coord/fields/collection_id/100`. - sort.Slice(removals, func(i, j int) bool { - return removals[i] > removals[j] - }) - removeFn := func(partialKeys []string) error { - return snapshot.MultiSaveAndRemoveWithPrefix(nil, partialKeys, ts) + return snapshot.MultiSaveAndRemove(nil, partialKeys, ts) } return etcd.RemoveByBatchWithLimit(removals, limit, removeFn) } @@ -127,7 +120,7 @@ func (kc *Catalog) AlterDatabase(ctx context.Context, newColl *model.Database, t func (kc *Catalog) DropDatabase(ctx context.Context, dbID int64, ts typeutil.Timestamp) error { key := BuildDatabaseKey(dbID) - return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{key}, ts) + return kc.Snapshot.MultiSaveAndRemove(nil, []string{key}, ts) } func (kc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) { @@ -300,7 +293,7 @@ func (kc *Catalog) CreateAlias(ctx context.Context, alias *model.Alias, ts typeu return err } kvs := map[string]string{k: string(v)} - return kc.Snapshot.MultiSaveAndRemoveWithPrefix(kvs, []string{oldKBefore210, oldKeyWithoutDb}, ts) + return kc.Snapshot.MultiSaveAndRemove(kvs, []string{oldKBefore210, oldKeyWithoutDb}, ts) } func (kc *Catalog) CreateCredential(ctx context.Context, credential *model.Credential) error { @@ -455,12 +448,12 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col // However, if we remove collection first, we cannot remove other metas. // since SnapshotKV may save both snapshot key and the original key if the original key is newest // MaxEtcdTxnNum need to divided by 2 - if err := batchMultiSaveAndRemoveWithPrefix(kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil { + if err := batchMultiSaveAndRemove(kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil { return err } // if we found collection dropping, we should try removing related resources. - return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, collectionKeys, ts) + return kc.Snapshot.MultiSaveAndRemove(nil, collectionKeys, ts) } func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *model.Collection, ts typeutil.Timestamp) error { @@ -491,7 +484,7 @@ func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *mod if oldKey == newKey { return kc.Snapshot.Save(newKey, string(value), ts) } - return kc.Snapshot.MultiSaveAndRemoveWithPrefix(saves, []string{oldKey}, ts) + return kc.Snapshot.MultiSaveAndRemove(saves, []string{oldKey}, ts) } func (kc *Catalog) AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, alterType metastore.AlterType, ts typeutil.Timestamp) error { @@ -559,7 +552,7 @@ func (kc *Catalog) DropPartition(ctx context.Context, dbID int64, collectionID t if partitionVersionAfter210(collMeta) { k := BuildPartitionKey(collectionID, partitionID) - return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k}, ts) + return kc.Snapshot.MultiSaveAndRemove(nil, []string{k}, ts) } k := BuildCollectionKey(util.NonDBID, collectionID) @@ -601,7 +594,7 @@ func (kc *Catalog) DropAlias(ctx context.Context, dbID int64, alias string, ts t oldKBefore210 := BuildAliasKey210(alias) oldKeyWithoutDb := BuildAliasKey(alias) k := BuildAliasKeyWithDB(dbID, alias) - return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k, oldKeyWithoutDb, oldKBefore210}, ts) + return kc.Snapshot.MultiSaveAndRemove(nil, []string{k, oldKeyWithoutDb, oldKBefore210}, ts) } func (kc *Catalog) GetCollectionByName(ctx context.Context, dbID int64, collectionName string, ts typeutil.Timestamp) (*model.Collection, error) { diff --git a/internal/metastore/kv/rootcoord/kv_catalog_test.go b/internal/metastore/kv/rootcoord/kv_catalog_test.go index 7523c821677d..5cb3c0f293d1 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog_test.go +++ b/internal/metastore/kv/rootcoord/kv_catalog_test.go @@ -495,7 +495,7 @@ func TestCatalog_CreateAliasV2(t *testing.T) { ctx := context.Background() snapshot := kv.NewMockSnapshotKV() - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { return errors.New("mock") } @@ -504,7 +504,7 @@ func TestCatalog_CreateAliasV2(t *testing.T) { err := kc.CreateAlias(ctx, &model.Alias{}, 0) assert.Error(t, err) - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { return nil } err = kc.CreateAlias(ctx, &model.Alias{}, 0) @@ -623,7 +623,7 @@ func TestCatalog_AlterAliasV2(t *testing.T) { ctx := context.Background() snapshot := kv.NewMockSnapshotKV() - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { return errors.New("mock") } @@ -632,7 +632,7 @@ func TestCatalog_AlterAliasV2(t *testing.T) { err := kc.AlterAlias(ctx, &model.Alias{}, 0) assert.Error(t, err) - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { return nil } err = kc.AlterAlias(ctx, &model.Alias{}, 0) @@ -706,7 +706,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) { snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) { return string(value), nil } - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { return errors.New("mock") } @@ -715,7 +715,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) { err = kc.DropPartition(ctx, 0, 100, 101, 0) assert.Error(t, err) - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { return nil } err = kc.DropPartition(ctx, 0, 100, 101, 0) @@ -758,7 +758,7 @@ func TestCatalog_DropAliasV2(t *testing.T) { ctx := context.Background() snapshot := kv.NewMockSnapshotKV() - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { return errors.New("mock") } @@ -767,7 +767,7 @@ func TestCatalog_DropAliasV2(t *testing.T) { err := kc.DropAlias(ctx, testDb, "alias", 0) assert.Error(t, err) - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { return nil } err = kc.DropAlias(ctx, testDb, "alias", 0) @@ -942,14 +942,14 @@ func TestCatalog_ListAliasesV2(t *testing.T) { }) } -func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) { +func Test_batchMultiSaveAndRemove(t *testing.T) { t.Run("failed to save", func(t *testing.T) { snapshot := kv.NewMockSnapshotKV() snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error { return errors.New("error mock MultiSave") } saves := map[string]string{"k": "v"} - err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0) + err := batchMultiSaveAndRemove(snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0) assert.Error(t, err) }) t.Run("failed to remove", func(t *testing.T) { @@ -957,12 +957,12 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) { snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error { return nil } - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { - return errors.New("error mock MultiSaveAndRemoveWithPrefix") + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + return errors.New("error mock MultiSaveAndRemove") } saves := map[string]string{"k": "v"} removals := []string{"prefix1", "prefix2"} - err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0) + err := batchMultiSaveAndRemove(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0) assert.Error(t, err) }) t.Run("normal case", func(t *testing.T) { @@ -971,7 +971,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) { log.Info("multi save", zap.Any("len", len(kvs)), zap.Any("saves", kvs)) return nil } - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { log.Info("multi save and remove with prefix", zap.Any("len of saves", len(saves)), zap.Any("len of removals", len(removals)), zap.Any("saves", saves), zap.Any("removals", removals)) return nil @@ -983,7 +983,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) { saves[fmt.Sprintf("k%d", i)] = fmt.Sprintf("v%d", i) removals = append(removals, fmt.Sprintf("k%d", i)) } - err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0) + err := batchMultiSaveAndRemove(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0) assert.NoError(t, err) }) } @@ -1040,7 +1040,7 @@ func TestCatalog_AlterCollection(t *testing.T) { t.Run("modify db name", func(t *testing.T) { var collectionID int64 = 1 snapshot := kv.NewMockSnapshotKV() - snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error { assert.ElementsMatch(t, []string{BuildCollectionKey(0, collectionID)}, removals) assert.Equal(t, len(saves), 1) assert.Contains(t, maps.Keys(saves), BuildCollectionKey(1, collectionID)) @@ -1149,6 +1149,17 @@ func withMockMultiSaveAndRemoveWithPrefix(err error) mockSnapshotOpt { } } +func withMockMultiSaveAndRemove(err error) mockSnapshotOpt { + return func(ss *mocks.SnapShotKV) { + ss.On( + "MultiSaveAndRemove", + mock.AnythingOfType("map[string]string"), + mock.AnythingOfType("[]string"), + mock.AnythingOfType("uint64")). + Return(err) + } +} + func TestCatalog_CreateCollection(t *testing.T) { t.Run("collection not creating", func(t *testing.T) { kc := &Catalog{} @@ -1198,7 +1209,7 @@ func TestCatalog_CreateCollection(t *testing.T) { func TestCatalog_DropCollection(t *testing.T) { t.Run("failed to remove", func(t *testing.T) { - mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemoveWithPrefix(errors.New("error mock MultiSaveAndRemoveWithPrefix"))) + mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemove(errors.New("error mock MultiSaveAndRemove"))) kc := &Catalog{Snapshot: mockSnapshot} ctx := context.Background() coll := &model.Collection{ @@ -1216,7 +1227,7 @@ func TestCatalog_DropCollection(t *testing.T) { removeOtherCalled := false removeCollectionCalled := false mockSnapshot.On( - "MultiSaveAndRemoveWithPrefix", + "MultiSaveAndRemove", mock.AnythingOfType("map[string]string"), mock.AnythingOfType("[]string"), mock.AnythingOfType("uint64")). @@ -1225,13 +1236,13 @@ func TestCatalog_DropCollection(t *testing.T) { return nil }).Once() mockSnapshot.On( - "MultiSaveAndRemoveWithPrefix", + "MultiSaveAndRemove", mock.AnythingOfType("map[string]string"), mock.AnythingOfType("[]string"), mock.AnythingOfType("uint64")). Return(func(map[string]string, []string, typeutil.Timestamp) error { removeCollectionCalled = true - return errors.New("error mock MultiSaveAndRemoveWithPrefix") + return errors.New("error mock MultiSaveAndRemove") }).Once() kc := &Catalog{Snapshot: mockSnapshot} ctx := context.Background() @@ -1248,7 +1259,7 @@ func TestCatalog_DropCollection(t *testing.T) { }) t.Run("normal case", func(t *testing.T) { - mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemoveWithPrefix(nil)) + mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemove(nil)) kc := &Catalog{Snapshot: mockSnapshot} ctx := context.Background() coll := &model.Collection{ diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index f945dc958d3b..af443ffc7c6f 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -502,6 +503,53 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s return resultKeys, resultValues, nil } +// MultiSaveAndRemove save muiltple kvs and remove as well +// if ts == 0, act like MetaKv +// each key-value will be treated in same logic like Save +func (ss *SuffixSnapshot) MultiSaveAndRemove(saves map[string]string, removals []string, ts typeutil.Timestamp) error { + // if ts == 0, act like MetaKv + if ts == 0 { + return ss.MetaKv.MultiSaveAndRemove(saves, removals) + } + ss.Lock() + defer ss.Unlock() + var err error + + // process each key, checks whether is the latest + execute, updateList, err := ss.generateSaveExecute(saves, ts) + if err != nil { + return err + } + + // load each removal, change execution to adding tombstones + for _, removal := range removals { + value, err := ss.MetaKv.Load(removal) + if err != nil { + log.Warn("SuffixSnapshot MetaKv Load failed", zap.String("key", removal), zap.Error(err)) + if errors.Is(err, merr.ErrIoKeyNotFound) { + continue + } + return err + } + // add tombstone to original key and add ts entry + if IsTombstone(value) { + continue + } + execute[removal] = string(SuffixSnapshotTombstone) + execute[ss.composeTSKey(removal, ts)] = string(SuffixSnapshotTombstone) + updateList = append(updateList, removal) + } + + // multi save execute map; if succeeds, update ts in the update list + err = ss.MetaKv.MultiSave(execute) + if err == nil { + for _, key := range updateList { + ss.lastestTS[key] = ts + } + } + return err +} + // MultiSaveAndRemoveWithPrefix save muiltple kvs and remove as well // if ts == 0, act like MetaKv // each key-value will be treated in same logic like Save diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go index 5efc00680def..6d76e544700a 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go @@ -673,6 +673,82 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { ss.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{""}, 0) } +func Test_SuffixSnapshotMultiSaveAndRemove(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + randVal := rand.Int() + + rootPath := fmt.Sprintf("/test/meta/%d", randVal) + sep := "_ts" + + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), + Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), + Params.EtcdCfg.EtcdTLSCACert.GetValue(), + Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + require.Nil(t, err) + defer etcdCli.Close() + etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath) + require.Nil(t, err) + defer etcdkv.Close() + + var vtso typeutil.Timestamp + ftso := func() typeutil.Timestamp { + return vtso + } + + ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) + assert.NoError(t, err) + assert.NotNil(t, ss) + defer ss.Close() + + for i := 0; i < 20; i++ { + vtso = typeutil.Timestamp(100 + i*5) + ts := ftso() + err = ss.Save(fmt.Sprintf("kd-%04d", i), fmt.Sprintf("value-%d", i), ts) + assert.NoError(t, err) + assert.Equal(t, vtso, ts) + } + for i := 20; i < 40; i++ { + sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)} + dm := []string{fmt.Sprintf("kd-%04d", i-20)} + vtso = typeutil.Timestamp(100 + i*5) + ts := ftso() + err = ss.MultiSaveAndRemove(sm, dm, ts) + assert.NoError(t, err) + assert.Equal(t, vtso, ts) + } + for i := 0; i < 20; i++ { + val, err := ss.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2)) + assert.NoError(t, err) + assert.Equal(t, fmt.Sprintf("value-%d", i), val) + _, vals, err := ss.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) + assert.NoError(t, err) + assert.Equal(t, i+1, len(vals)) + } + for i := 20; i < 40; i++ { + val, err := ss.Load("ks", typeutil.Timestamp(100+i*5+2)) + assert.NoError(t, err) + assert.Equal(t, fmt.Sprintf("value-%d", i), val) + _, vals, err := ss.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) + assert.NoError(t, err) + assert.Equal(t, 39-i, len(vals)) + } + + // try to load + _, err = ss.Load("kd-0000", 500) + assert.Error(t, err) + _, err = ss.Load("kd-0000", 0) + assert.Error(t, err) + _, err = ss.Load("kd-0000", 1) + assert.Error(t, err) + + // cleanup + ss.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{""}, 0) +} + func TestSuffixSnapshot_LoadWithPrefix(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int()