Conversation
| // Used to track which propagations are still pending across continue-as-new. | ||
| map<string, PropagatingRevisions> propagating_revisions = 8; | ||
| // Request ID used to create this worker deployment. | ||
| string create_request_id = 9; |
There was a problem hiding this comment.
I presume you will be using the lack of a create_request_id to differentiate between WorkerDeployments that were not created with the CreateWorkerDeployment API call -- and thus were lazily created?
There was a problem hiding this comment.
there'd be still a request ID but with special prefix.
service/frontend/configs/quotas.go
Outdated
| "/temporal.api.workflowservice.v1.WorkflowService/UpdateWorkerBuildIdCompatibility": 2, | ||
| "/temporal.api.workflowservice.v1.WorkflowService/UpdateWorkerVersioningRules": 2, | ||
|
|
||
| // Anything that changes task queue user data also creates replication tasks. |
There was a problem hiding this comment.
tasks that are generated by our replication stack to copy things from one cluster to another one for MRN namespaces.
There was a problem hiding this comment.
removed from this PR
| if allowNoPollers { | ||
| // we want to start the Worker Deployment workflow if it hasn't been started by a poller | ||
| if b := versionObj.GetBuildId(); b != "" { | ||
| // Empty build id is accepted for unset. |
There was a problem hiding this comment.
What do you mean by "accepted for unset"?
There was a problem hiding this comment.
unset current or ramping version.
|
|
||
| updateRequest := &updatepb.Request{ | ||
| Input: &updatepb.Input{Name: CreateWorkerDeployment, Args: updateArgs}, | ||
| Meta: &updatepb.Meta{UpdateId: "_create_" + requestID, Identity: identity}, |
There was a problem hiding this comment.
might be good to create a const for "create" like you did for the AutoCreate one...
There was a problem hiding this comment.
this is just used once and is only the WorkflowUpdate's request ID, not gonna be used as the create_request_id inside the workflow state. The reason I add the prefix is to differentiate the request id for create vs other APIs such as setcurrent etc. in case user sends the same request ID we still treat them differently. Adding this as a comment in the code.
| Meta: &updatepb.Meta{UpdateId: "_create_" + requestID, Identity: identity}, | ||
| } | ||
|
|
||
| outcome, err := updateWorkflowWithStart( |
There was a problem hiding this comment.
I mentioned this on one of Stefan's PRs as well, but I personally find the name updateWorkflowWithStart() to be incredibly confusing. Apparently this doesn't update a workflow at all but uses the rather weirdly-named WorkflowUpdate operation... updateWorkflowWithStart should really have been called WaitForWorkflowStatus...
There was a problem hiding this comment.
From what I read, UpdateWorkflowWithStart means: start workflow if it doesn't exist, and then send an update to it.
| t.Run("v1", func(t *testing.T) { | ||
| suite.Run(t, &VersionWorkflowSuite{workflowVersion: AsyncSetCurrentAndRamping}) | ||
| }) |
There was a problem hiding this comment.
this mode is not supported in prod anymore so no need for testing it.
| } | ||
|
|
||
| func (d *WorkflowRunner) validateCreateWorkerDeployment(args *deploymentspb.CreateWorkerDeploymentArgs) error { | ||
| // Only valid if deployment is deleted or the request ID matches the current one. |
There was a problem hiding this comment.
sorry, explain to me why the CreateWorkerDeploymentArgs is valid if the WorkflowRunner's deleteDeployment flag is true?
There was a problem hiding this comment.
this was to cover an edge case where we admit a delete and create simultaneously but we process create after the delete. thinking more, in that case, it's ok for create to return "already existed" error. simplifying this part.
| if err := d.validateDeleteDeployment(); err != nil { | ||
| return err | ||
| } |
There was a problem hiding this comment.
Is this unrelated to the addition of CreateWorkerDeployment?
There was a problem hiding this comment.
yes, I noticed the check does did not exist before. We should have it.
There was a problem hiding this comment.
removing it from this PR
| // used as Worker Deployment workflow update input: | ||
| message CreateWorkerDeploymentArgs { | ||
| string identity = 1; | ||
| string request_id = 2; |
There was a problem hiding this comment.
Please document the semantics of this request id is. It seems like:
- Retrying with same request id is a no-op.
- With different request id is an error.
- Unless the deployment is deleted, in this case it is revived.
There was a problem hiding this comment.
Added. Here is the semantics:
// Retrying with same request id is a successful no-op.
// Retrying with different request id is an error.
// One deployment is deleted, same or different request id will re-create it.
| bytes conflict_token = 2; | ||
| } | ||
|
|
||
| // used as Worker Deployment workflow update input: |
There was a problem hiding this comment.
Reword: Used as the input to the worker deployment workflow.
| // Used to track which propagations are still pending across continue-as-new. | ||
| map<string, PropagatingRevisions> propagating_revisions = 8; | ||
| // Request ID used to create this worker deployment. | ||
| string create_request_id = 9; |
There was a problem hiding this comment.
I think I slightly prefer the prefix because this is a long-lived entity receiving many types of requests after creation.
| return dInfo, queryResponse.GetState().GetConflictToken(), nil | ||
| } | ||
|
|
||
| func (d *ClientImpl) queryCreateRequestID( |
There was a problem hiding this comment.
nit: Just queryRequestId. Also update the error text.
| requestID string, | ||
| ) ([]byte, error) { | ||
| // Check if deployment already exists and whether it was created by this request. | ||
| res, err := d.queryCreateRequestID(ctx, namespaceEntry, deploymentName) |
There was a problem hiding this comment.
Is this an optimization to fail fast? Asking since even if it succeeds here, it is possible that it will fail on the server due to a race. Right?
There was a problem hiding this comment.
yes, this is an optimization, the race condition is handled by updatewithstart. the reason for the optimization is that we don't want to send updates to the workflow needlessly because it generates history events (ideally, we should right nothing in case of duplicate request ID.)
|
|
||
| conflictToken, err := d.ensureWorkerDeploymentDoesNotExist(ctx, namespaceEntry, deploymentName, requestID) | ||
| if err != nil { | ||
| // WD already exists, or other errors |
There was a problem hiding this comment.
with different request id?
| Meta: &updatepb.Meta{UpdateId: "_create_" + requestID, Identity: identity}, | ||
| } | ||
|
|
||
| outcome, err := updateWorkflowWithStart( |
There was a problem hiding this comment.
From what I read, UpdateWorkflowWithStart means: start workflow if it doesn't exist, and then send an update to it.
| func (d *WorkflowRunner) revive(ctx workflow.Context, args *deploymentspb.CreateWorkerDeploymentArgs) error { | ||
| // Reset routing config to clear all previous timestamps still present from previous life. | ||
| routingConfig := &deploymentpb.RoutingConfig{ | ||
| // We preserve the revision number in case of a previous delete while there were propagating revisions. |
There was a problem hiding this comment.
I didn't understand this comment. Can you describe in terms of what happens if we didn't do this and instead just did d.State.Reset?
There was a problem hiding this comment.
I got rid of the revive function, it was overly complicated.
There was a problem hiding this comment.
General comments.
- This PR packs a lot of changes. Suggest adding some details to the PR description --- call out key changes like files and what was changed. This helps reviewers who are not very familiar with the codebase.
- In the future, please see if it makes sense to split into multiple PRs. While we need to review all of them to get the full picture, I have found it easier to review and identify problems/regressions if the PRs are small and focused.
Example:
- Proto/API
- Server side: Handler code
- Frontend: client code
What changed?
New API added for explicitly creating a worker deployment. also supports passing Compute Config
Why?
Needed for serverless workers.
How did you test it?
Potential risks
None