Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio_stream::StreamMap allows duplicate keys #4774

Open
nashley opened this issue Jun 17, 2022 · 21 comments
Open

tokio_stream::StreamMap allows duplicate keys #4774

nashley opened this issue Jun 17, 2022 · 21 comments
Labels
A-tokio-stream Area: The tokio-stream crate C-bug Category: This is a bug.

Comments

@nashley
Copy link

nashley commented Jun 17, 2022

Version
List the versions of all tokio crates you are using.
This is version independent, but here you go:

$ cargo tree | grep tokio
tokio_stream--StreamMap v0.1.0 (/home/user/git/tmp/tokio_stream--StreamMap)
└── tokio-stream v0.1.9
    └── tokio v1.19.2

Platform
The output of uname -a (UNIX), or version and 32 or 64-bit (Windows)
This is platform independent, but here you go:

$ uname -a
Linux archlinux 5.18.3-arch1-1 #1 SMP PREEMPT_DYNAMIC Thu, 09 Jun 2022 16:14:10 +0000 x86_64 GNU/Linux

Description
[short summary of the bug]
tokio_stream::StreamMap::insert followed by tokio_stream::StreamMap::extend allows for duplicate keys.

[code sample that causes the bug]

use tokio_stream;

fn main() {
    let mut streams = tokio_stream::StreamMap::new();
    streams.insert("key", tokio_stream::pending::<u8>());
    streams.extend(vec![
        ("key", tokio_stream::pending::<u8>())
    ]);

    for (key, value) in streams.iter() {
        println!("{}: {:?}", key, value);
    }
}

I expected that the duplicate keys would overwrite existing ones like they are with from_iter (which calls insert, which calls remove).
However, I think it'd be best if both methods raised errors (see #4775) or at least returned a list of the replaced keys to match insert's functionality. This way, the calling code would at least know what keys were replaced.

Instead, you can see that the StreamMap now contains duplicate keys:

key: Pending(PhantomData)
key: Pending(PhantomData)
@nashley nashley added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Jun 17, 2022
@nashley
Copy link
Author

nashley commented Jun 17, 2022

PR that introduced this: #4272

@Darksonn Darksonn added A-tokio-stream Area: The tokio-stream crate and removed A-tokio Area: The main tokio crate labels Jun 17, 2022
@nashley
Copy link
Author

nashley commented Jun 17, 2022

For reference, here's an example showing that from_iter removes duplicate keys:

use tokio_stream;

fn main() {
    let streams_from_iter = tokio_stream::StreamMap::from_iter(vec![
        ("key", tokio_stream::pending::<u8>()),
        ("key", tokio_stream::pending::<u8>())
    ]);

    for (key, value) in streams_from_iter.iter() {
        println!("{}: {:?}", key, value);
    }
}

Results in:

key: Pending(PhantomData)

@nashley
Copy link
Author

nashley commented Jun 17, 2022

If this is indeed undesired behavior, then I think the simplest solution is to just iterate over the Vec<(K, V)> and call insert on each entry. I can make a PR for that if desired.

@Darksonn
Copy link
Contributor

I agree that this is a bug. It should not allow duplicate keys.

@nashley
Copy link
Author

nashley commented Jul 31, 2022

How concerned with the runtime performance impact are we?

Given n is the length of the existing StreamMap and m is the length of the Vec being appended:
With the naive solution...

// O(n*m) or worse if the underlying `Vec` needs to be resized, potentially multiple times:
for (key, value) in iter {
	self.insert(key, value);
}

... the runtime goes from O(n+m) (just an [allocation + copy] if necessary) to O(n*m)[1], since each of the keys being inserted is checked against each of the keys that already exist.

[1] or worse if the underlying Vec is unable to hold all of iter's items. We can't use reserve due to iter not being ExactSizeIterator.

Alternatively, we could still use extend but then sort and deduplicate the result...

self.entries.extend(iter.into_iter()); // O(n+m)
// O((n+m) * log(n+m)):
self.entries.sort_by(|a, b|
	if a.0.eq(&b.0) {
		std::cmp::Ordering::Equal
	} else {
		std::cmp::Ordering::Less
	}
);
self.entries.dedup_by(|a, b| a.0.eq(&b.0)); // O(n+m)

This removes duplicates (while preserving the original keys but not their order) with a runtime of O((n+m)*(2+log(n+m))), but I think it's more fragile. It also doesn't allow us to return the items that were ignored due to being duplicates unless we implement our own dedup_by.

I'd prefer the former solution unless we have reason to believe this will lead to a noticeable performance impact.

@Noah-Kennedy
Copy link
Contributor

I think that going to polynomial time is fundamentally unworkable here.

@nashley
Copy link
Author

nashley commented Jul 31, 2022

Should from_iter be changed to not run in O((n^2-n)/2) then? It uses the naive solution that I listed above.

I'm not sure what expected values of n and m are, but here's a graph of the three runtimes: https://www.desmos.com/calculator/iwn1q7w7t6

