Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shrink_to_fit and compact methods to DelayQueue #4170

Merged
merged 11 commits into from
Jan 9, 2022

Conversation

b-naber
Copy link
Contributor

@b-naber b-naber commented Oct 14, 2021

Fixes #3590

This PR introduces shrink_to_fit and compact methods that internally use the newly added functionality in the slab crate. Compact requires us to re-map Keys that were mapped to different indices in the slab, we use a HashMap for this to keep track of the Keys we handed out and the actual Keys that the slab uses. We also have to re-map the Keys that the Wheel uses in its slots, this is performed on each compact call.

@@ -1,3 +1,75 @@
warning: this attribute can only be applied to a `use` item
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure about this, I'd assume we have to remove the attribute. But I decided to keep this in, since I don't really know why we need this attribute in the first place and secondly why it fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file looks entirely unrelated to this change. Why is it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea, could it be that this is just a newly introduced lint in the compiler?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean, why did you change the macros_invalid_input.stderr file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't change this manually. This must have happened while running the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's weird. Can you undo the change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did remove that, but isn't it expected that the test suite modifies stderr output? Why is that lint triggered in the first place? Is that's what you find weird here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok nvm I understand. The test suite doesn't output what was contained in the stderr file, thats indeed pretty weird.

@Darksonn Darksonn added A-tokio-util Area: The tokio-util crate M-time Module: tokio/time labels Oct 14, 2021
@b-naber
Copy link
Contributor Author

b-naber commented Oct 20, 2021

Why does the security check fail now? I didn't change anything cargo related in the latest push.

@Darksonn
Copy link
Contributor

Darksonn commented Oct 21, 2021

Someone published a security issue for the chrono crate. It will be fixed in #4186.


/// List of keys that we can use to create new keys. See the comment for
/// `create_available_keys` for why this is necessary.
available_keys: LinkedList<usize>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a LinkedList? That seems rather wasteful when it could just be a vector.

