Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE_REQ] Add recv_many to StreamMap #6367

Closed
esemeniuc opened this issue Feb 26, 2024 · 0 comments · Fixed by #6409
Closed

[FEATURE_REQ] Add recv_many to StreamMap #6367

esemeniuc opened this issue Feb 26, 2024 · 0 comments · Fixed by #6409
Labels
A-tokio-stream Area: The tokio-stream crate C-feature-request Category: A feature request. M-stream Module: tokio/stream

Comments

@esemeniuc
Copy link

esemeniuc commented Feb 26, 2024

Is your feature request related to a problem? Please describe.
Most channels in tokio offer recv_many and poll_recv_many, merged with #6236. StreamMap doesn't offer anything like this.

Describe the solution you'd like
An api like
https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.recv_many
https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv_many

Describe alternatives you've considered
My current hacked up solution

/// Returns a vec of all queued entries from a StreamMap
/// Returns empty vec if no packets received
/// Returns [Option::None] if no streams in StreamMap
/// See `poll_next_entry()` from https://docs.rs/tokio-stream/latest/src/tokio_stream/stream_map.rs.html#520
pub async fn streammap_collect_queued<K: Clone + Unpin, V: Stream + Unpin>(
    stream_map: &mut StreamMap<K, V>,
) -> Option<Vec<(K, V::Item)>> {
    match futures_util::poll!(pin!(stream_map.next())) {
        // check if packets are queued
        Poll::Ready(Some(sender_with_packets)) => {
            let mut collected = vec![sender_with_packets];
            // collect all queued packets
            while let Poll::Ready(Some(sender_with_packets)) =
                futures_util::poll!(pin!(stream_map.next()))
            {
                collected.push(sender_with_packets);
            }
            Some(collected)
        }
        // no streams in streammap
        Poll::Ready(None) => {
            warn!("StreamMap has no streams.");
            None
        }
        // streams don't have anything queued yet
        Poll::Pending => {
            // let other thread do work, maybe packet will come within timeout
            match timeout(Duration::from_millis(50), stream_map.next()).await {
                Ok(Some(sender_with_packets)) => {
                    let mut collected = vec![sender_with_packets];
                    while let Poll::Ready(Some(sender_with_packets)) =
                        futures_util::poll!(pin!(stream_map.next()))
                    {
                        collected.push(sender_with_packets);
                    }
                    Some(collected)
                }
                Ok(None) => {
                    warn!("StreamMap has no streams.");
                    None
                }
                // timeout, nothing was ready
                Err(_) => Some(vec![]),
            }
        }
    }
}

Tests:

    #[tokio::test]
    async fn test_streammap_multi_stream() {
        let mut map: StreamMap<usize, UnboundedReceiverStream<usize>> = StreamMap::new();
        let senders = (0..5)
            .map(|i| {
                let (sender, receiver) = unbounded_channel::<usize>();
                let _ = map.insert(i, UnboundedReceiverStream::new(receiver));
                sender
            })
            .collect::<Vec<_>>();

        // this would hang indefinitely, since no channel is exhausted yet, and nothing has been inserted
        // dbg!(map.next().await);

        senders
            .iter()
            .enumerate()
            .for_each(|(i, sender)| sender.send(i).unwrap());
        assert_eq!(
            streammap_collect_queued(&mut map)
                .await
                .unwrap()
                .into_iter()
                .sorted()
                .collect_vec(),
            vec![(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
        );

        // add empty channel
        let (_sender, receiver) = unbounded_channel();
        map.insert(5, UnboundedReceiverStream::new(receiver));
        assert_eq!(streammap_collect_queued(&mut map).await, Some(vec![]));

        // add some more values, offset by 10
        senders
            .iter()
            .enumerate()
            .for_each(|(i, sender)| sender.send(i + 10).unwrap());
        assert_eq!(
            streammap_collect_queued(&mut map)
                .await
                .unwrap()
                .into_iter()
                .sorted()
                .collect_vec(),
            vec![(0, 10), (1, 11), (2, 12), (3, 13), (4, 14)]
        );
    }

Additional context
Add any other context or screenshots about the feature request here.

@esemeniuc esemeniuc added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels Feb 26, 2024
@esemeniuc esemeniuc changed the title Add recv_many to StreamMap [FEAT_REQ] Add recv_many to StreamMap Feb 26, 2024
@esemeniuc esemeniuc changed the title [FEAT_REQ] Add recv_many to StreamMap [FEATURE_REQ] Add recv_many to StreamMap Feb 26, 2024
@Darksonn Darksonn added A-tokio-util Area: The tokio-util crate M-stream Module: tokio/stream and removed A-tokio Area: The main tokio crate labels Feb 26, 2024
@Darksonn Darksonn added A-tokio-stream Area: The tokio-stream crate and removed A-tokio-util Area: The tokio-util crate labels Mar 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-stream Area: The tokio-stream crate C-feature-request Category: A feature request. M-stream Module: tokio/stream
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants