Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE - Async networking #67

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
7acf7dc
get changes from networking branch and use dev branch as base so all …
Apr 13, 2023
bd7d44b
make GitHub workflows work with dev branch instead of develop
May 28, 2023
5d777c7
refactor websocket client
Jun 30, 2023
6406bbc
refactor integration websocket client test
Jun 30, 2023
b60613e
fix tests
Jun 30, 2023
464e776
fix tests
Jun 30, 2023
316a122
fix cargo clippy
Jul 1, 2023
3ae950e
add code examples
Aug 3, 2023
0d7f9e9
fix model constructor to public; wallet scope public
Aug 3, 2023
e68ae4e
add changes to changelog
Aug 3, 2023
aacb322
add changes to changelog
Aug 3, 2023
f651a0f
implement new async_websocket_client
Aug 9, 2023
dc45474
tests for async_websocket_client
Aug 9, 2023
1e134c9
Add Examples
LimpidCrypto Aug 9, 2023
e556441
run tests on pull requests to the dev branch
Aug 9, 2023
b2355c5
get changes from networking branch and use dev branch as base so all …
Apr 13, 2023
0c6c450
refactor websocket client
Jun 30, 2023
412d989
refactor integration websocket client test
Jun 30, 2023
4aa4aa5
fix tests
Jun 30, 2023
e2ec599
fix tests
Jun 30, 2023
8c566e2
fix cargo clippy
Jul 1, 2023
54947d4
implement new async_websocket_client
Aug 9, 2023
6520cc1
tests for async_websocket_client
Aug 9, 2023
54aa509
Merge remote-tracking branch 'origin/async-net' into async-net
Aug 10, 2023
b7f8311
use nightly to allow unstable features
Aug 10, 2023
57ef449
remove unneeded features
Aug 10, 2023
15f3bf4
rename AsyncWebsocketClientEmbeddedWebsocketTokio -> AsyncWebsocketCl…
Aug 10, 2023
2452c68
remove client base files
Aug 10, 2023
b8d8b83
rename AsyncWebsocketClientEmbeddedWebsocketTokio -> AsyncWebsocketCl…
Aug 10, 2023
557ee65
update dependencies
Aug 10, 2023
30776d3
add dev dependencies
Aug 10, 2023
e898989
add dev dependencies
Aug 10, 2023
8cf4122
change clients export
Aug 14, 2023
1496592
change clients export
Aug 14, 2023
3ba3acb
add .DS_STORE to gitignore
Aug 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
on:
push:
branches:
- develop
- dev
pull_request:
branches:
- main
Expand All @@ -24,12 +24,12 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: build
args: --release --no-default-features --features core,models
args: --release --no-default-features --features core,models,net
- uses: actions-rs/cargo@v1
with:
command: test
args: --all-features
- uses: actions-rs/cargo@v1
with:
command: test
args: --no-default-features --features core,models
args: --no-default-features --features core,models,net
43 changes: 37 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,69 @@ chrono = { version = "0.4.19", default-features = false, features = [
"clock",
] }
hex = { version = "0.4.3", default-features = false, features = ["alloc"] }
rand = { version = "0.8.4", default-features = false, features = ["getrandom"] }
rand = { version = "0.8.5", default-features = false, features = ["getrandom"] }
serde = { version = "1.0.130", default-features = false, features = ["derive"] }
serde_json = { version = "1.0.68", default-features = false, features = [
"alloc",
] }
serde_with = "2.3.1"
serde_with = "3.2.0"
serde_repr = "0.1"
zeroize = "1.5.7"
hashbrown = { version = "0.13.2", default-features = false, features = ["serde"] }
hashbrown = { version = "0.14.0", default-features = false, features = ["serde"] }
fnv = { version = "1.0.7", default-features = false }
derive-new = { version = "0.5.9", default-features = false }
thiserror-no-std = "2.0.2"
anyhow = { version ="1.0.69", default-features = false }
tokio = { version = "1.28.0", default-features = false, optional = true }
url = { version = "2.2.2", default-features = false, optional = true }
futures = { version = "0.3.28", default-features = false, optional = true }
rand_core = { version = "0.6.3", default-features = false }
tokio-tungstenite = { version = "0.20.0", optional = true }

[dependencies.embedded-websocket]
# git version needed to use `framer_async`
git = "https://github.com/ninjasource/embedded-websocket"
version = "0.9.2"
rev = "8d87d46f46fa0c75e099ca8aad37e8d00c8854f8"
default-features = false

[dev-dependencies]
criterion = "0.4.0"
criterion = "0.5.1"
cargo-husky = { version = "1.5.0", default-features = false, features = [
"user-hooks",
] }
tokio = { version = "1.28.2", features = ["full"] }
tokio-util = { version = "0.7.7", features = ["codec"] }
bytes = { version = "1.4.0", default-features = false }

[[bench]]
name = "benchmarks"
harness = false

[features]
default = ["std", "core", "models", "utils"]
default = ["std", "core", "models", "utils", "net"]
models = ["core", "transactions", "requests", "ledger"]
transactions = ["core", "amounts", "currencies"]
requests = ["core", "amounts", "currencies"]
ledger = ["core", "amounts", "currencies"]
amounts = ["core"]
currencies = ["core"]
net = ["url", "futures"]
core = ["utils"]
utils = []
std = ["rand/std", "regex/std", "chrono/std", "rand/std_rng", "hex/std", "rust_decimal/std", "bs58/std", "serde/std", "indexmap/std", "secp256k1/std"]
std = [
"embedded-websocket/std",
"tokio-tungstenite/native-tls",
"tokio/full",
"futures/std",
"rand/std",
"regex/std",
"chrono/std",
"rand/std_rng",
"hex/std",
"rust_decimal/std",
"bs58/std",
"serde/std",
"indexmap/std",
"secp256k1/std",
]
2 changes: 2 additions & 0 deletions src/_anyhow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#[macro_export]
macro_rules! Err {
($err:expr $(,)?) => {{
use alloc::string::ToString;

let error = $err.to_string().replace("\"", "");
let boxed_error = ::alloc::boxed::Box::new(error);
let leaked_error: &'static str = ::alloc::boxed::Box::leak(boxed_error);
Expand Down
2 changes: 2 additions & 0 deletions src/asynch/clients/async_json_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// /// An async client for interacting with the rippled JSON RPC.
// pub struct AsyncJsonRpcClient {}
283 changes: 283 additions & 0 deletions src/asynch/clients/async_websocket_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
use super::exceptions::XRPLWebsocketException;
use crate::Err;
use anyhow::Result;
use core::{
fmt::{Debug, Display},
marker::PhantomData,
ops::Deref,
pin::Pin,
task::Poll,
};
use embedded_websocket::{
framer_async::Framer as EmbeddedWebsocketFramer, Client as EmbeddedWebsocketClient,
WebSocket as EmbeddedWebsocket,
};
use futures::{Sink, Stream};
use rand_core::RngCore;
use url::Url;

#[cfg(feature = "std")]
use tokio::net::TcpStream;
#[cfg(feature = "std")]
use tokio_tungstenite::{
connect_async as tungstenite_connect_async, MaybeTlsStream as TungsteniteMaybeTlsStream,
WebSocketStream as TungsteniteWebsocketStream,
};

// Exports
pub use embedded_websocket::{
framer_async::{
FramerError as EmbeddedWebsocketFramerError, ReadResult as EmbeddedWebsocketReadMessageType,
},
Error as EmbeddedWebsocketError, WebSocketCloseStatusCode as EmbeddedWebsocketCloseStatusCode,
WebSocketOptions as EmbeddedWebsocketOptions,
WebSocketSendMessageType as EmbeddedWebsocketSendMessageType,
WebSocketState as EmbeddedWebsocketState,
};

#[cfg(feature = "std")]
pub type AsyncWebsocketClientTungstenite<Status> =
AsyncWebsocketClient<TungsteniteWebsocketStream<TungsteniteMaybeTlsStream<TcpStream>>, Status>;
pub type AsyncWebsocketClientEmbeddedWebsocketTokio<Rng, Status> =
AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, Status>;
#[cfg(feature = "std")]
pub use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;

pub struct WebsocketOpen;
pub struct WebsocketClosed;

pub struct AsyncWebsocketClient<T, Status = WebsocketClosed> {
inner: T,
status: PhantomData<Status>,
}

impl<T, Status> AsyncWebsocketClient<T, Status> {
pub fn is_open(&self) -> bool {
core::any::type_name::<Status>() == core::any::type_name::<WebsocketOpen>()
}
}

impl<T, I> Sink<I> for AsyncWebsocketClient<T, WebsocketOpen>
where
T: Sink<TungsteniteMessage> + Unpin,
<T as Sink<TungsteniteMessage>>::Error: Display,
I: serde::Serialize,
{
type Error = anyhow::Error;

fn poll_ready(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<core::result::Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(error)) => Poll::Ready(Err!(error)),
Poll::Pending => Poll::Pending,
}
}

fn start_send(
mut self: core::pin::Pin<&mut Self>,
item: I,
) -> core::result::Result<(), Self::Error> {
match Pin::new(&mut self.inner).start_send(TungsteniteMessage::Text(
serde_json::to_string(&item).unwrap(),
)) {
// TODO: unwrap
Ok(()) => Ok(()),
Err(error) => Err!(error),
}
}

fn poll_flush(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<core::result::Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_flush(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(error)) => Poll::Ready(Err!(error)),
Poll::Pending => Poll::Pending,
}
}

fn poll_close(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<core::result::Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_close(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(error)) => Poll::Ready(Err!(error)),
Poll::Pending => Poll::Pending,
}
}
}

