Skip to content

Commit

Permalink
Update replication use branch token (#3447)
Browse files Browse the repository at this point in the history
* Update replication use branch token
  • Loading branch information
yux0 committed Oct 6, 2022
1 parent 0ae4cc5 commit 0df719d
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 253 deletions.
465 changes: 262 additions & 203 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/persistence/serialization/task_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ func (s *TaskSerializer) replicationHistoryTaskToProto(
NextEventId: historyTask.NextEventID,
BranchToken: historyTask.BranchToken,
NewRunBranchToken: historyTask.NewRunBranchToken,
NewRunId: historyTask.NewRunID,
VisibilityTime: &historyTask.VisibilityTimestamp,
}
}
Expand All @@ -1036,6 +1037,7 @@ func (s *TaskSerializer) replicationHistoryTaskFromProto(
Version: historyTask.Version,
BranchToken: historyTask.BranchToken,
NewRunBranchToken: historyTask.NewRunBranchToken,
NewRunID: historyTask.NewRunId,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ message ReplicationTaskInfo {
reserved 14;
int64 task_id = 15;
google.protobuf.Timestamp visibility_time = 16 [(gogoproto.stdtime) = true];
string new_run_id = 17;
}

// visibility_task_data column
Expand Down
112 changes: 85 additions & 27 deletions service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type (
currentClusterName string
shard shard.Context
config *configs.Config
historyCache workflow.Cache
workflowCache workflow.Cache
executionMgr persistence.ExecutionManager
metricsClient metrics.Client
logger log.Logger
Expand All @@ -88,7 +88,7 @@ var (

func NewAckManager(
shard shard.Context,
historyCache workflow.Cache,
workflowCache workflow.Cache,
executionMgr persistence.ExecutionManager,
logger log.Logger,
) AckManager {
Expand All @@ -104,7 +104,7 @@ func NewAckManager(
currentClusterName: currentClusterName,
shard: shard,
config: shard.GetConfig(),
historyCache: historyCache,
workflowCache: workflowCache,
executionMgr: executionMgr,
metricsClient: shard.GetMetricsClient(),
logger: log.With(logger, tag.ComponentReplicatorQueue),
Expand Down Expand Up @@ -423,7 +423,7 @@ func (p *ackMgrImpl) generateHistoryReplicationTask(
workflowID := taskInfo.WorkflowID
runID := taskInfo.RunID
taskID := taskInfo.TaskID
return p.processReplication(
replicationTask, err := p.processReplication(
ctx,
true, // still necessary to send out history replication message if workflow closed
namespaceID,
Expand All @@ -439,35 +439,16 @@ func (p *ackMgrImpl) generateHistoryReplicationTask(
return nil, err
}

// BranchToken will not set in get dlq replication message request
if len(taskInfo.BranchToken) == 0 {
taskInfo.BranchToken = branchToken
}

eventsBlob, err := p.getEventsBlob(
ctx,
taskInfo.BranchToken,
branchToken,
taskInfo.FirstEventID,
taskInfo.NextEventID,
)
if err != nil {
return nil, err
}

var newRunEventsBlob *commonpb.DataBlob
if len(taskInfo.NewRunBranchToken) != 0 {
// only get the first batch
newRunEventsBlob, err = p.getEventsBlob(
ctx,
taskInfo.NewRunBranchToken,
common.FirstEventID,
common.FirstEventID+1,
)
if err != nil {
return nil, err
}
}

replicationTask := &replicationspb.ReplicationTask{
TaskType: enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK,
SourceTaskId: taskID,
Expand All @@ -478,14 +459,26 @@ func (p *ackMgrImpl) generateHistoryReplicationTask(
RunId: runID,
VersionHistoryItems: versionHistoryItems,
Events: eventsBlob,
NewRunEvents: newRunEventsBlob,
// NewRunEvents will be set in processNewRunReplication
},
},
VisibilityTime: &taskInfo.VisibilityTimestamp,
}
return replicationTask, nil
},
)
if err != nil {
return replicationTask, err
}
return p.processNewRunReplication(
ctx,
namespaceID,
workflowID,
taskInfo.NewRunID,
taskInfo.NewRunBranchToken,
taskInfo.Version,
replicationTask,
)
}

func (p *ackMgrImpl) generateSyncWorkflowStateTask(
Expand Down Expand Up @@ -570,11 +563,11 @@ func (p *ackMgrImpl) processReplication(
RunId: runID,
}

context, release, err := p.historyCache.GetOrCreateWorkflowExecution(
context, release, err := p.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
execution,
workflow.CallerTypeAPI,
workflow.CallerTypeTask,
)
if err != nil {
return nil, err
Expand All @@ -596,6 +589,71 @@ func (p *ackMgrImpl) processReplication(
}
}

func (p *ackMgrImpl) processNewRunReplication(
ctx context.Context,
namespaceID namespace.ID,
workflowID string,
newRunID string,
branchToken []byte,
taskVersion int64,
task *replicationspb.ReplicationTask,
) (retReplicationTask *replicationspb.ReplicationTask, retError error) {

attr, ok := task.Attributes.(*replicationspb.ReplicationTask_HistoryTaskAttributes)
if !ok {
return nil, serviceerror.NewInternal("Wrong replication task to process new run replication.")
}

var newRunBranchToken []byte
if len(newRunID) > 0 {
newRunContext, releaseFn, err := p.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: newRunID,
},
workflow.CallerTypeTask,
)
if err != nil {
return nil, err
}
defer func() { releaseFn(retError) }()

newRunMutableState, err := newRunContext.LoadMutableState(ctx)
if err != nil {
return nil, err
}
_, newRunBranchToken, err = getVersionHistoryItems(
newRunMutableState,
common.FirstEventID,
taskVersion,
)
if err != nil {
return nil, err
}
} else if len(branchToken) != 0 {
newRunBranchToken = branchToken
}

var newRunEventsBlob *commonpb.DataBlob
if len(newRunBranchToken) > 0 {
// only get the first batch
var err error
newRunEventsBlob, err = p.getEventsBlob(
ctx,
newRunBranchToken,
common.FirstEventID,
common.FirstEventID+1,
)
if err != nil {
return nil, err
}
}
attr.HistoryTaskAttributes.NewRunEvents = newRunEventsBlob
return task, nil
}

func getVersionHistoryItems(
mutableState workflow.MutableState,
eventID int64,
Expand Down
8 changes: 4 additions & 4 deletions service/history/replication/ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (s *ackManagerSuite) TestSyncActivity_WorkflowCompleted() {
ScheduledEventID: scheduledEventID,
}

context, release, _ := s.replicationAckManager.historyCache.GetOrCreateWorkflowExecution(
context, release, _ := s.replicationAckManager.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
commonpb.WorkflowExecution{
Expand Down Expand Up @@ -323,7 +323,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityCompleted() {
ScheduledEventID: scheduledEventID,
}

context, release, _ := s.replicationAckManager.historyCache.GetOrCreateWorkflowExecution(
context, release, _ := s.replicationAckManager.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
commonpb.WorkflowExecution{
Expand Down Expand Up @@ -378,7 +378,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityRetry() {
ScheduledEventID: scheduledEventID,
}

context, release, _ := s.replicationAckManager.historyCache.GetOrCreateWorkflowExecution(
context, release, _ := s.replicationAckManager.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
commonpb.WorkflowExecution{
Expand Down Expand Up @@ -491,7 +491,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityRunning() {
ScheduledEventID: scheduledEventID,
}

context, release, _ := s.replicationAckManager.historyCache.GetOrCreateWorkflowExecution(
context, release, _ := s.replicationAckManager.workflowCache.GetOrCreateWorkflowExecution(
ctx,
namespaceID,
commonpb.WorkflowExecution{
Expand Down
7 changes: 5 additions & 2 deletions service/history/tasks/history_replication_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ type (
FirstEventID int64
NextEventID int64
Version int64
BranchToken []byte
NewRunBranchToken []byte
// deprecated
BranchToken []byte
// deprecated
NewRunBranchToken []byte
NewRunID string
}
)

Expand Down
14 changes: 1 addition & 13 deletions service/history/timerQueueTaskExecutorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package history

import (
"bytes"
"context"

commonpb "go.temporal.io/api/common/v1"
Expand All @@ -35,7 +34,6 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
Expand Down Expand Up @@ -133,17 +131,7 @@ func (t *timerQueueTaskExecutorBase) executeDeleteHistoryEventTask(
return err
}
if ok := VerifyTaskVersion(t.shard, t.logger, mutableState.GetNamespaceEntry(), lastWriteVersion, task.Version, task); !ok {
currentBranchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}
// the mutable state has a newer version and the branch token is updated
// use task branch token to delete the original branch
if !bytes.Equal(task.BranchToken, currentBranchToken) {
return t.deleteHistoryBranch(ctx, task.BranchToken)
}
t.logger.Error("Different mutable state versions have the same branch token", tag.TaskVersion(task.Version), tag.LastEventVersion(lastWriteVersion))
return serviceerror.NewInternal("Mutable state has different version but same branch token")
return nil
}

return t.deleteManager.DeleteWorkflowExecutionByRetention(
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,11 +725,13 @@ func (c *ContextImpl) mergeContinueAsNewReplicationTasks(
delete(newWorkflowSnapshot.Tasks, tasks.CategoryReplication)

newRunBranchToken := newRunTask.BranchToken
newRunID := newRunTask.RunID
taskUpdated := false
for _, replicationTask := range currentWorkflowMutation.Tasks[tasks.CategoryReplication] {
if task, ok := replicationTask.(*tasks.HistoryReplicationTask); ok {
taskUpdated = true
task.NewRunBranchToken = newRunBranchToken
task.NewRunID = newRunID
}
}
if !taskUpdated {
Expand Down
4 changes: 0 additions & 4 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,6 @@ func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks(
FirstEventID: firstEvent.GetEventId(),
NextEventID: lastEvent.GetEventId() + 1,
Version: version,
BranchToken: branchToken,
NewRunBranchToken: nil,
})
return nil
}
Expand Down Expand Up @@ -589,8 +587,6 @@ func (r *TaskGeneratorImpl) GenerateMigrationTasks(
FirstEventID: executionInfo.LastFirstEventId,
NextEventID: lastItem.GetEventId() + 1,
Version: lastItem.GetVersion(),
BranchToken: versionHistory.BranchToken,
NewRunBranchToken: nil,
}, nil
}
}
Expand Down

0 comments on commit 0df719d

Please sign in to comment.