Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potentially panicing unchecked duration adds in runtime #1489

Merged
merged 2 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,9 @@ where
obj_ref,
reason: reschedule_reason,
},
run_at: reconciler_finished_at + requeue_after,
run_at: reconciler_finished_at
.checked_add(requeue_after)
.unwrap_or_else(crate::scheduler::far_future),
}),
result: Some(result),
}
Expand Down
20 changes: 15 additions & 5 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,19 @@
// Message is already pending, so we can't even expedite it
return;
}
let next_time = request
.run_at

Check warning on line 78 in kube-runtime/src/scheduler.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/scheduler.rs#L78

Added line #L78 was not covered by tests
.checked_add(*self.debounce)
.unwrap_or_else(far_future);

Check warning on line 80 in kube-runtime/src/scheduler.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/scheduler.rs#L80

Added line #L80 was not covered by tests
match self.scheduled.entry(request.message) {
// If new request is supposed to be earlier than the current entry's scheduled
// time (for eg: the new request is user triggered and the current entry is the
// reconciler's usual retry), then give priority to the new request.
Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => {
// Old entry will run after the new request, so replace it..
let entry = old_entry.get_mut();
self.queue
.reset_at(&entry.queue_key, request.run_at + *self.debounce);
entry.run_at = request.run_at + *self.debounce;
self.queue.reset_at(&entry.queue_key, next_time);
entry.run_at = next_time;
old_entry.replace_key();
}
Entry::Occupied(_old_entry) => {
Expand All @@ -93,8 +96,8 @@
// No old entry, we're free to go!
let message = entry.key().clone();
entry.insert(ScheduledEntry {
run_at: request.run_at + *self.debounce,
queue_key: self.queue.insert_at(message, request.run_at + *self.debounce),
run_at: next_time,

Check warning on line 99 in kube-runtime/src/scheduler.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/scheduler.rs#L99

Added line #L99 was not covered by tests
queue_key: self.queue.insert_at(message, next_time),
});
}
}
Expand Down Expand Up @@ -280,6 +283,13 @@
Scheduler::new(requests, debounce)
}

// internal fallback for overflows in schedule times
pub(crate) fn far_future() -> Instant {

Check warning on line 287 in kube-runtime/src/scheduler.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/scheduler.rs#L287

Added line #L287 was not covered by tests
// private method from tokio for convenience - remove if upstream becomes pub
// https://github.com/tokio-rs/tokio/blob/6fcd9c02176bf3cd570bc7de88edaa3b95ea480a/tokio/src/time/instant.rs#L57-L63
Instant::now() + Duration::from_secs(86400 * 365 * 30)

Check warning on line 290 in kube-runtime/src/scheduler.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/scheduler.rs#L290

Added line #L290 was not covered by tests
}

#[cfg(test)]
mod tests {
use crate::utils::KubeRuntimeStreamExt;
Expand Down
Loading