impl<T> Stream for AsyncWebsocketClient<T, WebsocketOpen>
where
T: Stream + Unpin,
{
type Item = <T as Stream>::Item;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

#[cfg(feature = "std")]
impl
AsyncWebsocketClient<
TungsteniteWebsocketStream<TungsteniteMaybeTlsStream<TcpStream>>,
WebsocketClosed,
>
{
pub async fn open(
uri: Url,
) -> Result<
AsyncWebsocketClient<
TungsteniteWebsocketStream<TungsteniteMaybeTlsStream<TcpStream>>,
WebsocketOpen,
>,
> {
let (websocket_stream, _) = tungstenite_connect_async(uri).await.unwrap(); // TODO: unwrap

Ok(AsyncWebsocketClient {
inner: websocket_stream,
status: PhantomData::<WebsocketOpen>,
})
}
}

impl<Rng>
AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, WebsocketClosed>
where
Rng: RngCore,
{
pub async fn open<'a, B, E>(
stream: &mut (impl Stream<Item = Result<B, E>> + Sink<&'a [u8], Error = E> + Unpin),
buffer: &'a mut [u8],
rng: Rng,
websocket_options: &EmbeddedWebsocketOptions<'_>,
) -> Result<
AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, WebsocketOpen>,
>
where
B: AsRef<[u8]>,
E: Debug,
{
let websocket = EmbeddedWebsocket::<Rng, EmbeddedWebsocketClient>::new_client(rng);
let mut framer = EmbeddedWebsocketFramer::new(websocket);
framer
.connect(stream, buffer, websocket_options)
.await
.unwrap(); // TODO: unwrap

Ok(AsyncWebsocketClient {
inner: framer,
status: PhantomData::<WebsocketOpen>,
})
}
}

impl<Rng> AsyncWebsocketClient<EmbeddedWebsocketFramer<Rng, EmbeddedWebsocketClient>, WebsocketOpen>
where
Rng: RngCore,
{
pub fn encode<E>(
&mut self,
message_type: EmbeddedWebsocketSendMessageType,
end_of_message: bool,
from: &[u8],
to: &mut [u8],
) -> Result<usize>
where
E: Debug,
{
let len = self
.inner
.encode::<E>(message_type, end_of_message, from, to)
.unwrap(); // TODO: unwrap

Ok(len)
}

pub async fn send<'b, E, R: serde::Serialize>(
&mut self,
stream: &mut (impl Sink<&'b [u8], Error = E> + Unpin),
stream_buf: &'b mut [u8],
end_of_message: bool,
frame_buf: R,
) -> Result<()>
where
E: Debug,
{
self.inner
.write(
stream,
stream_buf,
EmbeddedWebsocketSendMessageType::Binary,
end_of_message,
serde_json::to_vec(&frame_buf).unwrap().as_slice(),
) // TODO: unwrap
.await
.unwrap(); // TODO: unwrap

Ok(())
}

pub async fn close<'b, E>(
&mut self,
stream: &mut (impl Sink<&'b [u8], Error = E> + Unpin),
stream_buf: &'b mut [u8],
close_status: EmbeddedWebsocketCloseStatusCode,
status_description: Option<&str>,
) -> Result<()>
where
E: Debug,
{
self.inner
.close(stream, stream_buf, close_status, status_description)
.await
.unwrap(); // TODO: unwrap

Ok(())
}

pub async fn next<'a, B: Deref<Target = [u8]>, E>(
&'a mut self,
stream: &mut (impl Stream<Item = Result<B, E>> + Sink<&'a [u8], Error = E> + Unpin),
buffer: &'a mut [u8],
) -> Option<Result<EmbeddedWebsocketReadMessageType<'_>>>
// TODO: Change to Response as soon as implemented
where
E: Debug,
{
match self.inner.read(stream, buffer).await {
Some(Ok(read_result)) => Some(Ok(read_result)),
Some(Err(error)) => Some(Err!(XRPLWebsocketException::from(error))),
None => None,
}
}

pub async fn try_next<'a, B: Deref<Target = [u8]>, E>(
&'a mut self,
stream: &mut (impl Stream<Item = Result<B, E>> + Sink<&'a [u8], Error = E> + Unpin),
buffer: &'a mut [u8],
) -> Result<Option<EmbeddedWebsocketReadMessageType<'_>>>
// TODO: Change to Response as soon as implemented
where
E: Debug,
{
match self.inner.read(stream, buffer).await {
Some(Ok(read_result)) => Ok(Some(read_result)),
Some(Err(error)) => Err!(XRPLWebsocketException::from(error)),
None => Ok(None),
}
}
}