Skip to content

Commit

Permalink
Bugfix: ExecutableTaskTracker LowWatermark funcion (#4343)
Browse files Browse the repository at this point in the history
* LowWatermark should keep track the next element during element deletion
* Use [inclusive, exclusive) range representation
  • Loading branch information
wxing1292 committed May 15, 2023
1 parent 9f9b828 commit d201491
Show file tree
Hide file tree
Showing 11 changed files with 423 additions and 279 deletions.
324 changes: 163 additions & 161 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

Expand Up @@ -70,8 +70,8 @@ message SyncShardStatus {
}

message SyncReplicationState {
int64 last_processed_message_id = 1;
google.protobuf.Timestamp last_processed_message_time = 2 [(gogoproto.stdtime) = true];
int64 inclusive_low_watermark = 1;
google.protobuf.Timestamp inclusive_low_watermark_time = 2 [(gogoproto.stdtime) = true];
}

message ReplicationMessages {
Expand All @@ -86,8 +86,8 @@ message ReplicationMessages {
message WorkflowReplicationMessages {
repeated ReplicationTask replication_tasks = 1;
// This can be different than the last taskId in the above list, because sender can decide to skip tasks (e.g. for completed workflows).
int64 last_task_id = 2;
google.protobuf.Timestamp last_task_time = 3 [(gogoproto.stdtime) = true];
int64 exclusive_high_watermark = 2;
google.protobuf.Timestamp exclusive_high_watermark_time = 3 [(gogoproto.stdtime) = true];
}

message ReplicationTaskInfo {
Expand Down
46 changes: 29 additions & 17 deletions service/history/api/replication/stream.go
Expand Up @@ -44,7 +44,6 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -158,11 +157,8 @@ func recvSyncReplicationState(
attr *replicationspb.SyncReplicationState,
clientClusterShardID historyclient.ClusterShardID,
) error {
lastProcessedMessageID := attr.GetLastProcessedMessageId()
lastProcessedMessageIDTime := attr.GetLastProcessedMessageTime()
if lastProcessedMessageID == persistence.EmptyQueueMessageID {
return nil
}
inclusiveLowWatermark := attr.GetInclusiveLowWatermark()
inclusiveLowWatermarkTime := attr.GetInclusiveLowWatermarkTime()

readerID := shard.ReplicationReaderIDFromClusterShardID(
int64(clientClusterShardID.ClusterID),
Expand All @@ -172,7 +168,7 @@ func recvSyncReplicationState(
Scopes: []*persistencespb.QueueSliceScope{{
Range: &persistencespb.QueueSliceRange{
InclusiveMin: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(lastProcessedMessageID + 1),
tasks.NewImmediateKey(inclusiveLowWatermark),
),
ExclusiveMax: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(math.MaxInt64),
Expand All @@ -192,8 +188,8 @@ func recvSyncReplicationState(
}
shardContext.UpdateRemoteClusterInfo(
string(clientClusterShardID.ClusterID),
lastProcessedMessageID,
*lastProcessedMessageIDTime,
inclusiveLowWatermark-1,
*inclusiveLowWatermarkTime,
)
return nil
}
Expand Down Expand Up @@ -335,8 +331,24 @@ func sendTasks(
beginInclusiveWatermark int64,
endExclusiveWatermark int64,
) error {
if beginInclusiveWatermark >= endExclusiveWatermark {
return nil
if beginInclusiveWatermark > endExclusiveWatermark {
err := serviceerror.NewInternal(fmt.Sprintf("StreamWorkflowReplication encountered invalid task range [%v, %v)",
beginInclusiveWatermark,
endExclusiveWatermark,
))
shardContext.GetLogger().Error("StreamWorkflowReplication unable to", tag.Error(err))
return err
}
if beginInclusiveWatermark == endExclusiveWatermark {
return server.Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &replicationspb.WorkflowReplicationMessages{
ReplicationTasks: nil,
ExclusiveHighWatermark: endExclusiveWatermark,
ExclusiveHighWatermarkTime: timestamp.TimeNowPtrUtc(),
},
},
})
}

engine, err := shardContext.GetEngine(ctx)
Expand Down Expand Up @@ -372,9 +384,9 @@ Loop:
if err := server.Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &replicationspb.WorkflowReplicationMessages{
ReplicationTasks: []*replicationspb.ReplicationTask{task},
LastTaskId: task.SourceTaskId,
LastTaskTime: task.VisibilityTime,
ReplicationTasks: []*replicationspb.ReplicationTask{task},
ExclusiveHighWatermark: task.SourceTaskId + 1,
ExclusiveHighWatermarkTime: task.VisibilityTime,
},
},
}); err != nil {
Expand All @@ -390,9 +402,9 @@ Loop:
return server.Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &replicationspb.WorkflowReplicationMessages{
ReplicationTasks: nil,
LastTaskId: endExclusiveWatermark - 1,
LastTaskTime: timestamp.TimeNowPtrUtc(),
ReplicationTasks: nil,
ExclusiveHighWatermark: endExclusiveWatermark,
ExclusiveHighWatermarkTime: timestamp.TimeNowPtrUtc(),
},
},
})
Expand Down
54 changes: 30 additions & 24 deletions service/history/api/replication/stream_test.go
Expand Up @@ -115,8 +115,8 @@ func (s *streamSuite) TestRecvSyncReplicationState_Success() {
s.clientClusterShardID.ShardID,
)
replicationState := &replicationspb.SyncReplicationState{
LastProcessedMessageId: rand.Int63(),
LastProcessedMessageTime: timestamp.TimePtr(time.Unix(0, rand.Int63())),
InclusiveLowWatermark: rand.Int63(),
InclusiveLowWatermarkTime: timestamp.TimePtr(time.Unix(0, rand.Int63())),
}

s.shardContext.EXPECT().UpdateReplicationQueueReaderState(
Expand All @@ -125,7 +125,7 @@ func (s *streamSuite) TestRecvSyncReplicationState_Success() {
Scopes: []*persistencespb.QueueSliceScope{{
Range: &persistencespb.QueueSliceRange{
InclusiveMin: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(replicationState.LastProcessedMessageId + 1),
tasks.NewImmediateKey(replicationState.InclusiveLowWatermark),
),
ExclusiveMax: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(math.MaxInt64),
Expand All @@ -140,8 +140,8 @@ func (s *streamSuite) TestRecvSyncReplicationState_Success() {
).Return(nil)
s.shardContext.EXPECT().UpdateRemoteClusterInfo(
string(s.clientClusterShardID.ClusterID),
replicationState.LastProcessedMessageId,
*replicationState.LastProcessedMessageTime,
replicationState.InclusiveLowWatermark-1,
*replicationState.InclusiveLowWatermarkTime,
)

err := recvSyncReplicationState(s.shardContext, replicationState, s.clientClusterShardID)
Expand All @@ -154,8 +154,8 @@ func (s *streamSuite) TestRecvSyncReplicationState_Error() {
s.clientClusterShardID.ShardID,
)
replicationState := &replicationspb.SyncReplicationState{
LastProcessedMessageId: rand.Int63(),
LastProcessedMessageTime: timestamp.TimePtr(time.Unix(0, rand.Int63())),
InclusiveLowWatermark: rand.Int63(),
InclusiveLowWatermarkTime: timestamp.TimePtr(time.Unix(0, rand.Int63())),
}

var ownershipLost error
Expand All @@ -171,7 +171,7 @@ func (s *streamSuite) TestRecvSyncReplicationState_Error() {
Scopes: []*persistencespb.QueueSliceScope{{
Range: &persistencespb.QueueSliceRange{
InclusiveMin: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(replicationState.LastProcessedMessageId + 1),
tasks.NewImmediateKey(replicationState.InclusiveLowWatermark),
),
ExclusiveMax: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(math.MaxInt64),
Expand Down Expand Up @@ -236,8 +236,8 @@ func (s *streamSuite) TestSendCatchUp() {
endExclusiveWatermark,
).Return(iter, nil)
s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(endExclusiveWatermark-1, resp.GetMessages().LastTaskId)
s.NotNil(resp.GetMessages().LastTaskTime)
s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
})

Expand Down Expand Up @@ -288,13 +288,13 @@ func (s *streamSuite) TestSendLive() {
)
gomock.InOrder(
s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(watermark1-1, resp.GetMessages().LastTaskId)
s.NotNil(resp.GetMessages().LastTaskTime)
s.Equal(watermark1, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
}),
s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(watermark2-1, resp.GetMessages().LastTaskId)
s.NotNil(resp.GetMessages().LastTaskTime)
s.Equal(watermark2, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
}),
)
Expand All @@ -320,6 +320,12 @@ func (s *streamSuite) TestSendTasks_Noop() {
beginInclusiveWatermark := rand.Int63()
endExclusiveWatermark := beginInclusiveWatermark

s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
})

err := sendTasks(
s.ctx,
s.server,
Expand Down Expand Up @@ -349,8 +355,8 @@ func (s *streamSuite) TestSendTasks_WithoutTasks() {
endExclusiveWatermark,
).Return(iter, nil)
s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(endExclusiveWatermark-1, resp.GetMessages().LastTaskId)
s.NotNil(resp.GetMessages().LastTaskTime)
s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
})

Expand Down Expand Up @@ -400,24 +406,24 @@ func (s *streamSuite) TestSendTasks_WithTasks() {
s.server.EXPECT().Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &replicationspb.WorkflowReplicationMessages{
ReplicationTasks: []*replicationspb.ReplicationTask{task0},
LastTaskId: task0.SourceTaskId,
LastTaskTime: task0.VisibilityTime,
ReplicationTasks: []*replicationspb.ReplicationTask{task0},
ExclusiveHighWatermark: task0.SourceTaskId + 1,
ExclusiveHighWatermarkTime: task0.VisibilityTime,
},
},
}).Return(nil),
s.server.EXPECT().Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &replicationspb.WorkflowReplicationMessages{
ReplicationTasks: []*replicationspb.ReplicationTask{task2},
LastTaskId: task2.SourceTaskId,
LastTaskTime: task2.VisibilityTime,
ReplicationTasks: []*replicationspb.ReplicationTask{task2},
ExclusiveHighWatermark: task2.SourceTaskId + 1,
ExclusiveHighWatermarkTime: task2.VisibilityTime,
},
},
}).Return(nil),
s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(endExclusiveWatermark-1, resp.GetMessages().LastTaskId)
s.NotNil(resp.GetMessages().LastTaskTime)
s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
}),
)
Expand Down
53 changes: 30 additions & 23 deletions service/history/replication/executable_task_tracker.go
Expand Up @@ -49,7 +49,7 @@ type (
Timestamp time.Time
}
ExecutableTaskTracker interface {
TrackTasks(highWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) []TrackableExecutableTask
TrackTasks(exclusiveHighWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) []TrackableExecutableTask
LowWatermark() *WatermarkInfo
Size() int
Cancel()
Expand All @@ -58,10 +58,10 @@ type (
logger log.Logger

sync.Mutex
cancelled bool
highWatermarkInfo *WatermarkInfo // this is exclusive, i.e. source need to resend with this watermark / task ID
taskQueue *list.List // sorted by task ID
taskIDs map[int64]struct{}
cancelled bool
exclusiveHighWatermarkInfo *WatermarkInfo // this is exclusive, i.e. source need to resend with this watermark / task ID
taskQueue *list.List // sorted by task ID
taskIDs map[int64]struct{}
}
)

