-
Notifications
You must be signed in to change notification settings - Fork 779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Nexus async completion endpoint #5735
Conversation
4a1ba13
to
4bb9cb4
Compare
4bb9cb4
to
7c8d04c
Compare
// The task is stale and is safe to be dropped. | ||
// Even though ErrStaleReference is castable to serviceerror.NotFound, we give this error special treatment | ||
// because we're interested in the metric. | ||
metrics.TaskSkipped.With(e.taggedMetricsHandler).Record(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should change the other place where TaskSkipped is emitted to return the StaleStateError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what the semantics are there.
When would the task event ID be >=
than the mutable state next event ID?
It does seem like we want to put the task in the DLQ in that case instead of dropping it. Do you want to change that behavior in this PR that goes into a feature branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding it that it should never happen and basically the same as the StaleStateError.
goes into a feature branch
I am fine either way. That dropping logic is there for a long time so not an urgent thing to fix.
I commented here because it might impact the interpretation of the TaskSkipped metric.
proto/internal/temporal/server/api/historyservice/v1/request_response.proto
Outdated
Show resolved
Hide resolved
Config *Config | ||
CallbackTokenGenerator *commonnexus.CallbackTokenGenerator | ||
HistoryClient resource.HistoryClient | ||
NamespaceValidationInterceptor *interceptor.NamespaceValidatorInterceptor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't read the code very carefully. I thought those interceptors are taken care of by the existing NexusHTTPHandler in frontend? I guess I am confused by all those handlers now 🤦
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was debating whether to put this functionality in the frontend package.
I'm trying to see how far I can take these components.
There's definitely some duplication but I'll take care of that later. I have it tracked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm also a little confused why there is a separate Nexus frontend handler just for completions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could move the nexus http handler into this package as part of later refactoring work.
f2367fd
to
4d20fd8
Compare
## What changed? Added pending nexus operations to describe response. I was debating if I want to allow components to register a describe hook but decided to just inline this for now.
Note to reviewers, I squashed the stacked PR #5749 on top of this. You can ignore that commit. |
cmd/tools/rpcwrappers/main.go
Outdated
panic(fmt.Sprintf("expected at least one namespace ID field in request with nesting of 2 in %s", t)) | ||
} else { | ||
// There's more than one, assume there's a top level one (e.g. | ||
// historyservice.GetWorkflowExecutionHistoryRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only see one NamespaceId field in historyservice.GetWorkflowExecutionHistoryRequest.
I think ideally we should panic here as well since it's ambiguous in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry it's GetWorkflowExecutionRawHistoryRequest
.
// The task is stale and is safe to be dropped. | ||
// Even though ErrStaleReference is castable to serviceerror.NotFound, we give this error special treatment | ||
// because we're interested in the metric. | ||
metrics.TaskSkipped.With(e.taggedMetricsHandler).Record(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding it that it should never happen and basically the same as the StaleStateError.
goes into a feature branch
I am fine either way. That dropping logic is there for a long time so not an urgent thing to fix.
I commented here because it might impact the interpretation of the TaskSkipped metric.
"nexus_completion_requests", | ||
WithDescription("The number of Nexus completion (callback) requests received by the service."), | ||
) | ||
NexusCompletionLatencyHistogram = NewCounterDef( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you want a NewTimerDef
instead? Ditto L686 above.
NexusCompletionLatencyHistogram = NewCounterDef( | |
NexusCompletionLatency = NewTimerDef( |
} | ||
|
||
completion := &tokenspb.NexusOperationCompletion{} | ||
return completion, proto.Unmarshal(plaintext, completion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanna double check, proto.Unmarshal
works with completion
instead of &completion
?
Can you add unit tests for this function and DecodeCallbackToken
below?
@@ -69,3 +71,26 @@ func (g *CallbackTokenGenerator) Tokenize(completion *tokenspb.NexusOperationCom | |||
} | |||
return string(b), nil | |||
} | |||
|
|||
// DecodeCompletion decodes a callback token unwrapping the contained NexusOperationCompletion proto struct. | |||
func (g *CallbackTokenGenerator) DecodeCompletion(token *CallbackToken) (*tokenspb.NexusOperationCompletion, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't seem to need be a method of CallbackTokenGenerator
if !exposeDetails { | ||
return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") | ||
} | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this to the default
case inside the switch
?
Also, don't you need to wrap the last return in nexus.HandlerError
?
func handleSuccessfulOperationResult(node *hsm.Node, operation Operation, | ||
result *commonpb.Payload, | ||
attemptTime *time.Time, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
func handleSuccessfulOperationResult(node *hsm.Node, operation Operation, | |
result *commonpb.Payload, | |
attemptTime *time.Time, | |
func handleSuccessfulOperationResult( | |
node *hsm.Node, | |
operation Operation, | |
result *commonpb.Payload, | |
attemptTime *time.Time, |
tag.WorkflowRunID(completion.GetRunId()), | ||
) | ||
if completion.GetNamespaceId() != ns.ID().String() { | ||
logger.Error("namespace ID in token doesn't match the token", tag.WorkflowNamespaceID(ns.ID().String()), tag.Error(err), tag.NewStringTag("completion-namespace-id", completion.GetNamespaceId())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: avoid very long lines
logger.Error("namespace ID in token doesn't match the token", tag.WorkflowNamespaceID(ns.ID().String()), tag.Error(err), tag.NewStringTag("completion-namespace-id", completion.GetNamespaceId())) | |
logger.Error( | |
"namespace ID in token doesn't match the token", | |
tag.WorkflowNamespaceID(ns.ID().String()), | |
tag.Error(err), | |
tag.NewStringTag("completion-namespace-id", completion.GetNamespaceId()), | |
) |
if !h.Config.Enabled() { | ||
h.MetricsHandler.Counter(metrics.NexusCompletionRequestPreProcessErrors.Name()).Record(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if !h.Config.Enabled() { | |
h.MetricsHandler.Counter(metrics.NexusCompletionRequestPreProcessErrors.Name()).Record(1) | |
reqPreProcessErrorsCounter := h.MetricsHandler.Counter(metrics.NexusCompletionRequestPreProcessErrors.Name()) | |
if !h.Config.Enabled() { | |
reqPreProcessErrorsCounter.Record(1) |
metrics.OperationTag(apiName), | ||
metrics.NamespaceTag(nsName), | ||
), | ||
requestStartTime: time.Now(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be more precise if you recorded the start time at the top of this function?
} else { | ||
if c.outcomeTag != nil { | ||
c.metricsHandler = c.metricsHandler.WithTags(c.outcomeTag) | ||
} else { | ||
var he *nexus.HandlerError | ||
if errors.As(*errPtr, &he) { | ||
c.metricsHandler = c.metricsHandler.WithTags(metrics.NexusOutcomeTag("error_" + strings.ToLower(string(he.Type)))) | ||
} else { | ||
c.metricsHandler = c.metricsHandler.WithTags(metrics.NexusOutcomeTag("error_internal")) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else { | |
if c.outcomeTag != nil { | |
c.metricsHandler = c.metricsHandler.WithTags(c.outcomeTag) | |
} else { | |
var he *nexus.HandlerError | |
if errors.As(*errPtr, &he) { | |
c.metricsHandler = c.metricsHandler.WithTags(metrics.NexusOutcomeTag("error_" + strings.ToLower(string(he.Type)))) | |
} else { | |
c.metricsHandler = c.metricsHandler.WithTags(metrics.NexusOutcomeTag("error_internal")) | |
} | |
} | |
} | |
} else if c.outcomeTag != nil { | |
c.metricsHandler = c.metricsHandler.WithTags(c.outcomeTag) | |
} else { | |
var he *nexus.HandlerError | |
if errors.As(*errPtr, &he) { | |
c.metricsHandler = c.metricsHandler.WithTags(metrics.NexusOutcomeTag("error_" + strings.ToLower(string(he.Type)))) | |
} else { | |
c.metricsHandler = c.metricsHandler.WithTags(metrics.NexusOutcomeTag("error_internal")) | |
} | |
} |
// not referencing a stale state or that the task itself is not stale. For example, if the state has a history of | ||
// `[{v: 1, t: 3}, {v: 2, t: 5}]`, task A `{v: 2, t: 4}` **is not** referencing stale state because for version `2` | ||
// When a task or API request is being processed, the history is compared with the imprinted state reference to verify | ||
// that the state is not stale or that the task or request itself is not stale. For example, if the state has a history |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
// that the state is not stale or that the task or request itself is not stale. For example, if the state has a history | |
// that the state is not stale or that the task/request itself is not stale. For example, if the state has a history |
## What changed? [Operation state machine](#5545) [Nexus command processing](#5546) [Implement Nexus start operation task executors](#5686) [Implement Nexus async completion endpoint](#5735) Upgrade API to support full E2E flow temporalio/api#363
What changed?
mux.Router
instead of collecting handlers so I can inject routes in the component (plugin) codetask_executor.go
tostatemachine_environment.go
and adapted it to support handling API requestsStateMachineEnvironment()
to history Engine. In the future we'll want to have the environment on the shard controller instead of the engine but because I'm waiting until we drop support for shard level workflow cache first to make that changenexus_handler.go
intocommon/nexus
for reusability