Skip to content

Commit

Permalink
Move propose_with into a dedicated blocking threadpool (paritytech#614)
Browse files Browse the repository at this point in the history
  • Loading branch information
sorpaas committed Nov 26, 2019
1 parent a99977f commit 00e167f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 24 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tokio = "0.1.22"
derive_more = "0.14.1"
log = "0.4.8"
exit-future = "0.1.4"
tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] }
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
availability_store = { package = "polkadot-availability-store", path = "../availability-store" }
parachain = { package = "polkadot-parachain", path = "../parachain" }
Expand Down
95 changes: 71 additions & 24 deletions validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use std::{
pin::Pin,
sync::Arc,
time::{self, Duration, Instant},
task::{Poll, Context}
task::{Poll, Context},
mem,
};

use babe_primitives::BabeApi;
Expand Down Expand Up @@ -498,7 +499,7 @@ impl<C, N, P, SC, TxApi> ProposerFactory<C, N, P, SC, TxApi> where
impl<C, N, P, SC, TxApi> consensus::Environment<Block> for ProposerFactory<C, N, P, SC, TxApi> where
C: Collators + Send + 'static,
N: Network,
TxApi: PoolChainApi<Block=Block>,
TxApi: PoolChainApi<Block=Block> + 'static,
P: ProvideRuntimeApi + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> +
BlockBuilderApi<Block> +
Expand Down Expand Up @@ -557,8 +558,8 @@ pub struct Proposer<C: Send + Sync, TxApi: PoolChainApi> where
}

impl<C, TxApi> consensus::Proposer<Block> for Proposer<C, TxApi> where
TxApi: PoolChainApi<Block=Block>,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
TxApi: PoolChainApi<Block=Block> + 'static,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = client_error::Error>,
{
type Error = Error;
Expand Down Expand Up @@ -616,18 +617,20 @@ impl<C, TxApi> consensus::Proposer<Block> for Proposer<C, TxApi> where
};

Either::Left(CreateProposal {
parent_hash: self.parent_hash.clone(),
parent_number: self.parent_number.clone(),
parent_id: self.parent_id.clone(),
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
table: self.tracker.table.clone(),
believed_minimum_timestamp: believed_timestamp,
timing,
inherent_data: Some(inherent_data),
inherent_digests,
// leave some time for the proposal finalisation
deadline,
state: CreateProposalState::Pending(CreateProposalData {
parent_hash: self.parent_hash.clone(),
parent_number: self.parent_number.clone(),
parent_id: self.parent_id.clone(),
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
table: self.tracker.table.clone(),
believed_minimum_timestamp: believed_timestamp,
timing,
inherent_data: Some(inherent_data),
inherent_digests,
// leave some time for the proposal finalisation
deadline,
})
})
}
}
Expand Down Expand Up @@ -686,6 +689,21 @@ impl ProposalTiming {

/// Future which resolves upon the creation of a proposal.
pub struct CreateProposal<C: Send + Sync, TxApi: PoolChainApi> {
state: CreateProposalState<C, TxApi>,
}

/// Current status of the proposal future.
enum CreateProposalState<C: Send + Sync, TxApi: PoolChainApi> {
/// Pending inclusion, with given proposal data.
Pending(CreateProposalData<C, TxApi>),
/// Represents the state when we switch from pending to fired.
Switching,
/// Block proposing has fired.
Fired(tokio_executor::blocking::Blocking<Result<Block, Error>>),
}

/// Inner data of the create proposal.
struct CreateProposalData<C: Send + Sync, TxApi: PoolChainApi> {
parent_hash: Hash,
parent_number: BlockNumber,
parent_id: BlockId,
Expand All @@ -699,12 +717,12 @@ pub struct CreateProposal<C: Send + Sync, TxApi: PoolChainApi> {
deadline: Instant,
}

impl<C, TxApi> CreateProposal<C, TxApi> where
impl<C, TxApi> CreateProposalData<C, TxApi> where
TxApi: PoolChainApi<Block=Block>,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = client_error::Error>,
{
fn propose_with(&mut self, candidates: Vec<AttestedCandidate>) -> Result<Block, Error> {
fn propose_with(mut self, candidates: Vec<AttestedCandidate>) -> Result<Block, Error> {
use block_builder::BlockBuilder;
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};

Expand Down Expand Up @@ -792,22 +810,51 @@ impl<C, TxApi> CreateProposal<C, TxApi> where
}

impl<C, TxApi> futures03::Future for CreateProposal<C, TxApi> where
TxApi: PoolChainApi<Block=Block>,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
TxApi: PoolChainApi<Block=Block> + 'static,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = client_error::Error>,
{
type Output = Result<Block, Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut state = CreateProposalState::Switching;
mem::swap(&mut state, &mut self.state);

// 1. try to propose if we have enough includable candidates and other
// delays have concluded.
let included = self.table.includable_count();
futures03::ready!(self.timing.poll(cx, included));
let data = match state {
CreateProposalState::Pending(mut data) => {
let included = data.table.includable_count();
match data.timing.poll(cx, included) {
Poll::Pending => {
self.state = CreateProposalState::Pending(data);
return Poll::Pending
},
Poll::Ready(()) => (),
}

data
},
CreateProposalState::Switching => return Poll::Pending,
CreateProposalState::Fired(mut future) => {
let ret = Pin::new(&mut future).poll(cx);
self.state = CreateProposalState::Fired(future);
return ret
},
};

// 2. propose
let proposed_candidates = self.table.proposed_set();
let future = tokio_executor::blocking::run(move || {
let proposed_candidates = data.table.proposed_set();
data.propose_with(proposed_candidates)
});
self.state = CreateProposalState::Fired(future);

Poll::Ready(self.propose_with(proposed_candidates))
match &mut self.state {
CreateProposalState::Fired(future) => Pin::new(future).poll(cx),
CreateProposalState::Switching | CreateProposalState::Pending(_) =>
Poll::Pending,
}
}
}

Expand Down

0 comments on commit 00e167f

Please sign in to comment.