Skip to content

Commit

Permalink
Demonstrate websockets bug
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Apr 25, 2023
1 parent d1765d9 commit f1cbd6b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 184 deletions.
102 changes: 25 additions & 77 deletions examples/websockets/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,118 +10,66 @@
//! websocket server and how the client-side and server-side code can be quite similar.
//!

use futures_util::stream::FuturesUnordered;
use futures_util::{SinkExt, StreamExt};
use std::borrow::Cow;
use std::ops::ControlFlow;
use std::time::Instant;

// we will use tungstenite for websocket client impl (same library as what axum is using)
use tokio_tungstenite::{
connect_async,
tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Message},
};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};

const N_CLIENTS: usize = 2; //set to desired number
const SERVER: &str = "ws://127.0.0.1:3000/ws";

#[tokio::main]
async fn main() {
let start_time = Instant::now();
//spawn several clients that will concurrently talk to the server
let mut clients = (0..N_CLIENTS)
.map(|cli| tokio::spawn(spawn_client(cli)))
.collect::<FuturesUnordered<_>>();

//wait for all our clients to exit
while clients.next().await.is_some() {}

let end_time = Instant::now();

//total time should be the same no matter how many clients we spawn
println!(
"Total time taken {:#?} with {N_CLIENTS} concurrent clients, should be about 6.45 seconds.",
end_time - start_time
);
}

//creates a client. quietly exits on failure.
async fn spawn_client(who: usize) {
let ws_stream = match connect_async(SERVER).await {
Ok((stream, response)) => {
println!("Handshake for client {} has been completed", who);
println!("Handshake for client has been completed");
// This will be the HTTP response, same as with server this is the last moment we
// can still access HTTP stuff.
println!("Server response was {:?}", response);
stream
}
Err(e) => {
println!("WebSocket handshake for client {who} failed with {e}!");
println!("WebSocket handshake for client failed with {e}!");
return;
}
};

let (mut sender, mut receiver) = ws_stream.split();

//we can ping the server for start
sender
.send(Message::Ping("Hello, Server!".into()))
.await
.expect("Can not send!");

//spawn an async sender to push some more messages into the server
let mut send_task = tokio::spawn(async move {
for i in 1..30 {
// In any websocket error, break loop.
if sender
.send(Message::Text(format!("Message number {}...", i)))
.await
.is_err()
{
//just as with server, if send fails there is nothing we can do but exit.
return;
}

tokio::time::sleep(std::time::Duration::from_millis(300)).await;
}

// When we are done we may want our client to close connection cleanly.
println!("Sending close to {}...", who);
if let Err(e) = sender
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: Cow::from("Goodbye"),
})))
for i in 1..3 {
// In any websocket error, break loop.
if sender
.send(Message::Binary(
format!("Message number {}...", i).as_bytes().to_vec(),
))
.await
.is_err()
{
println!("Could not send Close due to {:?}, probably it is ok?", e);
};
});

//receiver just prints whatever it gets
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
// print message and break if instructed to do so
if process_message(msg, who).is_break() {
break;
}
//just as with server, if send fails there is nothing we can do but exit.
return;
}
});

//wait for either task to finish and kill the other task
tokio::select! {
_ = (&mut send_task) => {
recv_task.abort();
},
_ = (&mut recv_task) => {
send_task.abort();
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}

//receiver just prints whatever it gets
let mut count = 0;
let start = Instant::now();
while let Some(Ok(msg)) = receiver.next().await {
count += 1;
let who = format!("{count} {:?}", start.elapsed());
// print message and break if instructed to do so
if process_message(msg, who).is_break() {
break;
}
}
}

/// Function to handle messages we get (with a slight twist that Frame variant is visible
/// since we are working with the underlying tungstenite library directly without axum here).
fn process_message(msg: Message, who: usize) -> ControlFlow<(), ()> {
fn process_message(msg: Message, who: String) -> ControlFlow<(), ()> {
match msg {
Message::Text(t) => {
println!(">>> {} got str: {:?}", who, t);
Expand Down
122 changes: 15 additions & 107 deletions examples/websockets/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use axum::{
};
use axum_extra::TypedHeader;

use std::borrow::Cow;
use std::ops::ControlFlow;
use std::{net::SocketAddr, path::PathBuf};
use tower_http::{
Expand All @@ -36,7 +35,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

//allows to extract the IP of connecting user
use axum::extract::connect_info::ConnectInfo;
use axum::extract::ws::CloseFrame;

//allows to split the websocket stream into separate TX and RX branches
use futures::{sink::SinkExt, stream::StreamExt};
Expand Down Expand Up @@ -98,129 +96,39 @@ async fn ws_handler(
}

/// Actual websocket statemachine (one will be spawned per connection)
async fn handle_socket(mut socket: WebSocket, who: SocketAddr) {
//send a ping (unsupported by some browsers) just to kick things off and get a response
if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() {
println!("Pinged {}...", who);
} else {
println!("Could not send ping {}!", who);
// no Error here since the only thing we can do is to close the connection.
// If we can not send messages, there is no way to salvage the statemachine anyway.
return;
}

// receive single message from a client (we can either receive or send with socket).
// this will likely be the Pong for our Ping or a hello message from client.
// waiting for message from a client will block this task, but will not block other client's
// connections.
if let Some(msg) = socket.recv().await {
if let Ok(msg) = msg {
if process_message(msg, who).is_break() {
return;
}
} else {
println!("client {who} abruptly disconnected");
return;
}
}

// Since each client gets individual statemachine, we can pause handling
// when necessary to wait for some external event (in this case illustrated by sleeping).
// Waiting for this client to finish getting its greetings does not prevent other clients from
// connecting to server and receiving their greetings.
for i in 1..5 {
if socket
.send(Message::Text(format!("Hi {i} times!")))
.await
.is_err()
{
println!("client {who} abruptly disconnected");
return;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

async fn handle_socket(socket: WebSocket, who: SocketAddr) {
// By splitting socket we can send and receive at the same time. In this example we will send
// unsolicited messages to client based on some sort of server's internal event (i.e .timer).
let (mut sender, mut receiver) = socket.split();

// Spawn a task that will push several messages to the client (does not matter what client does)
let mut send_task = tokio::spawn(async move {
let n_msg = 20;
for i in 0..n_msg {
// In case of any websocket error, we exit.
if sender
.send(Message::Text(format!("Server message {i} ...")))
.await
.is_err()
{
return i;
}

tokio::time::sleep(std::time::Duration::from_millis(300)).await;
}

println!("Sending close to {who}...");
if let Err(e) = sender
.send(Message::Close(Some(CloseFrame {
code: axum::extract::ws::close_code::NORMAL,
reason: Cow::from("Goodbye"),
})))
.await
{
println!("Could not send Close due to {}, probably it is ok?", e);
while let Some(Ok(msg)) = receiver.next().await {
// print message and break if instructed to do so
if process_message(msg, who).is_break() {
break;
}
n_msg
});

// This second task will receive messages from client and print them on server console
let mut recv_task = tokio::spawn(async move {
let mut cnt = 0;
while let Some(Ok(msg)) = receiver.next().await {
cnt += 1;
// print message and break if instructed to do so
if process_message(msg, who).is_break() {
break;
}
}
cnt
});
for _ in 0..10 {
// In case of any websocket error, we exit.
println!("Sending binary");
sender.send(Message::Binary(vec![1, 2, 3])).await.unwrap();

// If any one of the tasks exit, abort the other.
tokio::select! {
rv_a = (&mut send_task) => {
match rv_a {
Ok(a) => println!("{} messages sent to {}", a, who),
Err(a) => println!("Error sending messages {:?}", a)
}
recv_task.abort();
},
rv_b = (&mut recv_task) => {
match rv_b {
Ok(b) => println!("Received {} messages", b),
Err(b) => println!("Error receiving messages {:?}", b)
}
send_task.abort();
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
}

// returning from the handler closes the websocket connection
println!("Websocket context {} destroyed", who);
}

/// helper to print contents of messages to stdout. Has special treatment for Close.
fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> {
match msg {
Message::Text(t) => {
println!(">>> {} sent str: {:?}", who, t);
println!(">>> {} got str: {:?}", who, t);
}
Message::Binary(d) => {
println!(">>> {} sent {} bytes: {:?}", who, d.len(), d);
println!(">>> {} got {} bytes: {:?}", who, d.len(), d);
}
Message::Close(c) => {
if let Some(cf) = c {
println!(
">>> {} sent close with code {} and reason `{}`",
">>> {} got close with code {} and reason `{}`",
who, cf.code, cf.reason
);
} else {
Expand All @@ -230,13 +138,13 @@ fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> {
}

Message::Pong(v) => {
println!(">>> {} sent pong with {:?}", who, v);
println!(">>> {} got pong with {:?}", who, v);
}
// You should never need to manually handle Message::Ping, as axum's websocket library
// will do so for you automagically by replying with Pong and copying the v according to
// spec. But if you need the contents of the pings you can see them here.
Message::Ping(v) => {
println!(">>> {} sent ping with {:?}", who, v);
println!(">>> {} got ping with {:?}", who, v);
}
}
ControlFlow::Continue(())
Expand Down

0 comments on commit f1cbd6b

Please sign in to comment.