-
Notifications
You must be signed in to change notification settings - Fork 754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Callbacks as a plugin part 1 #5446
Conversation
destination string | ||
|
||
url string | ||
completion nexus.OperationCompletion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused. Is this callback implementation intended to be generic? It seems to be coupled to some Nexus concepts. If I were to add a new callback variant, would I need to define a new executable for that or would I modify this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now this is what we support, we can extend this same code to support more types of callbacks.
) | ||
|
||
// InvocationTaskTimeout is the timeout for executing a single callback invocation task. | ||
var InvocationTaskTimeout = "plugin.callback.invocation.taskTimeout" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little nervous about setting a precedent of putting dynamic config keys somewhere other than dynamicconfig/constants
. The current organization definitely is not perfect, but I worry that without a defined best practice it will get confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the concern, but I think this is a step in the right direction for our codebase.
I even want the proto definitions for the callback state machine to be included in the plugin directory.
message CallbackInfo { | ||
// The namespace failover version at the time this callback info was updated. | ||
// State-machine information. | ||
message StateMachine { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe StateMachineInfo
to keep the naming scheme consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this and that's what I had before changing this name.
I like just calling this state machine, I don't think info is adding much. It's also not used in the exact same way as the other Info
messages.
string id = 2; | ||
// Namespace failover version on the corresponding state machine object, used for staleness detection when global | ||
// namespaces are enabled. | ||
int64 version = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe namespace_failover_version
to keep the variable name consistent with other places we reference this type of version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be fine with me but in most other places in the codebase it's called version
. Maybe we need to change all of the new names to version
for consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I didn't realize those other versions were also for namespace failover. I like the more descriptive name, but not too picky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm taking your suggestion.
return nil, nil | ||
} | ||
|
||
func RegisterTaskSerializer(reg *statemachines.Registry) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little curious why registering a state machine, executor, and task serializer are all separate steps. Is it possible to have one without the others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I considered adding a RegisterPlugin
method and having a more "well defined" concept but for now I'd rather have the flexibility before solidifying that interface.
name: "non-retryable-error", | ||
caller: func(r *http.Request) (*http.Response, error) { | ||
return &http.Response{StatusCode: 500}, nil | ||
}, | ||
assertOutcome: func(t *testing.T, cb callbacks.Callback) { | ||
require.Equal(t, enumspb.CALLBACK_STATE_BACKING_OFF, cb.PublicInfo.State) | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the returned error should be a non-retryable 4xx and the expected state should be something terminally failed
var Module = fx.Module( | ||
"plugin.callbacks", | ||
fx.Provide(callbacks.ConfigProvider), | ||
fx.Invoke(callbacks.RegisterTaskSerializer), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, Invoke is variadic in case you want to use that style instead
} | ||
|
||
// TaskKind represents the possible set of kinds for a task. | ||
// Each kind is mapped to a concrete [tasks.Task] instance and is backed by specific protobuf message; for example, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's mapped to a concrete [tasks.Task] implementation, not instance, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks.
case *commonpb.Callback_Nexus_: | ||
u, err := url.Parse(c.PublicInfo.Callback.GetNexus().Url) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to parse URL: %v", &c) // nolint:goerr113 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to also wrap the err
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oversight, thanks
} | ||
|
||
func (stateMachineDefinition) Serialize(state any) ([]byte, error) { | ||
return proto.Marshal(state.(Callback).CallbackInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not return an error instead of panicking if state
has the wrong type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I overlooked this, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically this should never happen but better safe than sorry.
cb.recordAttempt(event.Time) | ||
// Use 0 for elapsed time as we don't limit the retry by time (for now). | ||
// TODO: Make the retry policy initial interval configurable. | ||
nextDelay := backoff.NewExponentialRetryPolicy(time.Second).ComputeNextDelay(0, int(cb.PublicInfo.Attempt)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered using WithExpirationInterval(backoff.NoExpiration)
instead? It may be more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know about that, let me try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It didn't make things more readable IMHO.
// Use 0 for elapsed time as we don't limit the retry by time (for now). | ||
// TODO: Make the retry policy initial interval configurable. | ||
nextDelay := backoff.NewExponentialRetryPolicy(time.Second).ComputeNextDelay(0, int(cb.PublicInfo.Attempt)) | ||
nextAttemptScheduleTime := event.Time.Add(nextDelay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the code, with the default value for maximumAttempts
, and with an elapsedTime
of 0, nextDelay
should never be done
. However, I think we should still check that the delay returned here is non-negative before doing this in case someone changes the above code later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that should be the responsibility of ComputeNextDelay
.
// EventRescheduled is triggered when the callback is meant to be rescheduled after backing off from a previous attempt. | ||
type EventRescheduled struct{} | ||
|
||
var TransitionRescheduled = statemachines.NewTransition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've identified a small but meaningful distinction between EventRescheduled
and EventAttemptFailed
within our system, pointing to two distinct programming models: proactive versus reactive. EventRescheduled
acts more like a command—a proactive instruction issued by the server to the callback state machine, telling it to reschedule. Conversely, EventAttemptFailed
functions as a reactive event, informing the state machine of something that already happened, the attempt failing. This nuanced difference underlines that not all state changes are events; some are direct commands.
Our previous use of next_event_id
for optimistic concurrency control in workflows had an oversight which I believe was also due to not making this distinction. We assumed it would always increment with any change because all changes were reactive. However, this wasn't the case because we started making changes to workflow state proactively, not in direct response to any events, leading us to switch to db_record_version
.
To address this, I propose a simple yet intuitive adjustment: shifting our terminology from "events" to "inputs." This aligns with the conventional language used in theoretical Finite State Machines (FSMs) and serves as an appropriate hypernym for both events and commands. Furthermore, we need not force all transition functions and their arguments into the TransitionX(EventX)
format. A more organized approach could look like this:
var TransitionFunctions = struct {
Reschedule Transition[Callback, time.Time]
OnAttemptFailed Transition[Callback, FailedAttemptInfo]
} {
...
}
This structure not only maintains ease of access to all transition functions within a package but also improves readability and clarity. By adopting this approach, we effectively acknowledge the diversity of inputs driving our state machine, enhancing both our conceptual model and codebase organization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both of these suggestions somewhat SGTM.
Events here are actually inputs, so I don't mind changing the terminology.
I know that a lot of FSM libraries use the term event but I agree changing the terminology to avoid confusion with history events which may also be inputs.
I'll try the struct approach, I didn't go for that because it adds what I think is unnecessary boilerplate.
I do like that all the transitions are grouped somehow, especially if we want to generate diagrams from the definitions but that doesn't require a struct, it could be based on a convention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'll stick with events. The fact that you can use any event to trigger a transition, not just history events makes the issue you raised non existent.
Here's how the term is used in other libraries:
https://github.com/temporalio/sdk-core/blob/master/fsm/rustfsm_trait/src/lib.rs
https://github.com/looplab/fsm
https://stately.ai/docs/transitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not loving the boilerplate for for putting the transitions in a single struct:
I don't think it's adding much, here's how I added 3 out of the 5 transitions:
var Transitions = struct{
Succeeded statemachines.Transition[enumspb.CallbackState, Callback, EventSucceeded]
Failed statemachines.Transition[enumspb.CallbackState, Callback, EventFailed]
AttemptFailed statemachines.Transition[enumspb.CallbackState, Callback, EventAttemptFailed]
}{
Succeeded: TransitionSucceeded,
Failed: TransitionFailed,
AttemptFailed: TransitionAttemptFailed,
}
// state machine's functionality. For example, a callback executor might need to extract the nexus completion from a | ||
// MutableState object but not all mutable states are required to support that. | ||
// In the future this may be merged into the [StateMachine] interface to allow state machines to have a hierarchy. | ||
type MutableState interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like that you've broken the dependency cycle here in a way where we don't rely on the workflow mutable state, but it seems weird that we still need to be aware of the "MutableState" concept in this package, especially just to retrieve a Store
. However, I also understand that we can't just pass in a *Store
here because we need other mutable state methods in the callback task executor. However, we don't want to make this one big object. Could we instead make this a generic parameter on the transition function to support different environments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would require that Executor is generic which is hard to work with in Go.
I plan on iterating on this when we add the next state machine.
I definitely don't consider this final.
service/history/statemachines/sm.go
Outdated
return m.Version | ||
// A StateMachine is anything that can get and set a comparable state S and generate tasks. | ||
// It is meant to be used with [Transition] objects to safely transition their state on a given event. | ||
// Task generation is done seperately from state transitions to support state based replication and task refresh. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Task generation is done seperately from state transitions to support state based replication and task refresh. | |
// Task generation is done separately from state transitions to support state-based replication and task refresh. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
service/history/statemachines/sm.go
Outdated
if !t.Possible(data) { | ||
// Apply applies a transition event to the given state machine changing the statemachine's state to the transition's | ||
// Destination on success. | ||
func (t Transition[S, SM, E]) Apply(sm SM, event E) (retErr error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is there any value in naming the returned error value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... this is a leftover from refactoring work. Thanks for pointing it out.
service/history/statemachines/sm.go
Outdated
func (m *MockEnvironment) Schedule(task Task) { | ||
m.ScheduledTasks = append(m.ScheduledTasks, task) | ||
// Transition represents a state machine transition for a machine of type SM with state S and event E. | ||
type Transition[S comparable, SM StateMachine[S], E any] struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be nonsense but at first glance the event type parameter E
seems to have a rather weak connection to the type: it's only used by an optional mutator function. That's making me want to double-check that it is definitely desired/required for the event to be part of the Transition
type; e.g. is there an alternative design where the mutator function is generic over event instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played with this type definition a bit before settling on this.
This is the only way I could make it type safe and ergonomic.
If you have a concrete suggestion, I'd definitely welcome it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I played with it a bit and also couldn't find a better arrangement.
// state machine's functionality. For example, a callback executor might need to extract the nexus completion from a | ||
// MutableState object but not all mutable states are required to support that. | ||
// In the future this may be merged into the [StateMachine] interface to allow state machines to have a hierarchy. | ||
type MutableState interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we are introducing new workflow execution state machine abstractions (e.g. here, and the ASM work), I'd like to suggest that we leave the name "MutableState" behind in the original implementation and not carry it forward into the new ones.
My criticisms of the name:
- In Go one doesn't need to call out that a data structure is mutable.
- It's extremely vague: we should choose a name that says what entity this is the state of (e.g. in the classic Temporal workflow implementation it seems it could be called
WorkflowExecutionState
or similar).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reused the name because of the familiarity.
I don't want to call this WorkflowExecutionState
because some day these records may not be referencing workflows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reused the name because of the familiarity.
Yes, I'm suggesting not to do that.
I don't want to call this
WorkflowExecutionState
because some day these records may not be referencing workflows.
I mentioned WorkflowExecutionState
as a name that could have been given, in an alternate universe, to MutableState
in the classic workflow implementation. I agree that wouldn't be the right name here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So my question is: what would be a good name for this if we were not going to re-use the almost meaningless "MutableState" name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking is that we merge this concept with StateMachine
which basically makes every state machine hierarchical but it might require another interface because StateMachine
is generic and I like that it is that way for now.
I'm not going there quite yet though. My thought was that when we implement the operation state machine it would have a child cancelation machine and I'd refactor then (or @MichaelSnowden whoever picks up that work).
// Note that each category supports a specific task kind, make sure those match. | ||
Kind() TaskKind | ||
// Determines which queue this task is scheduled on. | ||
Category() tasks.Category |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Is
Category()
used anywhere? Looks like right now each kind is hardcoded to use a specific Category. - I think now I get the idea of Kind. But I still think the def of Kind is not orthogonal to Category. e.g. you can't have a task with KindTimer and an immediate task category. Maybe Category is a bit too low level and not the right concept to expose here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not used here, it's used by the task generator when the tasks.Task
instances are generated.
I had a version of this where Kind
and Category
were merged but that seemed less flexible since we may want multiple timer or immediate task queues.
Lets leave this for now and consider merging the concepts if we see they're redundant after a few more use cases.
// TaskSerializer provides type information and a serializer for a state machine. | ||
type TaskSerializer interface { | ||
Serialize(Task) ([]byte, error) | ||
Deserialize(data []byte, category tasks.Category, kind TaskKind) (Task, error) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I was confused in the beginning why taskType is not a parameter here, then I realized taskSerializer is scoped to a certain task type. That maybe worth call out here.
- The
category
parameter feels unnecessary to me for decoding the task blob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was confused in the beginning why taskType is not a parameter here, then I realized taskSerializer is scoped to a certain task type. That maybe worth call out here.
Sure
The category parameter feels unnecessary to me for decoding the task blob.
Yeah, it's just so you can reconstruct the exact thing you passed into the serializer.
I'll consider removing this.
// Determines which queue this task is scheduled on. | ||
Category() tasks.Category | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR. But I noticed an issue here and want to comment before I forget.
The FireTime field of a taskKey must be zero for an immediate task category (some persistence implementations have additional sanity checks on its value). So you probably don't want to define the GetKey method in the base struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I think it should be fine since FireTime
will be zero for immediate tasks.
if err != nil { | ||
return err | ||
} | ||
serialized, err := e.collection.definition.Serialize(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you remind me one more time why serialization needs to happen here instead of later when, say, closing mutable state transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just so you have a checkpoint to rollback to on failed transition.
service/history/statemachines/sm.go
Outdated
return m.CurrentTime | ||
// A TaskGenerator is anything that can generate a [Task] slice. | ||
type TaskGenerator interface { | ||
Tasks() ([]Task, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re. state-based and event (input) -based task generation.
I think state-based approach can cover probably majority of the case, but since it's less general than the input-based approach, I guess I need to think more about it.
e.g. Workflow is a state machine and we only want to generate the timeout timer when the workflow is created, instead of every time some attributes of this workflow is updated. How do we achieve that in state based approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You only generate tasks if the state machine is updated.
string namespace_id = 1; | ||
string workflow_id = 2; | ||
string run_id = 3; | ||
|
||
temporal.server.api.enums.v1.TaskType task_type = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this always TASK_TYPE_STATE_MACHINE_OUTBOUND
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes for now.
message CallbackInfo { | ||
temporal.api.workflow.v1.CallbackInfo public_info = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted an internal structure so we can add more fields that are not public later.
type activeBackoffExecutor struct { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be removed?
|
||
func RegisterExecutor( | ||
registry *statemachines.Registry, | ||
options ActiveExecutorOptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the fx provider of this ActiveExecutorOptions should be defined in the callbacks package as well.
destination: smTask.Destination, | ||
}) | ||
case BackoffTask: | ||
return processor.ProcessOneStepExecutable(ctx, task, backoffExecutable{task: task}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a timeout enforced for this task? I don't see one enforced by executeStateMachineTask
in timerQueueActiveTaskExecutor.
This task shares the same goroutine pool with other timer tasks on the host and need to have a timeout like this.
I'm putting this on hold and will push some changes after getting the first round of feedback. |
I restructured things a bit. |
f04d1a0
to
f2e758a
Compare
Heads up that this PR is up for review again after internal discussions where we approved this v0.1 of the HSM API. |
7994ec8
to
5d712b3
Compare
This PR is intended to merge 4 months of work in the `nexus` feature branch into `main`. The functionality it brings is: - Dispatching Nexus Tasks by namespace and task queue - Internal implementation of the Incoming Service Registry - not yet exposed - Attaching workflow close callbacks on `StartWorkflowExecutionRequest` and processing of those callbacks There's more to come and some of the callback code will be refactored into a new plugin architecture (#5446) soon. --------- Co-authored-by: PJ Doerner <pj.doerner@temporal.io>
## Notes for reviewers I don't consider this a final approach but I do think it's a step in the right direction, we need to model more state machines on top of this to form a more solid API. Note that mutable state itself is registered as a state machine in the HSM framework and used as the root node in the HSM tree. It's a bit annoying that all of the unit tests need to register the workflow state machine but I think this is ended up being the cleanest approach from the HSM framework perspective. There's still some follow-up work, like collapsing timer tasks and support for interacting with history event, that I'll take on later. Replication support for the framework will also come at a later time. - See discussions in related PRs (#5446, #5495) ## What changed? - Modified the statemachines abstraction to be a bit more generic - Rewrote callbacks as a plugin using this framework ## Why? This centralizes most the callback code in the plugin directory instead of having it spread out the entire project moving common concerns such as staleness checks, task generation, and (in the future) replication into a framework and should generally help speed up feature delivery and maintainability. I plan to leverage this framework when implementing Nexus operations. ## How did you test it? Existing tests from the feature branch and added unit tests.
This PR is intended to merge 4 months of work in the `nexus` feature branch into `main`. The functionality it brings is: - Dispatching Nexus Tasks by namespace and task queue - Internal implementation of the Incoming Service Registry - not yet exposed - Attaching workflow close callbacks on `StartWorkflowExecutionRequest` and processing of those callbacks There's more to come and some of the callback code will be refactored into a new plugin architecture (temporalio#5446) soon. --------- Co-authored-by: PJ Doerner <pj.doerner@temporal.io>
## Notes for reviewers I don't consider this a final approach but I do think it's a step in the right direction, we need to model more state machines on top of this to form a more solid API. Note that mutable state itself is registered as a state machine in the HSM framework and used as the root node in the HSM tree. It's a bit annoying that all of the unit tests need to register the workflow state machine but I think this is ended up being the cleanest approach from the HSM framework perspective. There's still some follow-up work, like collapsing timer tasks and support for interacting with history event, that I'll take on later. Replication support for the framework will also come at a later time. - See discussions in related PRs (temporalio#5446, temporalio#5495) ## What changed? - Modified the statemachines abstraction to be a bit more generic - Rewrote callbacks as a plugin using this framework ## Why? This centralizes most the callback code in the plugin directory instead of having it spread out the entire project moving common concerns such as staleness checks, task generation, and (in the future) replication into a framework and should generally help speed up feature delivery and maintainability. I plan to leverage this framework when implementing Nexus operations. ## How did you test it? Existing tests from the feature branch and added unit tests.
Notes for reviewers
nexus
branchhsm
andplugins
directories from the WIP sub-state-machines branch.What changed?
Why?
This centralizes most the callback code in the plugin directory instead of having it spread out the entire project moving common concerns such as staleness checks, task generation, and (in the future) replication into a framework and should generally help speed up feature delivery and maintainability.
I plan to leverage this framework when implementing Nexus operations.
How did you test it?
Existing tests from the feature branch and added unit tests.