Skip to content

Commit

Permalink
Fix rootcoord restoration missing gcConfirmStep
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Jul 3, 2023
1 parent 62e9e4a commit 3201f76
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 2 deletions.
7 changes: 7 additions & 0 deletions internal/rootcoord/garbage_collector.go
Expand Up @@ -65,6 +65,7 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim
baseStep: baseStep{core: c.s},
pChannels: collMeta.PhysicalChannelNames,
})
redo.AddAsyncStep(newConfirmGCStep(c.s, collMeta.CollectionID, allPartition))
redo.AddAsyncStep(&deleteCollectionMetaStep{
baseStep: baseStep{core: c.s},
collectionID: collMeta.CollectionID,
Expand Down Expand Up @@ -125,6 +126,12 @@ func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels []string, par
collID: partition.CollectionID,
partIDs: []UniqueID{partition.PartitionID},
})
redo.AddAsyncStep(&deletePartitionDataStep{
baseStep: baseStep{core: c.s},
pchans: pChannels,
partition: partition,
})
redo.AddAsyncStep(newConfirmGCStep(c.s, partition.CollectionID, partition.PartitionID))
redo.AddAsyncStep(&removePartitionMetaStep{
baseStep: baseStep{core: c.s},
dbID: dbID,
Expand Down
72 changes: 70 additions & 2 deletions internal/rootcoord/garbage_collector_test.go
Expand Up @@ -32,6 +32,11 @@ import (
)

func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
oldValue := confirmGCInterval
defer func() {
confirmGCInterval = oldValue
}()
confirmGCInterval = 0
t.Run("failed to release collection", func(t *testing.T) {
broker := newMockBroker()
broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error {
Expand Down Expand Up @@ -107,6 +112,13 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
releaseCollectionChan <- struct{}{}
return nil
}
gcConfirmCalled := false
gcConfirmChan := make(chan struct{})
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
gcConfirmCalled = true
close(gcConfirmChan)
return true
}
dropCollectionIndexCalled := false
dropCollectionIndexChan := make(chan struct{}, 1)
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
Expand Down Expand Up @@ -143,6 +155,8 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
assert.True(t, releaseCollectionCalled)
<-dropCollectionIndexChan
assert.True(t, dropCollectionIndexCalled)
<-gcConfirmChan
assert.True(t, gcConfirmCalled)
<-dropMetaChan
})

Expand All @@ -162,6 +176,13 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
dropCollectionIndexChan <- struct{}{}
return nil
}
gcConfirmCalled := false
gcConfirmChan := make(chan struct{})
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
gcConfirmCalled = true
close(gcConfirmChan)
return true
}
meta := mockrootcoord.NewIMetaTable(t)
removeCollectionCalled := false
removeCollectionChan := make(chan struct{}, 1)
Expand Down Expand Up @@ -194,6 +215,8 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
assert.True(t, dropCollectionIndexCalled)
<-removeCollectionChan
assert.True(t, removeCollectionCalled)
<-gcConfirmChan
assert.True(t, gcConfirmCalled)
})
}

Expand Down Expand Up @@ -309,6 +332,11 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) {
}

func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
oldValue := confirmGCInterval
defer func() {
confirmGCInterval = oldValue
}()
confirmGCInterval = 0
t.Run("failed to GcPartitionData", func(t *testing.T) {
ticker := newTickerWithMockFailStream() // failed to broadcast drop msg.
shardsNum := int(common.DefaultShardsNum)
Expand Down Expand Up @@ -344,15 +372,35 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
return errors.New("error mock RemovePartition")
})

broker := newMockBroker()
dropCollectionIndexCalled := false
dropCollectionIndexChan := make(chan struct{}, 1)
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
dropCollectionIndexCalled = true
dropCollectionIndexChan <- struct{}{}
return nil
}
gcConfirmCalled := false
gcConfirmChan := make(chan struct{})
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
gcConfirmCalled = true
close(gcConfirmChan)
return true
}

tsoAllocator := newMockTsoAllocator()
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex())
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex(), withBroker(broker))
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
<-dropCollectionIndexChan
assert.True(t, dropCollectionIndexCalled)
<-gcConfirmChan
assert.True(t, gcConfirmCalled)
<-removePartitionChan
assert.True(t, removePartitionCalled)
})
Expand All @@ -377,15 +425,35 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
return nil
})

broker := newMockBroker()
dropCollectionIndexCalled := false
dropCollectionIndexChan := make(chan struct{}, 1)
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
dropCollectionIndexCalled = true
dropCollectionIndexChan <- struct{}{}
return nil
}
gcConfirmCalled := false
gcConfirmChan := make(chan struct{})
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
gcConfirmCalled = true
close(gcConfirmChan)
return true
}

tsoAllocator := newMockTsoAllocator()
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex())
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex(), withBroker(broker))
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
<-dropCollectionIndexChan
assert.True(t, dropCollectionIndexCalled)
<-gcConfirmChan
assert.True(t, gcConfirmCalled)
<-removePartitionChan
assert.True(t, removePartitionCalled)
})
Expand Down

0 comments on commit 3201f76

Please sign in to comment.