Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Initial: Offchain Workers #1942

Merged
merged 46 commits into from Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
28faa01
Refactor state-machine stuff.
tomusdrw Feb 20, 2019
d0597bf
Merge branch 'master' into td-state-machine
tomusdrw Feb 21, 2019
eb63210
Fix tests.
tomusdrw Feb 21, 2019
98cbd8d
WiP
tomusdrw Feb 20, 2019
4ba2fb3
WiP2
tomusdrw Feb 27, 2019
af67cf0
Merge branch 'master' into td-offlineworker
tomusdrw Mar 1, 2019
acd003e
Service support for offchain workers.
tomusdrw Mar 1, 2019
1d55220
Service support for offchain workers.
tomusdrw Mar 1, 2019
c12dbb9
Testing offchain worker.
tomusdrw Mar 1, 2019
95c22f3
Merge remote-tracking branch 'origin/td-offlineworker' into td-offlin…
tomusdrw Mar 1, 2019
c76c996
Merge branch 'master' into td-offlineworker
tomusdrw Mar 1, 2019
66d484d
Initial version working.
tomusdrw Mar 4, 2019
bf49911
Pass side effects in call.
tomusdrw Mar 4, 2019
45b6d68
Pass OffchainExt in context.
tomusdrw Mar 4, 2019
049a4e5
Submit extrinsics to the pool.
tomusdrw Mar 5, 2019
07082be
Merge branch 'master' into td-offlineworker
tomusdrw Mar 6, 2019
37afd7d
Support inherents.
tomusdrw Mar 6, 2019
c820a89
Insert to inherents pool.
tomusdrw Mar 6, 2019
646c20e
Inserting to the pool asynchronously.
tomusdrw Mar 6, 2019
f953ffe
Add test to offchain worker.
tomusdrw Mar 6, 2019
b10b7d2
Implement convenience syntax for modules.
tomusdrw Mar 6, 2019
2d8b6f9
Dispatching offchain worker through executive.
tomusdrw Mar 6, 2019
3f2a8ec
Fix offchain test.
tomusdrw Mar 7, 2019
301fd98
Remove offchain worker from timestamp.
tomusdrw Mar 7, 2019
e026333
Merge branch 'master' into td-offlineworker
tomusdrw Mar 7, 2019
f577822
Update Cargo.lock.
tomusdrw Mar 7, 2019
fa56420
Address review comments.
tomusdrw Mar 11, 2019
d1d6287
Merge branch 'master' into td-offlineworker
tomusdrw Mar 12, 2019
cba1e39
Use latest patch version for futures.
tomusdrw Mar 12, 2019
5f927a4
Add CLI parameter for offchain worker.
tomusdrw Mar 12, 2019
63c970a
Fix compilation.
tomusdrw Mar 12, 2019
34b3e3b
Fix test.
tomusdrw Mar 12, 2019
c669a1b
Fix extrinsics format for tests.
tomusdrw Mar 12, 2019
5778dcc
Fix RPC test.
tomusdrw Mar 13, 2019
e0219e3
Merge branch 'master' into td-offlineworker
tomusdrw Mar 14, 2019
79ae654
Merge branch 'master' into td-offlineworker
tomusdrw Mar 20, 2019
5713923
Bump spec version.
tomusdrw Mar 20, 2019
ed764bc
Fix executive.
tomusdrw Mar 20, 2019
f9c1621
Fix support macro.
tomusdrw Mar 20, 2019
a8d2f33
Merge branch 'master' into td-offlineworker
tomusdrw Mar 20, 2019
cbdb76f
Merge branch 'master' into td-offlineworker
tomusdrw Mar 21, 2019
5036cd2
Merge branch 'master' into td-offlineworker
tomusdrw Mar 22, 2019
d20980d
Address grumbles.
tomusdrw Mar 22, 2019
dd8a756
Merge branch 'master' into td-offlineworker
tomusdrw Mar 25, 2019
994918a
Merge remote-tracking branch 'origin/master' into td-offlineworker
gavofyork Mar 25, 2019
aa37a4c
Bump runtime
gavofyork Mar 25, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 48 additions & 4 deletions core/basic-authorship/src/basic_authorship.rs
Expand Up @@ -20,23 +20,22 @@
//
use std::{self, time, sync::Arc};

use log::{info, debug};
use log::{info, debug, warn};

use client::{
self, error, Client as SubstrateClient, CallExecutor,
block_builder::api::BlockBuilder as BlockBuilderApi, runtime_api::Core,
};
use codec::Decode;
use consensus_common::{self, evaluation};
use primitives::{H256, Blake2Hasher};
use primitives::{H256, Blake2Hasher, ExecutionContext};
use runtime_primitives::traits::{
Block as BlockT, Hash as HashT, Header as HeaderT, ProvideRuntimeApi, AuthorityIdFor
};
use runtime_primitives::ExecutionContext;
use runtime_primitives::generic::BlockId;
use runtime_primitives::ApplyError;
use transaction_pool::txpool::{self, Pool as TransactionPool};
use inherents::InherentData;
use inherents::{InherentData, pool::InherentsPool};

