Skip to content

Commit

Permalink
Updated websocket example to improve clarity and to be more distinct …
Browse files Browse the repository at this point in the history
…from chat example (#1637)

Co-authored-by: David Pedersen <david.pdrsn@gmail.com>
  • Loading branch information
alexpyattaev and davidpdrsn committed Jan 8, 2023
1 parent d11af16 commit 8d92902
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 31 deletions.
12 changes: 12 additions & 0 deletions examples/websockets/Cargo.toml
Expand Up @@ -6,8 +6,20 @@ publish = false

[dependencies]
axum = { path = "../../axum", features = ["ws", "headers"] }
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
headers = "0.3"
tokio = { version = "1.0", features = ["full"] }
tokio-tungstenite = "0.18.0"
tower = { version = "0.4", features = ["util"] }
tower-http = { version = "0.3.0", features = ["fs", "trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[[bin]]
name = "example-websockets"
path = "src/main.rs"

[[bin]]
name = "example-client"
path = "src/client.rs"
1 change: 1 addition & 0 deletions examples/websockets/assets/index.html
@@ -1 +1,2 @@
<a>Open the console to see stuff, then refresh to initiate exchange.</a>
<script src='script.js'></script>
16 changes: 16 additions & 0 deletions examples/websockets/assets/script.js
Expand Up @@ -7,3 +7,19 @@ socket.addEventListener('open', function (event) {
socket.addEventListener('message', function (event) {
console.log('Message from server ', event.data);
});


setTimeout(() => {
const obj = { hello: "world" };
const blob = new Blob([JSON.stringify(obj, null, 2)], {
type: "application/json",
});
console.log("Sending blob over websocket");
socket.send(blob);
}, 1000);

setTimeout(() => {
socket.send('About done here...');
console.log("Sending close over websocket");
socket.close(3000, "Crash and Burn!");
}, 3000);
160 changes: 160 additions & 0 deletions examples/websockets/src/client.rs
@@ -0,0 +1,160 @@
//! Based on tokio-tungstenite example websocket client, but with multiple
//! concurrent websocket clients in one package
//!
//! This will connect to a server specified in the SERVER with N_CLIENTS
//! concurrent connections, and then flood some test messages over websocket.
//! This will also print whatever it gets into stdout.
//!
//! Note that this is not currently optimized for performance, especially around
//! stdout mutex management. Rather it's intended to show an example of working with axum's
//! websocket server and how the client-side and server-side code can be quite similar.
//!

use futures_util::stream::FuturesUnordered;
use futures_util::{SinkExt, StreamExt};
use std::borrow::Cow;
use std::ops::ControlFlow;
use std::time::Instant;

// we will use tungstenite for websocket client impl (same library as what axum is using)
use tokio_tungstenite::{
connect_async,
tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Message},
};

const N_CLIENTS: usize = 2; //set to desired number
const SERVER: &'static str = "ws://127.0.0.1:3000/ws";

#[tokio::main]
async fn main() {
let start_time = Instant::now();
//spawn several clients that will concurrently talk to the server
let mut clients = (0..N_CLIENTS)
.into_iter()
.map(|cli| tokio::spawn(spawn_client(cli)))
.collect::<FuturesUnordered<_>>();

//wait for all our clients to exit
while clients.next().await.is_some() {}

let end_time = Instant::now();

//total time should be the same no matter how many clients we spawn
println!(
"Total time taken {:#?} with {N_CLIENTS} concurrent clients, should be about 6.45 seconds.",
end_time - start_time
);
}

//creates a client. quietly exits on failure.
async fn spawn_client(who: usize) {
let ws_stream = match connect_async(SERVER).await {
Ok((stream, response)) => {
println!("Handshake for client {} has been completed", who);
// This will be the HTTP response, same as with server this is the last moment we
// can still access HTTP stuff.
println!("Server response was {:?}", response);
stream
}
Err(e) => {
println!("WebSocket handshake for client {who} failed with {e}!");
return;
}
};

let (mut sender, mut receiver) = ws_stream.split();

//we can ping the server for start
sender
.send(Message::Ping("Hello, Server!".into()))
.await
.expect("Can not send!");

//spawn an async sender to push some more messages into the server
let mut send_task = tokio::spawn(async move {
for i in 1..30 {
// In any websocket error, break loop.
if sender
.send(Message::Text(format!("Message number {}...", i)))
.await
.is_err()
{
//just as with server, if send fails there is nothing we can do but exit.
return;
}

tokio::time::sleep(std::time::Duration::from_millis(300)).await;
}

// When we are done we may want our client to close connection cleanly.
println!("Sending close to {}...", who);
if let Err(e) = sender
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: Cow::from("Goodbye"),
})))
.await
{
println!("Could not send Close due to {:?}, probably it is ok?", e);
};
});

//receiver just prints whatever it gets
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
// print message and break if instructed to do so
if process_message(msg, who).is_break() {
break;
}
}
});

//wait for either task to finish and kill the other task
tokio::select! {
_ = (&mut send_task) => {
recv_task.abort();
},
_ = (&mut recv_task) => {
send_task.abort();
}
}
}

/// Function to handle messages we get (with a slight twist that Frame variant is visible
/// since we are working with the underlying tungstenite library directly without axum here).
fn process_message(msg: Message, who: usize) -> ControlFlow<(), ()> {
match msg {
Message::Text(t) => {
println!(">>> {} got str: {:?}", who, t);
}
Message::Binary(d) => {
println!(">>> {} got {} bytes: {:?}", who, d.len(), d);
}
Message::Close(c) => {
if let Some(cf) = c {
println!(
">>> {} got close with code {} and reason `{}`",
who, cf.code, cf.reason
);
} else {
println!(">>> {} somehow got close message without CloseFrame", who);
}
return ControlFlow::Break(());
}

Message::Pong(v) => {
println!(">>> {} got pong with {:?}", who, v);
}
// Just as with axum server, the underlying tungstenite websocket library
// will handle Ping for you automagically by replying with Pong and copying the
// v according to spec. But if you need the contents of the pings you can see them here.
Message::Ping(v) => {
println!(">>> {} got ping with {:?}", who, v);
}

Message::Frame(_) => {
unreachable!("This is never supposed to happen")
}
}
ControlFlow::Continue(())
}

0 comments on commit 8d92902

Please sign in to comment.