Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Companion PR to substrate#4289 #660

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,642 changes: 828 additions & 814 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions availability-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ log = "0.4.8"
futures01 = "0.1.17"
futures = { package = "futures", version = "0.3.1", features = ["compat"] }
tokio = "0.1.7"
exit-future = "0.1"
exit-future = "0.2.0"
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sc-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "polkadot-master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
sc-client = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
sp-runtime = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "ashley-update-exit-future" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
kvdb = "0.1.1"
kvdb-memorydb = "0.1.2"

Expand Down
39 changes: 15 additions & 24 deletions availability-store/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ use polkadot_primitives::parachain::{
CandidateReceipt, ParachainHost, ValidatorId,
ValidatorPair, AvailableMessages, BlockData, ErasureChunk,
};
use futures01::Future;
use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, Sink, SinkExt, TryFutureExt, StreamExt};
use futures::{FutureExt, Sink, SinkExt, TryFutureExt, StreamExt, future::select};
use keystore::KeyStorePtr;

use tokio::runtime::current_thread::{Handle, Runtime as LocalRuntime};
Expand Down Expand Up @@ -166,7 +165,7 @@ impl WorkerHandle {
impl Drop for WorkerHandle {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
let _ = signal.fire();
}

if let Some(thread) = self.thread.take() {
Expand Down Expand Up @@ -296,7 +295,7 @@ where
impl<PGM> Drop for Worker<PGM> {
fn drop(&mut self) {
for (_, signal) in self.registered_gossip_streams.drain() {
signal.fire();
let _ = signal.fire();
}
}
}
Expand Down Expand Up @@ -356,13 +355,10 @@ where
self.registered_gossip_streams.insert(topic, signal);

let _ = runtime_handle.spawn(
fut
.unit_error()
.boxed()
select(fut.boxed(), exit)
.map(|_| Ok(()))
.compat()
.select(exit)
.then(|_| Ok(()))
);
);

Ok(())
}
Expand Down Expand Up @@ -423,7 +419,7 @@ where
let topic = erasure_coding_topic(relay_parent, receipt.erasure_root, chunk.index);
// need to remove gossip listener and stop it.
if let Some(signal) = self.registered_gossip_streams.remove(&topic) {
signal.fire();
let _ = signal.fire();
}
}

Expand Down Expand Up @@ -594,15 +590,12 @@ where
};

runtime.spawn(
process_notification
.unit_error()
.boxed()
futures::future::select(process_notification.boxed(), exit.clone())
.map(|_| Ok(()))
.compat()
.select(exit.clone())
.then(|_| Ok(()))
);

if let Err(e) = runtime.block_on(exit) {
if let Err(e) = runtime.block_on(exit.unit_error().compat()) {
warn!(target: LOG_TARGET, "Availability worker error {:?}", e);
}

Expand Down Expand Up @@ -636,7 +629,7 @@ pub struct AvailabilityBlockImport<I, P> {
impl<I, P> Drop for AvailabilityBlockImport<I, P> {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
let _ = signal.fire();
}
}
}
Expand Down Expand Up @@ -775,12 +768,10 @@ impl<I, P> AvailabilityBlockImport<I, P> {
// dependent on the types of client and executor, which would prove
// not not so handy in the testing code.
let mut exit_signal = Some(signal);
let prune_available = prune_unneeded_availability(client.clone(), to_worker.clone())
.unit_error()
.boxed()
.compat()
.select(exit.clone())
.then(|_| Ok(()));
let prune_available = select(
prune_unneeded_availability(client.clone(), to_worker.clone()).boxed(),
exit.clone()
).map(|_| Ok(())).compat();

if let Err(_) = thread_pool.execute(Box::new(prune_available)) {
error!(target: LOG_TARGET, "Failed to spawn availability pruning task");
Expand Down
2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ tokio = "0.1.22"
futures = { version = "0.3.1", features = ["compat"] }
futures01 = { package = "futures", version = "0.1.29" }
structopt = "0.3.4"
cli = { package = "sc-cli", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
cli = { package = "sc-cli", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
service = { package = "polkadot-service", path = "../service" }

[features]
Expand Down
12 changes: 6 additions & 6 deletions collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ edition = "2018"
[dependencies]
futures01 = { package = "futures", version = "0.1.17" }
futures = { version = "0.3.1", features = ["compat"] }
client = { package = "sc-client", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
client-api = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
client = { package = "sc-client", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
client-api = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
polkadot-runtime = { path = "../runtime" }
polkadot-primitives = { path = "../primitives" }
polkadot-cli = { path = "../cli" }
Expand All @@ -24,4 +24,4 @@ tokio = "0.1.22"
futures-timer = "1.0"

[dev-dependencies]
keyring = { package = "sp-keyring", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
keyring = { package = "sp-keyring", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
25 changes: 12 additions & 13 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use std::time::Duration;

use futures::{
future, Future, Stream, FutureExt, TryFutureExt, StreamExt,
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt}
compat::{Future01CompatExt, Stream01CompatExt}
};
use futures01::{Future as _};
use log::{warn, error};
Expand Down Expand Up @@ -248,7 +248,7 @@ struct ApiContext<P, E> {
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
P: ProvideRuntimeApi + Send + Sync,
P::Api: ParachainHost<Block>,
E: futures01::Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
E: futures::Future<Output=()> + Clone + Send + Sync + 'static,
{
type Error = String;
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>;
Expand Down Expand Up @@ -277,19 +277,19 @@ struct CollationNode<P, E> {
}

impl<P, E> IntoExit for CollationNode<P, E> where
E: futures01::Future<Item=(),Error=()> + Unpin + Send + 'static
E: futures::Future<Output=()> + Unpin + Send + 'static
{
type Exit = future::Map<Compat01As03<E>, fn (Result<(), ()>) -> ()>;
type Exit = E;
fn into_exit(self) -> Self::Exit {
self.exit.compat().map(drop)
self.exit
}
}

impl<P, E> Worker for CollationNode<P, E> where
P: BuildParachainContext + Send + 'static,
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static,
E: futures01::Future<Item=(),Error=()> + Clone + Unpin + Send + Sync + 'static,
E: futures::Future<Output=()> + Clone + Unpin + Send + Sync + 'static,
{
type Work = Box<dyn Future<Output=()> + Unpin + Send>;

Expand Down Expand Up @@ -433,7 +433,8 @@ impl<P, E> Worker for CollationNode<P, E> where
outgoing,
);

tokio::spawn(res.select(inner_exit_2.clone()).then(|_| Ok(())));
let exit = inner_exit_2.clone().unit_error().compat();
tokio::spawn(res.select(exit).then(|_| Ok(())));
})
});

Expand All @@ -454,17 +455,15 @@ impl<P, E> Worker for CollationNode<P, E> where

let future = future::select(
silenced,
inner_exit.clone().map(|_| Ok::<_, ()>(())).compat()
inner_exit.clone()
).map(|_| Ok::<_, ()>(())).compat();

tokio::spawn(future);
future::ready(())
});

let work_and_exit = future::select(
work,
exit.map(|_| Ok::<_, ()>(())).compat()
).map(|_| ());
let work_and_exit = future::select(work, exit)
.map(|_| ());

Box::new(work_and_exit)
}
Expand Down Expand Up @@ -495,7 +494,7 @@ pub fn run_collator<P, E>(
P: BuildParachainContext + Send + 'static,
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static,
E: futures01::Future<Item = (),Error=()> + Unpin + Send + Clone + Sync + 'static,
E: futures::Future<Output = ()> + Unpin + Send + Clone + Sync + 'static,
{
let node_logic = CollationNode { build_parachain_context, exit, para_id, key };
polkadot_cli::run(node_logic, version)
Expand Down
4 changes: 2 additions & 2 deletions erasure-coding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ edition = "2018"
primitives = { package = "polkadot-primitives", path = "../primitives" }
reed_solomon = { package = "reed-solomon-erasure", git = "https://github.com/paritytech/reed-solomon-erasure" }
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
trie = { package = "sp-trie", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
trie = { package = "sp-trie", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
derive_more = "0.15.0"
2 changes: 1 addition & 1 deletion executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ description = "Polkadot node implementation in Rust."
edition = "2018"

[dependencies]
sc-executor = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sc-executor = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
polkadot-runtime = { path = "../runtime" }
16 changes: 8 additions & 8 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ polkadot-validation = { path = "../validation" }
polkadot-primitives = { path = "../primitives" }
polkadot-erasure-coding = { path = "../erasure-coding" }
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
futures = "0.1"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
log = "0.4.8"
exit-future = "0.1.4"
sc-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
exit-future = "0.2.0"
sc-client = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }

[dev-dependencies]
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future" }
7 changes: 4 additions & 3 deletions network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
{
/// Import a statement whose signature has been checked already.
pub(crate) fn import_statement(&self, statement: SignedStatement) {
Expand Down Expand Up @@ -174,7 +174,8 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w

if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "validation", "driving statement work to completion");
let work = work.select2(self.fetcher.exit().clone()).then(|_| Ok(()));
let exit = self.fetcher.exit().clone().unit_error().compat();
let work = work.select2(exit).then(|_| Ok(()));
self.fetcher.executor().spawn(work);
}
}
Expand Down Expand Up @@ -224,7 +225,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
{
type Error = io::Error;
type FetchValidationProof = validation::PoVReceiver;
Expand Down
15 changes: 10 additions & 5 deletions network/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use polkadot_primitives::parachain::{
use futures::prelude::*;
use futures::future::{self, Executor as FutureExecutor};
use futures::sync::oneshot::{self, Receiver};
use futures03::{FutureExt as _, TryFutureExt as _};

use std::collections::hash_map::{HashMap, Entry};
use std::io;
Expand Down Expand Up @@ -123,7 +124,7 @@ impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> {
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>,
E: Clone + Future<Item=(),Error=()> + Send + Sync + 'static,
E: Clone + futures03::Future<Output=()> + Send + Sync + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
Expand Down Expand Up @@ -206,7 +207,7 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
E: Clone + Future<Item=(),Error=()> + Send + Sync + 'static,
E: Clone + futures03::Future<Output=()> + Send + Sync + Unpin + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
Expand Down Expand Up @@ -242,8 +243,12 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where

let table_router_clone = table_router.clone();
let work = table_router.checked_statements()
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
executor.spawn(work.select(exit.clone()).map(|_| ()).map_err(|_| ()));
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) })
.select(exit.clone().unit_error().compat())
.map(|_| ())
.map_err(|_| ());

executor.spawn(work);

table_router
});
Expand Down Expand Up @@ -670,7 +675,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + 'static,
{
/// Fetch PoV block for the given candidate receipt.
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
Expand Down
4 changes: 2 additions & 2 deletions parachain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ codec = { package = "parity-scale-codec", version = "1.1.0", default-features =
wasmi = { version = "0.4.5", optional = true }
derive_more = { version = "0.14.1", optional = true }
serde = { version = "1.0.102", default-features = false, features = [ "derive" ] }
rstd = { package = "sp-std", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", default-features = false }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master", default-features = false }
rstd = { package = "sp-std", git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future", default-features = false }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "ashley-update-exit-future", default-features = false }
lazy_static = { version = "1.4.0", optional = true }
parking_lot = { version = "0.7.1", optional = true }
log = { version = "0.4.8", optional = true }
Expand Down
Loading