/// Build new blocks.
pub trait BlockBuilder<Block: BlockT> {
Expand Down Expand Up @@ -114,6 +113,8 @@ pub struct ProposerFactory<C, A> where A: txpool::ChainApi {
pub client: Arc<C>,
/// The transaction pool.
pub transaction_pool: Arc<TransactionPool<A>>,
/// The inherents pool
pub inherents_pool: Arc<InherentsPool<<A::Block as BlockT>::Extrinsic>>,
}

impl<C, A> consensus_common::Environment<<C as AuthoringApi>::Block> for ProposerFactory<C, A> where
Expand Down Expand Up @@ -143,6 +144,7 @@ impl<C, A> consensus_common::Environment<<C as AuthoringApi>::Block> for Propose
parent_id: id,
parent_number: *parent_header.number(),
transaction_pool: self.transaction_pool.clone(),
inherents_pool: self.inherents_pool.clone(),
now: Box::new(time::Instant::now),
};

Expand All @@ -157,6 +159,7 @@ pub struct Proposer<Block: BlockT, C, A: txpool::ChainApi> {
parent_id: BlockId<Block>,
parent_number: <<Block as BlockT>::Header as HeaderT>::Number,
transaction_pool: Arc<TransactionPool<A>>,
inherents_pool: Arc<InherentsPool<<Block as BlockT>::Extrinsic>>,
now: Box<Fn() -> time::Instant>,
}

Expand Down Expand Up @@ -200,11 +203,23 @@ impl<Block, C, A> Proposer<Block, C, A> where
&self.parent_id,
inherent_data,
|block_builder| {
// Add inherents from the internal pool

let inherents = self.inherents_pool.drain();
debug!("Pushing {} queued inherents.", inherents.len());
for i in inherents {
if let Err(e) = block_builder.push_extrinsic(i) {
warn!("Error while pushing inherent extrinsic from the pool: {:?}", e);
}
}

// proceed with transactions
let mut is_first = true;
let mut skipped = 0;
let mut unqueue_invalid = Vec::new();
let pending_iterator = self.transaction_pool.ready();

debug!("Attempting to push transactions from the pool.");
for pending in pending_iterator {
if (self.now)() > deadline {
debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
Expand Down Expand Up @@ -299,6 +314,7 @@ mod tests {
let proposer_factory = ProposerFactory {
client: client.clone(),
transaction_pool: txpool.clone(),
inherents_pool: Default::default(),
};

let mut proposer = proposer_factory.init(
Expand All @@ -321,4 +337,32 @@ mod tests {
assert_eq!(txpool.ready().count(), 2);
}

#[test]
fn should_include_inherents_from_the_pool() {
// given
let client = Arc::new(test_client::new());
let chain_api = transaction_pool::ChainApi::new(client.clone());
let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));
let inpool = Arc::new(InherentsPool::default());

let proposer_factory = ProposerFactory {
client: client.clone(),
transaction_pool: txpool.clone(),
inherents_pool: inpool.clone(),
};

inpool.add(extrinsic(0));

let proposer = proposer_factory.init(
&client.header(&BlockId::number(0)).unwrap().unwrap(),
&[]
).unwrap();

// when
let deadline = time::Duration::from_secs(3);
let block = proposer.propose(Default::default(), deadline).unwrap();

// then
assert_eq!(block.extrinsics().len(), 1);
}
}
16 changes: 12 additions & 4 deletions core/cli/src/lib.rs
Expand Up @@ -360,11 +360,19 @@ where
service::Roles::FULL
};

let exec = cli.execution_strategies;
config.execution_strategies = ExecutionStrategies {
syncing: cli.syncing_execution.into(),
importing: cli.importing_execution.into(),
block_construction: cli.block_construction_execution.into(),
other: cli.other_execution.into(),
syncing: exec.syncing_execution.into(),
importing: exec.importing_execution.into(),
block_construction: exec.block_construction_execution.into(),
offchain_worker: exec.offchain_worker_execution.into(),
other: exec.other_execution.into(),
};

config.offchain_worker = match (cli.offchain_worker, role) {
(params::OffchainWorkerEnabled::WhenValidating, service::Roles::AUTHORITY) => true,
(params::OffchainWorkerEnabled::Always, _) => true,
_ => false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use exhaustive matching so we don't trip up accidently if we add a new variant here.

Suggested change
_ => false,
(params::OffchainWorkerEnabled::Never, _) => false,

};

config.roles = role;
Expand Down
127 changes: 84 additions & 43 deletions core/cli/src/params.rs
Expand Up @@ -18,7 +18,6 @@ use crate::traits::{AugmentClap, GetLogFilter};

use std::path::PathBuf;
use structopt::{StructOpt, clap::{arg_enum, _clap_count_exprs, App, AppSettings, SubCommand}};
use client;

/// Auxialary macro to implement `GetLogFilter` for all types that have the `shared_params` field.
macro_rules! impl_get_log_filter {
Expand Down Expand Up @@ -53,6 +52,16 @@ impl Into<client::ExecutionStrategy> for ExecutionStrategy {
}
}

arg_enum! {
/// How to execute blocks
#[derive(Debug, Clone)]
pub enum OffchainWorkerEnabled {
Always,
Never,
WhenValidating,
}
}

/// Shared parameters used by all `CoreParams`.
#[derive(Debug, StructOpt, Clone)]
pub struct SharedParams {
Expand Down Expand Up @@ -122,6 +131,70 @@ pub struct TransactionPoolParams {
pub pool_kbytes: usize,
}

/// Execution strategies parameters.
#[derive(Debug, StructOpt, Clone)]
pub struct ExecutionStrategies {
/// The means of execution used when calling into the runtime while syncing blocks.
#[structopt(
long = "syncing-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""NativeElseWasm""#
)
)]
pub syncing_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while importing blocks.
#[structopt(
long = "importing-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""NativeElseWasm""#
)
)]
pub importing_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while constructing blocks.
#[structopt(
long = "block-construction-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""Wasm""#
)
)]
pub block_construction_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while constructing blocks.
#[structopt(
long = "offchain-worker-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""NativeWhenPossible""#
)
)]
pub offchain_worker_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while not syncing, importing or constructing blocks.
#[structopt(
long = "other-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""Wasm""#
)
)]
pub other_execution: ExecutionStrategy,
}

