-
Notifications
You must be signed in to change notification settings - Fork 90
Worker heartbeat #953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Worker heartbeat #953
Conversation
…heartbeat to clientBag, but that doesn't work bc clientBag not in mutex. Might need to revert back to passing client to heartbeat_info, but that causes the original problem of the timer thread not having the actual client
…ss. Now i need to write a test to verify the timer mechanism with an existing gRPC request. Or maybe heartbeating is enough, that it runs twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall making sense to me! A few things to change.
core/src/lib.rs
Outdated
// TODO: Adding this afterwards feels a little clunky | ||
heartbeat_info.lock().add_client(client_bag.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to fix this (which I think is worth doing) is to have WorkerHeartbeatInfo
become WorkerHeartbeatData
or something, with WorkerHeartbeatInfo
as a field inside it. Then, that can be created first, passed into the client bag, and passed into WorkerHeartbeatData
when it is constructed with the client.
Oh, and I see later on you're pretty much already doing that. Client bag doesn't really need the whole heartbeat info, just the data and a way to send the notification to reset timers. So, data could have something like a channel in it that is used to reset the timer loop in the spawned task. Potentially also this way you don't abort the existing task and create a new one each time, but rather just have one task that lives as long as the worker and just internally resets the timer via that channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also seems like this should all happen inside Worker::new
rather than here, since everything needed is passed down into there anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out I can remove WorkerHeartbeatInfo altogether, since the heartbeating task got moved to a long-running task
core/src/worker/heartbeat.rs
Outdated
// TODO: Is this right for worker_identity? | ||
worker_identity: worker_config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be, but I'm worried we need to get the client identity if this one wasn't set. I think I'm also doing it wrong here:
sdk-core/core/src/worker/mod.rs
Line 328 in 50fc882
worker_identity: config.client_identity_override.clone().unwrap_or_default(), |
I think what needs to happen is WorkerClient
trait probably needs a get_identity
function. Then, in the worker constructor, the line I linked can be fixed. As for here... not sure. Could potentially just mutate the override in WorkerConfig
in the constructor before it's passed down into here, or just pass the client identity separately down to here. Passing separately is probably a bit cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a bail!
statement in init_worker
to ensure either lang or client sets this value
core/src/worker/heartbeat.rs
Outdated
// heartbeat timer fires once | ||
tokio::time::sleep(Duration::from_millis(150)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically these sleeps could be avoided by having returning
in the mock fire down a channel. Not a super huge deal but also should be easy to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually want to test specific sleep times to make sure I try heartbeating in the right spot, new iteration of the test should be more obvious for this
core/src/worker/heartbeat.rs
Outdated
|
||
/// Transform heartbeat data into `WorkerHeartbeat` we can send in gRPC request. Some | ||
/// metrics are also cached for future calls of this function. | ||
pub(crate) fn capture_heartbeat(&mut self) -> WorkerHeartbeat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is called by every poll function, but we don't want to send it on every poll - only if it's been nearly a full interval since the last time anyone sent anything. So, probably this needs to get renamed to something like capture_heartbeat_if_needed
, and should return Optional<WorkerHeartbeat>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put "nearly" here as 90%, lmk if you think that number should be different
…-running, use channel to reset timer,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type system puzzle totally nerd sniped me
core/src/worker/mod.rs
Outdated
} | ||
} | ||
|
||
fn create_worker_heartbeat_process( | ||
data: Arc<Mutex<WorkerHeartbeatData>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should live in heartbeat.rs
core/src/worker/heartbeat.rs
Outdated
self.heartbeat_time = Some(now); | ||
if let Some(reset_tx) = &self.reset_tx { | ||
let _ = reset_tx.send(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This state should be unrepresentable. A fun type puzzle!
I think the way to fix this is to separate out data from some kind of handle for calling this function. The handle is created early on (including the reset channel inside it) and the data is created later, down inside Worker::new
. Initially the handle has no capability but once it's passed into the constructor for data it becomes useful.
This sort of "uninitialized" state still sort of exists between those two calls, but, at least it exists in a way that works fairly naturally with the return type of Option<WorkerHeartbeat>
, or in a way that can block until everything is initialized.
One option is something like having the handle contain a req_resp: Sender<Receiver<Option<WorkerHeartbeat>>>
which can be used inside the timing task to decide if the caller should get a heartbeat or not (also obviates then need for the dedicated reset channel). Dunno if that's overkill or not lol.
The more obvious option is to just have a OnceLock<Box<dyn Fn() -> Option<WorkerHeartbeat>>>
or similar, which can be wait
ed on to do semantically the same thing as the channel-of-channel. It's probably a bit cleaner than nested channels.
I dunno, fun one, I thought about this for a while and had to stop myself from just trying to do it. Feel free to play around. I maybe spend too much time trying to eliminate states like these. If you think it's not worth it, which is legit, change this to a dbg_panic!
core/src/worker/heartbeat.rs
Outdated
struct WorkerHeartbeatData { | ||
worker_instance_key: String, | ||
pub(crate) worker_identity: String, | ||
worker_identity: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, de-pub :feelsgoodman:
core/src/worker/heartbeat.rs
Outdated
client: Arc<dyn WorkerClient>, | ||
) -> Self { | ||
let sdk_name_and_ver = client.sdk_name_and_version(); | ||
let (reset_tx, reset_rx) = watch::channel(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm realizing this should probably just be a https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good to me. Just a few things to clean up.
core/src/worker/heartbeat.rs
Outdated
continue | ||
}; | ||
if let Err(e) = client.clone().record_worker_heartbeat(heartbeat).await { | ||
warn!(error=?e, "Network error while sending worker heartbeat"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think probably move this warning after the return-if-unimplemented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh I was originally thinking we'd want a warning either way, but I guess it makes sense not to bother informing users that don't even have this feature that it's being skipped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess an argument for logging it would be to try to entice folks to upgrade their server versions for this new feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eh, if they read about it and decide they want it, that's how I'd expect they'd learn anyway.
core/src/worker/heartbeat.rs
Outdated
// heartbeat timer fires once | ||
tokio::time::sleep(Duration::from_millis(300)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can avoid the sleeps by using advance
and family which are behind test-utils
flags for tokio: https://docs.rs/tokio/latest/tokio/time/index.html
Which is very nice. Probably I should be using that in more tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I can make a follow up PR moving over more tests to use this too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, is this needed? From https://docs.rs/tokio/latest/tokio/time/fn.advance.html
If you want to do that, you should instead call sleep and rely on the runtime’s auto-advance feature.
Sounds like there should be an auto-advance feature with sleep, but not seeing any information on this in the sleep page
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, I took a quick initial stab at it, seeing some test failures, maybe not as straightforward as I thought. For now I'll create an issue to track this
What was changed
New worker heartbeat functionality. There are a few calls that also send this heartbeat data, and there is a timer that runs in the background to ensure the worker heartbeats within
heartbeat_interval
if there are no calls that send this heartbeat data.misc changes:
mock_workflow_client
tomock_worker_client
la_permit_dealer
Why?
new feature
Checklist
Closes
How was this tested:
new unit test