Skip to content

Commit

Permalink
Merge pull request #15 from paritytech/bkchr-select-chain
Browse files Browse the repository at this point in the history
Provide `SelectChain` implementation for parachains
  • Loading branch information
rphmeier committed Oct 8, 2019
2 parents c10f05e + 92aefe9 commit f55c9f9
Showing 1 changed file with 111 additions and 23 deletions.
134 changes: 111 additions & 23 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use substrate_client::{backend::{Backend, Finalizer}, CallExecutor, Client, BlockchainEvents};
use substrate_client::error::{Error as ClientError, Result as ClientResult};
use substrate_client::{
backend::{Backend, Finalizer}, CallExecutor, Client, BlockchainEvents,
error::{Error as ClientError, Result as ClientResult},
};
use substrate_primitives::{Blake2Hasher, H256};
use sr_primitives::generic::BlockId;
use sr_primitives::traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi};
use polkadot_primitives::{Hash as PHash, Block as PBlock};
use polkadot_primitives::parachain::{Id as ParaId, ParachainHost};
use sr_primitives::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi},
};
use substrate_consensus_common::{Error as ConsensusError, SelectChain as SelectChainT};

use polkadot_primitives::{
Hash as PHash, Block as PBlock, parachain::{Id as ParaId, ParachainHost},
};

use futures::{Stream, StreamExt, TryStreamExt, future, Future, TryFutureExt, FutureExt};
use codec::{Encode, Decode};
use codec::Decode;
use log::warn;

use std::sync::Arc;
use std::{sync::Arc, marker::PhantomData};

/// Helper for the local client.
pub trait LocalClient {
Expand Down Expand Up @@ -66,6 +73,13 @@ pub trait PolkadotClient: Clone {

/// Get a stream of finalized heads.
fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::Finalized>;

/// Returns the parachain head for the given `para_id` at the given block id.
fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>>;
}

/// Spawns a future that follows the Polkadot relay chain for the given parachain.
Expand Down Expand Up @@ -116,38 +130,112 @@ impl<B, E, Block, RA> LocalClient for Client<B, E, Block, RA> where
}
}

fn parachain_key(para_id: ParaId) -> substrate_primitives::storage::StorageKey {
const PREFIX: &[u8] = &*b"Parachains Heads";
para_id.using_encoded(|s| {
let mut v = PREFIX.to_vec();
v.extend(s);
substrate_primitives::storage::StorageKey(v)
})
}

impl<B, E, RA> PolkadotClient for Arc<Client<B, E, PBlock, RA>> where
B: Backend<PBlock, Blake2Hasher> + Send + Sync + 'static,
E: CallExecutor<PBlock, Blake2Hasher> + Send + Sync + 'static,
RA: ProvideRuntimeApi + Send + Sync + 'static,
RA::Api: ParachainHost<PBlock>,
Client<B, E, PBlock, RA>: ProvideRuntimeApi + Send + Sync + 'static,
<Client<B, E, PBlock, RA> as ProvideRuntimeApi>::Api: ParachainHost<PBlock>,
{
type Error = ClientError;

type Finalized = Box<dyn Stream<Item=Vec<u8>> + Send + Unpin>;

fn finalized_heads(&self, para_id: ParaId) -> ClientResult<Self::Finalized> {
let polkadot = self.clone();
let parachain_key = parachain_key(para_id);

let s = self.finality_notification_stream()
.filter_map(move |n|
future::ready(
polkadot.storage(&BlockId::hash(n.hash), &parachain_key)
.ok()
.and_then(|d| d.map(|d| d.0)),
polkadot.parachain_head_at(&BlockId::hash(n.hash), para_id).ok().and_then(|h| h),
),
);

Ok(Box::new(s))
}

fn parachain_head_at(
&self,
at: &BlockId<PBlock>,
para_id: ParaId,
) -> ClientResult<Option<Vec<u8>>> {
self.runtime_api().parachain_status(at, para_id).map(|s| s.map(|s| s.head_data.0))
}
}

/// Select chain implementation for parachains.
///
/// The actual behavior of the implementation depends on the select chain implementation used by
/// Polkadot.
pub struct SelectChain<Block, PC, SC> {
polkadot_client: PC,
polkadot_select_chain: SC,
para_id: ParaId,
_marker: PhantomData<Block>,
}

impl<Block, PC, SC> SelectChain<Block, PC, SC> {
/// Create new instance of `Self`.
///
/// - `para_id`: The id of the parachain.
/// - `polkadot_client`: The client of the Polkadot node.
/// - `polkadot_select_chain`: The Polkadot select chain implementation.
pub fn new(para_id: ParaId, polkadot_client: PC, polkadot_select_chain: SC) -> Self {
Self {
polkadot_client,
polkadot_select_chain,
para_id,
_marker: PhantomData,
}
}
}

impl<Block, PC: Clone, SC: Clone> Clone for SelectChain<Block, PC, SC> {
fn clone(&self) -> Self {
Self {
polkadot_client: self.polkadot_client.clone(),
polkadot_select_chain: self.polkadot_select_chain.clone(),
para_id: self.para_id,
_marker: PhantomData,
}
}
}

impl<Block, PC, SC> SelectChainT<Block> for SelectChain<Block, PC, SC> where
Block: BlockT<Hash=H256>,
PC: PolkadotClient + Clone + Send + Sync,
PC::Error: ToString,
SC: SelectChainT<PBlock>,
{
fn leaves(&self) -> Result<Vec<<Block as BlockT>::Hash>, ConsensusError> {
let leaves = self.polkadot_select_chain.leaves()?;
leaves.into_iter()
.filter_map(|l|
self.polkadot_client
.parachain_head_at(&BlockId::Hash(l), self.para_id)
.map(|h| h.and_then(|d| <<Block as BlockT>::Hash>::decode(&mut &d[..]).ok()))
.transpose()
)
.collect::<Result<Vec<_>, _>>()
.map_err(|e| ConsensusError::ChainLookup(e.to_string()))
}

fn best_chain(&self) -> Result<<Block as BlockT>::Header, ConsensusError> {
let best_chain = self.polkadot_select_chain.best_chain()?;
let para_best_chain = self.polkadot_client
.parachain_head_at(&BlockId::Hash(best_chain.hash()), self.para_id)
.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?;

match para_best_chain {
Some(best) => Decode::decode(&mut &best[..])
.map_err(|e|
ConsensusError::ChainLookup(
format!("Error decoding parachain head: {}", e.what()),
),
),
None => Err(
ConsensusError::ChainLookup(
"Could not find parachain head for best relay chain!".into()),
)
}
}
}

0 comments on commit f55c9f9

Please sign in to comment.