Skip to content

Commit

Permalink
port tip update changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dkijania committed Jan 26, 2022
1 parent 908165b commit caf8915
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 220 deletions.
Expand Up @@ -40,6 +40,12 @@ impl From<ConsensusLeaderId> for BftLeaderId {
}
}

#[allow(clippy::from_over_into)]
impl Into<PublicKey<Ed25519>> for ConsensusLeaderId {
fn into(self) -> PublicKey<Ed25519> {
self.0.as_public_key().clone()
}
}
impl Serialize for ConsensusLeaderId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down
147 changes: 6 additions & 141 deletions jormungandr/src/blockchain/branch.rs
@@ -1,160 +1,25 @@
use crate::blockchain::Ref;
use futures::stream::{FuturesUnordered, StreamExt};
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Clone)]
pub struct Branches {
inner: Arc<RwLock<BranchesData>>,
}

struct BranchesData {
branches: Vec<Branch>,
}

/// the data that is contained in a branch
#[derive(Clone)]
pub struct Branch {
inner: Arc<RwLock<BranchData>>,
}

/// the data that is contained in a branch
struct BranchData {
/// reference to the block where the branch points to
reference: Arc<Ref>,
}

impl Default for Branches {
fn default() -> Self {
Self::new()
}
}

impl Branches {
pub fn new() -> Self {
Branches {
inner: Arc::new(RwLock::new(BranchesData {
branches: Vec::new(),
})),
}
}

pub async fn add(&mut self, branch: Branch) {
let mut guard = self.inner.write().await;
guard.add(branch);
}

pub async fn apply_or_create(&mut self, candidate: Arc<Ref>) -> Branch {
let maybe_branch = self.apply(Arc::clone(&candidate)).await;
match maybe_branch {
Some(branch) => branch,
None => {
let maybe_exists = self
.branches()
.await
.into_iter()
.find(|branch| branch.hash() == candidate.hash());

if let Some(branch) = maybe_exists {
return Branch::new(branch);
}
self.create(candidate).await
}
}
}

pub async fn branches(&self) -> Vec<Arc<Ref>> {
let guard = self.inner.read().await;
guard.branches().await
}

async fn apply(&mut self, candidate: Arc<Ref>) -> Option<Branch> {
let mut guard = self.inner.write().await;
guard.apply(candidate).await
}

async fn create(&mut self, candidate: Arc<Ref>) -> Branch {
let branch = Branch::new(candidate);
self.add(branch.clone()).await;
branch
}
}

impl BranchesData {
fn add(&mut self, branch: Branch) {
self.branches.push(branch)
}

async fn apply(&mut self, candidate: Arc<Ref>) -> Option<Branch> {
let (value, _) = self
.branches
.iter_mut()
.map(|branch| branch.continue_with(Arc::clone(&candidate)))
.collect::<FuturesUnordered<_>>()
.filter_map(|updated| Box::pin(async move { updated }))
.into_future()
.await;
value
}

async fn branches(&self) -> Vec<Arc<Ref>> {
self.branches
.iter()
.map(|b| b.get_ref())
// this is done so that inner futures are only polled when they generate wake-up notifications
.collect::<FuturesUnordered<_>>()
.collect()
.await
}
}

impl Branch {
pub fn new(reference: Arc<Ref>) -> Self {
Branch {
inner: Arc::new(RwLock::new(BranchData::new(reference))),
}
}

pub async fn get_ref(&self) -> Arc<Ref> {
let guard = self.inner.read().await;
guard.reference()
}

pub async fn update_ref(&mut self, new_ref: Arc<Ref>) -> Arc<Ref> {
let mut guard = self.inner.write().await;
guard.update(new_ref)
}

async fn continue_with(&mut self, candidate: Arc<Ref>) -> Option<Self> {
let mut guard = self.inner.write().await;
if guard.continue_with(candidate) {
Some(self.clone())
} else {
None
}
}
}

impl BranchData {
/// create the branch data with the current `last_updated` to
/// the current time this function was called
fn new(reference: Arc<Ref>) -> Self {
BranchData { reference }
}

fn update(&mut self, reference: Arc<Ref>) -> Arc<Ref> {
std::mem::replace(&mut self.reference, reference)
pub fn new(reference: Arc<Ref>) -> Self {
Branch { reference }
}

fn reference(&self) -> Arc<Ref> {
pub fn get_ref(&self) -> Arc<Ref> {
Arc::clone(&self.reference)
}

fn continue_with(&mut self, candidate: Arc<Ref>) -> bool {
if self.reference.hash() == candidate.block_parent_hash() {
let _parent = self.update(candidate);
true
} else {
false
}
pub fn into_ref(self) -> Arc<Ref> {
self.reference
}
}
47 changes: 21 additions & 26 deletions jormungandr/src/blockchain/chain.rs
Expand Up @@ -49,7 +49,7 @@ See Internal documentation for more details: doc/internal_design.md
[`Branch`]: ./struct.Branch.html
*/
#![allow(clippy::large_enum_variant)]
use super::{branch::Branches, reference_cache::RefCache};
use super::reference_cache::RefCache;
use crate::{
blockcfg::{
Block, Block0Error, BlockDate, ChainLength, Epoch, EpochRewardsInfo, Header, HeaderDesc,
Expand All @@ -59,8 +59,8 @@ use crate::{
};
use chain_impl_mockchain::{leadership::Verification, ledger};
use chain_time::TimeFrame;
use futures::{StreamExt, TryStreamExt};
use std::sync::Arc;
use tokio_stream::StreamExt;

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -167,8 +167,6 @@ pub fn pre_verify_link(
///
#[derive(Clone)]
pub struct Blockchain {
branches: Branches,

ref_cache: RefCache,

ledgers: Multiverse<Ledger>,
Expand Down Expand Up @@ -273,7 +271,6 @@ impl Blockchain {
rewards_report_all: bool,
) -> Self {
Blockchain {
branches: Branches::new(),
ref_cache: RefCache::new(cache_capacity),
ledgers: Multiverse::new(),
storage,
Expand All @@ -290,12 +287,15 @@ impl Blockchain {
&self.storage
}

pub fn branches(&self) -> &Branches {
&self.branches
}

pub fn branches_mut(&mut self) -> &mut Branches {
&mut self.branches
pub async fn branches(&self) -> Result<Vec<Branch>> {
futures::stream::iter(self.storage().get_branches()?)
// FIXME: this should always return a valid ref, as the branches
// are directly fetched from the node storage, but account for
// misses in the ref cache which are not handled currently by the node
.filter_map(|branch| async move { self.get_ref(branch).await.transpose() })
.map(|try_ref| try_ref.map(Branch::new))
.try_collect()
.await
}

pub async fn gc(&self, tip: Arc<Ref>) -> Result<()> {
Expand Down Expand Up @@ -631,8 +631,6 @@ impl Blockchain {
let block0_id = block0.header().hash();
let block0_date = block0.header().block_date();

let mut branches = self.branches.clone();

let time_frame = {
use crate::blockcfg::Block0DataSource as _;

Expand Down Expand Up @@ -665,9 +663,7 @@ impl Blockchain {
None,
)
.await;
let b = Branch::new(b);
branches.add(b.clone()).await;
Ok(b)
Ok(Branch::new(b))
}

/// function to do the initial application of the block0 in the `Blockchain` and its
Expand Down Expand Up @@ -761,7 +757,7 @@ impl Blockchain {
return Err(Error::NoTag(MAIN_BRANCH_TAG.to_owned()));
};

let mut branch = self.apply_block0(&block0).await?;
let mut last_ref = self.apply_block0(&block0).await?.get_ref();
let mut reporter = StreamReporter::new(|stream_info| {
let elapsed = stream_info
.last_reported
Expand All @@ -782,18 +778,17 @@ impl Blockchain {

while let Some(block) = block_stream.next().await.transpose()? {
reporter.append_block(&block);
branch
.update_ref(
self.handle_bootstrap_block(block, CheckHeaderProof::SkipFromStorage)
.await?,
)
.await;
// this is a stream of consecutive blocks up to the last known main branch,
// no need for special rules for updating the tip
last_ref = self
.handle_bootstrap_block(block, CheckHeaderProof::SkipFromStorage)
.await?;
}
Ok(Tip::new(branch))
Ok(Tip::new(Branch::new(last_ref)))
}

pub async fn get_checkpoints(&self, branch: &Branch) -> Checkpoints {
Checkpoints::new_from(branch.get_ref().await)
pub fn get_checkpoints(&self, branch: &Branch) -> Checkpoints {
Checkpoints::new_from(branch.get_ref())
}
}

Expand Down
4 changes: 2 additions & 2 deletions jormungandr/src/blockchain/process.rs
Expand Up @@ -29,7 +29,7 @@ use std::{sync::Arc, time::Duration};
type PullHeadersScheduler = FireForgetScheduler<HeaderHash, NodeId, Checkpoints>;
type GetNextBlockScheduler = FireForgetScheduler<HeaderHash, NodeId, ()>;

const TIP_UPDATE_QUEUE_SIZE: usize = 5;
const TIP_UPDATE_QUEUE_SIZE: usize = 10;

const DEFAULT_TIMEOUT_PROCESS_LEADERSHIP: u64 = 5;
const DEFAULT_TIMEOUT_PROCESS_ANNOUNCEMENT: u64 = 5;
Expand Down Expand Up @@ -416,7 +416,7 @@ async fn process_block_announcement(
PreCheckedHeader::MissingParent { header, .. } => {
tracing::debug!("block is missing a locally stored parent");
let to = header.hash();
let from = blockchain.get_checkpoints(blockchain_tip.branch()).await;
let from = blockchain.get_checkpoints(&blockchain_tip.branch().await);
pull_headers_scheduler
.schedule(to, node_id, from)
.unwrap_or_else(move |err| {
Expand Down
8 changes: 8 additions & 0 deletions jormungandr/src/blockchain/storage.rs
Expand Up @@ -107,6 +107,14 @@ impl Storage {
.map_err(Into::into)
}

pub fn get_branches(&self) -> Result<Vec<HeaderHash>, Error> {
self.storage
.get_tips_ids()?
.into_iter()
.map(|branch| HeaderHash::deserialize(branch.as_ref()).map_err(Error::Deserialize))
.collect::<Result<Vec<_>, Error>>()
}

pub fn get_blocks_by_chain_length(&self, chain_length: u32) -> Result<Vec<Block>, Error> {
self.storage
.get_blocks_by_chain_length(chain_length)
Expand Down

0 comments on commit caf8915

Please sign in to comment.