Expand All @@ -73,16 +73,17 @@ func NewExecutableTaskTracker(
return &ExecutableTaskTrackerImpl{
logger: logger,

highWatermarkInfo: nil,
taskQueue: list.New(),
taskIDs: make(map[int64]struct{}),
exclusiveHighWatermarkInfo: nil,
taskQueue: list.New(),
taskIDs: make(map[int64]struct{}),
}
}

// TrackTasks add tasks for tracking, return valid tasks (dedup)
// if task tracker is cancelled, then newly added tasks will also be cancelled
// tasks should be sorted by task ID, all task IDs < exclusiveHighWatermarkInfo
func (t *ExecutableTaskTrackerImpl) TrackTasks(
highWatermarkInfo WatermarkInfo,
exclusiveHighWatermarkInfo WatermarkInfo,
tasks ...TrackableExecutableTask,
) []TrackableExecutableTask {
filteredTasks := make([]TrackableExecutableTask, 0, len(tasks))
Expand All @@ -91,7 +92,7 @@ func (t *ExecutableTaskTrackerImpl) TrackTasks(
defer t.Unlock()

// need to assume source side send replication tasks in order
if t.highWatermarkInfo != nil && highWatermarkInfo.Watermark <= t.highWatermarkInfo.Watermark {
if t.exclusiveHighWatermarkInfo != nil && exclusiveHighWatermarkInfo.Watermark <= t.exclusiveHighWatermarkInfo.Watermark {
return filteredTasks
}

Expand All @@ -111,14 +112,14 @@ Loop:
lastTaskID = task.TaskID()
}

if highWatermarkInfo.Watermark < lastTaskID {
if exclusiveHighWatermarkInfo.Watermark <= lastTaskID {
panic(fmt.Sprintf(
"ExecutableTaskTracker encountered lower high watermark: %v < %v",
highWatermarkInfo.Watermark,
exclusiveHighWatermarkInfo.Watermark,
lastTaskID,
))
}
t.highWatermarkInfo = &highWatermarkInfo
t.exclusiveHighWatermarkInfo = &exclusiveHighWatermarkInfo

if t.cancelled {
t.cancelLocked()
Expand All @@ -130,30 +131,36 @@ func (t *ExecutableTaskTrackerImpl) LowWatermark() *WatermarkInfo {
t.Lock()
defer t.Unlock()

element := t.taskQueue.Front()
Loop:
for element := t.taskQueue.Front(); element != nil; element = element.Next() {
for element != nil {
task := element.Value.(TrackableExecutableTask)
taskState := task.State()
switch taskState {
case ctasks.TaskStateAcked:
nextElement := element.Next()
delete(t.taskIDs, task.TaskID())
t.taskQueue.Remove(element)
element = nextElement
case ctasks.TaskStateNacked:
if err := task.MarkPoisonPill(); err != nil {
// unable to save poison pill, retry later
break Loop
element = element.Next()
continue Loop
}
nextElement := element.Next()
delete(t.taskIDs, task.TaskID())
t.taskQueue.Remove(element)
element = nextElement
case ctasks.TaskStateAborted:
// noop, do not remove from queue, let it block low watermark
break Loop
element = element.Next()
case ctasks.TaskStateCancelled:
// noop, do not remove from queue, let it block low watermark
break Loop
element = element.Next()
case ctasks.TaskStatePending:
// noop, do not remove from queue, let it block low watermark
break Loop
element = element.Next()
default:
panic(fmt.Sprintf(
"ExecutableTaskTracker encountered unknown task state: %v",
Expand All @@ -163,14 +170,14 @@ Loop:
}

if element := t.taskQueue.Front(); element != nil {
lowWatermarkInfo := WatermarkInfo{
inclusiveLowWatermarkInfo := WatermarkInfo{
Watermark: element.Value.(TrackableExecutableTask).TaskID(),
Timestamp: element.Value.(TrackableExecutableTask).TaskCreationTime(),
}
return &lowWatermarkInfo
} else if t.highWatermarkInfo != nil {
lowWatermarkInfo := *t.highWatermarkInfo
return &lowWatermarkInfo
return &inclusiveLowWatermarkInfo
} else if t.exclusiveHighWatermarkInfo != nil {
inclusiveLowWatermarkInfo := *t.exclusiveHighWatermarkInfo
return &inclusiveLowWatermarkInfo
} else {
return nil
}
Expand Down

0 comments on commit d201491

Please sign in to comment.