Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What is the idiomatic way to create multiple streams? #183

Closed
caelansar opened this issue Jan 8, 2024 · 2 comments
Closed

What is the idiomatic way to create multiple streams? #183

caelansar opened this issue Jan 8, 2024 · 2 comments

Comments

@caelansar
Copy link

caelansar commented Jan 8, 2024

Hi! I recently tried to upgrade yamux to the latest version and I want to provide a method to create a stream from a connection in my project

In yamux 0.10.2, I can use

pub struct YamuxCtrl<S> {
    ctrl: Control,
    _conn: PhantomData<S>,
}

impl<S> YamuxCtrl<S>
where
    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
    pub fn new_client(stream: S) -> Self {
        let conn = Connection::new(stream.compat(), Config::default(), Mode::Client);

        let ctrl = conn.control();

        tokio::spawn(yamux::into_stream(conn).try_for_each_concurrent(None, |_stream| future::ready(Ok(()))));

        Self {
            ctrl,
            _conn: marker::PhantomData,
        }
    }
    async fn open_stream(&mut self) -> Result<ClientStream<Compat<yamux::Stream>>, Error> {
        let stream = self.ctrl.open_stream().await?;
        Ok(ClientStream::new(stream.compat()))
    }
}

I can use open_stream to obtain a stream, but how can I do this in yamux 0.13.1 since Control has been removed and we need call Connection::poll_next_inbound repeatedly in order to make progress

I found some examples in tests, it looks like:

        let mut stream = future::poll_fn(|cx| client.poll_new_outbound(cx))
            .await
            .unwrap();
        task::spawn(noop_server(stream::poll_fn(move |cx| {
            client.poll_next_inbound(cx)
        })));

        stream.write_all(&msg.0).await?;
        stream.close().await?;    }

but since the ownership of client has already been moved into task::spawn, how do I then obtain a stream from this connection afterward?

@thomaseizinger
Copy link
Contributor

You have two options:

  • Create an event loop and communicate with it via channels (send incoming streams out of the loop and request new streams by sending messages into it)
  • Compose your application logic directly with a loop that continuously polls the Connection

@caelansar
Copy link
Author

You have two options:

  • Create an event loop and communicate with it via channels (send incoming streams out of the loop and request new streams by sending messages into it)
  • Compose your application logic directly with a loop that continuously polls the Connection

@thomaseizinger Thanks! I create a loop in new_client method and send message in open_stream via a channel to get a stream. Now it works as I expected

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants