Skip to content
Permalink
Browse files

Adapt to changes to block subscription API

Left unimplemented:
- Processing incoming BlockEvent::Solicit on the client side
- BlockService::upload_blocks
  • Loading branch information...
mzabaluev committed May 14, 2019
1 parent ea03292 commit 5a472aa8deb4ae1997ce5dbf48171b922470e0bb
Showing with 46 additions and 11 deletions.
  1. +18 −5 src/network/grpc/client.rs
  2. +12 −3 src/network/propagate.rs
  3. +15 −2 src/network/service.rs
  4. +1 −1 src/network/subscription.rs
@@ -11,6 +11,7 @@ use crate::{
use network_core::{
client::{block::BlockService, gossip::GossipService},
gossip::Node,
subscription::BlockEvent,
};
use network_grpc::{
client::{Connect, Connection},
@@ -50,12 +51,16 @@ fn subscribe(
) -> impl Future<Item = (p2p::NodeId, propagate::PeerHandles), Error = ()> {
let block_box = channels.block_box;
let mut prop_handles = propagate::PeerHandles::new();
let block_sub = client.block_subscription(prop_handles.blocks.subscribe());
let gossip_sub = client.gossip_subscription(prop_handles.gossip.subscribe());
let block_sub_outbound = prop_handles.blocks.subscribe().map(|event| match event {
BlockEvent::Announce(header) => header,
BlockEvent::Solicit(_) => panic!("client connection used to solicit blocks"),
});
let block_req = client.block_subscription(block_sub_outbound);
let gossip_req = client.gossip_subscription(prop_handles.gossip.subscribe());
// TODO: decide if this is the way to make block requests
prop_handles.client = Some(client);
block_sub
.join(gossip_sub)
block_req
.join(gossip_req)
.map_err(move |err| {
error!("Subscription request failed: {:?}", err);
})
@@ -67,7 +72,15 @@ fn subscribe(
);
return Err(());
}
subscription::process_blocks(node_id, block_sub, block_box);
let block_sub = block_sub.map(|event| match event {
BlockEvent::Announce(header) => header,
BlockEvent::Solicit(_) => {
// TODO: fetch blocks from client request task
// and upload them
unimplemented!()
}
});
subscription::process_block_announcements(node_id, block_sub, block_box);
subscription::process_gossip(gossip_sub, global_state);
Ok((node_id, prop_handles))
})
@@ -5,6 +5,7 @@ use network_core::{
client::block::BlockService,
error as core_error,
gossip::{Gossip, Node},
subscription::BlockEvent,
};
use network_grpc::client::Connection;

@@ -128,7 +129,7 @@ enum SubscriptionState<T> {
/// be subscribed to.
#[derive(Default)]
pub struct PeerHandles {
pub blocks: PropagationHandle<Header>,
pub blocks: PropagationHandle<BlockEvent<Block>>,
pub messages: PropagationHandle<Message>,
pub gossip: PropagationHandle<Gossip<p2p::Node>>,

@@ -145,7 +146,15 @@ impl PeerHandles {
}

pub fn try_send_block(&mut self, header: Header) -> Result<(), PropagateError<Header>> {
self.blocks.try_send(header)
self.blocks
.try_send(BlockEvent::Announce(header))
.map_err(|e| {
let item = match e.item {
BlockEvent::Announce(header) => header,
_ => unreachable!(),
};
PropagateError { kind: e.kind, item }
})
}

pub fn try_send_message(&mut self, message: Message) -> Result<(), PropagateError<Message>> {
@@ -187,7 +196,7 @@ impl PropagationMap {
map.insert(id, handles);
}

pub fn subscribe_to_blocks(&self, id: p2p::NodeId) -> Subscription<Header> {
pub fn subscribe_to_blocks(&self, id: p2p::NodeId) -> Subscription<BlockEvent<Block>> {
let mut map = self.mutex.lock().unwrap();
let handles = ensure_propagation_peer(&mut map, id);
handles.blocks.subscribe()
@@ -12,6 +12,7 @@ use network_core::{
gossip::GossipService,
Node, P2pService,
},
subscription::BlockEvent,
};

use futures::future::{self, FutureResult};
@@ -79,7 +80,8 @@ impl BlockService for NodeService {
type PullHeadersFuture = FutureResult<Self::PullHeadersStream, core_error::Error>;
type GetHeadersStream = ReplyStream<Header, core_error::Error>;
type GetHeadersFuture = FutureResult<Self::GetHeadersStream, core_error::Error>;
type BlockSubscription = Subscription<Header>;
type UploadBlocksFuture = ReplyFuture<(), core_error::Error>;
type BlockSubscription = Subscription<BlockEvent<Block>>;
type BlockSubscriptionFuture = FutureResult<Self::BlockSubscription, core_error::Error>;

fn tip(&mut self) -> Self::TipFuture {
@@ -134,6 +136,13 @@ impl BlockService for NodeService {
unimplemented!()
}

fn upload_blocks<S>(&mut self, stream: S) -> Self::UploadBlocksFuture
where
S: Stream<Item = Block, Error = core_error::Error> + Send + 'static,
{
unimplemented!()
}

fn block_subscription<In>(
&mut self,
subscriber: Self::NodeId,
@@ -142,7 +151,11 @@ impl BlockService for NodeService {
where
In: Stream<Item = Self::Header, Error = core_error::Error> + Send + 'static,
{
subscription::process_blocks(subscriber, inbound, self.channels.block_box.clone());
subscription::process_block_announcements(
subscriber,
inbound,
self.channels.block_box.clone(),
);

let subscription = self
.global_state
@@ -8,7 +8,7 @@ use network_core::{error as core_error, gossip::Gossip};

use futures::prelude::*;

pub fn process_blocks<S>(
pub fn process_block_announcements<S>(
node_id: NodeId,
inbound: S,
mut block_box: MessageBox<BlockMsg>,

0 comments on commit 5a472aa

Please sign in to comment.
You can’t perform that action at this time.