Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Initial: Offchain Workers #1942

Merged
merged 46 commits into from Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
28faa01
Refactor state-machine stuff.
tomusdrw Feb 20, 2019
d0597bf
Merge branch 'master' into td-state-machine
tomusdrw Feb 21, 2019
eb63210
Fix tests.
tomusdrw Feb 21, 2019
98cbd8d
WiP
tomusdrw Feb 20, 2019
4ba2fb3
WiP2
tomusdrw Feb 27, 2019
af67cf0
Merge branch 'master' into td-offlineworker
tomusdrw Mar 1, 2019
acd003e
Service support for offchain workers.
tomusdrw Mar 1, 2019
1d55220
Service support for offchain workers.
tomusdrw Mar 1, 2019
c12dbb9
Testing offchain worker.
tomusdrw Mar 1, 2019
95c22f3
Merge remote-tracking branch 'origin/td-offlineworker' into td-offlin…
tomusdrw Mar 1, 2019
c76c996
Merge branch 'master' into td-offlineworker
tomusdrw Mar 1, 2019
66d484d
Initial version working.
tomusdrw Mar 4, 2019
bf49911
Pass side effects in call.
tomusdrw Mar 4, 2019
45b6d68
Pass OffchainExt in context.
tomusdrw Mar 4, 2019
049a4e5
Submit extrinsics to the pool.
tomusdrw Mar 5, 2019
07082be
Merge branch 'master' into td-offlineworker
tomusdrw Mar 6, 2019
37afd7d
Support inherents.
tomusdrw Mar 6, 2019
c820a89
Insert to inherents pool.
tomusdrw Mar 6, 2019
646c20e
Inserting to the pool asynchronously.
tomusdrw Mar 6, 2019
f953ffe
Add test to offchain worker.
tomusdrw Mar 6, 2019
b10b7d2
Implement convenience syntax for modules.
tomusdrw Mar 6, 2019
2d8b6f9
Dispatching offchain worker through executive.
tomusdrw Mar 6, 2019
3f2a8ec
Fix offchain test.
tomusdrw Mar 7, 2019
301fd98
Remove offchain worker from timestamp.
tomusdrw Mar 7, 2019
e026333
Merge branch 'master' into td-offlineworker
tomusdrw Mar 7, 2019
f577822
Update Cargo.lock.
tomusdrw Mar 7, 2019
fa56420
Address review comments.
tomusdrw Mar 11, 2019
d1d6287
Merge branch 'master' into td-offlineworker
tomusdrw Mar 12, 2019
cba1e39
Use latest patch version for futures.
tomusdrw Mar 12, 2019
5f927a4
Add CLI parameter for offchain worker.
tomusdrw Mar 12, 2019
63c970a
Fix compilation.
tomusdrw Mar 12, 2019
34b3e3b
Fix test.
tomusdrw Mar 12, 2019
c669a1b
Fix extrinsics format for tests.
tomusdrw Mar 12, 2019
5778dcc
Fix RPC test.
tomusdrw Mar 13, 2019
e0219e3
Merge branch 'master' into td-offlineworker
tomusdrw Mar 14, 2019
79ae654
Merge branch 'master' into td-offlineworker
tomusdrw Mar 20, 2019
5713923
Bump spec version.
tomusdrw Mar 20, 2019
ed764bc
Fix executive.
tomusdrw Mar 20, 2019
f9c1621
Fix support macro.
tomusdrw Mar 20, 2019
a8d2f33
Merge branch 'master' into td-offlineworker
tomusdrw Mar 20, 2019
cbdb76f
Merge branch 'master' into td-offlineworker
tomusdrw Mar 21, 2019
5036cd2
Merge branch 'master' into td-offlineworker
tomusdrw Mar 22, 2019
d20980d
Address grumbles.
tomusdrw Mar 22, 2019
dd8a756
Merge branch 'master' into td-offlineworker
tomusdrw Mar 25, 2019
994918a
Merge remote-tracking branch 'origin/master' into td-offlineworker
gavofyork Mar 25, 2019
aa37a4c
Bump runtime
gavofyork Mar 25, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 48 additions & 4 deletions core/basic-authorship/src/basic_authorship.rs
Expand Up @@ -20,23 +20,22 @@
//
use std::{self, time, sync::Arc};

use log::{info, debug};
use log::{info, debug, warn};

use client::{
self, error, Client as SubstrateClient, CallExecutor,
block_builder::api::BlockBuilder as BlockBuilderApi, runtime_api::Core,
};
use codec::Decode;
use consensus_common::{self, evaluation};
use primitives::{H256, Blake2Hasher};
use primitives::{H256, Blake2Hasher, ExecutionContext};
use runtime_primitives::traits::{
Block as BlockT, Hash as HashT, Header as HeaderT, ProvideRuntimeApi, AuthorityIdFor
};
use runtime_primitives::ExecutionContext;
use runtime_primitives::generic::BlockId;
use runtime_primitives::ApplyError;
use transaction_pool::txpool::{self, Pool as TransactionPool};
use inherents::InherentData;
use inherents::{InherentData, pool::InherentsPool};

/// Build new blocks.
pub trait BlockBuilder<Block: BlockT> {
Expand Down Expand Up @@ -114,6 +113,8 @@ pub struct ProposerFactory<C, A> where A: txpool::ChainApi {
pub client: Arc<C>,
/// The transaction pool.
pub transaction_pool: Arc<TransactionPool<A>>,
/// The inherents pool
pub inherents_pool: Arc<InherentsPool<<A::Block as BlockT>::Extrinsic>>,
}

impl<C, A> consensus_common::Environment<<C as AuthoringApi>::Block> for ProposerFactory<C, A> where
Expand Down Expand Up @@ -143,6 +144,7 @@ impl<C, A> consensus_common::Environment<<C as AuthoringApi>::Block> for Propose
parent_id: id,
parent_number: *parent_header.number(),
transaction_pool: self.transaction_pool.clone(),
inherents_pool: self.inherents_pool.clone(),
now: Box::new(time::Instant::now),
};

Expand All @@ -157,6 +159,7 @@ pub struct Proposer<Block: BlockT, C, A: txpool::ChainApi> {
parent_id: BlockId<Block>,
parent_number: <<Block as BlockT>::Header as HeaderT>::Number,
transaction_pool: Arc<TransactionPool<A>>,
inherents_pool: Arc<InherentsPool<<Block as BlockT>::Extrinsic>>,
now: Box<Fn() -> time::Instant>,
}

