Skip to content
Permalink
Browse files

Rework block solicitation

Better encapsulation, simpler code.
  • Loading branch information...
mzabaluev committed May 16, 2019
1 parent 37d11fa commit c60b2485c0781dced7f7afa7b67e86e6ffdf1d12
Showing with 64 additions and 74 deletions.
  1. +30 −8 src/network/grpc/client.rs
  2. +4 −17 src/network/mod.rs
  3. +24 −46 src/network/propagate.rs
  4. +6 −3 src/network/service.rs
@@ -1,11 +1,13 @@
use super::origin_authority;
use crate::{
blockcfg::{Block, HeaderHash},
intercom::BlockMsg,
network::{
p2p_topology as p2p, propagate, subscription, BlockConfig, Channels, ConnectionState,
FetchBlockError, GlobalStateR,
},
settings::start::network::Peer,
utils::async_msg::MessageBox,
};

use network_core::{
@@ -51,14 +53,8 @@ fn subscribe(
) -> impl Future<Item = (p2p::NodeId, propagate::PeerHandles), Error = ()> {
let block_box = channels.block_box;
let mut prop_handles = propagate::PeerHandles::new();
let block_sub_outbound = prop_handles.blocks.subscribe().map(|event| match event {
BlockEvent::Announce(header) => header,
BlockEvent::Solicit(_) => panic!("client connection used to solicit blocks"),
});
let block_req = client.block_subscription(block_sub_outbound);
let block_req = client.block_subscription(prop_handles.blocks.subscribe());
let gossip_req = client.gossip_subscription(prop_handles.gossip.subscribe());
// TODO: decide if this is the way to make block requests
prop_handles.client = Some(client);
block_req
.join(gossip_req)
.map_err(move |err| {
@@ -80,12 +76,38 @@ fn subscribe(
unimplemented!()
}
});
subscription::process_block_announcements(node_id, block_sub, block_box);
subscription::process_block_announcements(node_id, block_sub, block_box.clone());
subscription::process_gossip(gossip_sub, global_state);
process_block_solicitations(client, &mut prop_handles, block_box);
Ok((node_id, prop_handles))
})
}

fn process_block_solicitations(
mut client: Connection<BlockConfig, TcpStream, DefaultExecutor>,
prop_handles: &mut propagate::PeerHandles,
block_box: MessageBox<BlockMsg>,
) {
tokio::spawn(
prop_handles
.solicit_blocks
.subscribe()
.for_each(move |block_ids| {
let block_box = block_box.clone();
client.get_blocks(&block_ids).and_then(move |blocks| {
let mut block_box = block_box.clone();
blocks.for_each(move |block| {
block_box.send(BlockMsg::NetworkBlock(block));
Ok(())
})
})
})
.map_err(|e| {
info!("block solicitation failed: {:?}", e);
}),
);
}