/// The `run` command used to run a node.
#[derive(Debug, StructOpt, Clone)]
pub struct RunCmd {
Expand Down Expand Up @@ -179,55 +252,23 @@ pub struct RunCmd {
#[structopt(long = "telemetry-url", value_name = "URL VERBOSITY", parse(try_from_str = "parse_telemetry_endpoints"))]
pub telemetry_endpoints: Vec<(String, u8)>,

/// The means of execution used when calling into the runtime while syncing blocks.
#[structopt(
long = "syncing-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""NativeElseWasm""#
)
)]
pub syncing_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while importing blocks.
#[structopt(
long = "importing-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""NativeElseWasm""#
)
)]
pub importing_execution: ExecutionStrategy,

/// The means of execution used when calling into the runtime while constructing blocks.
/// Should execute offchain workers on every block. By default it's only enabled for nodes that are authoring new
/// blocks.
#[structopt(
long = "block-construction-execution",
value_name = "STRATEGY",
long = "offchain-worker",
value_name = "ENABLED",
raw(
possible_values = "&ExecutionStrategy::variants()",
possible_values = "&OffchainWorkerEnabled::variants()",
case_insensitive = "true",
default_value = r#""Wasm""#
default_value = r#""WhenValidating""#
)
)]
pub block_construction_execution: ExecutionStrategy,
pub offchain_worker: OffchainWorkerEnabled,

/// The means of execution used when calling into the runtime while not syncing, importing or constructing blocks.
#[structopt(
long = "other-execution",
value_name = "STRATEGY",
raw(
possible_values = "&ExecutionStrategy::variants()",
case_insensitive = "true",
default_value = r#""Wasm""#
)
)]
pub other_execution: ExecutionStrategy,
#[allow(missing_docs)]
#[structopt(flatten)]
pub execution_strategies: ExecutionStrategies,


#[allow(missing_docs)]
#[structopt(flatten)]
pub shared_params: SharedParams,
Expand Down
8 changes: 4 additions & 4 deletions core/client/src/block_builder/block_builder.rs
Expand Up @@ -17,15 +17,15 @@
use super::api::BlockBuilder as BlockBuilderApi;
use std::vec::Vec;
use parity_codec::Encode;
use crate::blockchain::HeaderBackend;
use runtime_primitives::ApplyOutcome;
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{
Header as HeaderT, Hash, Block as BlockT, One, HashFor, ProvideRuntimeApi, ApiRef
};
use primitives::H256;
use runtime_primitives::generic::BlockId;
use primitives::{H256, ExecutionContext};
use crate::blockchain::HeaderBackend;
use crate::runtime_api::Core;
use crate::error;
use runtime_primitives::{ApplyOutcome, ExecutionContext};


/// Utility for building new (valid) blocks from a stream of extrinsics.
Expand Down