Skip to content
Permalink
Browse files

Merge pull request #339 from input-output-hk/solicit-blocks

Block solicitation from network peers
  • Loading branch information...
NicolasDP committed May 14, 2019
2 parents ff1abb8 + 624a139 commit 65c5ea5658b66118c88602eb1318f9852e404e41

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
@@ -335,7 +335,7 @@ pub fn handle_block(
block: Block,
is_tip_candidate: bool,
) -> Result<HandledBlock, HandleBlockError> {
match header_triage(blockchain, block.header(), is_tip_candidate)? {
match header_triage(blockchain, &block.header(), is_tip_candidate)? {
BlockHeaderTriage::NotOfInterest { reason } => Ok(HandledBlock::Rejected { reason }),
BlockHeaderTriage::MissingParentOrBranch { to } => {
// the block is not directly connected to any block
@@ -413,7 +413,7 @@ fn process_block(

pub fn header_triage(
blockchain: &Blockchain,
header: Header,
header: &Header,
is_tip_candidate: bool,
) -> Result<BlockHeaderTriage, HandleBlockError> {
let block_id = header.id();
@@ -429,7 +429,7 @@ pub fn header_triage(
let (block_tip, _) = blockchain.get_block_tip()?;

if let Some(leadership) = blockchain.get_leadership_or_build(block_date.epoch, &parent_id) {
match leadership.verify(&header) {
match leadership.verify(header) {
Verification::Success => {}
Verification::Failure(err) => {
return Ok(BlockHeaderTriage::NotOfInterest {
@@ -1,5 +1,5 @@
use crate::blockchain::chain::{self, BlockHeaderTriage, BlockchainR, HandledBlock};
use crate::intercom::{BlockMsg, NetworkPropagateMsg};
use crate::intercom::{BlockMsg, NetworkMsg, PropagateMsg};
use crate::rest::v0::node::stats::StatsCounter;
use crate::utils::{
async_msg::MessageBox,
@@ -12,7 +12,7 @@ pub fn handle_input(
info: &TokioServiceInfo,
blockchain: &BlockchainR,
_stats_counter: &StatsCounter,
network_propagate: &MessageBox<NetworkPropagateMsg>,
network_msg_box: &mut MessageBox<NetworkMsg>,
input: Input<BlockMsg>,
) {
let bquery = match input {
@@ -59,15 +59,47 @@ pub fn handle_input(
"date" => header.date().to_string()
);
slog_debug!(logger, "Header: {:?}", header);
network_propagate
.clone()
.send(NetworkPropagateMsg::Block(header));
network_msg_box.send(NetworkMsg::Propagate(PropagateMsg::Block(header)));
}
}
}
BlockMsg::AnnouncedBlock(header) => {
BlockMsg::NetworkBlock(block) => {
let mut blockchain = blockchain.lock_write();
match chain::handle_block(&mut blockchain, block, true).unwrap() {
HandledBlock::Rejected { reason } => {
// TODO: drop the network peer that has sent
// an invalid block.
slog_warn!(logger, "rejecting block from the network: {:?}", reason);
}
HandledBlock::MissingBranchToBlock { to } => {
// This is abnormal because we have received a block
// that is not connected to preceding blocks, which
// should not happen as we solicit blocks in descending
// order.
//
// TODO: drop the network peer that has sent
// the wrong block.
slog_warn!(
logger,
"disconnected block received, missing intermediate blocks to {}",
to
);
}
HandledBlock::Acquired { header } => {
slog_info!(logger,
"block added successfully to Node's blockchain";
"id" => header.id().to_string(),
"date" => format!("{}.{}", header.date().epoch, header.date().slot_id)
);
slog_debug!(logger, "Header: {:?}", header);
// Propagate the block to other nodes
network_msg_box.send(NetworkMsg::Propagate(PropagateMsg::Block(header)));
}
}
}
BlockMsg::AnnouncedBlock(header, node_id) => {
let blockchain = blockchain.lock_read();
match chain::header_triage(&blockchain, header, false).unwrap() {
match chain::header_triage(&blockchain, &header, false).unwrap() {
BlockHeaderTriage::NotOfInterest { reason } => {
slog_info!(logger, "rejecting block announcement: {:?}", reason);
}
@@ -84,9 +116,7 @@ pub fn handle_input(
}
BlockHeaderTriage::ProcessBlockToState => {
slog_info!(logger, "Block announcement is interesting, fetch block");
// TODO: signal back to the network that the block is interesting
// (get block/request block)
unimplemented!()
network_msg_box.send(NetworkMsg::GetBlocks(node_id, vec![header.id()]));
}
}
}
@@ -1,4 +1,5 @@
use crate::blockcfg::{Block, Header, HeaderHash, Message, MessageId};
use crate::network::p2p_topology::NodeId;

use network_core::error as core_error;

@@ -267,32 +268,31 @@ impl Debug for ClientMsg {
}

/// General Block Message for the block task
#[derive(Debug)]
pub enum BlockMsg {
/// A trusted Block has been received from the leadership task
LeadershipBlock(Block),
/// Leadership process expect a new end of epoch
LeadershipExpectEndOfEpoch,
/// An untrusted Block has been received from the network task
NetworkBlock(Block),
/// A untrusted block Header has been received from the network task
AnnouncedBlock(Header),
}

impl Debug for BlockMsg {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use BlockMsg::*;
match self {
LeadershipBlock(block) => f.debug_tuple("LeadershipBlock").field(block).finish(),
LeadershipExpectEndOfEpoch => f.debug_tuple("LeadershipExpectEndOfEpoch").finish(),
AnnouncedBlock(header) => f.debug_tuple("AnnouncedBlock").field(header).finish(),
}
}
AnnouncedBlock(Header, NodeId),
}

/// Message to propagate to the connected peers.
/// Propagation requests for the network task.
#[derive(Clone, Debug)]
pub enum NetworkPropagateMsg {
pub enum PropagateMsg {
Block(Header),
Message(Message),
}

/// Messages to the network task.
#[derive(Clone, Debug)]
pub enum NetworkMsg {
Propagate(PropagateMsg),
GetBlocks(NodeId, Vec<HeaderHash>),
}

#[cfg(test)]
mod tests {}
@@ -61,7 +61,6 @@ use chain_impl_mockchain::message::{Message, MessageId};
use crate::{
blockcfg::Leader,
blockchain::BlockchainR,
intercom::BlockMsg,
rest::v0::node::stats::StatsCounter,
settings::start::Settings,
transaction::TPool,
@@ -108,7 +107,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
let tpool = Arc::new(RwLock::new(tpool_data));

// initialize the network propagation channel
let (network_msgbox, network_queue) = async_msg::channel(NETWORK_TASK_QUEUE_LEN);
let (mut network_msgbox, network_queue) = async_msg::channel(NETWORK_TASK_QUEUE_LEN);
let new_epoch_notifier = bootstrapped_node.new_epoch_notifier;

let stats_counter = StatsCounter::default();
@@ -126,7 +125,13 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
let blockchain = bootstrapped_node.blockchain.clone();
let stats_counter = stats_counter.clone();
services.spawn_future_with_inputs("block", move |info, input| {
blockchain::handle_input(info, &blockchain, &stats_counter, &network_msgbox, input);
blockchain::handle_input(
info,
&blockchain,
&stats_counter,
&mut network_msgbox,
input,
);
futures::future::ok(())
})
};
@@ -11,6 +11,7 @@ use crate::{
use network_core::{
client::{block::BlockService, gossip::GossipService},
gossip::Node,
subscription::BlockEvent,
};
use network_grpc::{
client::{Connect, Connection},
@@ -22,15 +23,14 @@ use http::uri;
use tokio::{executor::DefaultExecutor, net::TcpStream, runtime};
use tower_service::Service as _;

use std::{net::SocketAddr, slice};
use std::slice;

pub fn connect(
addr: SocketAddr,
state: ConnectionState,
channels: Channels,
) -> impl Future<Item = (p2p::NodeId, propagate::PeerHandles), Error = ()> {
info!("connecting to subscription peer {}", state.connection);
info!("address: {}", addr);
let addr = state.connection;
let peer = grpc_peer::TcpPeer::new(addr);
let origin = origin_authority(addr);

@@ -51,10 +51,16 @@ fn subscribe(
) -> impl Future<Item = (p2p::NodeId, propagate::PeerHandles), Error = ()> {
let block_box = channels.block_box;
let mut prop_handles = propagate::PeerHandles::new();
let block_sub = client.block_subscription(prop_handles.blocks.subscribe());
let gossip_sub = client.gossip_subscription(prop_handles.gossip.subscribe());
block_sub
.join(gossip_sub)
let block_sub_outbound = prop_handles.blocks.subscribe().map(|event| match event {
BlockEvent::Announce(header) => header,
BlockEvent::Solicit(_) => panic!("client connection used to solicit blocks"),
});
let block_req = client.block_subscription(block_sub_outbound);
let gossip_req = client.gossip_subscription(prop_handles.gossip.subscribe());
// TODO: decide if this is the way to make block requests
prop_handles.client = Some(client);
block_req
.join(gossip_req)
.map_err(move |err| {
error!("Subscription request failed: {:?}", err);
})
@@ -66,7 +72,15 @@ fn subscribe(
);
return Err(());
}
subscription::process_blocks(block_sub, block_box);
let block_sub = block_sub.map(|event| match event {
BlockEvent::Announce(header) => header,
BlockEvent::Solicit(_) => {
// TODO: fetch blocks from client request task
// and upload them
unimplemented!()
}
});
subscription::process_block_announcements(node_id, block_sub, block_box);
subscription::process_gossip(gossip_sub, global_state);
Ok((node_id, prop_handles))
})
Oops, something went wrong.

0 comments on commit 65c5ea5

Please sign in to comment.
You can’t perform that action at this time.