-
Notifications
You must be signed in to change notification settings - Fork 2
/
ws.rs
67 lines (55 loc) · 1.93 KB
/
ws.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use crate::{Client, Clients};
use futures::{FutureExt, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;
use warp::ws::{Message, WebSocket};
pub async fn client_connection(ws: WebSocket, clients: Clients) {
println!("establishing client connection... {:?}", ws);
let (client_ws_sender, mut client_ws_rcv) = ws.split();
let (client_sender, client_rcv) = mpsc::unbounded_channel();
let client_rcv = UnboundedReceiverStream::new(client_rcv);
tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| {
if let Err(e) = result {
println!("error sending websocket msg: {}", e);
}
}));
let uuid = Uuid::new_v4().simple().to_string();
let new_client = Client {
client_id: uuid.clone(),
sender: Some(client_sender),
};
clients.lock().await.insert(uuid.clone(), new_client);
while let Some(result) = client_ws_rcv.next().await {
let msg = match result {
Ok(msg) => msg,
Err(e) => {
println!("error receiving message for id {}): {}", uuid.clone(), e);
break;
}
};
client_msg(&uuid, msg, &clients).await;
}
clients.lock().await.remove(&uuid);
println!("{} disconnected", uuid);
}
async fn client_msg(client_id: &str, msg: Message, clients: &Clients) {
println!("received message from {}: {:?}", client_id, msg);
let message = match msg.to_str() {
Ok(v) => v,
Err(_) => return,
};
if message == "ping" || message == "ping\n" {
let locked = clients.lock().await;
match locked.get(client_id) {
Some(v) => {
if let Some(sender) = &v.sender {
println!("sending pong");
let _ = sender.send(Ok(Message::text("pong")));
}
}
None => return,
}
return;
};
}