Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: provide StreamMap utility #2185

Merged
merged 7 commits into from
Feb 1, 2020
Merged

stream: provide StreamMap utility #2185

merged 7 commits into from
Feb 1, 2020

Conversation

carllerche
Copy link
Member

StreamMap is similar to StreamExt::merge in that it combines source
streams into a single merged stream that yields values in the order that
they arrive from the source streams. However, StreamMap has a lot more
flexibility in usage patterns.

StreamMap can:

  • Merge an arbitrary number of streams.
  • Track which source stream the value was received from.
  • Handle inserting and removing streams from the set of managed streams
    at any point during iteration.

All source streams held by StreamMap are indexed using a key. This key
is included with the value when a source stream yields a value. The key
is also used to remove the stream from the StreamMap before the stream
has completed streaming.

Because the StreamMap API moves streams during runtime, both streams
and keys must be Unpin. In order to insert a !Unpin stream into a
StreamMap, use pin! to pin the stream to the stack or Box::pin to
pin the stream in the heap.

`StreamMap` is similar to `StreamExt::merge` in that it combines source
streams into a single merged stream that yields values in the order that
they arrive from the source streams. However, `StreamMap` has a lot more
flexibility in usage patterns.

`StreamMap` can:

- Merge an arbitrary number of streams.
- Track which source stream the value was received from.
- Handle inserting and removing streams from the set of managed streams
  at any point during iteration.

All source streams held by `StreamMap` are indexed using a key. This key
is included with the value when a source stream yields a value. The key
is also used to remove the stream from the `StreamMap` before the stream
has completed streaming.

Because the `StreamMap` API moves streams during runtime, both streams
and keys must be `Unpin`. In order to insert a `!Unpin` stream into a
`StreamMap`, use `pin!` to pin the stream to the stack or `Box::pin` to
pin the stream in the heap.
Copy link
Contributor

@jebrosen jebrosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to have a realistic example of why it can be helpful to know which stream produced a value, and possibly a comparison to approaches where the stream identifies itself in the values it produces (e.g. with stream.map(|v| ("stream 1", v)).

tokio/src/util/mod.rs Outdated Show resolved Hide resolved
tokio/src/util/mod.rs Outdated Show resolved Hide resolved
tokio/src/stream/stream_map.rs Show resolved Hide resolved
@carllerche
Copy link
Member Author

@jebrosen I added a more involved example. Is that what you were thinking?

Copy link
Contributor

@jebrosen jebrosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but someone else should look at the poll_next_entry implementation more closely than I did.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm honestly rather concerned about the running time of insert and remove. I'm okay with remove being O(n), but I think it's a major pitfall if inserting n streams is a quadratic time operation.

where
K: Hash + Eq,
{
let ret = self.remove(&k);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes insertion of n streams O(n^2).

/// assert!(map.remove(&1).is_some());
/// assert!(map.remove(&1).is_none());
/// ```
pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the docs should mention that this is an expensive operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The struct docs include a note, is this sufficient?

@carllerche
Copy link
Member Author

@Darksonn re: complexity, the thing to keep in mind is, the current implementation needs to scan regardless of the backend structure due to how polling scans all entries.

Switching to stds HashMap would be tricky due to the requirement of starting randomly to ensure fairness. I opted to back by a Vec and scan on insert / remove given the requirement to scan on poll anyway. This may change over time. The struct docs include a note about keeping the number of streams in the map "smallish".

@Darksonn
Copy link
Contributor

Darksonn commented Jan 28, 2020

I didn't actually notice the note in the docs above. That said, one could have a Vec<V> together with a HashMap<K, usize> and update indexes in the map on swap_remove.

I'm okay with it as long as there's a note, although I think it is somewhat unfortunate.

Edit: It would still have to be a Vec<(K, V)>.

@carllerche
Copy link
Member Author

It's a bit more complicated than that. One has to be able to get the entry key while iterating as well. This means, to use a HashMap, the key would have to be cloned and stored twice, once in the HashMap and once in the vec.

@carllerche carllerche merged commit ab24a65 into master Feb 1, 2020
@carllerche carllerche deleted the stream-map branch February 1, 2020 17:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants