Skip to content
Permalink
Browse files

Merge pull request #684 from input-output-hk/new-blockchain-process-b…

…locks

Added block processing for new blockchain
  • Loading branch information...
NicolasDP committed Aug 13, 2019
2 parents f81a8e8 + c9a83b3 commit 59dcb29844a3485658c6595d476310d6017be1c1
@@ -388,14 +388,10 @@ impl Blockchain {
}
}

/// TODO: document this function
///
/// apply the block on the blockchain from a post checked header
///
pub fn apply_block(
fn apply_block(
&mut self,
post_checked_header: PostCheckedHeader,
block: Block,
block: &Block,
) -> impl Future<Item = Ref, Error = Error> {
let header = post_checked_header.header;
let block_id = header.hash();
@@ -429,6 +425,23 @@ impl Blockchain {
})
}

/// Apply the block on the blockchain from a post checked header
/// and add it to the storage.
pub fn apply_and_store_block(
&mut self,
post_checked_header: PostCheckedHeader,
block: Block,
) -> impl Future<Item = Ref, Error = Error> {
let mut storage = self.storage.clone();
self.apply_block(post_checked_header, &block)
.and_then(move |block_ref| {
storage
.put_block(block)
.map_err(|e| e.into())
.and_then(move |()| Ok(block_ref))
})
}

/// Apply the given block0 in the blockchain (updating the RefCache and the other objects)
///
/// This function returns the created block0 branch. Having it will
@@ -650,7 +663,7 @@ impl Blockchain {
}
})
.and_then(move |post_checked_header: PostCheckedHeader| {
self6.apply_block(post_checked_header, block)
self6.apply_block(post_checked_header, &block)
})
.and_then(move |new_ref| {
branch
@@ -664,4 +677,15 @@ impl Blockchain {
})
})
}

pub fn get_checkpoints(
&self,
branch: Branch,
) -> impl Future<Item = Vec<HeaderHash>, Error = Error> {
let storage = self.storage.clone();
branch
.get_ref()
.map_err(|_| unreachable!())
.and_then(move |tip| storage.get_checkpoints(*tip.hash()).map_err(|e| e.into()))
}
}
@@ -1,16 +1,23 @@
use super::{Blockchain, Branch, Error, ErrorKind, PreCheckedHeader, Ref};
use crate::{
blockchain::{Blockchain, Branch, PreCheckedHeader},
blockcfg::{Block, Header, HeaderHash},
intercom::{BlockMsg, NetworkMsg},
leadership::NewEpochToSchedule,
network::p2p::topology::NodeId,
stats_counter::StatsCounter,
utils::{
async_msg::MessageBox,
task::{Input, TokioServiceInfo},
},
};
use chain_core::property::HasHeader as _;
use chain_core::property::{Block as _, HasHeader as _};

use futures::future::Either;
use slog::Logger;
use tokio::{prelude::*, sync::mpsc::Sender};

use std::convert::identity;

