From 3201f7648203e0b183a978549316b81781cfce0b Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Mon, 3 Jul 2023 15:55:54 +0800 Subject: [PATCH] Fix rootcoord restoration missing gcConfirmStep Signed-off-by: Congqi Xia --- internal/rootcoord/garbage_collector.go | 7 ++ internal/rootcoord/garbage_collector_test.go | 72 +++++++++++++++++++- 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/internal/rootcoord/garbage_collector.go b/internal/rootcoord/garbage_collector.go index ee13785c9351..59c57475a851 100644 --- a/internal/rootcoord/garbage_collector.go +++ b/internal/rootcoord/garbage_collector.go @@ -65,6 +65,7 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim baseStep: baseStep{core: c.s}, pChannels: collMeta.PhysicalChannelNames, }) + redo.AddAsyncStep(newConfirmGCStep(c.s, collMeta.CollectionID, allPartition)) redo.AddAsyncStep(&deleteCollectionMetaStep{ baseStep: baseStep{core: c.s}, collectionID: collMeta.CollectionID, @@ -125,6 +126,12 @@ func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels []string, par collID: partition.CollectionID, partIDs: []UniqueID{partition.PartitionID}, }) + redo.AddAsyncStep(&deletePartitionDataStep{ + baseStep: baseStep{core: c.s}, + pchans: pChannels, + partition: partition, + }) + redo.AddAsyncStep(newConfirmGCStep(c.s, partition.CollectionID, partition.PartitionID)) redo.AddAsyncStep(&removePartitionMetaStep{ baseStep: baseStep{core: c.s}, dbID: dbID, diff --git a/internal/rootcoord/garbage_collector_test.go b/internal/rootcoord/garbage_collector_test.go index 8644a73bed0b..94dab3504b0f 100644 --- a/internal/rootcoord/garbage_collector_test.go +++ b/internal/rootcoord/garbage_collector_test.go @@ -32,6 +32,11 @@ import ( ) func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { + oldValue := confirmGCInterval + defer func() { + confirmGCInterval = oldValue + }() + confirmGCInterval = 0 t.Run("failed to release collection", func(t *testing.T) { broker := newMockBroker() broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error { @@ -107,6 +112,13 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { releaseCollectionChan <- struct{}{} return nil } + gcConfirmCalled := false + gcConfirmChan := make(chan struct{}) + broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool { + gcConfirmCalled = true + close(gcConfirmChan) + return true + } dropCollectionIndexCalled := false dropCollectionIndexChan := make(chan struct{}, 1) broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error { @@ -143,6 +155,8 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { assert.True(t, releaseCollectionCalled) <-dropCollectionIndexChan assert.True(t, dropCollectionIndexCalled) + <-gcConfirmChan + assert.True(t, gcConfirmCalled) <-dropMetaChan }) @@ -162,6 +176,13 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { dropCollectionIndexChan <- struct{}{} return nil } + gcConfirmCalled := false + gcConfirmChan := make(chan struct{}) + broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool { + gcConfirmCalled = true + close(gcConfirmChan) + return true + } meta := mockrootcoord.NewIMetaTable(t) removeCollectionCalled := false removeCollectionChan := make(chan struct{}, 1) @@ -194,6 +215,8 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) { assert.True(t, dropCollectionIndexCalled) <-removeCollectionChan assert.True(t, removeCollectionCalled) + <-gcConfirmChan + assert.True(t, gcConfirmCalled) }) } @@ -309,6 +332,11 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) { } func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { + oldValue := confirmGCInterval + defer func() { + confirmGCInterval = oldValue + }() + confirmGCInterval = 0 t.Run("failed to GcPartitionData", func(t *testing.T) { ticker := newTickerWithMockFailStream() // failed to broadcast drop msg. shardsNum := int(common.DefaultShardsNum) @@ -344,15 +372,35 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { return errors.New("error mock RemovePartition") }) + broker := newMockBroker() + dropCollectionIndexCalled := false + dropCollectionIndexChan := make(chan struct{}, 1) + broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error { + dropCollectionIndexCalled = true + dropCollectionIndexChan <- struct{}{} + return nil + } + gcConfirmCalled := false + gcConfirmChan := make(chan struct{}) + broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool { + gcConfirmCalled = true + close(gcConfirmChan) + return true + } + tsoAllocator := newMockTsoAllocator() tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { return 100, nil } - core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex()) + core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex(), withBroker(broker)) core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator) gc := newBgGarbageCollector(core) core.garbageCollector = gc gc.ReDropPartition(0, pchans, &model.Partition{}, 100000) + <-dropCollectionIndexChan + assert.True(t, dropCollectionIndexCalled) + <-gcConfirmChan + assert.True(t, gcConfirmCalled) <-removePartitionChan assert.True(t, removePartitionCalled) }) @@ -377,15 +425,35 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) { return nil }) + broker := newMockBroker() + dropCollectionIndexCalled := false + dropCollectionIndexChan := make(chan struct{}, 1) + broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error { + dropCollectionIndexCalled = true + dropCollectionIndexChan <- struct{}{} + return nil + } + gcConfirmCalled := false + gcConfirmChan := make(chan struct{}) + broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool { + gcConfirmCalled = true + close(gcConfirmChan) + return true + } + tsoAllocator := newMockTsoAllocator() tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) { return 100, nil } - core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex()) + core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex(), withBroker(broker)) core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator) gc := newBgGarbageCollector(core) core.garbageCollector = gc gc.ReDropPartition(0, pchans, &model.Partition{}, 100000) + <-dropCollectionIndexChan + assert.True(t, dropCollectionIndexCalled) + <-gcConfirmChan + assert.True(t, gcConfirmCalled) <-removePartitionChan assert.True(t, removePartitionCalled) })