-
Notifications
You must be signed in to change notification settings - Fork 0
/
routes.rs
64 lines (53 loc) · 1.82 KB
/
routes.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use askama_axum::IntoResponse;
use axum::extract::{
ws::{self, Message},
State, WebSocketUpgrade,
};
use futures::{sink::SinkExt, stream::StreamExt};
use std::sync::Arc;
use crate::AppState;
use rustingwithngrok::{ChatMsg, IndexTemplate};
/// Renders and returns the `index.html` template
pub async fn index(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let chat_messages = state.get_history();
IndexTemplate { chat_messages }
}
/// Handles websocket connections. Usernames are derived from the `ngrok-auth-user-name`
/// header.
pub async fn websocket(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
let username = "None".to_string();
ws.on_upgrade(|socket| chat_service(socket, state, username))
}
/// Handles both sending and receiving websocket messages
async fn chat_service(socket: ws::WebSocket, state: Arc<AppState>, username: String) {
let (mut sender, mut receiver) = socket.split();
let mut rx = state.tx.subscribe();
log::info!("{} connected", username);
let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if sender.send(Message::Text(msg)).await.is_err() {
break;
}
}
});
let tx = state.tx.clone();
let u = username.clone();
let mut receive_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = receiver.next().await {
let msg = ChatMsg {
username: u.clone(),
text,
};
_ = tx.send(msg.to_string());
state.push_history(msg);
}
});
tokio::select! {
_ = (&mut send_task) => receive_task.abort(),
_ = (&mut receive_task) => send_task.abort(),
};
log::info!("{} disconnected", username);
}