From 30adac2934d319a02171f514fbba51de17eac925 Mon Sep 17 00:00:00 2001 From: Tomas Tauber <2410580+tomtau@users.noreply.github.com> Date: Fri, 10 Mar 2023 09:38:35 +0800 Subject: [PATCH 1/2] switch to rustls --- relay_client/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay_client/Cargo.toml b/relay_client/Cargo.toml index c1bad1d..259f3b2 100644 --- a/relay_client/Cargo.toml +++ b/relay_client/Cargo.toml @@ -8,7 +8,7 @@ relay_rpc = { path = "../relay_rpc" } futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } thiserror = "1.0" tokio = { version = "1.22", features = ["rt", "time", "sync", "macros", "rt-multi-thread"] } -tokio-tungstenite = { version = "0.17", features = ["connect", "native-tls"] } +tokio-tungstenite = { version = "0.18", features = ["rustls-tls-native-roots"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_qs = "0.10" From e104f0e0e95d5f2d14de63d8a627d1ccde100eb8 Mon Sep 17 00:00:00 2001 From: Tomas Tauber <2410580+tomtau@users.noreply.github.com> Date: Fri, 10 Mar 2023 09:50:26 +0800 Subject: [PATCH 2/2] make the ConnectionHandler async --- Cargo.toml | 1 + examples/basic_client.rs | 12 +++++++----- relay_client/Cargo.toml | 1 + relay_client/src/client.rs | 12 +++++++----- relay_client/src/client/connection.rs | 12 ++++++------ 5 files changed, 22 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f577079..60346d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ relay_rpc = { path = "./relay_rpc", optional = true } [dev-dependencies] anyhow = "1" +async-trait = "0.1" structopt = { version = "0.3", default-features = false } tokio = { version = "1.22", features = ["full"] } diff --git a/examples/basic_client.rs b/examples/basic_client.rs index e31283a..125806c 100644 --- a/examples/basic_client.rs +++ b/examples/basic_client.rs @@ -1,4 +1,5 @@ use { + async_trait::async_trait, relay_client::{ Client, CloseFrame, @@ -36,27 +37,28 @@ impl Handler { } } +#[async_trait] impl ConnectionHandler for Handler { - fn connected(&mut self) { + async fn connected(&mut self) { println!("[{}] connection open", self.name); } - fn disconnected(&mut self, frame: Option>) { + async fn disconnected(&mut self, frame: Option>) { println!("[{}] connection closed: frame={frame:?}", self.name); } - fn message_received(&mut self, message: PublishedMessage) { + async fn message_received(&mut self, message: PublishedMessage) { println!( "[{}] inbound message: topic={} message={}", self.name, message.topic, message.message ); } - fn inbound_error(&mut self, error: Error) { + async fn inbound_error(&mut self, error: Error) { println!("[{}] inbound error: {error}", self.name); } - fn outbound_error(&mut self, error: Error) { + async fn outbound_error(&mut self, error: Error) { println!("[{}] outbound error: {error}", self.name); } } diff --git a/relay_client/Cargo.toml b/relay_client/Cargo.toml index 259f3b2..f8f9876 100644 --- a/relay_client/Cargo.toml +++ b/relay_client/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +async-trait = "0.1" relay_rpc = { path = "../relay_rpc" } futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } thiserror = "1.0" diff --git a/relay_client/src/client.rs b/relay_client/src/client.rs index cc8105e..a1d32e1 100644 --- a/relay_client/src/client.rs +++ b/relay_client/src/client.rs @@ -1,6 +1,7 @@ use { self::connection::{connection_event_loop, ConnectionControl}, crate::{ConnectionOptions, Error}, + async_trait::async_trait, relay_rpc::{ domain::{SubscriptionId, Topic}, rpc::{BatchSubscribe, BatchUnsubscribe, Publish, Subscribe, Subscription, Unsubscribe}, @@ -44,23 +45,24 @@ impl PublishedMessage { } /// Handlers for the RPC stream events. +#[async_trait] pub trait ConnectionHandler: Send + 'static { /// Called when a connection to the Relay is established. - fn connected(&mut self) {} + async fn connected(&mut self) {} /// Called when the Relay connection is closed. - fn disconnected(&mut self, _frame: Option>) {} + async fn disconnected(&mut self, _frame: Option>) {} /// Called when a message is received from the Relay. - fn message_received(&mut self, message: PublishedMessage); + async fn message_received(&mut self, message: PublishedMessage); /// Called when an inbound error occurs, such as data deserialization /// failure, or an unknown response message ID. - fn inbound_error(&mut self, _error: Error) {} + async fn inbound_error(&mut self, _error: Error) {} /// Called when an outbound error occurs, i.e. failed to write to the /// websocket stream. - fn outbound_error(&mut self, _error: Error) {} + async fn outbound_error(&mut self, _error: Error) {} } /// The Relay RPC client. diff --git a/relay_client/src/client/connection.rs b/relay_client/src/client/connection.rs index 4186a19..f8b874f 100644 --- a/relay_client/src/client/connection.rs +++ b/relay_client/src/client/connection.rs @@ -49,7 +49,7 @@ pub(super) async fn connection_event_loop( let result = conn.connect(*opts).await; if result.is_ok() { - handler.connected(); + handler.connected().await; } tx.send(result).ok(); @@ -67,7 +67,7 @@ pub(super) async fn connection_event_loop( // Control TX has been dropped, shutting down. None => { conn.disconnect().await.ok(); - handler.disconnected(None); + handler.disconnected(None).await; break; } } @@ -76,20 +76,20 @@ pub(super) async fn connection_event_loop( event = conn.select_next_some() => { match event { StreamEvent::InboundSubscriptionRequest(request) => { - handler.message_received(PublishedMessage::from_request(&request)); + handler.message_received(PublishedMessage::from_request(&request)).await; request.respond(Ok(true)).ok(); } StreamEvent::InboundError(error) => { - handler.inbound_error(error); + handler.inbound_error(error).await; } StreamEvent::OutboundError(error) => { - handler.outbound_error(error); + handler.outbound_error(error).await; } StreamEvent::ConnectionClosed(frame) => { - handler.disconnected(frame); + handler.disconnected(frame).await; conn.reset(); } }