Skip to content
Permalink
Browse files

Merge branch 'master' of github.com:input-output-hk/rust-cardano

  • Loading branch information...
vincenthz committed May 14, 2019
2 parents 5f22b92 + 731e66b commit 2af82b49170076f4fdd7d0a4068d20b78f13b5c2
@@ -25,7 +25,7 @@ impl BlockDate {

/// Get the slot following this one.
pub fn next(&self, era: &TimeEra) -> BlockDate {
let epoch_duration = era.slots_per_epoch;
let epoch_duration = era.slots_per_epoch();
assert!(self.slot_id < epoch_duration);
if self.slot_id + 1 == epoch_duration {
BlockDate {
@@ -145,6 +145,18 @@ impl Leadership {
}
}

/// get the epoch associated to the `Leadership`
#[inline]
pub fn epoch(&self) -> Epoch {
self.epoch
}

/// get the TimeEra associated to the `Leadership`
#[inline]
pub fn era(&self) -> &TimeEra {
&self.era
}

/// Verify whether this header has been produced by a leader that fits with the leadership
///
pub fn verify(&self, block_header: &Header) -> Verification {
@@ -68,6 +68,7 @@ pub enum Block0Error {
InitialMessageDuplicatePraosActiveSlotsCoeff,
InitialMessageNoDate,
InitialMessageNoSlotDuration,
InitialMessageNoSlotsPerEpoch,
InitialMessageNoDiscrimination,
InitialMessageNoConsensusVersion,
InitialMessageNoConsensusLeaderId,
@@ -190,6 +191,7 @@ impl Ledger {
let mut block0_start_time = None;
let mut slot_duration = None;
let mut discrimination = None;
let mut slots_per_epoch = None;

for param in init_ents.iter() {
match param {
@@ -202,6 +204,9 @@ impl Ledger {
ConfigParam::SlotDuration(d) => {
slot_duration = Some(*d);
}
ConfigParam::SlotsPerEpoch(n) => {
slots_per_epoch = Some(*n);
}
_ => regular_ents.push(param.clone()),
}
}
@@ -213,6 +218,8 @@ impl Ledger {
discrimination.ok_or(Error::Block0(Block0Error::InitialMessageNoDiscrimination))?;
let slot_duration =
slot_duration.ok_or(Error::Block0(Block0Error::InitialMessageNoSlotDuration))?;
let slots_per_epoch =
slots_per_epoch.ok_or(Error::Block0(Block0Error::InitialMessageNoSlotsPerEpoch))?;

let static_params = LedgerStaticParameters {
block0_initial_hash,
@@ -225,8 +232,7 @@ impl Ledger {
let tf = TimeFrame::new(timeline, SlotDuration::from_secs(slot_duration as u32));
let slot0 = tf.slot0();

// TODO -- configurable slots per epoch
let era = TimeEra::new_era(slot0, Epoch(0), 21600);
let era = TimeEra::new(slot0, Epoch(0), slots_per_epoch);

let settings = setting::Settings::new(era).apply(&regular_ents)?;

@@ -828,6 +834,7 @@ pub mod test {
ie.push(ConfigParam::ConsensusGenesisPraosActiveSlotsCoeff(
Milli::HALF,
));
ie.push(ConfigParam::SlotsPerEpoch(21600));

let mut rng = rand::thread_rng();
let (sk1, _pk1, user1_address) = make_key(&mut rng, &discrimination);
@@ -291,14 +291,15 @@ mod test {

#[test]
pub fn multiverse() {
const NUM_BLOCK_PER_EPOCH: u32 = 1000;
let mut multiverse = Multiverse::new();

let system_time = SystemTime::UNIX_EPOCH;
let timeline = Timeline::new(system_time);
let tf = TimeFrame::new(timeline, SlotDuration::from_secs(10));

let slot0 = tf.slot0();
let era = TimeEra::new_era(slot0, Epoch(0), 1000);
let era = TimeEra::new(slot0, Epoch(0), NUM_BLOCK_PER_EPOCH);

let mut g = StdGen::new(rand::thread_rng(), 10);
let leader_key = Arbitrary::arbitrary(&mut g);
@@ -316,6 +317,7 @@ mod test {
ents.push(ConfigParam::ConsensusGenesisPraosActiveSlotsCoeff(
Milli::HALF,
));
ents.push(ConfigParam::SlotsPerEpoch(NUM_BLOCK_PER_EPOCH));
genesis_block.message(Message::Initial(ents));
let genesis_block = genesis_block.make_genesis_block();
let mut date = genesis_block.date();
@@ -23,19 +23,24 @@ pub struct EpochPosition {
pub struct TimeEra {
epoch_start: Epoch,
slot_start: Slot,
pub slots_per_epoch: u32,
slots_per_epoch: u32,
}

impl TimeEra {
/// Set a new era to start on slot_start at epoch_start for a given slots per epoch.
pub fn new_era(slot_start: Slot, epoch_start: Epoch, slots_per_epoch: u32) -> Self {
pub fn new(slot_start: Slot, epoch_start: Epoch, slots_per_epoch: u32) -> Self {
TimeEra {
epoch_start,
slot_start,
slots_per_epoch,
}
}

/// retrieve the number of slots in an epoch during a given Epoch
pub fn slots_per_epoch(&self) -> u32 {
self.slots_per_epoch
}

/// Try to return the epoch/inner-epoch-slot associated.
///
/// If the slot in parameter is before the beginning of this era, then
@@ -92,7 +97,7 @@ mod test {
assert_eq!(slot2, Slot(4));
assert_eq!(slot3, Slot(20));

let era = TimeEra::new_era(slot1, Epoch(2), 4);
let era = TimeEra::new(slot1, Epoch(2), 4);

let p1 = era.from_slot_to_era(slot1).unwrap();
let p2 = era.from_slot_to_era(slot2).unwrap();
@@ -1,5 +1,5 @@
use super::P2pService;
use crate::error::Error;
use crate::{error::Error, subscription::BlockEvent};

use chain_core::property::{Block, HasHeader};

@@ -66,9 +66,19 @@ pub trait BlockService: P2pService {
Error = Error,
>;

/// The type of asynchronous futures returned by method `upload_blocks`.
type UploadBlocksFuture: Future<Item = (), Error = Error>;

/// Uploads blocks to the service in response to `BlockEvent::Solicit`.
///
/// The blocks to send are retrieved asynchronously from the passed stream.
fn upload_blocks<S>(&mut self, blocks: S) -> Self::UploadBlocksFuture
where
S: Stream<Item = Self::Block> + Send + 'static;

/// The type of an asynchronous stream that provides notifications
/// of blocks created or accepted by the remote node.
type BlockSubscription: Stream<Item = <Self::Block as HasHeader>::Header, Error = Error>;
type BlockSubscription: Stream<Item = BlockEvent<Self::Block>, Error = Error>;

/// Establishes a bidirectional stream of notifications for blocks
/// created or accepted by either of the peers.
@@ -8,3 +8,4 @@ pub mod client;
pub mod server;

pub mod gossip;
pub mod subscription;
@@ -1,7 +1,7 @@
//! Block service abstraction.

use super::P2pService;
use crate::error::Error;
use crate::{error::Error, subscription::BlockEvent};

use chain_core::property::{Block, BlockDate, BlockId, HasHeader, Header};

@@ -62,15 +62,18 @@ pub trait BlockService: P2pService {
/// response to `get_headers` methods.
type GetHeadersStream: Stream<Item = Self::Header, Error = Error>;

/// The type of asynchronous futures returned by `get_headeres` methods.
/// The type of asynchronous futures returned by `get_headers` methods.
///
/// The future resolves to a stream that will be used by the protocol
/// implementation to produce a server-streamed response.
type GetHeadersFuture: Future<Item = Self::GetHeadersStream, Error = Error>;

/// The type of asynchronous futures returned by method `upload_blocks`.
type UploadBlocksFuture: Future<Item = (), Error = Error>;

/// The type of an asynchronous stream that retrieves headers of new
/// blocks as they are created.
type BlockSubscription: Stream<Item = Self::Header, Error = Error>;
type BlockSubscription: Stream<Item = BlockEvent<Self::Block>, Error = Error>;

/// The type of asynchronous futures returned by method `block_subscription`.
///
@@ -113,6 +116,10 @@ pub trait BlockService: P2pService {
/// to the server's tip.
fn pull_headers_to_tip(&mut self, from: &[Self::BlockId]) -> Self::PullHeadersFuture;

fn upload_blocks<S>(&mut self, stream: S) -> Self::UploadBlocksFuture
where
S: Stream<Item = Self::Block, Error = Error> + Send + 'static;

/// Establishes a bidirectional subscription for announcing blocks.
///
/// The network protocol implementation passes the node identifier of
@@ -0,0 +1,25 @@
use chain_core::property::{Block, HasHeader};

use std::fmt::{self, Debug};

pub enum BlockEvent<B>
where
B: Block + HasHeader,
{
Announce(<B as HasHeader>::Header),
Solicit(Vec<<B as Block>::Id>),
}

impl<B> Debug for BlockEvent<B>
where
B: Block + HasHeader,
<B as HasHeader>::Header: Debug,
<B as Block>::Id: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
BlockEvent::Announce(header) => f.debug_tuple("Announce").field(header).finish(),
BlockEvent::Solicit(ids) => f.debug_tuple("Solicit").field(ids).finish(),
}
}
}
@@ -31,6 +31,9 @@ message PullBlocksToTipRequest {
repeated bytes from = 1;
}

// Response message for method UploadBlocks.
message UploadBlocksResponse {}

// Representation of a block.
message Block {
// The serialized content of the block.
@@ -56,6 +59,17 @@ message Gossip {
repeated bytes nodes = 2;
}

// Element of the subscription stream returned by BlockSubscription.
message BlockEvent {
oneof item {
// Announcement of a new block, carrying the block's header.
Header announce = 1;
// Solicitation to upload identified blocks with a method call
// UploadBlocks.
BlockIds solicit = 2;
}
}

service Node {
rpc Tip(TipRequest) returns (TipResponse);
rpc GetBlocks(BlockIds) returns (stream Block) {
@@ -70,9 +84,13 @@ service Node {

rpc PullBlocksToTip(PullBlocksToTipRequest) returns (stream Block);

// Uploads blocks to the service in response to a solicitation item
// received from the BlockSubscription response stream.
rpc UploadBlocks(stream Block) returns (UploadBlocksResponse);

// Establishes a bidirectional stream to exchange information on new
// blocks created or accepted by the peers.
rpc BlockSubscription(stream Header) returns (stream Header);
rpc BlockSubscription(stream Header) returns (stream BlockEvent);

// Establishes a bidirectional stream to exchange information on new
// messages created or accepted by the peers.
@@ -13,6 +13,7 @@ use network_core::{
client::{block::BlockService, gossip::GossipService, P2pService},
error as core_error,
gossip::{self, Gossip, NodeId},
subscription::BlockEvent,
};

use futures::future::Executor;
@@ -105,6 +106,12 @@ type GrpcUnaryFuture<R> = tower_grpc::client::unary::ResponseFuture<
tower_h2::RecvBody,
>;

type GrpcClientStreamingFuture<R> = tower_grpc::client::client_streaming::ResponseFuture<
R,
tower_h2::client::ResponseFuture,
tower_h2::RecvBody,
>;

type GrpcServerStreamingFuture<R> =
tower_grpc::client::server_streaming::ResponseFuture<R, tower_h2::client::ResponseFuture>;

@@ -125,6 +132,16 @@ impl<T, R> ResponseFuture<T, R> {
}
}

pub struct ClientStreamingCompletionFuture<R> {
inner: GrpcClientStreamingFuture<R>,
}

impl<R> ClientStreamingCompletionFuture<R> {
fn new(inner: GrpcClientStreamingFuture<R>) -> Self {
ClientStreamingCompletionFuture { inner }
}
}

pub struct ResponseStreamFuture<T, R> {
inner: GrpcServerStreamingFuture<R>,
_phantom: PhantomData<T>,
@@ -173,6 +190,19 @@ where
}
}

impl<R> Future for ClientStreamingCompletionFuture<R>
where
R: prost::Message + Default,
{
type Item = ();
type Error = core_error::Error;

fn poll(&mut self) -> Poll<(), core_error::Error> {
try_ready!(self.inner.poll().map_err(error_from_grpc));
Ok(Async::Ready(()))
}
}

impl<T, R> Future for ResponseStreamFuture<T, R>
where
R: prost::Message + Default,
@@ -303,8 +333,11 @@ where
type GetBlocksStream = ResponseStream<P::Block, gen::node::Block>;
type GetBlocksFuture = ResponseStreamFuture<P::Block, gen::node::Block>;

type BlockSubscription = ResponseStream<P::Header, gen::node::Header>;
type BlockSubscriptionFuture = SubscriptionFuture<P::Header, Self::NodeId, gen::node::Header>;
type BlockSubscription = ResponseStream<BlockEvent<P::Block>, gen::node::BlockEvent>;
type BlockSubscriptionFuture =
SubscriptionFuture<BlockEvent<P::Block>, Self::NodeId, gen::node::BlockEvent>;

type UploadBlocksFuture = ClientStreamingCompletionFuture<gen::node::UploadBlocksResponse>;

fn tip(&mut self) -> Self::TipFuture {
let req = gen::node::TipRequest {};
@@ -326,6 +359,16 @@ where
ResponseStreamFuture::new(future)
}

fn upload_blocks<S>(&mut self, blocks: S) -> Self::UploadBlocksFuture
where
S: Stream<Item = P::Block> + Send + 'static,
{
let rs = RequestStream::new(blocks);
let req = Request::new(rs);
let future = self.service.upload_blocks(req);
ClientStreamingCompletionFuture::new(future)
}

fn block_subscription<Out>(&mut self, outbound: Out) -> Self::BlockSubscriptionFuture
where
Out: Stream<Item = P::Header> + Send + 'static,
Oops, something went wrong.

0 comments on commit 2af82b4

Please sign in to comment.
You can’t perform that action at this time.