Skip to content

Commit

Permalink
fix: triggering the queue removes stale waiting_event
Browse files Browse the repository at this point in the history
  • Loading branch information
saibatizoku committed Mar 28, 2024
1 parent 696e12a commit b5e034f
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions hermes/bin/src/runtime_extensions/hermes/cron/queue.rs
Expand Up @@ -142,8 +142,14 @@ impl CronEventQueue {
.ok_or(Error::InvalidTimestamp)?
.try_into()?;
// drop the old waiting task if it has passed, retain if it hasn't.
self.waiting_event
.retain(|_, (waiting_for, _)| *waiting_for > trigger_time);
if let Some((_key, (_, handle))) = self
.waiting_event
.remove_if(&Self::WAITING_EVENT_TASK_ID, |_, (waiting_for, _)| {
*waiting_for <= trigger_time
})
{
handle.join().map_err(|_| Error::CronQueueTaskFailed)?;
}
// Get the next timestamp in the queue, and the list of apps that should be triggered.
while let Some((ts, app_names)) = self.next_in_queue() {
if trigger_time >= ts {
Expand All @@ -153,7 +159,8 @@ impl CronEventQueue {
} else {
// If the timestamp is in the future:
// * update the waiting task
self.update_waiting_task(trigger_time, ts);
let sleep_duration = ts - trigger_time;
self.update_waiting_task(ts, sleep_duration);
// Since `ts` is in the future, we can break
break;
}
Expand All @@ -162,20 +169,19 @@ impl CronEventQueue {
}

/// Update the waiting task.
fn update_waiting_task(&self, trigger_time: CronTimestamp, timestamp: CronTimestamp) {
fn update_waiting_task(&self, timestamp: CronTimestamp, sleep_duration: CronTimestamp) {
// Create a new waiting task.
let duration = timestamp - trigger_time;
self.waiting_event
.entry(Self::WAITING_EVENT_TASK_ID)
.and_modify(|(waiting_for, handle)| {
// `timestamp` is before the task that is waiting,
// so we need to update the waiting task, and cancel
// the old one, if it exists.
if *waiting_for > timestamp {
(*waiting_for, *handle) = new_waiting_task(timestamp, duration);
(*waiting_for, *handle) = new_waiting_task(timestamp, sleep_duration);
}
})
.or_insert_with(|| new_waiting_task(timestamp, duration));
.or_insert_with(|| new_waiting_task(timestamp, sleep_duration));
}

/// Pop the first item from all the `BTreeMap`s belonging
Expand Down Expand Up @@ -309,7 +315,7 @@ mod tests {
}
}

/// Initialize the CronEventQueue and the HermesEventQueue with
/// Initialize the `CronEventQueue` and the `HermesEventQueue` with
/// the `HermesApp` named `HermesAppName(APP_NAME.to_string())`.
#[allow(clippy::unwrap_used)]
fn initialize_queue() -> (CronEventQueue, HermesEventLoopHandler) {
Expand Down

0 comments on commit b5e034f

Please sign in to comment.