Skip to content
Permalink
Browse files

schedule the end of epoch and notify the blockchain manager we are

expecting the next epoch to start
  • Loading branch information...
NicolasDP committed May 14, 2019
1 parent 829f6cf commit 08bdf4149aea0eeaf817cef321b057d5dd5dfe72
Showing with 146 additions and 14 deletions.
  1. +22 −13 src/blockchain/chain.rs
  2. +4 −0 src/blockchain/process.rs
  3. +3 −0 src/intercom.rs
  4. +117 −1 src/leadership/process.rs
@@ -308,6 +308,28 @@ pub enum BlockHeaderTriage {
ProcessBlockToState,
}

pub fn handle_end_of_epoch_event(blockchain: &Blockchain) -> Result<(), HandleBlockError> {
let (tip, tip_info) = blockchain.get_block_tip()?;
let state = blockchain.get_ledger(&tip_info.block_hash).unwrap();

// TODO: get the ledger state from 2 epochs ago

blockchain
.epoch_event
.clone() // clone it to get mutability
.try_send(EpochParameters {
epoch: tip.header().date().epoch + 1,

ledger_static_parameters: state.get_static_parameters().clone(),
ledger_parameters: state.get_ledger_parameters(),

time_frame: blockchain.time_frame.clone(),
ledger_reference: state.clone(),
})
.unwrap_or_else(|_| ());
Ok(())
}

