-
Notifications
You must be signed in to change notification settings - Fork 757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Misc changes to migration workflow #2838
Conversation
- Separates workflows and activities out into separate files for cleanliness. - Fixes WorkflowTaskTimeout if ConcurrentActivityTask count was too high. - Replaces ForceReplicationInput of PerActivityRPS with an OverallRPS - Adds basic unit tests for the Namespace Handover Workflow. - Misc code cleanup. - Adds QueryHandler to force-replication workflow so a caller can better gauge status of the force-replication part.
@@ -470,6 +134,9 @@ func (a *activities) checkReplicationOnce(ctx context.Context, waitRequest waitR | |||
tag.NewDurationTag("AllowedLagging", waitRequest.AllowedLagging), | |||
tag.NewDurationTag("ActualLagging", shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime)), | |||
tag.NewStringTag("RemoteCluster", waitRequest.RemoteCluster), | |||
tag.NewInt64("MaxReplicationTaskId", shard.MaxReplicationTaskId), | |||
tag.NewTimeTag("ShardLocalTime", *shard.ShardLocalTime), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you trace AllowedLaggingTasks (we are anyways also tracing AllowedLagging). Also might be easier to debug if we also trace the subtraction of MaxTaskID and AckedTaskID (slightly less math someone has to perform :))
for _, shard := range resp.Shards { | ||
clusterInfo, hasClusterInfo := shard.RemoteClusters[waitRequest.RemoteCluster] | ||
if hasClusterInfo { | ||
if clusterInfo.AckedTaskId == shard.MaxReplicationTaskId || | ||
if shard.MaxReplicationTaskId-clusterInfo.AckedTaskId <= waitRequest.AllowedLaggingTasks || | ||
(clusterInfo.AckedTaskId >= waitRequest.WaitForTaskIds[shard.ShardId] && | ||
shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime) <= waitRequest.AllowedLagging) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the altered condition of the first conditional in the if statement, how often do you expect the AllowedLaggingTasks to be out of the threshold but the times to be in threshold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could happen when load is high? threshold for AllowedLaggingTasks should be very low (I'm thinking 3 or 5 at most).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure this is always true: clusterInfo.AckedTaskId >= waitRequest.WaitForTaskIds[shard.ShardId]
workflow.Go(ctx, func(ctx workflow.Context) { | ||
listWorkflowsErr = listWorkflowsForReplication(ctx, workflowExecutionsCh, ¶ms) | ||
|
||
// enqueueReplicationTasks only returns when listWorkflowCh is closed (or if it encounters an error). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment references listWorkflowCh, but we should change it to workflowExecutionsCh
97b17bc
to
ee40c22
Compare
Co-authored-by: Manu Srivastava <manu@temporal.io>
Co-authored-by: Manu Srivastava <manu@temporal.io>
Co-authored-by: Manu Srivastava <manu@temporal.io>
What changed?
Why?
How did you test it?
Potential risks
Is hotfix candidate?