Navigation Menu

Skip to content
This repository has been archived by the owner on Jun 11, 2022. It is now read-only.

Commit

Permalink
Merge pull request #682 from input-output-hk/upload-blocks
Browse files Browse the repository at this point in the history
network: Redesign block subscription for upload
  • Loading branch information
NicolasDP committed May 13, 2019
2 parents 0ac5ac6 + 7c27bc4 commit 731e66b
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 10 deletions.
14 changes: 12 additions & 2 deletions network-core/src/client/block.rs
@@ -1,5 +1,5 @@
use super::P2pService;
use crate::error::Error;
use crate::{error::Error, subscription::BlockEvent};

use chain_core::property::{Block, HasHeader};

Expand Down Expand Up @@ -66,9 +66,19 @@ pub trait BlockService: P2pService {
Error = Error,
>;

/// The type of asynchronous futures returned by method `upload_blocks`.
type UploadBlocksFuture: Future<Item = (), Error = Error>;

/// Uploads blocks to the service in response to `BlockEvent::Solicit`.
///
/// The blocks to send are retrieved asynchronously from the passed stream.
fn upload_blocks<S>(&mut self, blocks: S) -> Self::UploadBlocksFuture
where
S: Stream<Item = Self::Block> + Send + 'static;

/// The type of an asynchronous stream that provides notifications
/// of blocks created or accepted by the remote node.
type BlockSubscription: Stream<Item = <Self::Block as HasHeader>::Header, Error = Error>;
type BlockSubscription: Stream<Item = BlockEvent<Self::Block>, Error = Error>;

/// Establishes a bidirectional stream of notifications for blocks
/// created or accepted by either of the peers.
Expand Down
1 change: 1 addition & 0 deletions network-core/src/lib.rs
Expand Up @@ -8,3 +8,4 @@ pub mod client;
pub mod server;

pub mod gossip;
pub mod subscription;
13 changes: 10 additions & 3 deletions network-core/src/server/block.rs
@@ -1,7 +1,7 @@
//! Block service abstraction.

use super::P2pService;
use crate::error::Error;
use crate::{error::Error, subscription::BlockEvent};

use chain_core::property::{Block, BlockDate, BlockId, HasHeader, Header};

Expand Down Expand Up @@ -62,15 +62,18 @@ pub trait BlockService: P2pService {
/// response to `get_headers` methods.
type GetHeadersStream: Stream<Item = Self::Header, Error = Error>;

/// The type of asynchronous futures returned by `get_headeres` methods.
/// The type of asynchronous futures returned by `get_headers` methods.
///
/// The future resolves to a stream that will be used by the protocol
/// implementation to produce a server-streamed response.
type GetHeadersFuture: Future<Item = Self::GetHeadersStream, Error = Error>;

/// The type of asynchronous futures returned by method `upload_blocks`.
type UploadBlocksFuture: Future<Item = (), Error = Error>;

/// The type of an asynchronous stream that retrieves headers of new
/// blocks as they are created.
type BlockSubscription: Stream<Item = Self::Header, Error = Error>;
type BlockSubscription: Stream<Item = BlockEvent<Self::Block>, Error = Error>;

/// The type of asynchronous futures returned by method `block_subscription`.
///
Expand Down Expand Up @@ -113,6 +116,10 @@ pub trait BlockService: P2pService {
/// to the server's tip.
fn pull_headers_to_tip(&mut self, from: &[Self::BlockId]) -> Self::PullHeadersFuture;

fn upload_blocks<S>(&mut self, stream: S) -> Self::UploadBlocksFuture
where
S: Stream<Item = Self::Block, Error = Error> + Send + 'static;

/// Establishes a bidirectional subscription for announcing blocks.
///
/// The network protocol implementation passes the node identifier of
Expand Down
25 changes: 25 additions & 0 deletions network-core/src/subscription.rs
@@ -0,0 +1,25 @@
use chain_core::property::{Block, HasHeader};

use std::fmt::{self, Debug};

pub enum BlockEvent<B>
where
B: Block + HasHeader,
{
Announce(<B as HasHeader>::Header),
Solicit(Vec<<B as Block>::Id>),
}

impl<B> Debug for BlockEvent<B>
where
B: Block + HasHeader,
<B as HasHeader>::Header: Debug,
<B as Block>::Id: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
BlockEvent::Announce(header) => f.debug_tuple("Announce").field(header).finish(),
BlockEvent::Solicit(ids) => f.debug_tuple("Solicit").field(ids).finish(),
}
}
}
20 changes: 19 additions & 1 deletion network-grpc/proto/node.proto
Expand Up @@ -31,6 +31,9 @@ message PullBlocksToTipRequest {
repeated bytes from = 1;
}

// Response message for method UploadBlocks.
message UploadBlocksResponse {}