@Darksonn
Copy link
Contributor

Darksonn commented Aug 1, 2022

Well, on one hand, the StreamMap type already has some pretty bad running time complexities. On the other hand, I would prefer to not make them worse.

@ghost
Copy link

ghost commented Aug 5, 2022

Also, the incoming vector in streams.extend could have duplicates within it.

@ghost
Copy link

ghost commented Aug 5, 2022

One more thing, the duplicates are removed using swap_remove function which replaces the removed element with last element. This will reorder the vector which goes against the specification of StreamMap which promises to retain the incoming order.

return Some(self.entries.swap_remove(i).1);

@nashley
Copy link
Author

nashley commented Aug 5, 2022

This will reorder the vector which goes against the specification of StreamMap which promises to retain the incoming order

Where is that specified?
I only see this:

StreamMap is similar to StreamExt::merge in that it combines source streams into a single merged stream that yields values in the order that they arrive from the source streams

Which, as I understand it, applies to the order of values received within a stream rather than the order of the streams themselves.

All the -> Iterator<> methods are documented with:

An iterator visiting ... in arbitrary order.

Am I misunderstanding something?

@ghost
Copy link

ghost commented Aug 5, 2022

@nashley I interpreted the quote you highlighted as the StreamMap should return items in the order they were added similar to a FIFO queue.

@ghost
Copy link

ghost commented Aug 5, 2022

I have a proposal. If the proposal is acceptable then I'll do the code change. The proposal is to maintain a HashMap (or BTreeMap?) with key value pair as `<key, index in vector>. Before inserting we can check in the HashMap for the key and if it's present, we can get the position in the vector and perform the deletion. This approach would remove the O(n) complexity for every insert and replace it with the complexity to fetch and insert into the HashMap. I am not sure of the complexity of Rust HashMap. We can use some other map implementations if any one has suggestions.

@Darksonn
Copy link
Contributor

Darksonn commented Aug 9, 2022

I would be ok with using a HashMap. I think you can just store the Stream directly in the map without the index trick. There's no need to retain the ordering of the streams - when streams become ready at the same time, we may return them in every order.

@nashley
Copy link
Author

nashley commented Aug 9, 2022

@DevSabb let me know if you want assistance implementing, documenting, or testing this.

@ghost
Copy link

ghost commented Aug 9, 2022

@nashley The change now is more complex that one I proposed. I'll take a look into the code before confirming if I wan't to pick this up. Is it okay if I confirm in couple of days?

@nashley
Copy link
Author

nashley commented Aug 9, 2022

Of course! I can also work on implementing this if you change your mind.

@ghost
Copy link

ghost commented Aug 10, 2022

@nashley I started working on this ticket. Will let you know if I have questions.

@ghost
Copy link

ghost commented Aug 11, 2022

I have a question. I had to include the traits Hash + Eq to the extend function (see below). I get an error that impl has stricter requirements than trait. Any idea how can I fix this.

impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
    fn extend<T>(&mut self, iter: T)
    where
        T: IntoIterator<Item = (K, V)>,
        K: Hash + Eq,
    {
        for (key, value) in iter {
            self.entries.insert(key, value);
        }
    }
}

@Darksonn
Copy link
Contributor

Ah that sucks. It would require a breaking release to make this change, then.

(the particular error in question is fixed by putting the K: Hash bound on the impl block instead of the fn, but it's still a breaking change, so it won't work for other reasons)

@ghost
Copy link

ghost commented Aug 11, 2022

@Darksonn Thank you! Yes, I had to include the trait Eq + Hash in multiple places.

I am also wondering if I can get some suggestion on how to transform the map iterator to list iterator

    pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
        self.entries.iter()
    }

Error:

type mismatch resolving `<std::collections::hash_map::Iter<'_, K, V> as std::iter::Iterator>::Item == &(K, V)`

expected tuple, found `&(K, V)`

note:  expected tuple `(&K, &V)`
      found reference `&(K, V)`

baum added a commit to baum/tokio that referenced this issue Mar 29, 2023
…kable.

However, this implementation does not allow duplicate keys and would overwrite
existing ones similar to from_iter() behaviour.

Initially introduced: PR tokio-rs#4272 - tokio-rs#4272

Note: can not implement Extend trait, since this method has stricter
requirements than trait, as Extend does not require Key to be 'Eq + Hash'

Issue tokio-rs#4774 - tokio-rs#4774
baum added a commit to baum/tokio that referenced this issue Mar 29, 2023
…kable.

However, this implementation does not allow duplicate keys and would overwrite
existing ones similar to from_iter() behaviour.

Initially introduced: PR tokio-rs#4272 - tokio-rs#4272

Note: can not implement Extend trait, since this method has stricter
requirements than trait, as Extend does not require Key to be 'Eq + Hash'

Issue tokio-rs#4774 - tokio-rs#4774
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-stream Area: The tokio-stream crate C-bug Category: This is a bug.
Projects
None yet
Development

No branches or pull requests

3 participants