Skip to content

Commit

Permalink
timer: Fix DelayQueue delay reset logic (#851)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw authored and tobz committed Jan 20, 2019
1 parent 4c8f274 commit 983e9d1
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 3 deletions.
11 changes: 8 additions & 3 deletions tokio-timer/src/delay_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub struct DelayQueue<T> {
/// An entry in `DelayQueue` that has expired and removed.
///
/// Values are returned by [`DelayQueue::poll`].
///
///
/// [`DelayQueue::poll`]: struct.DelayQueue.html#method.poll
#[derive(Debug)]
pub struct Expired<T> {
Expand Down Expand Up @@ -496,15 +496,20 @@ impl<T> DelayQueue<T> {

// Normalize the deadline. Values cannot be set to expire in the past.
let when = self.normalize_deadline(when);
let old = self.start + Duration::from_millis(self.slab[key.index].when);

// This is needed only for the debug assertion inside the if-let.
let old = self.start + Duration::from_millis(self.slab[key.index].when);

self.slab[key.index].when = when;

if let Some(ref mut delay) = self.delay {
debug_assert!(old >= delay.deadline());

if old == delay.deadline() {
let start = self.start;
let next_poll = self.wheel.poll_at()
.map(move |t| start + Duration::from_millis(t));

if next_poll != Some(delay.deadline()) {
delay.reset(self.start + Duration::from_millis(when));
}
}
Expand Down
58 changes: 58 additions & 0 deletions tokio-timer/tests/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,64 @@ fn reset_entry() {
});
}

#[test]
fn reset_much_later() {
// Reproduces tokio-rs/tokio#849.
mocked(|timer, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();

let epoch = time.now();

turn(timer, ms(1));

let key = queue.insert_at("foo", epoch + ms(200));

task.enter(|| {
assert_not_ready!(queue);
});

turn(timer, ms(3));

queue.reset_at(&key, epoch + ms(5));

turn(timer, ms(20));

assert!(task.is_notified());
});
}

#[test]
fn reset_twice() {
// Reproduces tokio-rs/tokio#849.
mocked(|timer, time| {
let mut queue = DelayQueue::new();
let mut task = MockTask::new();

let epoch = time.now();

turn(timer, ms(1));

let key = queue.insert_at("foo", epoch + ms(200));

task.enter(|| {
assert_not_ready!(queue);
});

turn(timer, ms(3));

queue.reset_at(&key, epoch + ms(50));

turn(timer, ms(20));

queue.reset_at(&key, epoch + ms(40));

turn(timer, ms(20));

assert!(task.is_notified());
});
}

#[test]
fn remove_expired_item() {
mocked(|timer, time| {
Expand Down

0 comments on commit 983e9d1

Please sign in to comment.