Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The subscription of websocket does not have an example of broadcasting to all subscribers, hoping to give guidance #697

Closed
keienWang opened this issue Feb 14, 2022 · 8 comments

Comments

@keienWang
Copy link

For example, I want to publish a message to all subscribers, how can I collect all subscribers at this time? In the example, I only see, publish an i:= 4, but I want to change the value of i and then broadcast to each subscriber, how to change it
image

@keienWang
Copy link
Author

image
The jump here on the homepage is wrong, jump to http-server
span

@niklasad1
Copy link
Member

Hey @keienWang

The result should published to each subscriber however your example is confusing, you should really remove the first thread::spawn and move i to the register_subscription closure. Otherwise wrap i in a Arc<Mutex> or something similar if you want a synchronized value....

Can you share the entire example otherwise I can't re-produce thus? It could be that you only poll the subscription once or something?!

Also, we should update the pubsub examples to use the new subscription API. We happily accept any PRs regarding the examples :)

@keienWang
Copy link
Author

source example,I just want to implement an observer pattern and send an external changing value,For example, there are two subscribers, and I want to send new blocks to them at the same time

@niklasad1
Copy link
Member

@keienWang

Ok, I see the easiest would be use message passing i.e, once a new block/item is produced/changed you send a message to the receiver that implements Stream

The idea that something like should work:

async fn run_server() -> anyhow::Result<SocketAddr> {
	let server = WsServerBuilder::default().build("127.0.0.1:0").await?;

	let (tx, rx) = async_channel::unbounded();
	let mut module = RpcModule::new(());

	tokio::spawn(async move {
		let mut i = 4;
		loop {
			i += 1;
			tx.send(i).await.unwrap();
			tokio::time::sleep(std::time::Duration::from_secs(5)).await;
		}
	});

	module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, mut sink, _| {
		// required because of the `Fn closure above`
		let rx = rx.clone();
		tokio::spawn(async move {
			sink.pipe_from_stream(rx).await;
		});
		Ok(())
	})?;
	let addr = server.local_addr()?;
	server.start(module)?;
	Ok(addr)
}

@keienWang
Copy link
Author

image
I have two clients, but once the value of subscription is sent to one client, the other client will not receive it, I think the cloned rx is not distributing the same value, but which client grabs it first who will be given.

@niklasad1
Copy link
Member

Right, my bad async-channel only sends the value to one of the receivers you need to use tokio::sync::broadcast::channel or something similar for that to work.

async fn run_server() -> anyhow::Result<SocketAddr> {
	let server = WsServerBuilder::default().build("127.0.0.1:0").await?;

	let (tx, _) = tokio::sync::broadcast::channel::<i32>(16);
	let mut module = RpcModule::new(());

	let tx1 = tx.clone();
	tokio::spawn(async move {
		let mut i = 4;
		loop {
			i += 1;
			tx1.send(i).unwrap();
			tokio::time::sleep(std::time::Duration::from_secs(5)).await;
		}
	});

	module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", move |_, mut sink, _| {
		// required because of the `Fn closure above`
		let mut rx = tx.subscribe();
		tokio::spawn(async move {
			while let Ok(item) = rx.recv().await {
				if let Err(Error::SubscriptionClosed(_)) = sink.send(&item) {
					return;
				}
			}
		});
		Ok(())
	})?;
	let addr = server.local_addr()?;
	server.start(module)?;
	Ok(addr)
}

However, I realized that tokio::sync::broadcast::Receiver doesn't impl Stream I guess we could implement a similar wrapper type such tokio_stream::sync::BroadcastStream because pipe_from_stream requires to T: Serialize.

In practice we more or less expect users to create a separate stream for each subscription but if you plan to use something like provided above we should make it a bit more ergonomic to use.

@keienWang
Copy link
Author

Okay, I know what to do with my project, thank you,Expect to add subscriptions to the example
image

@niklasad1
Copy link
Member

Closed by #705

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants