diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index eb00295aebff..6330cbdf7d3c 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -405,12 +405,12 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version switch watchInfo.State { case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch: if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil { + log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err)) watchInfo.State = datapb.ChannelWatchState_WatchFailure - return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err) + } else { + log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName)) + watchInfo.State = datapb.ChannelWatchState_WatchSuccess } - log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName)) - watchInfo.State = datapb.ChannelWatchState_WatchSuccess - case datapb.ChannelWatchState_ToRelease: // there is no reason why we release fail node.tryToReleaseFlowgraph(vChanName) @@ -453,6 +453,7 @@ func (node *DataNode) handleDeleteEvent(vChanName string) { func (node *DataNode) tryToReleaseFlowgraph(vChanName string) { log.Info("try to release flowgraph", zap.String("vChanName", vChanName)) node.flowgraphManager.release(vChanName) + log.Info("release flowgraph success", zap.String("vChanName", vChanName)) } // BackGroundGC runs in background to release datanode resources diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 5891c574e7e0..e398ad9fd88f 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -646,18 +646,13 @@ func TestDataNode(t *testing.T) { {"fake-by-dev-rootcoord-dml-backgroundgc-1"}, {"fake-by-dev-rootcoord-dml-backgroundgc-2"}, {"fake-by-dev-rootcoord-dml-backgroundgc-3"}, - {""}, - {""}, } - for i, test := range testDataSyncs { - if i <= 2 { - err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler()) - assert.Nil(t, err) - vchanNameCh <- test.dmChannelName - } + for _, test := range testDataSyncs { + err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler()) + assert.Nil(t, err) + vchanNameCh <- test.dmChannelName } - cancel() }) t.Run("Test SyncSegments", func(t *testing.T) { diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 946473bb9ab6..b174f6941314 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -69,7 +69,11 @@ func (e *channelEventManager) Run() { case event := <-e.eventChan: switch event.eventType { case putEventType: - e.handlePutEvent(event.info, event.version) + err := e.handlePutEvent(event.info, event.version) + if err != nil { + // logging the error is convenient for follow-up investigation of problems + log.Warn("handle put event failed", zap.String("vChanName", event.vChanName), zap.Error(err)) + } case deleteEventType: e.handleDeleteEvent(event.vChanName) } diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index ba49574d6cc5..3e50288a9bcd 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -62,6 +62,8 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode zap.Int64("collection ID", dmNodeConfig.collectionID)) err = insertStream.Seek([]*internalpb.MsgPosition{seekPos}) if err != nil { + insertStream.Close() + log.Error("seek failed", zap.Error(err)) return nil, err } log.Info("datanode seek successfully", diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index b3429112664d..1d10de84b860 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -262,6 +262,7 @@ func (q *queryNodeFlowGraph) close() { q.flowGraph.Close() if q.dmlStream != nil && q.consumerCnt > 0 { metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(q.consumerCnt)) + q.consumerCnt = 0 } log.Info("stop query node flow graph", zap.Int64("collectionID", q.collectionID), diff --git a/internal/querynode/task_test.go b/internal/querynode/task_test.go index 1c39c1e82d8c..8ce797017683 100644 --- a/internal/querynode/task_test.go +++ b/internal/querynode/task_test.go @@ -79,6 +79,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { t.Run("test execute loadCollection", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := watchDmChannelsTask{ req: genWatchDMChannelsRequest(), @@ -98,6 +99,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { t.Run("test empty metric type", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := watchDmChannelsTask{ req: genWatchDMChannelsRequest(), @@ -118,6 +120,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { t.Run("test execute repeated watchDmChannelTask", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := watchDmChannelsTask{ req: genWatchDMChannelsRequest(), @@ -142,6 +145,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { t.Run("test execute loadPartition", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := watchDmChannelsTask{ req: genWatchDMChannelsRequest(), @@ -166,6 +170,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { t.Run("test execute loadPartition without init collection and partition", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := watchDmChannelsTask{ req: genWatchDMChannelsRequest(), @@ -186,6 +191,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { t.Run("test execute seek error", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := watchDmChannelsTask{ req: genWatchDMChannelsRequest(), @@ -242,6 +248,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := watchDmChannelsTask{ req: genWatchDMChannelsRequest(), @@ -267,6 +274,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { t.Run("test add excluded segment for dropped segment", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := watchDmChannelsTask{ req: genWatchDMChannelsRequest(), @@ -292,6 +300,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) { t.Run("test load growing segment", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := watchDmChannelsTask{ req: genWatchDMChannelsRequest(), @@ -350,6 +359,7 @@ func TestTask_releaseCollectionTask(t *testing.T) { t.Run("test execute", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() /* err = node.queryService.addQueryCollection(defaultCollectionID) @@ -366,6 +376,7 @@ func TestTask_releaseCollectionTask(t *testing.T) { t.Run("test execute no collection", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() err = node.metaReplica.removeCollection(defaultCollectionID) assert.NoError(t, err) @@ -381,6 +392,7 @@ func TestTask_releaseCollectionTask(t *testing.T) { t.Run("test execute remove deltaVChannel tSafe", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() /* err = node.queryService.addQueryCollection(defaultCollectionID) @@ -437,6 +449,7 @@ func TestTask_releasePartitionTask(t *testing.T) { t.Run("test isAllPartitionsReleased", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) + defer node.metaReplica.freeAll() assert.NoError(t, err) task := releasePartitionsTask{ @@ -460,6 +473,7 @@ func TestTask_releasePartitionTask(t *testing.T) { t.Run("test execute", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() /* err = node.queryService.addQueryCollection(defaultCollectionID) @@ -478,6 +492,7 @@ func TestTask_releasePartitionTask(t *testing.T) { t.Run("test execute no collection", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := releasePartitionsTask{ req: genReleasePartitionsRequest(), @@ -493,6 +508,7 @@ func TestTask_releasePartitionTask(t *testing.T) { t.Run("test execute no partition", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() task := releasePartitionsTask{ req: genReleasePartitionsRequest(), @@ -508,6 +524,7 @@ func TestTask_releasePartitionTask(t *testing.T) { t.Run("test execute non-exist partition", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() req := genReleasePartitionsRequest() req.PartitionIDs = []int64{-1} @@ -523,6 +540,7 @@ func TestTask_releasePartitionTask(t *testing.T) { t.Run("test execute remove deltaVChannel", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + defer node.metaReplica.freeAll() col, err := node.metaReplica.getCollectionByID(defaultCollectionID) assert.NoError(t, err)