// Fetches a block from a network peer in a one-off, blocking call.
// This function is used during node bootstrap to fetch the genesis block.
pub fn fetch_block(peer: Peer, hash: &HeaderHash) -> Result<Block, FetchBlockError> {
@@ -31,7 +31,7 @@ use network_core::{
};

use futures::prelude::*;
use futures::{future, stream};
use futures::stream;
use tokio::timer::Interval;

use std::{error::Error, iter, net::SocketAddr, sync::Arc, time::Duration};
@@ -196,24 +196,11 @@ fn handle_network_input(
input.for_each(move |msg| match msg {
NetworkMsg::Propagate(msg) => {
handle_propagation_msg(msg, state.clone(), channels.clone());
future::Either::A(future::ok(()))
Ok(())
}
NetworkMsg::GetBlocks(node_id, block_ids) => {
let mut channels = channels.clone();
future::Either::B(
state
.propagation_peers
.solicit_blocks(node_id, &block_ids)
.map(move |blocks| {
for block in blocks {
channels.block_box.send(BlockMsg::NetworkBlock(block));
}
})
.or_else(move |e| {
warn!("failed to fetch blocks from peer {}: {:?}", node_id, e);
future::ok(())
}),
)
state.propagation_peers.solicit_blocks(node_id, block_ids);
Ok(())
}
})
}
@@ -1,17 +1,14 @@
use super::{p2p_topology as p2p, BlockConfig};
use super::p2p_topology as p2p;
use crate::blockcfg::{Block, Header, HeaderHash, Message};

use network_core::{
client::block::BlockService,
error as core_error,
gossip::{Gossip, Node},
subscription::BlockEvent,
};
use network_grpc::client::Connection;

use futures::prelude::*;
use futures::{future, sync::mpsc};
use tokio::{executor::DefaultExecutor, net::TcpStream};
use futures::{stream, sync::mpsc};

use std::{
collections::{hash_map, HashMap},
@@ -62,6 +59,13 @@ impl<T> Stream for Subscription<T> {
}
}

type BlockEventAnnounceStream = stream::Map<Subscription<Header>, fn(Header) -> BlockEvent<Block>>;

type BlockEventSolicitStream =
stream::Map<Subscription<Vec<HeaderHash>>, fn(Vec<HeaderHash>) -> BlockEvent<Block>>;

pub type BlockEventSubscription = stream::Select<BlockEventAnnounceStream, BlockEventSolicitStream>;

/// Handle used by the per-peer connection tasks to produce an outbound
/// subscription stream towards the peer.
pub struct PropagationHandle<T> {
@@ -129,13 +133,10 @@ enum SubscriptionState<T> {
/// be subscribed to.
#[derive(Default)]
pub struct PeerHandles {
pub blocks: PropagationHandle<BlockEvent<Block>>,
pub blocks: PropagationHandle<Header>,
pub solicit_blocks: PropagationHandle<Vec<HeaderHash>>,
pub messages: PropagationHandle<Message>,
pub gossip: PropagationHandle<Gossip<p2p::Node>>,

// TODO: decide if we want this or send requests via
// the bidirectional stream
pub(super) client: Option<Connection<BlockConfig, TcpStream, DefaultExecutor>>,
}

impl PeerHandles {
@@ -146,15 +147,7 @@ impl PeerHandles {
}

pub fn try_send_block(&mut self, header: Header) -> Result<(), PropagateError<Header>> {
self.blocks
.try_send(BlockEvent::Announce(header))
.map_err(|e| {
let item = match e.item {
BlockEvent::Announce(header) => header,
_ => unreachable!(),
};
PropagateError { kind: e.kind, item }
})
self.blocks.try_send(header)
}

pub fn try_send_message(&mut self, message: Message) -> Result<(), PropagateError<Message>> {
@@ -196,10 +189,14 @@ impl PropagationMap {
map.insert(id, handles);
}

pub fn subscribe_to_blocks(&self, id: p2p::NodeId) -> Subscription<BlockEvent<Block>> {
pub fn subscribe_to_blocks(&self, id: p2p::NodeId) -> BlockEventSubscription {
let mut map = self.mutex.lock().unwrap();
let handles = ensure_propagation_peer(&mut map, id);
handles.blocks.subscribe()
let announce_events: BlockEventAnnounceStream =
handles.blocks.subscribe().map(BlockEvent::Announce);
let solicit_events: BlockEventSolicitStream =
handles.solicit_blocks.subscribe().map(BlockEvent::Solicit);
announce_events.select(solicit_events)
}

pub fn subscribe_to_messages(&self, id: p2p::NodeId) -> Subscription<Message> {
@@ -287,34 +284,15 @@ impl PropagationMap {
}
}

pub fn solicit_blocks(
&self,
node_id: p2p::NodeId,
hashes: &[HeaderHash],
) -> impl Future<Item = Vec<Block>, Error = core_error::Error> {
pub fn solicit_blocks(&self, node_id: p2p::NodeId, hashes: Vec<HeaderHash>) {
let mut map = self.mutex.lock().unwrap();
let handles = match map.get_mut(&node_id) {
Some(handles) => handles,
None => {
return future::Either::B(future::err(
// FIXME: better error code, if that's the way we want to do this
core_error::Error::new(core_error::Code::NotFound, "peer not available"),
));
}
};
match &mut handles.client {
Some(client) => future::Either::A(
client
.get_blocks(hashes)
.and_then(|stream| stream.collect()),
),
match map.get_mut(&node_id) {
Some(handles) => handles.solicit_blocks.try_send(hashes).unwrap_or_else(|e| {
warn!("block solicitation from {} failed: {:?}", node_id, e);
}),
None => {
// TODO: connect and request on demand?
// FIXME: better error code, if that's the way we want to do this
future::Either::B(future::err(core_error::Error::new(
core_error::Code::NotFound,
"peer client connection not available",
)))
warn!("peer {} not available to solicit blocks from", node_id);
}
}
}
@@ -1,4 +1,8 @@
use super::{p2p_topology as p2p, propagate::Subscription, subscription, Channels, GlobalStateR};
use super::{
p2p_topology as p2p,
propagate::{BlockEventSubscription, Subscription},
subscription, Channels, GlobalStateR,
};

use crate::blockcfg::{Block, BlockDate, Header, HeaderHash, Message, MessageId};
use crate::intercom::{self, stream_reply, unary_reply, ClientMsg, ReplyFuture, ReplyStream};
@@ -12,7 +16,6 @@ use network_core::{
gossip::GossipService,
Node, P2pService,
},
subscription::BlockEvent,
};

use futures::future::{self, FutureResult};
@@ -81,7 +84,7 @@ impl BlockService for NodeService {
type GetHeadersStream = ReplyStream<Header, core_error::Error>;
type GetHeadersFuture = FutureResult<Self::GetHeadersStream, core_error::Error>;
type UploadBlocksFuture = ReplyFuture<(), core_error::Error>;
type BlockSubscription = Subscription<BlockEvent<Block>>;
type BlockSubscription = BlockEventSubscription;
type BlockSubscriptionFuture = FutureResult<Self::BlockSubscription, core_error::Error>;

fn tip(&mut self) -> Self::TipFuture {

0 comments on commit c60b248

Please sign in to comment.
You can’t perform that action at this time.