Expand Down Expand Up @@ -200,11 +203,23 @@ impl<Block, C, A> Proposer<Block, C, A> where
&self.parent_id,
inherent_data,
|block_builder| {
// Add inherents from the internal pool

let inherents = self.inherents_pool.drain();
debug!("Pushing {} queued inherents.", inherents.len());
for i in inherents {
if let Err(e) = block_builder.push_extrinsic(i) {
warn!("Error while pushing inherent extrinsic from the pool: {:?}", e);
}
}

// proceed with transactions
let mut is_first = true;
let mut skipped = 0;
let mut unqueue_invalid = Vec::new();
let pending_iterator = self.transaction_pool.ready();

debug!("Attempting to push transactions from the pool.");
for pending in pending_iterator {
if (self.now)() > deadline {
debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
Expand Down Expand Up @@ -299,6 +314,7 @@ mod tests {
let proposer_factory = ProposerFactory {
client: client.clone(),
transaction_pool: txpool.clone(),
inherents_pool: Default::default(),
};

let mut proposer = proposer_factory.init(
Expand All @@ -321,4 +337,32 @@ mod tests {
assert_eq!(txpool.ready().count(), 2);
}

#[test]
fn should_include_inherents_from_the_pool() {
// given
let client = Arc::new(test_client::new());
let chain_api = transaction_pool::ChainApi::new(client.clone());
let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));
let inpool = Arc::new(InherentsPool::default());

let proposer_factory = ProposerFactory {
client: client.clone(),
transaction_pool: txpool.clone(),
inherents_pool: inpool.clone(),
};

inpool.add(extrinsic(0));

let proposer = proposer_factory.init(
&client.header(&BlockId::number(0)).unwrap().unwrap(),
&[]
).unwrap();

// when
let deadline = time::Duration::from_secs(3);
let block = proposer.propose(Default::default(), deadline).unwrap();

// then
assert_eq!(block.extrinsics().len(), 1);
}
}
3 changes: 2 additions & 1 deletion core/cli/src/lib.rs
Expand Up @@ -25,7 +25,7 @@ mod params;
pub mod error;
pub mod informant;

use client::ExecutionStrategies;
use client::{ExecutionStrategies, ExecutionStrategy};
use runtime_primitives::traits::As;
use service::{
ServiceFactory, FactoryFullConfiguration, RuntimeGenesis,
Expand Down Expand Up @@ -364,6 +364,7 @@ where
syncing: cli.syncing_execution.into(),
importing: cli.importing_execution.into(),
block_construction: cli.block_construction_execution.into(),
offchain_worker: ExecutionStrategy::NativeWhenPossible,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it always be NativeWhenPossible or configurable by the user at some point?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be configurable primarily to keep it consistent with all the other wasm execution. But I don't see any instance where you would not want to use native if it's there.

Maybe in debugging...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was not sure as well, but if it does not need to be configurable, we don't need to touch this struct at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add the option for consistency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that native workers are not consensus critical, NativeWhenPossible seems like the most reasonable strategy

other: cli.other_execution.into(),
};

Expand Down
8 changes: 4 additions & 4 deletions core/client/src/block_builder/block_builder.rs
Expand Up @@ -17,15 +17,15 @@
use super::api::BlockBuilder as BlockBuilderApi;
use std::vec::Vec;
use parity_codec::Encode;
use crate::blockchain::HeaderBackend;
use runtime_primitives::ApplyOutcome;
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{
Header as HeaderT, Hash, Block as BlockT, One, HashFor, ProvideRuntimeApi, ApiRef
};
use primitives::H256;
use runtime_primitives::generic::BlockId;
use primitives::{H256, ExecutionContext};
use crate::blockchain::HeaderBackend;
use crate::runtime_api::Core;
use crate::error;
use runtime_primitives::{ApplyOutcome, ExecutionContext};


/// Utility for building new (valid) blocks from a stream of extrinsics.
Expand Down
33 changes: 26 additions & 7 deletions core/client/src/call_executor.rs
Expand Up @@ -19,12 +19,12 @@ use parity_codec::{Encode, Decode};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::Block as BlockT;
use state_machine::{
self, OverlayedChanges, Ext, CodeExecutor, ExecutionManager, ExecutionStrategy
self, OverlayedChanges, Ext, CodeExecutor, ExecutionManager, ExecutionStrategy, NeverOffchainExt,
};
use executor::{RuntimeVersion, RuntimeInfo, NativeVersion};
use hash_db::Hasher;
use trie::MemoryDB;
use primitives::{H256, Blake2Hasher, NativeOrEncoded, NeverNativeValue};
use primitives::{H256, Blake2Hasher, NativeOrEncoded, NeverNativeValue, OffchainExt};

use crate::backend;
use crate::error;
Expand All @@ -42,12 +42,15 @@ where
/// Execute a call to a contract on top of state in a block of given hash.
///
/// No changes are made.
fn call(
fn call<
O: OffchainExt,
>(
&self,
id: &BlockId<B>,
method: &str,
call_data: &[u8],
strategy: ExecutionStrategy,
side_effects_handler: Option<&mut O>,
) -> Result<Vec<u8>, error::Error>;

/// Execute a contextual call on top of state in a block of a given hash.
Expand All @@ -56,6 +59,7 @@ where
/// Before executing the method, passed header is installed as the current header
/// of the execution context.
fn contextual_call<
O: OffchainExt,
PB: Fn() -> error::Result<B::Header>,
EM: Fn(
Result<NativeOrEncoded<R>, Self::Error>,
Expand All @@ -73,6 +77,7 @@ where
prepare_environment_block: PB,
execution_manager: ExecutionManager<EM>,
native_call: Option<NC>,
side_effects_handler: Option<&mut O>,
) -> error::Result<NativeOrEncoded<R>> where ExecutionManager<EM>: Clone;

/// Extract RuntimeVersion of given block
Expand All @@ -84,6 +89,7 @@ where
///
/// No changes are made.
fn call_at_state<
O: OffchainExt,
S: state_machine::Backend<H>,
F: FnOnce(
Result<NativeOrEncoded<R>, Self::Error>,
Expand All @@ -98,6 +104,7 @@ where
call_data: &[u8],
manager: ExecutionManager<F>,
native_call: Option<NC>,
side_effects_handler: Option<&mut O>,
) -> Result<(NativeOrEncoded<R>, S::Transaction, Option<MemoryDB<H>>), error::Error>;

/// Execute a call to a contract on top of given state, gathering execution proof.
Expand Down Expand Up @@ -140,7 +147,10 @@ pub struct LocalCallExecutor<B, E> {
impl<B, E> LocalCallExecutor<B, E> {
/// Creates new instance of local call executor.
pub fn new(backend: Arc<B>, executor: E) -> Self {
LocalCallExecutor { backend, executor }
LocalCallExecutor {
backend,
executor,
}
}
}

Expand All @@ -161,17 +171,19 @@ where
{
type Error = E::Error;

fn call(&self,
fn call<O: OffchainExt>(&self,
id: &BlockId<Block>,
method: &str,
call_data: &[u8],
strategy: ExecutionStrategy
strategy: ExecutionStrategy,
side_effects_handler: Option<&mut O>,
) -> error::Result<Vec<u8>> {
let mut changes = OverlayedChanges::default();
let state = self.backend.state_at(*id)?;
let return_data = state_machine::new(
&state,
self.backend.changes_trie_storage(),
side_effects_handler,
&mut changes,
&self.executor,
method,
Expand All @@ -187,6 +199,7 @@ where
}

fn contextual_call<
O: OffchainExt,
PB: Fn() -> error::Result<Block::Header>,
EM: Fn(
Result<NativeOrEncoded<R>, Self::Error>,
Expand All @@ -204,13 +217,15 @@ where
prepare_environment_block: PB,
execution_manager: ExecutionManager<EM>,
native_call: Option<NC>,
mut side_effects_handler: Option<&mut O>,
) -> Result<NativeOrEncoded<R>, error::Error> where ExecutionManager<EM>: Clone {
let state = self.backend.state_at(*at)?;
if method != "Core_initialise_block" && initialised_block.map(|id| id != *at).unwrap_or(true) {
let header = prepare_environment_block()?;
state_machine::new(
&state,
self.backend.changes_trie_storage(),
side_effects_handler.as_mut().map(|x| &mut **x),
changes,
&self.executor,
"Core_initialise_block",
Expand All @@ -226,6 +241,7 @@ where
let result = state_machine::new(
&state,
self.backend.changes_trie_storage(),
side_effects_handler,
changes,
&self.executor,
method,
Expand All @@ -248,12 +264,13 @@ where
fn runtime_version(&self, id: &BlockId<Block>) -> error::Result<RuntimeVersion> {
let mut overlay = OverlayedChanges::default();
let state = self.backend.state_at(*id)?;
let mut ext = Ext::new(&mut overlay, &state, self.backend.changes_trie_storage());
let mut ext = Ext::new(&mut overlay, &state, self.backend.changes_trie_storage(), NeverOffchainExt::new());
self.executor.runtime_version(&mut ext)
.ok_or(error::ErrorKind::VersionInvalid.into())
}

fn call_at_state<
O: OffchainExt,
S: state_machine::Backend<Blake2Hasher>,
F: FnOnce(
Result<NativeOrEncoded<R>, Self::Error>,
Expand All @@ -268,10 +285,12 @@ where
call_data: &[u8],
manager: ExecutionManager<F>,
native_call: Option<NC>,
side_effects_handler: Option<&mut O>,
) -> error::Result<(NativeOrEncoded<R>, S::Transaction, Option<MemoryDB<Blake2Hasher>>)> {
state_machine::new(
state,
self.backend.changes_trie_storage(),
side_effects_handler,
changes,
&self.executor,
method,
Expand Down