pub fn handle_block(
blockchain: &mut Blockchain,
block: Block,
@@ -355,19 +377,6 @@ fn process_block(
};

if block.header.date().epoch > parent_epoch {
blockchain
.epoch_event
.try_send(EpochParameters {
epoch: block.header().date().epoch,

ledger_static_parameters: state.get_static_parameters().clone(),
ledger_parameters: state.get_ledger_parameters(),

time_frame: blockchain.time_frame.clone(),
ledger_reference: state.clone(),
})
.unwrap_or_else(|_| ());

let leadership = Leadership::new(
block.header.date().epoch,
blockchain.get_ledger(&block.parent_id()).unwrap(),
@@ -27,6 +27,10 @@ pub fn handle_input(
let logger = info.logger().clone();

match bquery {
BlockMsg::LeadershipExpectEndOfEpoch => {
let blockchain = blockchain.lock_read();
chain::handle_end_of_epoch_event(&blockchain).unwrap()
}
BlockMsg::LeadershipBlock(block) => {
let mut blockchain = blockchain.lock_write();
match chain::handle_block(&mut blockchain, block, true).unwrap() {
@@ -270,6 +270,8 @@ impl Debug for ClientMsg {
pub enum BlockMsg {
/// A trusted Block has been received from the leadership task
LeadershipBlock(Block),
/// Leadership process expect a new end of epoch
LeadershipExpectEndOfEpoch,
/// A untrusted block Header has been received from the network task
AnnouncedBlock(Header),
}
@@ -279,6 +281,7 @@ impl Debug for BlockMsg {
use BlockMsg::*;
match self {
LeadershipBlock(block) => f.debug_tuple("LeadershipBlock").field(block).finish(),
LeadershipExpectEndOfEpoch => f.debug_tuple("LeadershipExpectEndOfEpoch").finish(),
AnnouncedBlock(header) => f.debug_tuple("AnnouncedBlock").field(header).finish(),
}
}
@@ -1,15 +1,19 @@
use crate::{
blockcfg::{Epoch, Leader},
blockcfg::{BlockDate, Epoch, Leader},
blockchain::Tip,
intercom::BlockMsg,
leadership::{EpochParameters, Leadership, Task, TaskParameters},
transaction::TPoolR,
utils::{async_msg::MessageBox, task::TokioServiceInfo},
};
use chain_core::property::BlockDate as _;
use chain_time::era::{EpochPosition, EpochSlotOffset};
use slog::Logger;
use std::sync::Arc;
use tokio::{
prelude::*,
sync::{mpsc, watch},
timer::Delay,
};

custom_error! { pub HandleEpochError
@@ -67,6 +71,7 @@ impl Process {
let error_logger = self.service_info.logger().clone();
slog_info!(self.service_info.logger(), "starting");

self.spawn_end_of_epoch_reminder();
for leader in leaders {
self.spawn_leader(leader);
}
@@ -112,6 +117,15 @@ impl Process {
self.service_info.spawn(task.start())
}

fn spawn_end_of_epoch_reminder(&mut self) {
let epoch_receiver = self.epoch_receiver.clone();
let logger = self.service_info.logger().clone();
let block_message = self.block_message_box.clone();
let end_of_epoch_reminder = EndOfEpochReminder::new(epoch_receiver, logger, block_message);

self.service_info.spawn(end_of_epoch_reminder.start())
}

/// handle incoming Epoch
fn handle_epoch(&mut self, epoch_parameters: EpochParameters) -> Result<(), HandleEpochError> {
let leadership =
@@ -130,3 +144,105 @@ impl Process {
.map_err(|_| HandleEpochError::Broadcast)
}
}

custom_error! {pub EndOfEpochReminderError
EpochReceiver { extra: String } = "Cannot continue to receiver new epoch events: {extra}",
DelayFailed { source: tokio::timer::Error } = "Delay to the end of Epoch failed",
}

struct EndOfEpochReminder {
epoch_receiver: watch::Receiver<Option<TaskParameters>>,
logger: Logger,
block_message_box: MessageBox<BlockMsg>,
}
impl EndOfEpochReminder {
fn new(
epoch_receiver: watch::Receiver<Option<TaskParameters>>,
logger: Logger,
block_message_box: MessageBox<BlockMsg>,
) -> Self {
EndOfEpochReminder {
epoch_receiver,
logger: slog::Logger::root(logger, o!("task" => "End Of Epoch Reminder")),
block_message_box,
}
}

fn start(self) -> impl Future<Item = (), Error = ()> {
slog_info!(self.logger, "starting");

let handle_logger = self.logger.clone();
let crit_logger = self.logger;
let block_message = self.block_message_box;

self.epoch_receiver
.map_err(|error| EndOfEpochReminderError::EpochReceiver {
extra: format!("{}", error),
})
// filter_map so we don't have to do the pattern match on `Option::Nothing`.
.filter_map(|task_parameters| task_parameters)
.for_each(move |task_parameters| {
handle_epoch(block_message.clone(), handle_logger.clone(), task_parameters)
})
.map_err(move |error| {
slog_crit!(crit_logger, "critical error in the Leader task" ; "reason" => error.to_string())
})
}
}

fn handle_epoch(
mut block_message: MessageBox<BlockMsg>,
logger: Logger,
task_parameters: TaskParameters,
) -> impl Future<Item = (), Error = EndOfEpochReminderError> {
let era = task_parameters.leadership.era().clone();
let time_frame = task_parameters.time_frame.clone();

let current_slot = time_frame.slot_at(&std::time::SystemTime::now()).expect(
"assume we cannot only get one valid timeline and that the slot duration does not change",
);
let epoch_position = era
.from_slot_to_era(current_slot)
.expect("assume the current time is already in the era");

// TODO: need to handle:
//
// * if too early for the leadership, we need to wait
// * if too late for this leadership, log it and return
assert!(epoch_position.epoch.0 == task_parameters.epoch);

let last_slot_in_epoch = era.slots_per_epoch() - 1;

let slot = era.from_era_to_slot(EpochPosition {
epoch: chain_time::Epoch(task_parameters.epoch),
slot: EpochSlotOffset(last_slot_in_epoch),
});
let slot_system_time = time_frame
.slot_to_systemtime(slot)
.expect("The slot should always be in the given timeframe here");

let date = BlockDate::from_epoch_slot_id(task_parameters.epoch, last_slot_in_epoch);
let now = std::time::SystemTime::now();
let duration = slot_system_time
.duration_since(now)
.expect("time should always be in the future");

slog_debug!(
logger,
"scheduling end of epoch";
"epoch" => date.epoch,
"expected_at" => format!("{:?}", slot_system_time)
);

Delay::new(
std::time::Instant::now()
.checked_add(duration)
.expect("That the duration is positive"),
)
.map_err(|error| EndOfEpochReminderError::DelayFailed { source: error })
.and_then(move |()| {
slog_info!(logger, "End of epoch" ; "epoch" => date.epoch);
block_message.send(BlockMsg::LeadershipExpectEndOfEpoch);
future::ok(())
})
}

0 comments on commit 08bdf41

Please sign in to comment.
You can’t perform that action at this time.