Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async support #13

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
52 changes: 49 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,64 @@ categories = ["embedded", "no-std", "network-programming"]
readme = "README.md"

[dependencies]
sha1 = { version = "0.10.1", default-features = false }
heapless = "0.7.14"
async-trait = { version = "0.1.56", optional = true }
base64 = { version = "0.13.0", default-features = false }
base64-simd = { version = "0.5.0", default-features = false, optional = true }
byteorder = { version = "1.4.3", default-features = false }
cfg-if = "1.0.0"
heapless = "0.7.14"
httparse = { version = "1.7.1", default-features = false }
rand_core = "0.6.3"
base64 = { version = "0.13.0", default-features = false }
sha1 = { version = "0.10.1", default-features = false }

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
async-trait = "0.1.56"
cfg-if = "1.0.0"
once_cell = "1.12.0"
rand = "0.8.5"
route-recognizer = "0.3.1"
smol = "1.2.5"
smol-potat = { version = "1.1.2", features = ["auto"] }
tokio = { version = "1.19.2", features = ["macros", "net", "rt-multi-thread", "io-util"] }
tokio-stream = { version = "0.1.9", features = ["net"] }

# see readme for no_std support
[features]
default = ["std"]
# default = []
std = []
async = ["std", "async-trait"]
example-tokio = ["async"]
example-smol = ["async"]
example-async-std = ["async"]

[[example]]
name = "server_tokio"
path = "examples/server_async/main.rs"
required-features = ["example-tokio"]

[[example]]
name = "server_smol"
path = "examples/server_async/main.rs"
required-features = ["example-smol"]

[[example]]
name = "server_async_std"
path = "examples/server_async/main.rs"
required-features = ["example-async-std"]

[[example]]
name = "client_tokio"
path = "examples/client_async/main.rs"
required-features = ["example-tokio"]

[[example]]
name = "client_smol"
path = "examples/client_async/main.rs"
required-features = ["example-smol"]

[[example]]
name = "client_async_std"
path = "examples/client_async/main.rs"
required-features = ["example-async-std"]
File renamed without changes.
110 changes: 110 additions & 0 deletions examples/client_async/compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#![allow(dead_code)]

// This is an example implementation of compatibility extension, gladly stolen from futures_lite
// As this is far from being any useful, please do extend this on your own
pub trait CompatExt {
fn compat(self) -> Compat<Self>
where
Self: Sized;
fn compat_ref(&self) -> Compat<&Self>;
fn compat_mut(&mut self) -> Compat<&mut Self>;
}

impl<T> CompatExt for T {
fn compat(self) -> Compat<Self>
where
Self: Sized,
{
Compat(self)
}

fn compat_ref(&self) -> Compat<&Self> {
Compat(self)
}

fn compat_mut(&mut self) -> Compat<&mut Self> {
Compat(self)
}
}

pub struct Compat<T>(T);

impl<T> Compat<T> {
pub fn get_ref(&self) -> &T {
&self.0
}

pub fn get_mut(&mut self) -> &mut T {
&mut self.0
}

pub fn into_inner(self) -> T {
self.0
}
}

#[cfg(feature = "example-tokio")]
pub mod tokio_compat {
use super::Compat;
use async_trait::async_trait;
use embedded_websocket::compat::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[async_trait]
impl<T: tokio::io::AsyncRead + Unpin + Send + Sync> AsyncRead<std::io::Error> for Compat<T> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
AsyncReadExt::read(self.get_mut(), buf).await
}
}

#[async_trait]
impl<T: tokio::io::AsyncWrite + Unpin + Send + Sync> AsyncWrite<std::io::Error> for Compat<T> {
async fn write_all(&mut self, buf: &[u8]) -> Result<(), std::io::Error> {
AsyncWriteExt::write_all(self.get_mut(), buf).await
}
}
}

#[cfg(feature = "example-smol")]
pub mod smol_compat {
use super::Compat;
use async_trait::async_trait;
use embedded_websocket::compat::{AsyncRead, AsyncWrite};
use smol::io::{AsyncReadExt, AsyncWriteExt};

#[async_trait]
impl<T: smol::io::AsyncRead + Unpin + Send + Sync> AsyncRead<std::io::Error> for Compat<T> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
AsyncReadExt::read(self.get_mut(), buf).await
}
}

#[async_trait]
impl<T: smol::io::AsyncWrite + Unpin + Send + Sync> AsyncWrite<std::io::Error> for Compat<T> {
async fn write_all(&mut self, buf: &[u8]) -> Result<(), std::io::Error> {
AsyncWriteExt::write_all(self.get_mut(), buf).await
}
}
}

#[cfg(feature = "example-async-std")]
pub mod async_std_compat {
use super::Compat;
use async_std::io::{ReadExt, WriteExt};
use async_trait::async_trait;
use embedded_websocket::compat::{AsyncRead, AsyncWrite};

#[async_trait]
impl<T: async_std::io::Read + Unpin + Send + Sync> AsyncRead<std::io::Error> for Compat<T> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
ReadExt::read(self.get_mut(), buf).await
}
}

#[async_trait]
impl<T: async_std::io::Write + Unpin + Send + Sync> AsyncWrite<std::io::Error> for Compat<T> {
async fn write_all(&mut self, buf: &[u8]) -> Result<(), std::io::Error> {
WriteExt::write_all(self.get_mut(), buf).await
}
}
}
90 changes: 90 additions & 0 deletions examples/client_async/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// The MIT License (MIT)
// Copyright (c) 2019 David Haig

// Demo websocket client connecting to localhost port 1337.
// This will initiate a websocket connection to path /chat. The demo sends a simple "Hello, World!"
// message and expects an echo of the same message as a reply.
// It will then initiate a close handshake, wait for a close response from the server,
// and terminate the connection.
// Note that we are using the standard library in the demo and making use of the framer helper module
// but the websocket library remains no_std (see client_full for an example without the framer helper module)

mod compat;

use crate::compat::CompatExt;
use embedded_websocket::{
framer::{Framer, FramerError, ReadResult},
WebSocketClient, WebSocketCloseStatusCode, WebSocketOptions, WebSocketSendMessageType,
};
use std::error::Error;

cfg_if::cfg_if! {
if #[cfg(feature = "example-tokio")] {
use tokio::net::TcpStream;
} else if #[cfg(feature = "example-smol")] {
use smol::net::TcpStream;
} else if #[cfg(feature = "example-async-std")] {
use async_std::net::TcpStream;
}
}

#[cfg_attr(feature = "example-async-std", async_std::main)]
#[cfg_attr(feature = "example-tokio", tokio::main)]
#[cfg_attr(feature = "example-smol", smol_potat::main)]
async fn main() -> Result<(), FramerError<impl Error>> {
// open a TCP stream to localhost port 1337
let address = "127.0.0.1:1337";
println!("Connecting to: {}", address);
let mut stream = TcpStream::connect(address).await.map_err(FramerError::Io)?;
println!("Connected.");

let mut read_buf = [0; 4000];
let mut read_cursor = 0;
let mut write_buf = [0; 4000];
let mut frame_buf = [0; 4000];
let mut websocket = WebSocketClient::new_client(rand::thread_rng());

// initiate a websocket opening handshake
let websocket_options = WebSocketOptions {
path: "/chat",
host: "localhost",
origin: "http://localhost:1337",
sub_protocols: None,
additional_headers: None,
};

let mut framer = Framer::new(
&mut read_buf,
&mut read_cursor,
&mut write_buf,
&mut websocket,
);
let mut stream = stream.compat_mut();

framer
.connect_async(&mut stream, &websocket_options)
.await?;

let message = "Hello, World!";
framer
.write_async(
&mut stream,
WebSocketSendMessageType::Text,
true,
message.as_bytes(),
)
.await?;

while let ReadResult::Text(s) = framer.read_async(&mut stream, &mut frame_buf).await? {
println!("Received: {}", s);

// close the websocket after receiving the first reply
framer
.close_async(&mut stream, WebSocketCloseStatusCode::NormalClosure, None)
.await?;
println!("Sent close handshake");
}

println!("Connection closed");
Ok(())
}