Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remove leaked collection meta cmd #222

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions states/etcd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func RemoveCommand(cli kv.MetaKV, instanceName, basePath string) *cobra.Command
remove.SegmentCollectionDroppedCommand(cli, basePath),
// remove etcd-config
remove.EtcdConfigCommand(cli, instanceName),
// remove collection has been dropped
remove.CollectionCleanCommand(cli, basePath),
)

return removeCmd
Expand Down
3 changes: 3 additions & 0 deletions states/etcd/common/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
)

const (
SnapshotPrefix = "snapshots"
// CollectionMetaPrefix is prefix for rootcoord collection meta.
CollectionMetaPrefix = `root-coord/collection`
// DBCollectionMetaPrefix is prefix for rootcoord database collection meta
DBCollectionMetaPrefix = `root-coord/database/collection-info`
// FieldMetaPrefix is prefix for rootcoord collection fields meta
FieldMetaPrefix = `root-coord/fields`
// CollectionLoadPrefix is prefix for querycoord collection loaded in milvus v2.1.x
CollectionLoadPrefix = "queryCoord-collectionMeta"
// CollectionLoadPrefixV2 is prefix for querycoord collection loaded in milvus v2.2.x
Expand Down
9 changes: 5 additions & 4 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import (
)

const (
segmentMetaPrefix = "datacoord-meta/s"
SegmentMetaPrefix = "datacoord-meta/s"
SegmentStatsMetaPrefix = "datacoord-meta/statslog"
)

// ListSegmentsVersion list segment info as specified version.
func ListSegmentsVersion(ctx context.Context, cli kv.MetaKV, basePath string, version string, filters ...func(*models.Segment) bool) ([]*models.Segment, error) {
prefix := path.Join(basePath, segmentMetaPrefix) + "/"
prefix := path.Join(basePath, SegmentMetaPrefix) + "/"
switch version {
case models.LTEVersion2_1:
segments, keys, err := ListProtoObjects[datapb.SegmentInfo](ctx, cli, prefix)
Expand Down Expand Up @@ -107,7 +108,7 @@ func getSegmentLazyFunc(cli kv.MetaKV, basePath string, segment datapbv2.Segment
func ListSegments(cli kv.MetaKV, basePath string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, vals, err := cli.LoadWithPrefix(ctx, path.Join(basePath, segmentMetaPrefix)+"/")
_, vals, err := cli.LoadWithPrefix(ctx, path.Join(basePath, SegmentMetaPrefix)+"/")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -308,7 +309,7 @@ func RemoveSegmentByID(ctx context.Context, cli kv.MetaKV, basePath string, coll

func UpdateSegments(ctx context.Context, cli kv.MetaKV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error {

prefix := path.Join(basePath, fmt.Sprintf("%s/%d", segmentMetaPrefix, collectionID)) + "/"
prefix := path.Join(basePath, fmt.Sprintf("%s/%d", SegmentMetaPrefix, collectionID)) + "/"
segments, keys, err := ListProtoObjects[datapbv2.SegmentInfo](ctx, cli, prefix)
if err != nil {
return err
Expand Down
122 changes: 122 additions & 0 deletions states/etcd/remove/collection_clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package remove

import (
"context"
"fmt"
"path"
"strconv"
"strings"

"github.com/samber/lo"
"github.com/spf13/cobra"

"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
"github.com/milvus-io/birdwatcher/states/kv"
)

var paginationSize = 2000

type ExcludePrefixOptions func(string) bool

// CollectionCleanCommand returns command to remove
func CollectionCleanCommand(cli kv.MetaKV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "collection-meta-leaked",
Short: "Remove leaked collection meta for collection has been dropped",
Run: func(cmd *cobra.Command, args []string) {
run, err := cmd.Flags().GetBool("run")
if err != nil {
fmt.Println(err.Error())
return
}

collections, err := common.ListCollectionsVersion(context.TODO(), cli, basePath, etcdversion.GetVersion())
if err != nil {
fmt.Println(err.Error())
return
}

id2Collection := lo.SliceToMap(collections, func(col *models.Collection) (string, *models.Collection) {
fmt.Printf("existing collectionID %v\n", col.ID)
return strconv.FormatInt(col.ID, 10), col
})

cleanMetaFn := func(ctx context.Context, prefix string, opts ...ExcludePrefixOptions) error {
return cli.WalkWithPrefix(ctx, prefix, paginationSize, func(k []byte, v []byte) error {
sKey := string(k)
for _, opt := range opts {
if opt(sKey) {
return nil
}
}

key := sKey[len(prefix):]
ss := strings.Split(key, "/")
collectionExist := false
for _, s := range ss {
if _, ok := id2Collection[s]; ok {
collectionExist = true
}
}

if !collectionExist {
fmt.Println("clean meta key ", sKey)
if run {
return cli.Remove(ctx, sKey)
}
}

return nil
})
}

// remove collection meta
// meta before database
collectionMetaPrefix := path.Join(basePath, common.CollectionMetaPrefix)
// with database
dbCollectionMetaPrefix := path.Join(basePath, common.DBCollectionMetaPrefix)
// remove collection field meta
fieldsPrefix := path.Join(basePath, common.FieldMetaPrefix)
fieldsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.FieldMetaPrefix)
// remove collection partition meta
partitionsPrefix := path.Join(basePath, common.PartitionPrefix)
partitionsSnapShotPrefix := path.Join(basePath, common.SnapshotPrefix, common.PartitionPrefix)
prefixes := []string{
collectionMetaPrefix,
dbCollectionMetaPrefix,
fieldsPrefix,
fieldsSnapShotPrefix,
partitionsPrefix,
partitionsSnapShotPrefix}

for _, prefix := range prefixes {
fmt.Printf("start cleaning leaked collection meta, prefix: %s\n", prefix)
err = cleanMetaFn(context.TODO(), prefix)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Printf("clean leaked collection meta done, prefix: %s\n", prefix)
}

// remove segment meta
segmentPrefix := path.Join(basePath, common.SegmentMetaPrefix)
segmentStatsPrefix := path.Join(basePath, common.SegmentStatsMetaPrefix)
fmt.Printf("start cleaning leaked segment meta, prefix: %s, exclude prefix%s\n", segmentPrefix, segmentStatsPrefix)
err = cleanMetaFn(context.TODO(), segmentPrefix, func(key string) bool {
return strings.HasPrefix(key, segmentStatsPrefix)
})
if err != nil {
fmt.Println(err.Error())
return
}

fmt.Printf("clean leaked segment meta done, prefix: %s\n", segmentPrefix)
},
}

cmd.Flags().Bool("run", false, "flags indicating whether to execute removed command")
return cmd
}
76 changes: 76 additions & 0 deletions states/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"math"
"path"
"strings"
"time"

Expand Down Expand Up @@ -50,6 +51,7 @@ type MetaKV interface {
removeWithPrefixAndPrevKV(ctx context.Context, prefix string) ([]*mvccpb.KeyValue, error)
GetAllRootPath(ctx context.Context) ([]string, error)
BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision bool, batchSize int64) error
WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error
Close()
}

Expand Down Expand Up @@ -206,6 +208,8 @@ func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision
meta["instance"] = instance
meta["metaPath"] = metaPath

fmt.Println("meta path ", metaPath)

bs, _ := json.Marshal(meta)
ph := models.PartHeader{
PartType: int32(models.EtcdBackup),
Expand Down Expand Up @@ -271,6 +275,39 @@ func (kv *etcdKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision
return nil
}

func (kv *etcdKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
prefix = path.Join(kv.rootPath, prefix)

batch := int64(paginationSize)
opts := []clientv3.OpOption{
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(batch),
clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)),
}

key := prefix
for {
resp, err := kv.client.Get(ctx, key, opts...)
if err != nil {
return err
}

for _, kv := range resp.Kvs {
if err = fn(kv.Key, kv.Value); err != nil {
return err
}
}

if !resp.More {
break
}
// move to next key
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
}

return nil
}

// Close closes the connection to etcd.
func (kv *etcdKV) Close() {
kv.client.Close()
Expand Down Expand Up @@ -588,6 +625,45 @@ func (kv *txnTiKV) BackupKV(base, prefix string, w *bufio.Writer, ignoreRevision
return txn.Commit(ctx)
}

func (kv *txnTiKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
prefix = path.Join(kv.rootPath, prefix)

// Since only reading, use Snapshot for less overhead
ss := kv.client.GetSnapshot(MaxSnapshotTS)
ss.SetScanBatchSize(paginationSize)

// Retrieve key-value pairs with the specified prefix
startKey := []byte(prefix)
endKey := tikv.PrefixNextKey([]byte(prefix))
iter, err := ss.Iter(startKey, endKey)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for %s during WalkWithPrefix", prefix))
return err
}
defer iter.Close()

// Iterate over the key-value pairs
for iter.Valid() {
// Grab value for empty check
byteVal := iter.Value()
// Check if empty val and replace with placeholder
if isEmptyByte(byteVal) {
byteVal = []byte{}
}
err = fn(iter.Key(), byteVal)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Failed to apply fn to (%s;%s)", string(iter.Key()), string(byteVal)))
return err
}
err = iter.Next()
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for WalkWithPrefix", string(iter.Key())))
return err
}
}
return nil
}

// Close closes the connection to TiKV.
func (kv *txnTiKV) Close() {
kv.client.Close()
Expand Down
4 changes: 4 additions & 0 deletions states/kv/kv_audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,7 @@ func (c *FileAuditKV) writeData(data []byte) {
}
}
}

func (c *FileAuditKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error {
return c.cli.WalkWithPrefix(ctx, prefix, paginationSize, fn)
}