Comment on lines 633 to 635
#[tokio::test]
async fn compact_expire_empty() {
time::pause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[tokio::test]
async fn compact_expire_empty() {
time::pause();
#[tokio::test(start_paused = true)]
async fn compact_expire_empty() {

@b-naber b-naber force-pushed the delayqueue_compact_shrink_to_fit branch from f8ed08e to 1e31107 Compare October 23, 2021 21:17
@b-naber
Copy link
Contributor Author

b-naber commented Oct 23, 2021

@Darksonn Thanks for the review, addressed your comments.

@b-naber b-naber force-pushed the delayqueue_compact_shrink_to_fit branch from 1e31107 to 50a4395 Compare October 23, 2021 21:21
tokio-util/src/time/delay_queue.rs Outdated Show resolved Hide resolved
tokio-util/src/time/delay_queue.rs Outdated Show resolved Hide resolved
tokio-util/src/time/delay_queue.rs Outdated Show resolved Hide resolved
tokio-util/src/time/delay_queue.rs Outdated Show resolved Hide resolved
@b-naber
Copy link
Contributor Author

b-naber commented Oct 25, 2021

@Darksonn Addressed your review.

Comment on lines 201 to 404
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct KeySlab {
index: usize,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably have a short documentation snippet for future readers of the code.

Comment on lines 344 to 548
let mut key = self.slab.insert(Data {
inner: value,
when,
expired: false,
next: None,
prev: None,
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really make sure to wrap these integers in our key types as soon as we possibly can.

Suggested change
let mut key = self.slab.insert(Data {
inner: value,
when,
expired: false,
next: None,
prev: None,
});
let mut key = KeySlab::new(self.slab.insert(Data {
inner: value,
when,
expired: false,
next: None,
prev: None,
}));

Comment on lines 328 to 367
self.insert_idx(when, key);
// `old_key` is the actual index the slab uses internally
self.insert_idx(when, old_key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these still use raw integers? It makes me feel a lot safer if we use the key-types as widely as we can, because it's just a lot more robust than raw integers.

Comment on lines 625 to 629
let key_map = &self.key_map;
let remapped_key = match key_map.get(&*key) {
Some(k) => *k,
None => (*key).into(),
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe refactor this into a method?

Comment on lines 160 to 165
/// A `compact` call requires a re-mapping of the `Key`s that were changed
/// during the `compact` call of the `slab`. Since the keys that were given out
/// cannot be changed retroactively we need to keep track of these re-mappings.
/// The keys of `key_map` correspond to the old keys that were given out and
/// the values to the `Key`s that were re-mapped by the `compact` call.
key_map: HashMap<Key, KeySlab>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't remove items from this map when they are removed from the DelayQueue. The following tests therefore fail:

#[tokio::test(start_paused = true)]
async fn remove_after_compact() {
    let now = Instant::now();
    let mut queue = DelayQueue::new();

    let foo_key = queue.insert_at("foo", now + ms(10));
    queue.insert_at("bar", now + ms(20));
    queue.remove(&foo_key);
    queue.compact();

    let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
        queue.remove(&foo_key);
    }));
    assert!(panic.is_err());
}

#[tokio::test(start_paused = true)]
async fn remove_after_compact_poll() {
    let now = Instant::now();
    let mut queue = task::spawn(DelayQueue::new());

    let foo_key = queue.insert_at("foo", now + ms(10));
    queue.insert_at("bar", now + ms(20));

    sleep(ms(10)).await;
    assert_eq!(assert_ready_ok!(poll!(queue)).key(), foo_key);

    queue.compact();

    let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
        queue.remove(&foo_key);
    }));
    assert!(panic.is_err());
}

Copy link
Contributor Author

@b-naber b-naber Oct 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do remove Keys from key_map here. I haven't tried this out, but it seems to me as if calling remove with a Key that was already removed should also panic in the current master. I think we should return an Option in remove.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, both of the tests I wrote here should panic in their last remove, i.e. the one I wrapped in catch_unwind. And it seems like the problem they are revealing is actually something different than not removing stuff from the map, namely that using remove with a key that doesn't exist, but which something else is mapped to, behaves incorrectly.

(though poll_idx still doesn't remove from the map as it should - another test would be needed for that case)

@b-naber
Copy link
Contributor Author

b-naber commented Oct 27, 2021

@Darksonn Isolated the key routing logic to the slab and fixed the bug revolving around remove.

@b-naber b-naber force-pushed the delayqueue_compact_shrink_to_fit branch from 9646ee3 to 58d9205 Compare October 27, 2021 15:00
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code for the delay queue has definitely become a lot simpler.

@@ -45,7 +45,8 @@ futures-io = { version = "0.3.0", optional = true }
futures-util = { version = "0.3.0", optional = true }
log = "0.4"
pin-project-lite = "0.2.0"
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
slab = { version = "0.4.4", optional = true } # Backs `DelayQueue`
tracing = "0.1.29"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's up with this?

tokio-util/src/time/delay_queue.rs Show resolved Hide resolved
Comment on lines 253 to 255
pub(crate) fn compact(&mut self) {
self.compact_called = true;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the implementation can be simplified a bit.

pub(crate) fn compact(&mut self) {
    if !self.compact_called {
        for (key, _) in self.inner.iter() {
            self.key_map.insert(key, key);
        }
    }

    let mut remapping = HashMap::new();
    slab.compact(|_, from, to| {
        remapping.insert(from, to);
        true
    });

    // At this point `key_map` contains a mapping for every element.
    for internal_key in self.key_map.values_mut() {
        if let Some(new_internal_key) = remapping.get(&*internal_key) {
            *internal_key = new_internal_key;
        }
    }
}

Comment on lines 325 to 281
// We maintain a set of available keys for efficiency reasons, so as not to calculate
// the smallest available key each time `self.inner.insert` outputs a duplicate key.
// The creation of new keys is necessary if the `self.inner.insert` call
// in `self.insert` gives back a key that was previously given out.
// This scenario of a duplicate key can only happen after `compact` was called.
fn create_available_keys(&mut self) {
assert!(self.available_keys.is_empty());
self.available_keys.reserve(AVAILABLE_KEYS_SET_SIZE);

let mut i = 0;
let mut num_created_keys = 0;
while num_created_keys < AVAILABLE_KEYS_SET_SIZE {
if !self.key_map.contains_key(&Key::new(i)) {
self.available_keys.insert(KeyInternal::new(i));
num_created_keys += 1;
}
i += 1;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of this. It makes insert run in linear time in some cases, which is not great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Darksonn I don't see a better solution to this. We can add removed Keys back into available_keys, this slightly limits the number of create_new_key calls, but when we add many new keys after a compact call then I think we just have to live with a linear runtime every AVAILABLE_KEYS_SET_SIZE insert calls.

I'm not sure if/how bug prone simply incrementing an index to create new Keys would be in practice... I guess not really. But if the key index does overflow this way then the bug is really subtle. We can't really handle an overflow since we then have to check each new Key index again for duplication.

Do you have a solution for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a counter for the next id seems fine to me if you do it like this:

while key_is_in_use(self.next_key) {
    self.next_key = self.next_key.wrapping_add(1);
}
return self.next_key;

@Darksonn
Copy link
Contributor

Darksonn commented Nov 9, 2021

Do you need further review on this from me at this time, or are you able to continue with the PR?

@b-naber
Copy link
Contributor Author

b-naber commented Nov 12, 2021

@Darksonn Updated the PR. Thanks for simplifying compact, it's looks a lot nicer now.

@b-naber
Copy link
Contributor Author

b-naber commented Dec 6, 2021

@Darksonn Do you want anything else to be changed here?

The lint error here is misplaced imo, I think it perfectly fine to have the if statement in this case. Using get_or_else or entry just leads to borrow checker problems and makes it less straightforward to read. Can we ignore this lint?

@Darksonn
Copy link
Contributor

Darksonn commented Dec 6, 2021

You could write it like this, which doesn't seem too bad?

if let Entry::Occupied(entry) = self.key_map.entry(key.into()) {
    entry.insert(key);
}

I'm not going to review the PR in detail today — this is exam week. Can you remind me again later?

Comment on lines 273 to 276
let remapped_key = match key_map.get(&*key) {
Some(k) => *k,
None => (*key).into(),
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears incorrect. You should only accept the direct mapping if compact has not been called.

tokio-util/src/time/delay_queue.rs Show resolved Hide resolved
Comment on lines 296 to 297
pub(crate) fn clear(&mut self) {
self.inner.clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to clear the hash map too. (It could even reset compact_called to false.)

Comment on lines 308 to 326
pub(crate) fn contains(&self, key: &Key) -> bool {
let remapped_key = self.remap_key(&key);
self.inner.contains(remapped_key.index)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the thing I mentioned on remap_key, this is incorrect and may return true for items that don't exists.

Comment on lines 333 to 335
fn index(&self, key: Key) -> &Self::Output {
let remapped_key = self.remap_key(&key);
&self.inner[remapped_key.index]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Comment on lines 777 to 803
/// the slab use [`compact`]
/// This function can take O(n) time even when the capacity cannot be reduced or the allocation is
/// shrunk in place. Repeated calls run in O(1) though.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want a line break, you have to include an empty line in the markdown. (Also, missing period.)

Suggested change
/// the slab use [`compact`]
/// This function can take O(n) time even when the capacity cannot be reduced or the allocation is
/// shrunk in place. Repeated calls run in O(1) though.
/// the slab use [`compact`].
///
/// This function can take O(n) time even when the capacity cannot be reduced or the allocation is
/// shrunk in place. Repeated calls run in O(1) though.

Comment on lines 775 to 777
/// This function is not guaranteed to, and in most cases, won't decrease the capacity of the slab
/// to the number of elements still contained in it. To decrease the capacity to the size of
/// the slab use [`compact`]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a bit more explicit about why this is.

/// let key2 = delay_queue.insert(10, Duration::from_secs(10));
/// let key3 = delay_queue.insert(15, Duration::from_secs(15));
///
/// delay_queue.remove(&key3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not removing the last insert seems like a better test.

Suggested change
/// delay_queue.remove(&key3);
/// delay_queue.remove(&key2);

Comment on lines 143 to 147
/// Advances the timer up to the instant represented by `now`.
#[instrument(skip(self, store), level = "debug")]
pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
loop {
debug!("inside loop of wheel::poll");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be testing prints. Please take them out.

@b-naber
Copy link
Contributor Author

b-naber commented Dec 12, 2021

You could write it like this, which doesn't seem too bad?

if let Entry::Occupied(entry) = self.key_map.entry(key.into()) {
    entry.insert(key);
}

This isn't what we want semantically though. We actually want to insert a key/value pair with a new key (key_to_give_out), so entry isn't really applicable here.

@b-naber b-naber force-pushed the delayqueue_compact_shrink_to_fit branch from 03a981e to c00d58c Compare December 12, 2021 20:32
@b-naber
Copy link
Contributor Author

b-naber commented Dec 13, 2021

@Darksonn Any other changes needed?

@Darksonn
Copy link
Contributor

It's on my todo-list and I will have another look soon.

@b-naber
Copy link
Contributor Author

b-naber commented Dec 13, 2021

Thanks, sorry if the question might have come across as somewhat pushy, it wasn't meant that way.

@Darksonn
Copy link
Contributor

No worries. PRs do sometimes gets lost, and it's totally fair to ping me. :)

tokio-util/src/time/delay_queue.rs Show resolved Hide resolved
Comment on lines 272 to 273
// corresponding internal key. Returns None if there was no compact
// call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems wrong. It always returns Some if compact has not been called.

Comment on lines 283 to 291
fn create_new_key(&self) -> KeyInternal {
let mut next_key_index = self.next_key_index;

while self.key_map.contains_key(&Key::new(next_key_index)) {
next_key_index = next_key_index.wrapping_add(1);
}

KeyInternal::new(next_key_index)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to remember the value of next_key_index here for the next call.

if let Some(next) = store[*item].next {
store[next].prev = store[*item].prev;
if let Some(next) = store[key].next {
store[Key::new(next)].prev = store[key].prev;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the next/prev fields and variables should just be of type Key themselves instead of wrapping them here. We want as little wrapping and unwrapping as possible.

@b-naber
Copy link
Contributor Author

b-naber commented Dec 28, 2021

@Darksonn Can you take another look at this, please?

Comment on lines 1086 to 1089
impl<T> wheel::Stack for Stack<T> {
type Owned = usize;
type Borrowed = usize;
type Store = Slab<Data<T>>;
type Store = SlabStorage<T>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too familiar with the wheel::Stack trait, so it may not be possible, but shouldn't these be Key as well rather than usize?

@b-naber
Copy link
Contributor Author

b-naber commented Dec 28, 2021

@Darksonn fixed that.

@b-naber
Copy link
Contributor Author

b-naber commented Jan 2, 2022

@Darksonn Can you take one more look maybe? I'd like to finish this PR soon.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any other comments.

@b-naber
Copy link
Contributor Author

b-naber commented Jan 9, 2022

@Darksonn Just pinging in case you forgot about this. When do you want to merge this?

@Darksonn Darksonn merged commit c800dea into tokio-rs:master Jan 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-util Area: The tokio-util crate M-time Module: tokio/time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DelayQueue::shrink_to_fit
2 participants