From 7fa88af0271db659de9274c94cb8e7eead0e4289 Mon Sep 17 00:00:00 2001 From: Ashley Date: Mon, 9 Dec 2019 23:17:02 +0100 Subject: [PATCH] Revert "Asyncify network functions" This reverts commit f20ae6548dc482cb1e75bc80641cfe55c6131a53. --- network/src/lib.rs | 32 ++++++++++++------ network/src/router.rs | 58 ++++++++++++++++----------------- network/src/tests/validation.rs | 6 ++-- network/src/validation.rs | 7 ++-- 4 files changed, 58 insertions(+), 45 deletions(-) diff --git a/network/src/lib.rs b/network/src/lib.rs index fd6217364c41..312e0ed69d2a 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -28,6 +28,7 @@ pub mod gossip; use codec::{Decode, Encode}; use futures::channel::{oneshot, mpsc}; use futures::prelude::*; +use futures::future::Either; use polkadot_primitives::{Block, Hash, Header}; use polkadot_primitives::parachain::{ Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock, @@ -825,25 +826,34 @@ impl PolkadotProtocol { /// This should be called by a collator intending to get the locally-collated /// block into the hands of validators. /// It also places the outgoing message and block data in the local availability store. - pub async fn add_local_collation( + pub fn add_local_collation( &mut self, ctx: &mut dyn Context, relay_parent: Hash, targets: HashSet, collation: Collation, outgoing_targeted: OutgoingMessages, - ) { + ) -> impl Future { debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", relay_parent, collation.info.parachain_index); - if let Some(ref availability_store) = self.availability_store { - let collation_cloned = collation.clone(); - let _ = availability_store.make_available(av_store::Data { - relay_parent, - parachain_id: collation_cloned.info.parachain_index, - block_data: collation_cloned.pov.block_data.clone(), - outgoing_queues: Some(outgoing_targeted.clone().into()), - }).await; + let res = match self.availability_store { + Some(ref availability_store) => { + let availability_store_cloned = availability_store.clone(); + let collation_cloned = collation.clone(); + Either::Left((async move { + let _ = availability_store_cloned.make_available(av_store::Data { + relay_parent, + parachain_id: collation_cloned.info.parachain_index, + block_data: collation_cloned.pov.block_data.clone(), + outgoing_queues: Some(outgoing_targeted.clone().into()), + }).await; + } + ) + .boxed() + ) + } + None => Either::Right(futures::future::ready(())), }; for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { @@ -860,6 +870,8 @@ impl PolkadotProtocol { warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary), } } + + res } /// Give the network protocol a handle to an availability store, used for diff --git a/network/src/router.rs b/network/src/router.rs index a5b78eb2741a..29fe3141aa87 100644 --- a/network/src/router.rs +++ b/network/src/router.rs @@ -175,7 +175,7 @@ impl Router w if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) { trace!(target: "validation", "driving statement work to completion"); - let work = select(work.boxed(), self.fetcher.exit().clone()) + let work = select(work, self.fetcher.exit().clone()) .map(drop); let _ = self.fetcher.executor().spawn(work); } @@ -193,35 +193,35 @@ impl Router w let knowledge = self.fetcher.knowledge().clone(); let attestation_topic = self.attestation_topic; let parent_hash = self.parent_hash(); - let api = self.fetcher.api().clone(); - - async move { - match producer.prime(api).validate().await { - Ok(validated) => { - // store the data before broadcasting statements, so other peers can fetch. - knowledge.lock().note_candidate( - candidate_hash, - Some(validated.0.pov_block().clone()), - validated.0.outgoing_messages().cloned(), - ); - - // propagate the statement. - // consider something more targeted than gossip in the future. - let statement = GossipStatement::new( - parent_hash, - match table.import_validated(validated.0) { - None => return, - Some(s) => s, - } - ); - - network.gossip_message(attestation_topic, statement.into()); - } - Err(err) => { - debug!(target: "p_net", "Failed to produce statements: {:?}", err); + + producer.prime(self.fetcher.api().clone()) + .validate() + .boxed() + .map_ok(move |validated| { + // store the data before broadcasting statements, so other peers can fetch. + knowledge.lock().note_candidate( + candidate_hash, + Some(validated.0.pov_block().clone()), + validated.0.outgoing_messages().cloned(), + ); + + // propagate the statement. + // consider something more targeted than gossip in the future. + let statement = GossipStatement::new( + parent_hash, + match table.import_validated(validated.0) { + None => return, + Some(s) => s, + } + ); + + network.gossip_message(attestation_topic, statement.into()); + }) + .map(|res| { + if let Err(e) = res { + debug!(target: "p_net", "Failed to produce statements: {:?}", e); } - } - } + }) } } diff --git a/network/src/tests/validation.rs b/network/src/tests/validation.rs index fc976f9bdea7..94b627367325 100644 --- a/network/src/tests/validation.rs +++ b/network/src/tests/validation.rs @@ -150,7 +150,7 @@ impl NetworkService for TestNetwork { fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { let (tx, rx) = mpsc::unbounded(); let _ = self.gossip.send_listener.unbounded_send((topic, tx)); - GossipMessageStream::new(rx.boxed()) + GossipMessageStream::new(Box::new(rx)) } fn gossip_message(&self, topic: Hash, message: GossipMessage) { @@ -419,8 +419,8 @@ impl av_store::ProvideGossipMessages for DummyGossipMessages { fn gossip_messages_for( &self, _topic: Hash - ) -> Pin + Send>> { - stream::empty().boxed() + ) -> Box + Send + Unpin> { + Box::new(stream::empty()) } fn gossip_erasure_chunk( diff --git a/network/src/validation.rs b/network/src/validation.rs index 2edeafbfb903..6635cc863a9a 100644 --- a/network/src/validation.rs +++ b/network/src/validation.rs @@ -158,13 +158,14 @@ impl ValidationNetwork where impl ValidationNetwork where N: NetworkService { /// Convert the given `CollatorId` to a `PeerId`. - pub async fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> Option { + pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> + impl Future> + Send + { let (send, recv) = oneshot::channel(); self.network.with_spec(move |spec, _| { let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned()); }); - - recv.map(|res| res.unwrap_or(None)).await + recv.map(|res| res.unwrap_or(None)) } /// Create a `Stream` of checked statements for the given `relay_parent`.