// Representation of a block.
message Block {
// The serialized content of the block.
Expand All @@ -56,6 +59,17 @@ message Gossip {
repeated bytes nodes = 2;
}

// Element of the subscription stream returned by BlockSubscription.
message BlockEvent {
oneof item {
// Announcement of a new block, carrying the block's header.
Header announce = 1;
// Solicitation to upload identified blocks with a method call
// UploadBlocks.
BlockIds solicit = 2;
}
}

service Node {
rpc Tip(TipRequest) returns (TipResponse);
rpc GetBlocks(BlockIds) returns (stream Block) {
Expand All @@ -70,9 +84,13 @@ service Node {

rpc PullBlocksToTip(PullBlocksToTipRequest) returns (stream Block);

// Uploads blocks to the service in response to a solicitation item
// received from the BlockSubscription response stream.
rpc UploadBlocks(stream Block) returns (UploadBlocksResponse);

// Establishes a bidirectional stream to exchange information on new
// blocks created or accepted by the peers.
rpc BlockSubscription(stream Header) returns (stream Header);
rpc BlockSubscription(stream Header) returns (stream BlockEvent);

// Establishes a bidirectional stream to exchange information on new
// messages created or accepted by the peers.
Expand Down
47 changes: 45 additions & 2 deletions network-grpc/src/client.rs
Expand Up @@ -13,6 +13,7 @@ use network_core::{
client::{block::BlockService, gossip::GossipService, P2pService},
error as core_error,
gossip::{self, Gossip, NodeId},
subscription::BlockEvent,
};

use futures::future::Executor;
Expand Down Expand Up @@ -105,6 +106,12 @@ type GrpcUnaryFuture<R> = tower_grpc::client::unary::ResponseFuture<
tower_h2::RecvBody,
>;

type GrpcClientStreamingFuture<R> = tower_grpc::client::client_streaming::ResponseFuture<
R,
tower_h2::client::ResponseFuture,
tower_h2::RecvBody,
>;

type GrpcServerStreamingFuture<R> =
tower_grpc::client::server_streaming::ResponseFuture<R, tower_h2::client::ResponseFuture>;

Expand All @@ -125,6 +132,16 @@ impl<T, R> ResponseFuture<T, R> {
}
}

pub struct ClientStreamingCompletionFuture<R> {
inner: GrpcClientStreamingFuture<R>,
}

impl<R> ClientStreamingCompletionFuture<R> {
fn new(inner: GrpcClientStreamingFuture<R>) -> Self {
ClientStreamingCompletionFuture { inner }
}
}

pub struct ResponseStreamFuture<T, R> {
inner: GrpcServerStreamingFuture<R>,
_phantom: PhantomData<T>,
Expand Down Expand Up @@ -173,6 +190,19 @@ where
}
}

impl<R> Future for ClientStreamingCompletionFuture<R>
where
R: prost::Message + Default,
{
type Item = ();
type Error = core_error::Error;

fn poll(&mut self) -> Poll<(), core_error::Error> {
try_ready!(self.inner.poll().map_err(error_from_grpc));
Ok(Async::Ready(()))
}
}

impl<T, R> Future for ResponseStreamFuture<T, R>
where
R: prost::Message + Default,
Expand Down Expand Up @@ -303,8 +333,11 @@ where
type GetBlocksStream = ResponseStream<P::Block, gen::node::Block>;
type GetBlocksFuture = ResponseStreamFuture<P::Block, gen::node::Block>;

type BlockSubscription = ResponseStream<P::Header, gen::node::Header>;
type BlockSubscriptionFuture = SubscriptionFuture<P::Header, Self::NodeId, gen::node::Header>;
type BlockSubscription = ResponseStream<BlockEvent<P::Block>, gen::node::BlockEvent>;
type BlockSubscriptionFuture =
SubscriptionFuture<BlockEvent<P::Block>, Self::NodeId, gen::node::BlockEvent>;

type UploadBlocksFuture = ClientStreamingCompletionFuture<gen::node::UploadBlocksResponse>;

fn tip(&mut self) -> Self::TipFuture {
let req = gen::node::TipRequest {};
Expand All @@ -326,6 +359,16 @@ where
ResponseStreamFuture::new(future)
}

fn upload_blocks<S>(&mut self, blocks: S) -> Self::UploadBlocksFuture
where
S: Stream<Item = P::Block> + Send + 'static,
{
let rs = RequestStream::new(blocks);
let req = Request::new(rs);
let future = self.service.upload_blocks(req);
ClientStreamingCompletionFuture::new(future)
}

fn block_subscription<Out>(&mut self, outbound: Out) -> Self::BlockSubscriptionFuture
where
Out: Stream<Item = P::Header> + Send + 'static,
Expand Down
58 changes: 58 additions & 0 deletions network-grpc/src/convert.rs
Expand Up @@ -7,6 +7,7 @@ use chain_core::{
use network_core::{
error as core_error,
gossip::{Gossip, Node, NodeId},
subscription::BlockEvent,
};

use tower_grpc::{
Expand Down Expand Up @@ -120,6 +121,35 @@ where
}
}

impl<T> FromProtobuf<gen::node::BlockEvent> for BlockEvent<T>
where
T: property::Block + property::HasHeader,
T::Header: mempack::Readable,
T::Id: mempack::Readable,
{
fn from_message(msg: gen::node::BlockEvent) -> Result<Self, core_error::Error> {
use gen::node::block_event::*;

let event = match msg.item {
Some(Item::Announce(header)) => {
let header = parse_bytes(&header.content)?;
BlockEvent::Announce(header)
}
Some(Item::Solicit(ids)) => {
let ids = parse_repeated_bytes(&ids.ids)?;
BlockEvent::Solicit(ids)
}
None => {
return Err(core_error::Error::new(
core_error::Code::InvalidArgument,
"invalid BlockEvent payload, one of the fields is required",
))
}
};
Ok(event)
}
}

impl<T> FromProtobuf<gen::node::Message> for T
where
T: property::Message + mempack::Readable,
Expand Down Expand Up @@ -198,6 +228,34 @@ where
}
}

