[DRAFT] Process wide worker heartbeat#962
[DRAFT] Process wide worker heartbeat#962yuandrew wants to merge 13 commits intotemporalio:masterfrom
Conversation
| if runtime.heartbeat_worker.get().is_none() { | ||
| let process_key = Uuid::new_v4(); | ||
| // TODO: set max_concurrent_nexus_polls to 1? | ||
| // let nexus_config = WorkerConfig { |
There was a problem hiding this comment.
Seems like we want to have a separate config for this worker, or do we want to group them with the existing WorkerConfig? But what if different workers on the same namespace have 2 different configs?
Also, not sure what config options need to be unique here, I'm thinking potentially max_concurrent_nexus_polls, and also heartbeat_interval at least?
There was a problem hiding this comment.
So, yeah, the config for these workers should just be entirely in our control, and not derived from some of the other worker's configs.
For example, if you look at how we initialize replay workers, we set a lot of values to 1 or disable things, same story here really.
Off the top of my head we probably want:
- Disable workflow and activity polling - this will happen by just not calling the poll APIs, but we can also explicitly set
no_remote_activities: true. - Probably use poller autoscaling for the nexus poller with minimum/initial set to 1 and max set fairly low like 10.
- Fixed size nexus slots. Honestly not sure what the value should be here, but we can start with something low to begin with.
Pretty much everything else can just be default or off or something, whatever makes sense.
The only option that needs to be derived from an existing worker here is the newly-added heartbeat_interval. The problem is it could be conflicting. So, we can just always pick the smallest value among workers sharing the same heartbeat. That should be added to the docstring.
| ticker.reset(); | ||
| } | ||
| // TODO: handle nexus tasks | ||
| res = manager.next_nexus_task() => { |
There was a problem hiding this comment.
Currently working with Yuri on this
| runtime_handle: tokio::runtime::Handle, | ||
| heartbeat_worker: OnceLock<Worker>, | ||
| heartbeat_fn_map: Arc<Mutex<HeartbeatMap>>, | ||
| process_key: Uuid, |
There was a problem hiding this comment.
Probably this name isn't so great any more now that the grouping isn't necessarily per-process.
Honestly we could just call it task_queue_key since ultimately that's what it is. We'll want to change it in the API too.
| // Process-wide nexus worker | ||
| let worker_heartbeat = if let Some(ref details) = heartbeat_details { |
There was a problem hiding this comment.
The worker itself should now no longer need a heartbeat manager at all, like we talked about on the call, the only variant of WorkerHeartbeatDetails should be the callback (and then it'll just go away).
In fact, I don't think you need to pass in anything to the Worker at all. Workers can just expose a pub(crate) fn capture_heartbeat_details and when you are registering a new worker with the shared heartbeat manager, you just pass in a callback that calls that.
| } | ||
| } | ||
|
|
||
| pub(crate) struct WorkerHeartbeatManager { |
There was a problem hiding this comment.
I think this goes away after the other changes? All we really need is the map on the runtime, or the shared worker on a client.
…s, as well as used a hack for fixing API defs, think Yuri's PR needs a change to work with SDK
| #[builder(default = "Arc::new(AtomicUsize::new(0))")] | ||
| pub max_cached_workflows: Arc<AtomicUsize>, |
There was a problem hiding this comment.
The Arc shouldn't be in the config itself - users (lang) shouldn't need to create an Arc or know about it.
Rather, just use it everywhere someone was referencing config.
There was a problem hiding this comment.
AtomicUsize doesn't implement Clone, which WorkerConfig derives
There was a problem hiding this comment.
Sorry, I mean the config itself shouldn't change at all. It should stay a normal usize. Then you create the arc'd atomic based on it in init.
There was a problem hiding this comment.
ahhh, yeah that makes sense. ty
| ) | ||
| } | ||
|
|
||
| pub(crate) fn mock_worker_with_heartbeat(mock: MockWorkerClient, config: WorkerConfig) -> Worker { |
There was a problem hiding this comment.
not used rn, will use for the test i need to fix up, feel free to ignore for now
|
New changes with latest change:
|
| /// Mirrors `WorkerConfig`, but with atomic structs to allow Worker Commands to make config changes | ||
| /// from the server | ||
| #[derive(Clone)] | ||
| pub(crate) struct WorkerConfigInner { |
There was a problem hiding this comment.
My first reaction was that I didn't like this at all, but I get why you did it. That said I think we can accomplish the same goal with less repetition.
I think: keep the "new/mutable" fields at the top like you've done here, and then have an original_config field which is an Arc<WorkerConfig> (Arc purely to make copying cheaper). Docstring can make it clear that the contents in there can't change (one rub might be the Tuners, if/when we make that remotely changeable).
| features = ["history_builders", "serde_serialize"] | ||
| #features = ["history_builders"] |
There was a problem hiding this comment.
This shouldn't need to have changed I think
| pub(crate) type HeartbeatCallback = Arc<dyn Fn() -> WorkerHeartbeat + Send + Sync>; | ||
| pub(crate) type WorkerDataMap = HashMap<String, Arc<Mutex<WorkerHeartbeatData>>>; |
There was a problem hiding this comment.
Short docstrings would be good on these
| /// SharedNamespaceWorker is responsible for polling worker commands and sending worker heartbeat | ||
| /// to the server. This communicates with all workers in the same process that share the same | ||
| /// namespace. |
There was a problem hiding this comment.
| /// SharedNamespaceWorker is responsible for polling worker commands and sending worker heartbeat | |
| /// to the server. This communicates with all workers in the same process that share the same | |
| /// namespace. | |
| /// SharedNamespaceWorker is responsible for polling nexus-delivered worker commands and sending worker heartbeats | |
| /// to the server. This communicates with all workers in the same process that share the same | |
| /// namespace. |
| // Worker commands | ||
| workflow_cache_size: Arc<AtomicUsize>, | ||
| workflow_poller_behavior: PollerBehavior, |
There was a problem hiding this comment.
Why are these in the heartbeat data?
| } | ||
| } | ||
|
|
||
| fn fetch_config(&self) -> fetch_worker_config_response::WorkerConfigEntry { |
There was a problem hiding this comment.
I don't think this belongs on data. Semantically this is more like another callback.
| all_permits_tracker: tokio::sync::Mutex<AllPermitsTracker>, | ||
| /// Used to shutdown the worker heartbeat task | ||
| worker_heartbeat: Option<WorkerHeartbeatManager>, | ||
| worker_heartbeat_data: Option<Arc<Mutex<WorkerHeartbeatData>>>, |
There was a problem hiding this comment.
Similar issue here - WorkerHeartbeatData is mixing a lot of concerns. Leave data as just data, and separate out the behavior. My comments on capture_heartbeat / fetch_config get at that.
Like I had in my comment on the last review round - we can just have get_worker_heartbeat_data(), which you do have, but we don't need to actually store data - you can just construct a new one every time it's called.
| } | ||
|
|
||
| // TODO: rename | ||
| // TODO: impl trait so entire struct doesn't need to be passed to Worker and SharedNamespaceWorker |
There was a problem hiding this comment.
Shouldn't be necessary anyway, per my other comments.
| }; | ||
|
|
||
| let np_metrics = metrics.with_new_attrs([nexus_poller()]); | ||
| // This starts the poller thread. |
There was a problem hiding this comment.
Not really a thread - tasks. But, kind of an uninteresting comment anyway
| /// there will need to be some associated refactoring. // TODO: sounds like we'll need to do this | ||
| max_permits: Option<Arc<AtomicUsize>>, |
There was a problem hiding this comment.
Potentially, yes. Let's leave proper implementations of the commands for later though, since this PR is already quite large, and they won't be working server side for a while anyway.
|
Closing this PR in favor separating this out into a process-wide heartbeat PR, and a worker commands PR that will come in the future. |
What was changed
Why?
Checklist
Closes
How was this tested: