Skip to content

Commit

Permalink
Prevent exclusive consumer exception in pulsar (#25376)
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Jul 12, 2023
1 parent d4fe3ce commit 9499fb7
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 14 deletions.
9 changes: 5 additions & 4 deletions internal/datanode/data_node.go
Expand Up @@ -405,12 +405,12 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version
switch watchInfo.State {
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err))
watchInfo.State = datapb.ChannelWatchState_WatchFailure
return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err)
} else {
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
watchInfo.State = datapb.ChannelWatchState_WatchSuccess
}
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
watchInfo.State = datapb.ChannelWatchState_WatchSuccess

case datapb.ChannelWatchState_ToRelease:
// there is no reason why we release fail
node.tryToReleaseFlowgraph(vChanName)
Expand Down Expand Up @@ -453,6 +453,7 @@ func (node *DataNode) handleDeleteEvent(vChanName string) {
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
log.Info("try to release flowgraph", zap.String("vChanName", vChanName))
node.flowgraphManager.release(vChanName)
log.Info("release flowgraph success", zap.String("vChanName", vChanName))
}

// BackGroundGC runs in background to release datanode resources
Expand Down
13 changes: 4 additions & 9 deletions internal/datanode/data_node_test.go
Expand Up @@ -646,18 +646,13 @@ func TestDataNode(t *testing.T) {
{"fake-by-dev-rootcoord-dml-backgroundgc-1"},
{"fake-by-dev-rootcoord-dml-backgroundgc-2"},
{"fake-by-dev-rootcoord-dml-backgroundgc-3"},
{""},
{""},
}

for i, test := range testDataSyncs {
if i <= 2 {
err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler())
assert.Nil(t, err)
vchanNameCh <- test.dmChannelName
}
for _, test := range testDataSyncs {
err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler())
assert.Nil(t, err)
vchanNameCh <- test.dmChannelName
}
cancel()
})

t.Run("Test SyncSegments", func(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion internal/datanode/event_manager.go
Expand Up @@ -69,7 +69,11 @@ func (e *channelEventManager) Run() {
case event := <-e.eventChan:
switch event.eventType {
case putEventType:
e.handlePutEvent(event.info, event.version)
err := e.handlePutEvent(event.info, event.version)
if err != nil {
// logging the error is convenient for follow-up investigation of problems
log.Warn("handle put event failed", zap.String("vChanName", event.vChanName), zap.Error(err))
}
case deleteEventType:
e.handleDeleteEvent(event.vChanName)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/datanode/flow_graph_dmstream_input_node.go
Expand Up @@ -62,6 +62,8 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode
zap.Int64("collection ID", dmNodeConfig.collectionID))
err = insertStream.Seek([]*internalpb.MsgPosition{seekPos})
if err != nil {
insertStream.Close()
log.Error("seek failed", zap.Error(err))

Check warning on line 66 in internal/datanode/flow_graph_dmstream_input_node.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/flow_graph_dmstream_input_node.go#L65-L66

Added lines #L65 - L66 were not covered by tests
return nil, err
}
log.Info("datanode seek successfully",
Expand Down
1 change: 1 addition & 0 deletions internal/querynode/flow_graph_query_node.go
Expand Up @@ -262,6 +262,7 @@ func (q *queryNodeFlowGraph) close() {
q.flowGraph.Close()
if q.dmlStream != nil && q.consumerCnt > 0 {
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(q.consumerCnt))
q.consumerCnt = 0
}
log.Info("stop query node flow graph",
zap.Int64("collectionID", q.collectionID),
Expand Down
18 changes: 18 additions & 0 deletions internal/querynode/task_test.go
Expand Up @@ -79,6 +79,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
t.Run("test execute loadCollection", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
Expand All @@ -98,6 +99,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
t.Run("test empty metric type", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
Expand All @@ -118,6 +120,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
t.Run("test execute repeated watchDmChannelTask", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
Expand All @@ -142,6 +145,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
t.Run("test execute loadPartition", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
Expand All @@ -166,6 +170,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
t.Run("test execute loadPartition without init collection and partition", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
Expand All @@ -186,6 +191,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
t.Run("test execute seek error", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
Expand Down Expand Up @@ -242,6 +248,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {

node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
Expand All @@ -267,6 +274,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
t.Run("test add excluded segment for dropped segment", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
Expand All @@ -292,6 +300,7 @@ func TestTask_watchDmChannelsTask(t *testing.T) {
t.Run("test load growing segment", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := watchDmChannelsTask{
req: genWatchDMChannelsRequest(),
Expand Down Expand Up @@ -350,6 +359,7 @@ func TestTask_releaseCollectionTask(t *testing.T) {
t.Run("test execute", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

/*
err = node.queryService.addQueryCollection(defaultCollectionID)
Expand All @@ -366,6 +376,7 @@ func TestTask_releaseCollectionTask(t *testing.T) {
t.Run("test execute no collection", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

err = node.metaReplica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
Expand All @@ -381,6 +392,7 @@ func TestTask_releaseCollectionTask(t *testing.T) {
t.Run("test execute remove deltaVChannel tSafe", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

/*
err = node.queryService.addQueryCollection(defaultCollectionID)
Expand Down Expand Up @@ -437,6 +449,7 @@ func TestTask_releasePartitionTask(t *testing.T) {

t.Run("test isAllPartitionsReleased", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
defer node.metaReplica.freeAll()
assert.NoError(t, err)

task := releasePartitionsTask{
Expand All @@ -460,6 +473,7 @@ func TestTask_releasePartitionTask(t *testing.T) {
t.Run("test execute", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

/*
err = node.queryService.addQueryCollection(defaultCollectionID)
Expand All @@ -478,6 +492,7 @@ func TestTask_releasePartitionTask(t *testing.T) {
t.Run("test execute no collection", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := releasePartitionsTask{
req: genReleasePartitionsRequest(),
Expand All @@ -493,6 +508,7 @@ func TestTask_releasePartitionTask(t *testing.T) {
t.Run("test execute no partition", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

task := releasePartitionsTask{
req: genReleasePartitionsRequest(),
Expand All @@ -508,6 +524,7 @@ func TestTask_releasePartitionTask(t *testing.T) {
t.Run("test execute non-exist partition", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

req := genReleasePartitionsRequest()
req.PartitionIDs = []int64{-1}
Expand All @@ -523,6 +540,7 @@ func TestTask_releasePartitionTask(t *testing.T) {
t.Run("test execute remove deltaVChannel", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
defer node.metaReplica.freeAll()

col, err := node.metaReplica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
Expand Down

0 comments on commit 9499fb7

Please sign in to comment.