impl IntoProtobuf<gen::node::UploadBlocksResponse> for () {
fn into_message(self) -> Result<gen::node::UploadBlocksResponse, tower_grpc::Status> {
Ok(gen::node::UploadBlocksResponse {})
}
}

impl<T> IntoProtobuf<gen::node::BlockEvent> for BlockEvent<T>
where
T: property::Block + property::HasHeader,
T::Header: property::Serialize,
{
fn into_message(self) -> Result<gen::node::BlockEvent, tower_grpc::Status> {
use gen::node::block_event::*;

let item = match self {
BlockEvent::Announce(header) => {
let content = serialize_to_bytes(&header)?;
Item::Announce(gen::node::Header { content })
}
BlockEvent::Solicit(ids) => {
let ids = serialize_to_repeated_bytes(&ids)?;
Item::Solicit(gen::node::BlockIds { ids })
}
};
Ok(gen::node::BlockEvent { item: Some(item) })
}
}

impl<T> IntoProtobuf<gen::node::Message> for T
where
T: property::Message + property::Serialize,
Expand Down
5 changes: 5 additions & 0 deletions network-grpc/src/server.rs
Expand Up @@ -28,6 +28,7 @@ use std::path::Path;
pub struct Server<T, E>
where
T: Node + Clone,
<T::BlockService as BlockService>::Block: protocol_bounds::Block,
<T::BlockService as BlockService>::Header: protocol_bounds::Header,
<T::ContentService as ContentService>::Message: protocol_bounds::Message,
<T::GossipService as GossipService>::Node: protocol_bounds::Node,
Expand All @@ -44,6 +45,7 @@ pub struct Connection<S, T, E>
where
S: AsyncRead + AsyncWrite,
T: Node + Clone,
<T::BlockService as BlockService>::Block: protocol_bounds::Block,
<T::BlockService as BlockService>::Header: protocol_bounds::Header,
<T::ContentService as ContentService>::Message: protocol_bounds::Message,
<T::GossipService as GossipService>::Node: protocol_bounds::Node,
Expand All @@ -61,6 +63,7 @@ impl<S, T, E> Future for Connection<S, T, E>
where
S: AsyncRead + AsyncWrite,
T: Node + Clone + 'static,
<T::BlockService as BlockService>::Block: protocol_bounds::Block,
<T::BlockService as BlockService>::Header: protocol_bounds::Header,
<T::ContentService as ContentService>::Message: protocol_bounds::Message,
<T::GossipService as GossipService>::Node: protocol_bounds::Node,
Expand All @@ -81,6 +84,7 @@ where
impl<T, E> Server<T, E>
where
T: Node + Clone + 'static,
<T::BlockService as BlockService>::Block: protocol_bounds::Block,
<T::BlockService as BlockService>::Header: protocol_bounds::Header,
<T::ContentService as ContentService>::Message: protocol_bounds::Message,
<T::GossipService as GossipService>::Node: protocol_bounds::Node,
Expand Down Expand Up @@ -155,6 +159,7 @@ type H2Error<T> = tower_h2::server::Error<gen_server::NodeServer<NodeService<T>>
impl<T> From<H2Error<T>> for Error
where
T: Node + Clone,
<T::BlockService as BlockService>::Block: protocol_bounds::Block,
<T::BlockService as BlockService>::Header: protocol_bounds::Header,
<T::ContentService as ContentService>::Message: protocol_bounds::Message,
<T::GossipService as GossipService>::Node: protocol_bounds::Node,
Expand Down

0 comments on commit 731e66b

Please sign in to comment.