pub fn handle_input(
info: &TokioServiceInfo,
blockchain: &mut Blockchain,
@@ -32,20 +39,9 @@ pub fn handle_input(
match bquery {
BlockMsg::LeadershipExpectEndOfEpoch(epoch) => unimplemented!(),
BlockMsg::LeadershipBlock(block) => {
let header = block.header();

match blockchain.pre_check_header(header).wait().unwrap() {
PreCheckedHeader::HeaderWithCache { header, parent_ref } => {
let pch = blockchain
.post_check_header(header, parent_ref)
.wait()
.unwrap();
let new_block_ref = blockchain.apply_block(pch, block).wait().unwrap();

blockchain_tip.update_ref(new_block_ref).wait().unwrap();
}
_ => unimplemented!(),
}
let future = process_leadership_block(blockchain.clone(), block);
let new_block_ref = future.wait().unwrap();
blockchain_tip.update_ref(new_block_ref).wait().unwrap();
}
BlockMsg::AnnouncedBlock(header, node_id) => unimplemented!(),
BlockMsg::NetworkBlock(block, reply) => unimplemented!(),
@@ -54,3 +50,151 @@ pub fn handle_input(

Ok(())
}

pub fn process_leadership_block(
mut blockchain: Blockchain,
block: Block,
) -> impl Future<Item = Ref, Error = Error> {
let mut end_blockchain = blockchain.clone();
let header = block.header();
let parent_hash = block.parent_id();
// This is a trusted block from the leadership task,
// so we can skip pre-validation.
blockchain
.get_ref(parent_hash)
.and_then(move |parent| {
if let Some(parent_ref) = parent {
Either::A(blockchain.post_check_header(header, parent_ref))
} else {
Either::B(future::err(
ErrorKind::MissingParentBlockFromStorage(header).into(),
))
}
})
.and_then(move |post_checked| end_blockchain.apply_and_store_block(post_checked, block))
}

pub fn process_block_announcement(
mut blockchain: Blockchain,
branch: Branch,
header: Header,
node_id: NodeId,
mut network_msg_box: MessageBox<NetworkMsg>,
logger: Logger,
) -> impl Future<Item = (), Error = Error> {
blockchain
.pre_check_header(header)
.and_then(move |pre_checked| match pre_checked {
PreCheckedHeader::AlreadyPresent { .. } => {
debug!(logger, "block is already present");
Either::A(future::ok(()))
}
PreCheckedHeader::MissingParent { header, .. } => {
debug!(logger, "block is missing a locally stored parent");
let to = header.hash();
Either::B(blockchain.get_checkpoints(branch).map(move |from| {
network_msg_box
.try_send(NetworkMsg::PullHeaders { node_id, from, to })
.unwrap_or_else(move |err| {
error!(
logger,
"cannot send PullHeaders request to network: {}", err
)
});
}))
}
PreCheckedHeader::HeaderWithCache { header, parent_ref } => {
debug!(
logger,
"Announced block has a locally stored parent, fetch it"
);
network_msg_box
.try_send(NetworkMsg::GetNextBlock(node_id, header.hash()))
.unwrap_or_else(move |err| {
error!(
logger,
"cannot send GetNextBlock request to network: {}", err
)
});
Either::A(future::ok(()))
}
})
}

pub fn process_network_block(
mut blockchain: Blockchain,
block: Block,
mut network_msg_box: MessageBox<NetworkMsg>,
logger: Logger,
) -> impl Future<Item = (), Error = Error> {
let mut end_blockchain = blockchain.clone();
let header = block.header();
blockchain
.pre_check_header(header)
.and_then(move |pre_checked| match pre_checked {
PreCheckedHeader::AlreadyPresent { .. } => {
debug!(logger, "block is already present");
Either::A(future::ok(()))
}
PreCheckedHeader::MissingParent { header, .. } => {
debug!(logger, "block is missing a locally stored parent");
Either::A(future::err(
ErrorKind::MissingParentBlockFromStorage(header).into(),
))
}
PreCheckedHeader::HeaderWithCache { header, parent_ref } => {
let post_check_and_apply = blockchain
.post_check_header(header, parent_ref)
.and_then(move |post_checked| {
end_blockchain.apply_and_store_block(post_checked, block)
})
.map(move |_| {
// TODO: advance branch?
debug!(logger, "block successfully applied");
});
Either::B(post_check_and_apply)
}
})
}

pub fn process_chain_headers_into_block_request<S>(
mut blockchain: Blockchain,
headers: S,
logger: Logger,
) -> impl Future<Item = Vec<HeaderHash>, Error = Error>
where
S: Stream<Item = Header>,
{
headers
.map_err(|e| {
// TODO: map the incoming stream error to the result error
unimplemented!()
})
.and_then(move |header| {
blockchain
.pre_check_header(header)
.and_then(move |pre_checked| match pre_checked {
PreCheckedHeader::AlreadyPresent { .. } => {
// The block is already present. This may happen
// if the peer has started from an earlier checkpoint
// than our tip, so ignore this and proceed.
Ok(None)
}
PreCheckedHeader::MissingParent { header, .. } => {
// TODO: this fails on the first header after the
// immediate descendant of the local tip. Need branch storage
// that would store the whole header chain without blocks,
// so that the chain can be pre-validated first and blocks
// fetched afterwards in arbitrary order.
Err(ErrorKind::MissingParentBlockFromStorage(header).into())
}
PreCheckedHeader::HeaderWithCache { header, parent_ref } => {
// TODO: limit the headers to the single epoch
// before pausing to retrieve blocks.
Ok(Some(header.hash()))
}
})
})
.filter_map(identity)
.collect()
}
@@ -146,6 +146,22 @@ impl Storage {
}
})
}

pub fn get_checkpoints(
&self,
tip: HeaderHash,
) -> impl Future<Item = Vec<HeaderHash>, Error = StorageError> {
let mut inner = self.inner.clone();
future::poll_fn(move || Ok(inner.poll_lock())).and_then(move |store| {
let tip_info = store.get_block_info(&tip)?;
let mut checkpoints = Vec::new();
assert!(tip_info.depth > 0);
for_path_to_nth_ancestor(&*store, &tip, tip_info.depth - 1, |block_info| {
checkpoints.push(block_info.block_hash.clone());
})?;
Ok(checkpoints)
})
}
}

impl Stream for BlockStream {
@@ -162,7 +162,7 @@ fn handle_pull_blocks_to_tip(
None => {
return Err(Error::not_found(
"none of the starting points are found in the blockchain",
))
));
}
};

@@ -76,6 +76,7 @@ pub fn bootstrap_from_peer(
.join(branch.get_ref().map_err(|_| unreachable!()))
.and_then(|(mut client, tip)| {
let tip_hash = *tip.hash();
debug!(logger, "pulling blocks starting from {}", tip_hash);
client
.pull_blocks_to_tip(&[tip_hash])
.map_err(Error::PullRequestFailed)
@@ -134,7 +135,7 @@ fn handle_block(
})
.and_then(move |post_checked| {
end_blockchain
.apply_block(post_checked, block)
.apply_and_store_block(post_checked, block)
.map_err(Error::ApplyBlockFailed)
})
}

0 comments on commit 59dcb29

Please sign in to comment.
You can’t perform that action at this time.