Skip to content

Commit

Permalink
enter and leave events (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulgb committed Apr 19, 2024
1 parent c84ff9b commit 967ac49
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
5 changes: 4 additions & 1 deletion examples/randomness/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ impl SimpleStateroomService for RandomServer {
}

fn disconnect(&mut self, client_id: ClientId, ctx: &impl StateroomContext) {
ctx.send_message(MessageRecipient::Broadcast, &format!("User {:?} left.", client_id));
ctx.send_message(
MessageRecipient::Broadcast,
&format!("User {:?} left.", client_id),
);
}
}
39 changes: 26 additions & 13 deletions stateroom-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl ServiceActorContext {
if let Some(sender) = self.senders.get(&client_id) {
sender.try_send(message).unwrap();
} else {
todo!()
println!("No sender for client {:?}", client_id);
}
}
}
Expand Down Expand Up @@ -69,12 +69,15 @@ impl StateroomContext for ServiceActorContext {
pub struct ServerState {
pub handle: JoinHandle<()>,
pub inbound_sender: Sender<Event>,
pub receivers: Arc<DashMap<ClientId, Sender<Message>>>,
pub senders: Arc<DashMap<ClientId, Sender<Message>>>,
pub next_client_id: AtomicU32,
}

#[derive(Debug)]
pub enum Event {
Message { client: ClientId, message: Message },
Join { client: ClientId },
Leave { client: ClientId },
TimerEvent,
}

Expand All @@ -84,29 +87,33 @@ impl ServerState {
) -> Self {
let (tx, mut rx) = tokio::sync::mpsc::channel::<Event>(100);

let receivers = Arc::new(DashMap::new());
let senders = Arc::new(DashMap::new());

let receivers_ = receivers.clone();
let senders_ = senders.clone();
let tx_ = tx.clone();
let handle = tokio::spawn(async move {
let mut service = service_factory
.build(
"",
ServiceActorContext {
senders: receivers_.clone(),
senders: senders_.clone(),
event_sender: tx_,
},
)
.unwrap();

loop {
let msg = rx.recv().await;
println!("{:?}", msg);
match msg {
Some(Event::Message { client, message }) => match message {
Message::Text(msg) => service.message(client, &msg),
Message::Binary(msg) => service.binary(client, &msg),
_ => todo!(),
Message::Close(_) => {}
msg => println!("Ignoring unhandled message: {:?}", msg),
},
Some(Event::Join { client }) => service.connect(client),
Some(Event::Leave { client }) => service.disconnect(client),
Some(Event::TimerEvent) => {
service.timer();
}
Expand All @@ -118,22 +125,28 @@ impl ServerState {
Self {
handle,
inbound_sender: tx,
receivers,
senders,
next_client_id: AtomicU32::new(1),
}
}

pub fn remove(&self, client: &ClientId) {
self.receivers.remove(client);
self.inbound_sender
.try_send(Event::Leave {
client: client.clone(),
})
.unwrap();
self.senders.remove(client);
}

pub fn connect(&self) -> (Sender<Event>, Receiver<Message>, ClientId) {
let client_id = self
.next_client_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let client_id = ClientId(client_id);
let client_id = self.next_client_id();
let (tx, rx) = tokio::sync::mpsc::channel::<Message>(100);
self.receivers.insert(self.next_client_id(), tx);

self.senders.insert(client_id, tx);
self.inbound_sender
.try_send(Event::Join { client: client_id })
.unwrap();
(self.inbound_sender.clone(), rx, client_id)
}

Expand Down

0 comments on commit 967ac49

Please sign in to comment.