diff --git a/Cargo.lock b/Cargo.lock index 2431434437..ceb6393714 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -217,9 +217,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.42" +version = "0.1.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d3a45e77e34375a7923b1e8febb049bb011f064714a8e17a1a616fef01da13d" +checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" dependencies = [ "proc-macro2 1.0.27", "quote 1.0.9", diff --git a/jormungandr/Cargo.toml b/jormungandr/Cargo.toml index 0cad86f5e4..a87dccc1db 100644 --- a/jormungandr/Cargo.toml +++ b/jormungandr/Cargo.toml @@ -24,7 +24,7 @@ cardano-legacy-address = { git = "https://github.com/input-output-hk/chain-libs. imhamt = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" } arc-swap = "^1.1.0" -async-trait = "=0.1.42" # https://github.com/dtolnay/async-trait/issues/144 +async-trait = "0.1.50" async-graphql = "2.5.1" async-graphql-warp = "2.6.0" base64 = "0.13.0" diff --git a/jormungandr/src/network/service.rs b/jormungandr/src/network/service.rs index 04a71522b0..67cbb3c6f0 100644 --- a/jormungandr/src/network/service.rs +++ b/jormungandr/src/network/service.rs @@ -5,7 +5,7 @@ use super::{ subscription, Channels, GlobalStateR, }; use crate::blockcfg as app_data; -use crate::intercom::{self, BlockMsg, ClientMsg, TopologyMsg}; +use crate::intercom::{self, BlockMsg, ClientMsg, RequestSink, TopologyMsg}; use crate::topology::{self, Gossips}; use crate::utils::async_msg::MessageBox; use chain_network::core::server::{BlockService, FragmentService, GossipService, Node, PushStream}; @@ -116,6 +116,27 @@ fn serve_subscription(sub: S) -> SubscriptionStream { sub.map(Ok) } +// extracted as an external function as a workaround for +// https://github.com/dtolnay/async-trait/issues/144 +async fn join_streams( + stream: PushStream, + sink: RequestSink<::Object>, + reply: V, +) -> Result<(), Error> +where + T: Decode, + E: Into, + V: Future>, +{ + try_join!( + stream + .and_then(|header| async { header.decode() }) + .forward(sink.sink_err_into()), + reply.err_into::(), + )?; + Ok(()) +} + #[async_trait] impl BlockService for NodeService { type PullBlocksStream = ResponseStream; @@ -221,13 +242,7 @@ impl BlockService for NodeService { send_message(block_box, BlockMsg::ChainHeaders(handle)) .instrument(span) .await?; - try_join!( - stream - .and_then(|header| async { header.decode() }) - .forward(sink.sink_err_into()), - reply.err_into(), - )?; - Ok(()) + join_streams(stream, sink, reply).await } async fn upload_blocks(&self, stream: PushStream) -> Result<(), Error> { @@ -238,13 +253,7 @@ impl BlockService for NodeService { send_message(block_box, BlockMsg::NetworkBlocks(handle)) .instrument(span) .await?; - try_join!( - stream - .and_then(|block| async { block.decode() }) - .forward(sink.sink_err_into()), - reply.err_into(), - )?; - Ok(()) + join_streams(stream, sink, reply).await } async fn block_subscription(