diff --git a/examples/randomness/src/lib.rs b/examples/randomness/src/lib.rs index 964bce9..30da1ad 100644 --- a/examples/randomness/src/lib.rs +++ b/examples/randomness/src/lib.rs @@ -31,6 +31,9 @@ impl SimpleStateroomService for RandomServer { } fn disconnect(&mut self, client_id: ClientId, ctx: &impl StateroomContext) { - ctx.send_message(MessageRecipient::Broadcast, &format!("User {:?} left.", client_id)); + ctx.send_message( + MessageRecipient::Broadcast, + &format!("User {:?} left.", client_id), + ); } } diff --git a/stateroom-server/src/server.rs b/stateroom-server/src/server.rs index 8192558..f1096ed 100644 --- a/stateroom-server/src/server.rs +++ b/stateroom-server/src/server.rs @@ -39,7 +39,7 @@ impl ServiceActorContext { if let Some(sender) = self.senders.get(&client_id) { sender.try_send(message).unwrap(); } else { - todo!() + println!("No sender for client {:?}", client_id); } } } @@ -69,12 +69,15 @@ impl StateroomContext for ServiceActorContext { pub struct ServerState { pub handle: JoinHandle<()>, pub inbound_sender: Sender, - pub receivers: Arc>>, + pub senders: Arc>>, pub next_client_id: AtomicU32, } +#[derive(Debug)] pub enum Event { Message { client: ClientId, message: Message }, + Join { client: ClientId }, + Leave { client: ClientId }, TimerEvent, } @@ -84,16 +87,16 @@ impl ServerState { ) -> Self { let (tx, mut rx) = tokio::sync::mpsc::channel::(100); - let receivers = Arc::new(DashMap::new()); + let senders = Arc::new(DashMap::new()); - let receivers_ = receivers.clone(); + let senders_ = senders.clone(); let tx_ = tx.clone(); let handle = tokio::spawn(async move { let mut service = service_factory .build( "", ServiceActorContext { - senders: receivers_.clone(), + senders: senders_.clone(), event_sender: tx_, }, ) @@ -101,12 +104,16 @@ impl ServerState { loop { let msg = rx.recv().await; + println!("{:?}", msg); match msg { Some(Event::Message { client, message }) => match message { Message::Text(msg) => service.message(client, &msg), Message::Binary(msg) => service.binary(client, &msg), - _ => todo!(), + Message::Close(_) => {} + msg => println!("Ignoring unhandled message: {:?}", msg), }, + Some(Event::Join { client }) => service.connect(client), + Some(Event::Leave { client }) => service.disconnect(client), Some(Event::TimerEvent) => { service.timer(); } @@ -118,22 +125,28 @@ impl ServerState { Self { handle, inbound_sender: tx, - receivers, + senders, next_client_id: AtomicU32::new(1), } } pub fn remove(&self, client: &ClientId) { - self.receivers.remove(client); + self.inbound_sender + .try_send(Event::Leave { + client: client.clone(), + }) + .unwrap(); + self.senders.remove(client); } pub fn connect(&self) -> (Sender, Receiver, ClientId) { - let client_id = self - .next_client_id - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let client_id = ClientId(client_id); + let client_id = self.next_client_id(); let (tx, rx) = tokio::sync::mpsc::channel::(100); - self.receivers.insert(self.next_client_id(), tx); + + self.senders.insert(client_id, tx); + self.inbound_sender + .try_send(Event::Join { client: client_id }) + .unwrap(); (self.inbound_sender.clone(), rx, client_id) }