Skip to content

Worker heartbeat #953

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open

Conversation

yuandrew
Copy link
Contributor

@yuandrew yuandrew commented Jul 2, 2025

What was changed

New worker heartbeat functionality. There are a few calls that also send this heartbeat data, and there is a timer that runs in the background to ensure the worker heartbeats within heartbeat_interval if there are no calls that send this heartbeat data.

misc changes:

  • Renamed mock_workflow_client to mock_worker_client
  • fixed spelling for la_permit_dealer

Why?

new feature

Checklist

  1. Closes

  2. How was this tested:

new unit test

  1. Any docs updates needed?

yuandrew added 6 commits June 26, 2025 16:08
…heartbeat to clientBag, but that doesn't work bc clientBag not in mutex. Might need to revert back to passing client to heartbeat_info, but that causes the original problem of the timer thread not having the actual client
…ss. Now i need to write a test to verify the timer mechanism with an existing gRPC request. Or maybe heartbeating is enough, that it runs twice?
@yuandrew yuandrew requested a review from a team as a code owner July 2, 2025 00:16
Copy link
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall making sense to me! A few things to change.

core/src/lib.rs Outdated
Comment on lines 113 to 114
// TODO: Adding this afterwards feels a little clunky
heartbeat_info.lock().add_client(client_bag.clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to fix this (which I think is worth doing) is to have WorkerHeartbeatInfo become WorkerHeartbeatData or something, with WorkerHeartbeatInfo as a field inside it. Then, that can be created first, passed into the client bag, and passed into WorkerHeartbeatData when it is constructed with the client.

Oh, and I see later on you're pretty much already doing that. Client bag doesn't really need the whole heartbeat info, just the data and a way to send the notification to reset timers. So, data could have something like a channel in it that is used to reset the timer loop in the spawned task. Potentially also this way you don't abort the existing task and create a new one each time, but rather just have one task that lives as long as the worker and just internally resets the timer via that channel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also seems like this should all happen inside Worker::new rather than here, since everything needed is passed down into there anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out I can remove WorkerHeartbeatInfo altogether, since the heartbeating task got moved to a long-running task

Comment on lines 112 to 113
// TODO: Is this right for worker_identity?
worker_identity: worker_config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be, but I'm worried we need to get the client identity if this one wasn't set. I think I'm also doing it wrong here:

worker_identity: config.client_identity_override.clone().unwrap_or_default(),

I think what needs to happen is WorkerClient trait probably needs a get_identity function. Then, in the worker constructor, the line I linked can be fixed. As for here... not sure. Could potentially just mutate the override in WorkerConfig in the constructor before it's passed down into here, or just pass the client identity separately down to here. Passing separately is probably a bit cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a bail! statement in init_worker to ensure either lang or client sets this value

Comment on lines 230 to 231
// heartbeat timer fires once
tokio::time::sleep(Duration::from_millis(150)).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically these sleeps could be avoided by having returning in the mock fire down a channel. Not a super huge deal but also should be easy to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually want to test specific sleep times to make sure I try heartbeating in the right spot, new iteration of the test should be more obvious for this


/// Transform heartbeat data into `WorkerHeartbeat` we can send in gRPC request. Some
/// metrics are also cached for future calls of this function.
pub(crate) fn capture_heartbeat(&mut self) -> WorkerHeartbeat {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is called by every poll function, but we don't want to send it on every poll - only if it's been nearly a full interval since the last time anyone sent anything. So, probably this needs to get renamed to something like capture_heartbeat_if_needed, and should return Optional<WorkerHeartbeat>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put "nearly" here as 90%, lmk if you think that number should be different

Copy link
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type system puzzle totally nerd sniped me

Comment on lines 913 to 917
}
}

fn create_worker_heartbeat_process(
data: Arc<Mutex<WorkerHeartbeatData>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should live in heartbeat.rs

Comment on lines 70 to 72
self.heartbeat_time = Some(now);
if let Some(reset_tx) = &self.reset_tx {
let _ = reset_tx.send(());
Copy link
Member

@Sushisource Sushisource Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This state should be unrepresentable. A fun type puzzle!

I think the way to fix this is to separate out data from some kind of handle for calling this function. The handle is created early on (including the reset channel inside it) and the data is created later, down inside Worker::new. Initially the handle has no capability but once it's passed into the constructor for data it becomes useful.

This sort of "uninitialized" state still sort of exists between those two calls, but, at least it exists in a way that works fairly naturally with the return type of Option<WorkerHeartbeat>, or in a way that can block until everything is initialized.

One option is something like having the handle contain a req_resp: Sender<Receiver<Option<WorkerHeartbeat>>> which can be used inside the timing task to decide if the caller should get a heartbeat or not (also obviates then need for the dedicated reset channel). Dunno if that's overkill or not lol.

The more obvious option is to just have a OnceLock<Box<dyn Fn() -> Option<WorkerHeartbeat>>> or similar, which can be waited on to do semantically the same thing as the channel-of-channel. It's probably a bit cleaner than nested channels.

I dunno, fun one, I thought about this for a while and had to stop myself from just trying to do it. Feel free to play around. I maybe spend too much time trying to eliminate states like these. If you think it's not worth it, which is legit, change this to a dbg_panic!

Comment on lines 83 to 85
struct WorkerHeartbeatData {
worker_instance_key: String,
pub(crate) worker_identity: String,
worker_identity: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, de-pub :feelsgoodman:

client: Arc<dyn WorkerClient>,
) -> Self {
let sdk_name_and_ver = client.sdk_name_and_version();
let (reset_tx, reset_rx) = watch::channel(());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm realizing this should probably just be a https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html

Copy link
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good to me. Just a few things to clean up.

continue
};
if let Err(e) = client.clone().record_worker_heartbeat(heartbeat).await {
warn!(error=?e, "Network error while sending worker heartbeat");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think probably move this warning after the return-if-unimplemented

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh I was originally thinking we'd want a warning either way, but I guess it makes sense not to bother informing users that don't even have this feature that it's being skipped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess an argument for logging it would be to try to entice folks to upgrade their server versions for this new feature?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, if they read about it and decide they want it, that's how I'd expect they'd learn anyway.

Comment on lines 215 to 216
// heartbeat timer fires once
tokio::time::sleep(Duration::from_millis(300)).await;
Copy link
Member

@Sushisource Sushisource Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can avoid the sleeps by using advance and family which are behind test-utils flags for tokio: https://docs.rs/tokio/latest/tokio/time/index.html

Which is very nice. Probably I should be using that in more tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I can make a follow up PR moving over more tests to use this too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, is this needed? From https://docs.rs/tokio/latest/tokio/time/fn.advance.html

If you want to do that, you should instead call sleep and rely on the runtime’s auto-advance feature.

Sounds like there should be an auto-advance feature with sleep, but not seeing any information on this in the sleep page

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I took a quick initial stab at it, seeing some test failures, maybe not as straightforward as I thought. For now I'll create an issue to track this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants