Skip to content

Commit

Permalink
fix clean collection meta command (#229)
Browse files Browse the repository at this point in the history
Signed-off-by: longjiquan <jiquan.long@zilliz.com>
  • Loading branch information
longjiquan committed Dec 14, 2023
1 parent 87dc3cc commit 43cb144
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 13 deletions.
42 changes: 31 additions & 11 deletions states/etcd/remove/collection_dropping.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,37 @@ func cleanCollectionDropMeta(cli kv.MetaKV, basePath string, collection *models.
fmt.Printf("Collection %s[%d] key is empty string, cannot perform cleanup", collection.Schema.Name, collection.ID)
return
}
// remove collection meta
cli.Remove(context.TODO(), collection.Key())
fmt.Printf("Collection Meta: %s\n", collection.Key())
// remove collection field meta
fieldsPrefix := path.Join(basePath, fmt.Sprintf("root-coord/fields/%d", collection.ID)) + "/"
fmt.Printf("Collection Field Meta(Prefix): %s\n", collection.Key())
cli.RemoveWithPrefix(context.TODO(), fieldsPrefix)
// remove collection partition meta
partitionsPrefix := path.Join(basePath, fmt.Sprintf("root-coord/partitions/%d", collection.ID)) + "/"
fmt.Printf("Collection Partition Meta(Prefix): %s\n", partitionsPrefix)
cli.RemoveWithPrefix(context.TODO(), partitionsPrefix)

// better to remove collection meta finally for better atomicity.
// TODO: alias meta can't be cleaned conveniently.
prefixes := []string{
// remove collection field meta
path.Join(basePath, fmt.Sprintf("root-coord/fields/%d", collection.ID)) + "/",
path.Join(basePath, common.SnapshotPrefix, fmt.Sprintf("root-coord/fields/%d", collection.ID)) + "/",

// remove collection partition meta
path.Join(basePath, fmt.Sprintf("root-coord/partitions/%d", collection.ID)) + "/",
path.Join(basePath, common.SnapshotPrefix, fmt.Sprintf("root-coord/partitions/%d", collection.ID)) + "/",
}

var collectionKey string
if collection.DBID != 0 {
collectionKey = fmt.Sprintf("root-coord/database/collection-info/%d/%d", collection.DBID, collection.ID)
} else {
collectionKey = fmt.Sprintf("root-coord/collection/%d", collection.ID)
}

// collection will have timestamp suffix, which should be also removed by prefix.
prefixes = append(prefixes, path.Join(basePath, collectionKey))
prefixes = append(prefixes, path.Join(basePath, common.SnapshotPrefix, collectionKey))

for _, prefix := range prefixes {
if err := cli.RemoveWithPrefix(context.TODO(), prefix); err != nil {
fmt.Printf("failed to clean prefix: %s, error: %s\n", prefix, err.Error())
} else {
fmt.Printf("clean prefix: %s\n", prefix)
}
}

channelWatchInfos, err := common.ListChannelWatch(context.Background(), cli, basePath, etcdversion.GetVersion(), func(cw *models.ChannelWatch) bool {
return cw.Vchan.CollectionID == collection.ID
Expand Down
7 changes: 5 additions & 2 deletions states/kv/kv_audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *FileAuditKV) Remove(ctx context.Context, key string) error {

func (c *FileAuditKV) RemoveWithPrefix(ctx context.Context, key string) error {
fmt.Println("audit delete with prefix", key)
val, err := c.cli.Load(ctx, key)
keys, values, err := c.cli.LoadWithPrefix(ctx, key)
if err != nil {
return err
}
Expand All @@ -75,7 +75,10 @@ func (c *FileAuditKV) RemoveWithPrefix(ctx context.Context, key string) error {
return err
}
c.writeHeader(models.OpDel, 1)
c.writeKeyValue(key, val)
for i, key := range keys {
val := values[i]
c.writeKeyValue(key, val)
}
return nil
}

Expand Down

0 comments on commit 43cb144

Please sign in to comment.