diff --git a/states/etcd/commands.go b/states/etcd/commands.go index 7451fcb..0aaf4c8 100644 --- a/states/etcd/commands.go +++ b/states/etcd/commands.go @@ -90,6 +90,8 @@ func RemoveCommand(cli kv.MetaKV, instanceName, basePath string) *cobra.Command remove.SegmentCollectionDroppedCommand(cli, basePath), // remove etcd-config remove.EtcdConfigCommand(cli, instanceName), + // remove collection has been dropped + remove.CollectionCleanCommand(cli, basePath), ) return removeCmd diff --git a/states/etcd/common/collection.go b/states/etcd/common/collection.go index d70a0d7..f63e3a5 100644 --- a/states/etcd/common/collection.go +++ b/states/etcd/common/collection.go @@ -21,10 +21,13 @@ import ( ) const ( + SnapshotPrefix = "snapshots" // CollectionMetaPrefix is prefix for rootcoord collection meta. CollectionMetaPrefix = `root-coord/collection` // DBCollectionMetaPrefix is prefix for rootcoord database collection meta DBCollectionMetaPrefix = `root-coord/database/collection-info` + // FieldMetaPrefix is prefix for rootcoord collection fields meta + FieldMetaPrefix = `root-coord/fields` // CollectionLoadPrefix is prefix for querycoord collection loaded in milvus v2.1.x CollectionLoadPrefix = "queryCoord-collectionMeta" // CollectionLoadPrefixV2 is prefix for querycoord collection loaded in milvus v2.2.x diff --git a/states/etcd/common/segment.go b/states/etcd/common/segment.go index 02786dc..c89960b 100644 --- a/states/etcd/common/segment.go +++ b/states/etcd/common/segment.go @@ -17,12 +17,13 @@ import ( ) const ( - segmentMetaPrefix = "datacoord-meta/s" + SegmentMetaPrefix = "datacoord-meta/s" + SegmentStatsMetaPrefix = "datacoord-meta/statslog" ) // ListSegmentsVersion list segment info as specified version. func ListSegmentsVersion(ctx context.Context, cli kv.MetaKV, basePath string, version string, filters ...func(*models.Segment) bool) ([]*models.Segment, error) { - prefix := path.Join(basePath, segmentMetaPrefix) + "/" + prefix := path.Join(basePath, SegmentMetaPrefix) + "/" switch version { case models.LTEVersion2_1: segments, keys, err := ListProtoObjects[datapb.SegmentInfo](ctx, cli, prefix) @@ -107,7 +108,7 @@ func getSegmentLazyFunc(cli kv.MetaKV, basePath string, segment datapbv2.Segment func ListSegments(cli kv.MetaKV, basePath string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - _, vals, err := cli.LoadWithPrefix(ctx, path.Join(basePath, segmentMetaPrefix)+"/") + _, vals, err := cli.LoadWithPrefix(ctx, path.Join(basePath, SegmentMetaPrefix)+"/") if err != nil { return nil, err } @@ -308,7 +309,7 @@ func RemoveSegmentByID(ctx context.Context, cli kv.MetaKV, basePath string, coll func UpdateSegments(ctx context.Context, cli kv.MetaKV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error { - prefix := path.Join(basePath, fmt.Sprintf("%s/%d", segmentMetaPrefix, collectionID)) + "/" + prefix := path.Join(basePath, fmt.Sprintf("%s/%d", SegmentMetaPrefix, collectionID)) + "/" segments, keys, err := ListProtoObjects[datapbv2.SegmentInfo](ctx, cli, prefix) if err != nil { return err diff --git a/states/etcd/remove/collection_clean.go b/states/etcd/remove/collection_clean.go new file mode 100644 index 0000000..6f11269 --- /dev/null +++ b/states/etcd/remove/collection_clean.go @@ -0,0 +1,127 @@ +package remove + +import ( + "context" + "fmt" + "github.com/milvus-io/birdwatcher/states/kv" + "github.com/spf13/cobra" + "path" + "strconv" + "strings" + + "github.com/samber/lo" + + "github.com/milvus-io/birdwatcher/models" + "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" +) + +var paginationSize = 2000 + +type ExcludePrefixOptions func(string) bool + +// CollectionCleanCommand returns command to remove +func CollectionCleanCommand(cli kv.MetaKV, basePath string) *cobra.Command { + cmd := &cobra.Command{ + Use: "collection-meta-leaked", + Short: "Remove leaked collection meta for collection has been dropped", + Run: func(cmd *cobra.Command, args []string) { + run, err := cmd.Flags().GetBool("run") + if err != nil { + fmt.Println(err.Error()) + return + } + + collections, err := common.ListCollectionsVersion(context.TODO(), cli, basePath, etcdversion.GetVersion()) + if err != nil { + fmt.Println(err.Error()) + return + } + + id2Collection := lo.SliceToMap(collections, func(col *models.Collection) (string, *models.Collection) { + fmt.Printf("existing collectionID %v\n", col.ID) + return strconv.FormatInt(col.ID, 10), col + }) + + cleanMetaFn := func(ctx context.Context, prefix string, opts ...ExcludePrefixOptions) error { + return cli.WalkWithPrefix(ctx, prefix, paginationSize, func(k []byte, v []byte) error { + sKey := string(k) + for _, opt := range opts { + if opt(sKey) { + return nil + } + } + + key := sKey[len(prefix):] + ss := strings.Split(key, "/") + collectionExist := false + for _, s := range ss { + if _, ok := id2Collection[s]; ok { + collectionExist = true + } + } + + if !collectionExist { + fmt.Println("clean meta key ", sKey) + if run { + return cli.Remove(ctx, sKey) + } + } + + return nil + }) + } + + // remove collection meta + // meta before database + collectionMetaPrefix := path.Join(basePath, common.CollectionMetaPrefix) + // with database + dbCollectionMetaPrefix := path.Join(basePath, common.DBCollectionMetaPrefix) + // remove collection field meta + fieldsPrefix := path.Join(basePath, common.FieldMetaPrefix) + fieldsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.FieldMetaPrefix) + // remove collection partition meta + partitionsPrefix := path.Join(basePath, common.PartitionPrefix) + partitionsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.PartitionPrefix) + prefixes := []string{ + collectionMetaPrefix, + dbCollectionMetaPrefix, + fieldsPrefix, + fieldsSnapShotPrefix, + partitionsPrefix, + partitionsSnapShotPrefix} + + for _, prefix := range prefixes { + fmt.Printf("start cleaning leaked collection meta, prefix: %s\n", prefix) + err = cleanMetaFn(context.TODO(), prefix) + if err != nil { + fmt.Println(err.Error()) + return + } + fmt.Printf("clean leaked collection meta done, prefix: %s\n", prefix) + } + + // remove segment meta + segmentPrefix := path.Join(basePath, common.SegmentMetaPrefix) + segmentStatsPrefix := path.Join(basePath, common.SegmentStatsMetaPrefix) + fmt.Printf("start cleaning leaked segment meta, prefix: %s, exclude prefix%s\n", segmentPrefix, segmentStatsPrefix) + err = cleanMetaFn(context.TODO(), segmentPrefix, func(key string) bool { + if strings.HasPrefix(key, segmentStatsPrefix) { + return true + } + + return false + }) + if err != nil { + fmt.Println(err.Error()) + return + } + + fmt.Printf("clean leaked segment meta done, prefix: %s\n", segmentPrefix) + return + }, + } + + cmd.Flags().Bool("run", false, "flags indicating whether to execute removed command") + return cmd +} diff --git a/states/kv/kv.go b/states/kv/kv.go index 657a769..d6e2cf7 100644 --- a/states/kv/kv.go +++ b/states/kv/kv.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "math" + "path" "strings" "time" @@ -50,6 +51,7 @@ type MetaKV interface { removeWithPrefixAndPrevKV(ctx context.Context, prefix string) ([]*mvccpb.KeyValue, error) GetAllRootPath(ctx context.Context) ([]string, error) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision bool, batchSize int64) error + WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error Close() } @@ -206,6 +208,8 @@ func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision meta["instance"] = instance meta["metaPath"] = metaPath + fmt.Println("meta path ", metaPath) + bs, _ := json.Marshal(meta) ph := models.PartHeader{ PartType: int32(models.EtcdBackup), @@ -271,6 +275,39 @@ func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision return nil } +func (kv *etcdKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error { + prefix = path.Join(kv.rootPath, prefix) + + batch := int64(paginationSize) + opts := []clientv3.OpOption{ + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + clientv3.WithLimit(batch), + clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)), + } + + key := prefix + for { + resp, err := kv.client.Get(ctx, key, opts...) + if err != nil { + return err + } + + for _, kv := range resp.Kvs { + if err = fn(kv.Key, kv.Value); err != nil { + return err + } + } + + if !resp.More { + break + } + // move to next key + key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) + } + + return nil +} + // Close closes the connection to etcd. func (kv *etcdKV) Close() { kv.client.Close() @@ -588,6 +625,45 @@ func (kv *txnTiKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision return txn.Commit(ctx) } +func (kv *txnTiKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error { + prefix = path.Join(kv.rootPath, prefix) + + // Since only reading, use Snapshot for less overhead + ss := kv.client.GetSnapshot(MaxSnapshotTS) + ss.SetScanBatchSize(paginationSize) + + // Retrieve key-value pairs with the specified prefix + startKey := []byte(prefix) + endKey := tikv.PrefixNextKey([]byte(prefix)) + iter, err := ss.Iter(startKey, endKey) + if err != nil { + err = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for %s during WalkWithPrefix", prefix)) + return err + } + defer iter.Close() + + // Iterate over the key-value pairs + for iter.Valid() { + // Grab value for empty check + byteVal := iter.Value() + // Check if empty val and replace with placeholder + if isEmptyByte(byteVal) { + byteVal = []byte{} + } + err = fn(iter.Key(), byteVal) + if err != nil { + err = errors.Wrap(err, fmt.Sprintf("Failed to apply fn to (%s;%s)", string(iter.Key()), string(byteVal))) + return err + } + err = iter.Next() + if err != nil { + err = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for WalkWithPrefix", string(iter.Key()))) + return err + } + } + return nil +} + // Close closes the connection to TiKV. func (kv *txnTiKV) Close() { kv.client.Close() diff --git a/states/kv/kv_audit.go b/states/kv/kv_audit.go index 9a8c681..2cb6cb4 100644 --- a/states/kv/kv_audit.go +++ b/states/kv/kv_audit.go @@ -129,3 +129,7 @@ func (c *FileAuditKV) writeData(data []byte) { } } } + +func (c *FileAuditKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error { + return c.cli.WalkWithPrefix(ctx, prefix, paginationSize, fn) +}