Skip to content

Commit

Permalink
Merge pull request #3366 from Zeegomo/async-traits-update
Browse files Browse the repository at this point in the history
Async traits update
  • Loading branch information
Mikhail Zabaluev committed Jun 8, 2021
2 parents 2d1732a + b4ae2ab commit a09e3af
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 18 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion jormungandr/Cargo.toml
Expand Up @@ -24,7 +24,7 @@ cardano-legacy-address = { git = "https://github.com/input-output-hk/chain-libs.
imhamt = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }

arc-swap = "^1.1.0"
async-trait = "=0.1.42" # https://github.com/dtolnay/async-trait/issues/144
async-trait = "0.1.50"
async-graphql = "2.5.1"
async-graphql-warp = "2.6.0"
base64 = "0.13.0"
Expand Down
39 changes: 24 additions & 15 deletions jormungandr/src/network/service.rs
Expand Up @@ -5,7 +5,7 @@ use super::{
subscription, Channels, GlobalStateR,
};
use crate::blockcfg as app_data;
use crate::intercom::{self, BlockMsg, ClientMsg, TopologyMsg};
use crate::intercom::{self, BlockMsg, ClientMsg, RequestSink, TopologyMsg};
use crate::topology::{self, Gossips};
use crate::utils::async_msg::MessageBox;
use chain_network::core::server::{BlockService, FragmentService, GossipService, Node, PushStream};
Expand Down Expand Up @@ -116,6 +116,27 @@ fn serve_subscription<S: Stream>(sub: S) -> SubscriptionStream<S> {
sub.map(Ok)
}

// extracted as an external function as a workaround for
// https://github.com/dtolnay/async-trait/issues/144
async fn join_streams<T, V, E, R>(
stream: PushStream<T>,
sink: RequestSink<<T as Decode>::Object>,
reply: V,
) -> Result<(), Error>
where
T: Decode,
E: Into<Error>,
V: Future<Output = Result<R, E>>,
{
try_join!(
stream
.and_then(|header| async { header.decode() })
.forward(sink.sink_err_into()),
reply.err_into::<Error>(),
)?;
Ok(())
}

#[async_trait]
impl BlockService for NodeService {
type PullBlocksStream = ResponseStream<app_data::Block>;
Expand Down Expand Up @@ -221,13 +242,7 @@ impl BlockService for NodeService {
send_message(block_box, BlockMsg::ChainHeaders(handle))
.instrument(span)
.await?;
try_join!(
stream
.and_then(|header| async { header.decode() })
.forward(sink.sink_err_into()),
reply.err_into(),
)?;
Ok(())
join_streams(stream, sink, reply).await
}

async fn upload_blocks(&self, stream: PushStream<Block>) -> Result<(), Error> {
Expand All @@ -238,13 +253,7 @@ impl BlockService for NodeService {
send_message(block_box, BlockMsg::NetworkBlocks(handle))
.instrument(span)
.await?;
try_join!(
stream
.and_then(|block| async { block.decode() })
.forward(sink.sink_err_into()),
reply.err_into(),
)?;
Ok(())
join_streams(stream, sink, reply).await
}

async fn block_subscription(
Expand Down

0 comments on commit a09e3af

Please sign in to comment.