Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task executable implementation #2738

Merged
merged 4 commits into from
Apr 20, 2022
Merged

Conversation

yycptt
Copy link
Member

@yycptt yycptt commented Apr 19, 2022

What changed?

  • Implement Task executable interface

Why?

  • Host level worker pool implementation

How did you test it?

  • Added unit test

Potential risks

  • N/A, not used anywhere

Is hotfix candidate?
No.

@yycptt yycptt requested review from wxing1292 and yiminc April 19, 2022 21:17
@yycptt yycptt requested a review from a team as a code owner April 19, 2022 21:17
return taskLogger
}

func GetTransferTaskEventID(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we wrap the task and make the task return the value? e.g.
https://github.com/uber/cadence/blob/0.11.x/service/worker/replicator/replicationTask.go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced offline, will:

  • Have one task executor per task type and each task executor contains both active and standby logic.
  • The eventID should be retrieved from tasks.Task interface.

Comment on lines 203 to 206
if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok {
e.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove this check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some additional context here, the error will only happen if there's bug in the system and in that case, we should not ignore the issue.

@@ -80,6 +80,10 @@ const (
retryTaskProcessingMaxInterval = 100 * time.Millisecond
retryTaskProcessingMaxAttempts = 3

rescheduleTaskInitialInterval = 3 * time.Second
rescheduleTaskBackoffCoefficient = 1.05
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this 1.05? should we use shorter initial interval with more aggressive backoff?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shorter initial interval won't help as tasks in the rescheduler will only be rescheduled periodically (batched), instead of maintaining a timer per task and reschedule once the backoff duration is reached. That's why for certain error types, there's this optimization for immediate reschedule when the # of attempts is low.

Back to the coefficient, I don't have a very strong opinion on the exact number.
For 20 attempts, using 1.05 will give us ~8s backoff, ~20s when using 1.1, ~49s when using 1.15 and ~115s when using 1.2.
For 30 attempts, ~13s for 1.05, ~52s for 1.1, ~198s for 1.15

I think the backoff calculated with coefficient > 1.1 is kinda too aggressive. 1.05 or 1.1 looks more reasonable to me. Thoughts? We can tune this number as time goes.

service/history/queues/executable.go Outdated Show resolved Hide resolved
service/history/queues/executable.go Show resolved Hide resolved
service/history/queues/executable.go Outdated Show resolved Hide resolved
service/history/queues/executable.go Show resolved Hide resolved
service/history/queues/executable.go Outdated Show resolved Hide resolved
if duration, ok := metrics.ContextCounterGet(ctx, metrics.HistoryWorkflowExecutionCacheLatency); ok {
userLatency = time.Duration(duration)
}
e.userLatency += userLatency
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this e.userLatency will accumulate during retry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, e.userLatency is used to calculate taskNoUserLatency and taskNoUserQueueLatency when acking the task. Both latencies are across multiple attempts/retries. This is the same as the existing impl in taskProcessor.go

However, I believe existing logic taskProcessor.go is not calculating the taskProcessingLatency correctly, as it's calculated by subtracting e.userlatency which is across attempts from the execution latency for one attempt.

service/history/tasks/utils.go Outdated Show resolved Hide resolved
@yycptt yycptt merged commit d84a19c into temporalio:master Apr 20, 2022
@yycptt yycptt deleted the executable-impl branch April 20, 2022 22:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants