Skip to content

Commit

Permalink
basic grpc-based subscription re-implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Jun 8, 2021
1 parent a09e3af commit 2da8591
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 21 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion jormungandr/Cargo.toml
Expand Up @@ -22,7 +22,7 @@ chain-time = { git = "https://github.com/input-output-hk/chain-libs.git", b
chain-vote = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }
cardano-legacy-address = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }
imhamt = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }

chain-watch = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "heterogeneous-client-api" }
arc-swap = "^1.1.0"
async-trait = "0.1.50"
async-graphql = "2.5.1"
Expand Down
74 changes: 68 additions & 6 deletions jormungandr/src/blockchain/process.rs
Expand Up @@ -7,7 +7,9 @@ use super::{
use crate::{
blockcfg::{Block, FragmentId, Header, HeaderHash},
blockchain::Checkpoints,
intercom::{self, BlockMsg, ExplorerMsg, NetworkMsg, PropagateMsg, TransactionMsg},
intercom::{
self, BlockMsg, ExplorerMsg, NetworkMsg, NotifierMsg, PropagateMsg, TransactionMsg,
},
network::p2p::Address,
stats_counter::StatsCounter,
utils::{
Expand Down Expand Up @@ -58,6 +60,7 @@ pub struct Process {
pub network_msgbox: MessageBox<NetworkMsg>,
pub fragment_msgbox: MessageBox<TransactionMsg>,
pub explorer_msgbox: Option<MessageBox<ExplorerMsg>>,
pub notifier_msgbox: MessageBox<NotifierMsg>,
pub garbage_collection_interval: Duration,
}

Expand Down Expand Up @@ -92,6 +95,7 @@ impl Process {
let blockchain_tip = self.blockchain_tip.clone();
let network_msg_box = self.network_msgbox.clone();
let explorer_msg_box = self.explorer_msgbox.clone();
let event_notifier_msg_box = self.notifier_msgbox.clone();
let tx_msg_box = self.fragment_msgbox.clone();
let stats_counter = self.stats_counter.clone();

Expand All @@ -117,6 +121,7 @@ impl Process {
tx_msg_box,
network_msg_box,
explorer_msg_box,
event_notifier_msg_box,
leadership_block,
stats_counter,
)
Expand Down Expand Up @@ -163,6 +168,7 @@ impl Process {
tx_msg_box,
network_msg_box,
explorer_msg_box,
event_notifier_msg_box,
get_next_block_scheduler,
handle,
stats_counter,
Expand Down Expand Up @@ -194,10 +200,19 @@ impl Process {
let blockchain = self.blockchain.clone();
let explorer = self.explorer_msgbox.clone();

let notifier = self.notifier_msgbox.clone();

info.run_periodic_fallible(
"branch reprocessing",
BRANCH_REPROCESSING_INTERVAL,
move || reprocess_tip(blockchain.clone(), tip.clone(), explorer.clone()),
move || {
reprocess_tip(
blockchain.clone(),
tip.clone(),
explorer.clone(),
notifier.clone(),
)
},
)
}

Expand Down Expand Up @@ -282,6 +297,7 @@ async fn reprocess_tip(
mut blockchain: Blockchain,
tip: Tip,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
notifier_msg_box: MessageBox<NotifierMsg>,
) -> Result<(), Error> {
let branches: Vec<Arc<Ref>> = blockchain.branches().branches().await;

Expand All @@ -298,6 +314,7 @@ async fn reprocess_tip(
tip.clone(),
Arc::clone(other),
explorer_msg_box.clone(),
Some(notifier_msg_box.clone()),
)
.await?
}
Expand All @@ -319,6 +336,7 @@ pub async fn process_new_ref(
mut tip: Tip,
candidate: Arc<Ref>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
notifier_msg_box: Option<MessageBox<NotifierMsg>>,
) -> Result<(), Error> {
let candidate_hash = candidate.hash();
let tip_ref = tip.get_ref().await;
Expand Down Expand Up @@ -365,6 +383,15 @@ pub async fn process_new_ref(
.await
.unwrap_or_else(|err| {
tracing::error!("cannot send new tip to explorer: {}", err)
})
}

if let Some(mut msg_box) = notifier_msg_box {
msg_box
.send(NotifierMsg::NewTip(candidate_hash))
.await
.unwrap_or_else(|err| {
tracing::error!("cannot notify new block to subscribers: {}", err)
});
}
}
Expand All @@ -379,11 +406,19 @@ async fn process_and_propagate_new_ref(
new_block_ref: Arc<Ref>,
mut network_msg_box: MessageBox<NetworkMsg>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
notifier_msg_box: MessageBox<NotifierMsg>,
) -> chain::Result<()> {
let header = new_block_ref.header().clone();
tracing::debug!("processing the new block and propagating");

process_new_ref(blockchain, tip, new_block_ref, explorer_msg_box).await?;
process_new_ref(
blockchain,
tip,
new_block_ref,
explorer_msg_box,
Some(notifier_msg_box),
)
.await?;

tracing::debug!("propagating block to the network");

Expand All @@ -401,6 +436,7 @@ async fn process_leadership_block(
mut tx_msg_box: MessageBox<TransactionMsg>,
network_msg_box: MessageBox<NetworkMsg>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
mut notifier_msg_box: MessageBox<crate::notifier::Message>,
leadership_block: LeadershipBlock,
stats_counter: StatsCounter,
) -> chain::Result<()> {
Expand All @@ -418,12 +454,18 @@ async fn process_leadership_block(
Arc::clone(&new_block_ref),
network_msg_box,
explorer_msg_box.clone(),
notifier_msg_box.clone(),
)
.await?;

// Track block as new new tip block
stats_counter.set_tip_block(Arc::new(block.clone()));

notifier_msg_box
.send(NotifierMsg::NewBlock(block.clone()))
.await
.map_err(|_| "Cannot propagate block to blockchain event notifier")?;

if let Some(mut msg_box) = explorer_msg_box {
msg_box.send(ExplorerMsg::NewBlock(block)).await?;
}
Expand Down Expand Up @@ -498,6 +540,7 @@ async fn process_network_blocks(
mut tx_msg_box: MessageBox<TransactionMsg>,
network_msg_box: MessageBox<NetworkMsg>,
mut explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
mut notifier_msg_box: MessageBox<NotifierMsg>,
mut get_next_block_scheduler: GetNextBlockScheduler,
handle: intercom::RequestStreamHandle<Block, ()>,
stats_counter: StatsCounter,
Expand All @@ -516,6 +559,7 @@ async fn process_network_blocks(
block.clone(),
&mut tx_msg_box,
explorer_msg_box.as_mut(),
&mut notifier_msg_box,
&mut get_next_block_scheduler,
)
.await;
Expand Down Expand Up @@ -554,6 +598,7 @@ async fn process_network_blocks(
Arc::clone(&new_block_ref),
network_msg_box,
explorer_msg_box,
notifier_msg_box,
)
.await?;

Expand All @@ -572,6 +617,7 @@ async fn process_network_block(
block: Block,
tx_msg_box: &mut MessageBox<TransactionMsg>,
explorer_msg_box: Option<&mut MessageBox<ExplorerMsg>>,
event_notifier_msg_box: &mut MessageBox<NotifierMsg>,
get_next_block_scheduler: &mut GetNextBlockScheduler,
) -> Result<Option<Arc<Ref>>, chain::Error> {
get_next_block_scheduler
Expand Down Expand Up @@ -602,9 +648,15 @@ async fn process_network_block(
Err(Error::MissingParentBlock(parent_hash))
}
PreCheckedHeader::HeaderWithCache { parent_ref, .. } => {
let r =
check_and_apply_block(blockchain, parent_ref, block, tx_msg_box, explorer_msg_box)
.await;
let r = check_and_apply_block(
blockchain,
parent_ref,
block,
tx_msg_box,
explorer_msg_box,
event_notifier_msg_box,
)
.await;
r
}
}
Expand All @@ -616,6 +668,7 @@ async fn check_and_apply_block(
block: Block,
tx_msg_box: &mut MessageBox<TransactionMsg>,
explorer_msg_box: Option<&mut MessageBox<ExplorerMsg>>,
event_notifier_msg_box: &mut MessageBox<NotifierMsg>,
) -> Result<Option<Arc<Ref>>, chain::Error> {
let explorer_enabled = explorer_msg_box.is_some();
let post_checked = blockchain
Expand All @@ -634,6 +687,8 @@ async fn check_and_apply_block(
} else {
None
};
let block_for_subscribers = block.clone();

let fragment_ids = block.fragments().map(|f| f.id()).collect::<Vec<_>>();
let applied_block = blockchain
.apply_and_store_block(post_checked, block)
Expand All @@ -654,6 +709,13 @@ async fn check_and_apply_block(
.try_send(ExplorerMsg::NewBlock(block_for_explorer.take().unwrap()))
.unwrap_or_else(|err| tracing::error!("cannot add block to explorer: {}", err));
}

event_notifier_msg_box
.try_send(NotifierMsg::NewBlock(block_for_subscribers))
.unwrap_or_else(|err| {
tracing::error!("cannot notify new block to subscribers: {}", err)
});

Ok(Some(block_ref))
} else {
tracing::debug!(
Expand Down
27 changes: 23 additions & 4 deletions jormungandr/src/fragment/logs.rs
@@ -1,4 +1,4 @@
use crate::fragment::FragmentId;
use crate::{fragment::FragmentId, intercom::NotifierMsg, utils::async_msg::MessageBox};
use jormungandr_lib::{
crypto::hash::Hash,
interfaces::{FragmentLog, FragmentOrigin, FragmentStatus},
Expand All @@ -8,12 +8,14 @@ use std::collections::HashMap;

pub struct Logs {
entries: LruCache<Hash, FragmentLog>,
notifier: MessageBox<NotifierMsg>,
}

impl Logs {
pub fn new(max_entries: usize) -> Self {
pub fn new(max_entries: usize, notifier: MessageBox<NotifierMsg>) -> Self {
Logs {
entries: LruCache::new(max_entries),
notifier,
}
}

Expand All @@ -32,10 +34,17 @@ impl Logs {
/// Returns true if fragment was registered
pub fn insert(&mut self, log: FragmentLog) -> bool {
let fragment_id = *log.fragment_id();

if self.entries.contains(&fragment_id) {
false
} else {
self.entries.put(fragment_id, log);
self.entries.put(fragment_id, log.clone());

let _ = self.notifier.try_send(NotifierMsg::FragmentLog(
fragment_id.into_hash(),
log.status().clone(),
));

true
}
}
Expand All @@ -52,8 +61,12 @@ impl Logs {
let fragment_id: Hash = fragment_id.into();
match self.entries.get_mut(&fragment_id) {
Some(entry) => {
if !entry.modify(status) {
if !entry.modify(status.clone()) {
tracing::debug!("the fragment log update was refused: cannot mark the fragment as invalid if it was already committed to a block");
} else {
let _ = self
.notifier
.try_send(NotifierMsg::FragmentLog(fragment_id.into_hash(), status));
}
}
None => {
Expand All @@ -72,6 +85,12 @@ impl Logs {
FragmentLog::new(fragment_id.clone().into_hash(), FragmentOrigin::Network);
entry.modify(status);
self.entries.put(fragment_id, entry);

// TODO: not sure about this, this could be outside the branches also, but that
// wouldn't cover the refused case above
let _ = self
.notifier
.try_send(NotifierMsg::FragmentLog(fragment_id.into_hash(), status));
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion jormungandr/src/fragment/process.rs
@@ -1,6 +1,6 @@
use crate::{
fragment::{Logs, Pools},
intercom::{NetworkMsg, TransactionMsg},
intercom::{NetworkMsg, NotifierMsg, TransactionMsg},
stats_counter::StatsCounter,
utils::{
async_msg::{MessageBox, MessageQueue},
Expand Down Expand Up @@ -39,7 +39,9 @@ impl Process {
pool_max_entries: usize,
logs_max_entries: usize,
network_msg_box: MessageBox<NetworkMsg>,
notifier_msg_box: MessageBox<NotifierMsg>,
) -> Self {
let logs = Logs::new(logs_max_entries, notifier_msg_box);
Process {
pool_max_entries,
logs_max_entries,
Expand Down
7 changes: 7 additions & 0 deletions jormungandr/src/intercom.rs
Expand Up @@ -627,5 +627,12 @@ pub enum ExplorerMsg {
NewTip(HeaderHash),
}

/// Messages to the notifier task
pub enum NotifierMsg {
NewBlock(Block),
NewTip(HeaderHash),
FragmentLog(FragmentId, FragmentStatus),
}

#[cfg(test)]
mod tests {}

0 comments on commit 2da8591

Please sign in to comment.