Skip to content

Commit

Permalink
Run multiple futures at once.
Browse files Browse the repository at this point in the history
  • Loading branch information
vldr committed Oct 4, 2023
1 parent b23bea6 commit a7ab3ac
Showing 1 changed file with 20 additions and 16 deletions.
36 changes: 20 additions & 16 deletions src/relay.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use futures_util::{future::join_all, stream::SplitSink, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, vec};
use tokio::{net::TcpStream, sync::Mutex, sync::RwLock};
use tokio_tungstenite::{tungstenite::protocol::Message, WebSocketStream};
use tungstenite::{
Expand Down Expand Up @@ -138,7 +138,7 @@ impl Server {
Err(error) => {
println!("Failed to read message: {}", error);
break;
},
}
}
}

Expand Down Expand Up @@ -255,17 +255,18 @@ impl Client {

drop(server);

self.room_id = Some(room_id);

let mut futures = vec![];
for sender in &senders {
if Arc::ptr_eq(sender, &self.sender) {
self.send_packet(&sender, ResponsePacket::Join { size })
.await;
futures.push(self.send_packet(sender, ResponsePacket::Join { size }));
} else {
self.send_packet(&sender, ResponsePacket::Join { size: None })
.await;
futures.push(self.send_packet(sender, ResponsePacket::Join { size: None }));
}
}

self.room_id = Some(room_id);
join_all(futures).await;
}

async fn handle_leave_room(&mut self, server: &RwLock<Server>) {
Expand Down Expand Up @@ -295,10 +296,12 @@ impl Client {

drop(server);

let mut futures = vec![];
for sender in &senders {
self.send_packet(&sender, ResponsePacket::Leave { index })
.await;
futures.push(self.send_packet(sender, ResponsePacket::Leave { index }));
}

join_all(futures).await;
}

async fn handle_message(&mut self, server: &RwLock<Server>, message: Message) {
Expand All @@ -311,11 +314,11 @@ impl Client {
return
};

return match packet {
match packet {
RequestPacket::Create { size } => self.handle_create_room(server, size).await,
RequestPacket::Join { id } => self.handle_join_room(server, id).await,
RequestPacket::Leave => self.handle_leave_room(server).await,
};
}
} else if message.is_binary() {
let server = server.read().await;

Expand Down Expand Up @@ -346,21 +349,22 @@ impl Client {

drop(server);

return self
.send(&sender, Message::Binary(data))
.await;
return self.send(&sender, Message::Binary(data)).await;
} else if destination == usize::from(u8::MAX) {
let senders = room.senders.clone();

drop(server);

let mut futures = vec![];
for sender in &senders {
if Arc::ptr_eq(sender, &self.sender) {
continue;
}

self.send(&sender, Message::Binary(data.clone())).await;
futures.push(self.send(sender, Message::Binary(data.clone())));
}

join_all(futures).await;
}
}
}
Expand Down

0 comments on commit a7ab3ac

Please sign in to comment.