Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Introduce block cache. #1411

Merged
merged 1 commit into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 41 additions & 0 deletions blockchain/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,26 @@ impl Block {
}
}

///
/// Unwrap a Micro Block by ref.
///
/// # Panics
///
/// Panics if the block is not Micro Block.
///
pub fn unwrap_micro_ref(&self) -> &MicroBlock {
match self {
Block::MicroBlock(ref micro_block) => micro_block,
Block::MacroBlock(ref macro_block) => {
panic!(
"Expected a micro block: epoch={}, block={}",
macro_block.header.epoch,
Hash::digest(&macro_block)
);
}
}
}

///
/// Unwrap a Micro Block.
///
Expand All @@ -551,6 +571,27 @@ impl Block {
}
}
}

///
/// Unwrap a Micro Block by ref.
///
/// # Panics
///
/// Panics if the block is not Macro Block.
///
pub fn unwrap_macro_ref(&self) -> &MacroBlock {
match self {
Block::MacroBlock(ref macro_block) => macro_block,
Block::MicroBlock(ref micro_block) => {
panic!(
"Expected a micro block: epoch={}, offset={}, block={}",
micro_block.header.epoch,
micro_block.header.offset,
Hash::digest(&micro_block)
);
}
}
}
}

impl Hashable for Block {
Expand Down
109 changes: 94 additions & 15 deletions blockchain/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use log::*;
use rocksdb;
use rocksdb::{ColumnFamily, Snapshot, WriteBatch};
use serde_derive::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::path::Path;
use stegos_crypto::bulletproofs::fee_a;
use stegos_crypto::hash::*;
Expand Down Expand Up @@ -268,6 +269,9 @@ pub struct Blockchain {
//
awards: Awards,
epoch_activity: ValidatorsActivity,

// Block ache
cache: VecDeque<Block>,
}

impl Blockchain {
Expand Down Expand Up @@ -322,6 +326,8 @@ impl Blockchain {
//
let awards = Awards::new(cfg.awards_difficulty);
let epoch_activity = MultiVersionedMap::new();
// Block cache.
let cache = VecDeque::with_capacity(cfg.stake_epochs as usize + 1);

let mut blockchain = Blockchain {
cfg,
Expand All @@ -344,6 +350,7 @@ impl Blockchain {
last_block_hash,
awards,
epoch_activity,
cache,
};

blockchain.init(genesis, timestamp, consistency_check)?;
Expand Down Expand Up @@ -423,7 +430,7 @@ impl Blockchain {
recover_map!(cf_escrow, escrow, lsn);
self.escrow.escrow = escrow;

let block = self.macro_block(lsn.0)?;
let block = self.macro_block(lsn.0)?.into_owned();

let block_hash = Hash::digest(&block);

Expand Down Expand Up @@ -491,6 +498,26 @@ impl Blockchain {
.set(*stake);
}

self.with_snapshot(|this, snapshot| {
let epoch = lsn.0.saturating_sub(this.cfg.stake_epochs as u64 + 1);
let lsn = LSN(epoch, lsn.1);
let blocks = snapshot
.iterator(rocksdb::IteratorMode::From(
&Self::block_key(lsn),
rocksdb::Direction::Forward,
))
.map(|(_, v)| Block::from_buffer(&*v).expect("couldn't deserialize block."))
.filter_map(|b| match b {
Block::MicroBlock(_b) => None,
Block::MacroBlock(b) => Some(b),
});

for block in blocks {
debug!("Recovered block to cache: epoch={}", block.header.epoch);
this.cache_push_block(block.into())
}
});

Ok(true)
}

Expand Down Expand Up @@ -651,11 +678,12 @@ impl Blockchain {
match self.output_by_hash.get(output_hash) {
Some(OutputKey::MacroBlock { epoch, output_id }) => {
let block = self.macro_block(*epoch)?;
assert_eq!(block.header.epoch, *epoch);
if let Some(output) = block.outputs.get(*output_id as usize) {
let result = OutputRecovery {
output: output.clone(),
epoch: block.header.epoch,
block_hash: Hash::digest(&block),
block_hash: Hash::digest(block.as_ref()),
is_final: true,
timestamp: block.header.timestamp,
};
Expand All @@ -682,7 +710,7 @@ impl Blockchain {
let result = OutputRecovery {
output: output.clone(),
epoch: block.header.epoch,
block_hash: Hash::digest(&block),
block_hash: Hash::digest(block.as_ref()),
is_final: false,
timestamp: block.header.timestamp,
};
Expand Down Expand Up @@ -768,21 +796,39 @@ impl Blockchain {
}

/// Get a block by position.
fn block(&self, lsn: LSN) -> Result<Block, StorageError> {
fn block(&self, lsn: LSN) -> Result<Cow<Block>, StorageError> {
if lsn.1 == MACRO_BLOCK_OFFSET {
if let Some(b) = self.get_block_cached(lsn.0) {
return Ok(Cow::Borrowed(b));
}
}
match self.database.get(&Self::block_key(lsn))? {
Some(buffer) => Ok(Block::from_buffer(&buffer).expect("couldn't deserialize block.")),
Some(buffer) => Ok(Cow::Owned(
Block::from_buffer(&buffer).expect("couldn't deserialize block."),
)),
None => panic!("Block must exists"),
}
}

/// Get a micro block by offset.
pub fn micro_block(&self, epoch: u64, offset: u32) -> Result<MicroBlock, StorageError> {
Ok(self.block(LSN(epoch, offset))?.unwrap_micro())
pub fn micro_block(&self, epoch: u64, offset: u32) -> Result<Cow<MicroBlock>, StorageError> {
let block = self.block(LSN(epoch, offset))?;

let res = match block {
Cow::Owned(b) => Cow::Owned(b.unwrap_micro()),
Cow::Borrowed(b) => Cow::Borrowed(b.unwrap_micro_ref()),
};
Ok(res)
}

/// Get a block by offset.
pub fn macro_block(&self, epoch: u64) -> Result<MacroBlock, StorageError> {
Ok(self.block(LSN(epoch, MACRO_BLOCK_OFFSET))?.unwrap_macro())
pub fn macro_block(&self, epoch: u64) -> Result<Cow<MacroBlock>, StorageError> {
let block = self.block(LSN(epoch, MACRO_BLOCK_OFFSET))?;
let res = match block {
Cow::Owned(b) => Cow::Owned(b.unwrap_macro()),
Cow::Borrowed(b) => Cow::Borrowed(b.unwrap_macro_ref()),
};
Ok(res)
}

/// Returns iterator over saved blocks.
Expand Down Expand Up @@ -1343,7 +1389,7 @@ impl Blockchain {
"Registering a macro block: epoch={}, block={}",
epoch, &block_hash
);

let block_clone = block.clone();
//
// Prepare inputs and outputs.
//
Expand Down Expand Up @@ -1428,7 +1474,10 @@ impl Blockchain {
//
// Update metadata.
//

self.epoch += 1;

self.cache_push_block(block_clone.into());
self.offset = 0;
self.last_block_timestamp = block.header.timestamp;
self.last_block_hash = block_hash;
Expand Down Expand Up @@ -1947,17 +1996,17 @@ impl Blockchain {
//
// Remove from the disk.
//
let block = self.micro_block(self.epoch, offset)?;
let block = self.micro_block(self.epoch, offset)?.into_owned();
let (previous, lsn, last_block_timestamp) = if offset == 0 {
// Previous block is Macro Block.
let block = self.macro_block(self.epoch - 1)?;
let lsn = LSN(self.epoch - 1, MACRO_BLOCK_OFFSET);
(Hash::digest(&block), lsn, block.header.timestamp)
(Hash::digest(block.as_ref()), lsn, block.header.timestamp)
} else {
// Previous block is Micro Block.
let block = self.micro_block(self.epoch, offset - 1)?;
let lsn = LSN(self.epoch, offset - 1);
(Hash::digest(&block), lsn, block.header.timestamp)
(Hash::digest(block.as_ref()), lsn, block.header.timestamp)
};
self.database
.delete(&Self::block_key(LSN(self.epoch, offset)))?;
Expand Down Expand Up @@ -2080,6 +2129,31 @@ impl Blockchain {
}
Ok(())
}

/// Insert block into cache.
fn cache_push_block(&mut self, block: Block) {
trace!("Cache block = {}", block.unwrap_macro_ref().header.epoch);
if self.cache.len() == self.cfg.stake_epochs as usize + 1 {
trace!("Removed block");
assert!(self.cache.pop_front().is_some())
}
self.cache.push_back(block)
}

/// Try get block from cache.
fn get_block_cached(&self, epoch: u64) -> Option<&Block> {
trace!("get block = {}", epoch);
if epoch > self.epoch {
return None;
}
let lower_epoch = self.epoch.saturating_sub(self.cfg.stake_epochs + 1);
if epoch < lower_epoch {
return None;
} else {
let idx = epoch - lower_epoch;
self.cache.get(idx as usize)
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -2142,7 +2216,12 @@ pub mod tests {
assert!(!blockchain.contains_block(&Hash::digest("test")));

assert_eq!(
Hash::digest(&blockchain.block(LSN(0, MACRO_BLOCK_OFFSET)).unwrap()),
Hash::digest(
blockchain
.block(LSN(0, MACRO_BLOCK_OFFSET))
.unwrap()
.as_ref()
),
Hash::digest(&block1)
);

Expand Down
8 changes: 4 additions & 4 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ impl NodeService {
let remote_hash = Hash::digest(&remote);
let remote_view_change = remote.header.view_change;
let local = self.chain.micro_block(epoch, offset)?;
let local_hash = Hash::digest(&local);
let local_hash = Hash::digest(local.as_ref());

// check multiple blocks with same view_change
if remote.header.view_change == local.header.view_change {
Expand Down Expand Up @@ -784,7 +784,7 @@ impl NodeService {

metrics::MICRO_BLOCKS_CHEATS.inc();

let proof = SlashingProof::new_unchecked(remote.clone(), local);
let proof = SlashingProof::new_unchecked(remote.clone(), local.into_owned());

if let Some(_proof) = self.cheating_proofs.insert(leader, proof) {
sdebug!(self, "Cheater was already detected: cheater={}", leader);
Expand Down Expand Up @@ -2165,7 +2165,7 @@ impl NodeService {
return Err(format_err!("Macro block doesn't exists: epoch={}", epoch));
}

let block = self.chain.macro_block(epoch)?;
let block = self.chain.macro_block(epoch)?.into_owned();
let epoch_info = self.chain.epoch_info(epoch)?.unwrap().clone();
let msg = ExtendedMacroBlock {
block,
Expand All @@ -2187,7 +2187,7 @@ impl NodeService {
offset
));
}
let block = self.chain.micro_block(epoch, offset)?;
let block = self.chain.micro_block(epoch, offset)?.into_owned();
let msg = ExtendedMicroBlock {
block,
transaction_statuses: HashMap::new(),
Expand Down
28 changes: 24 additions & 4 deletions node/src/test/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,12 @@ fn finalized_slashing_with_service_award() {
node.node_service.chain.last_macro_block_hash()
);
let block_hash = node.node_service.chain.last_block_hash();
let block = node.node_service.chain.macro_block(epoch).unwrap();
let block = node
.node_service
.chain
.macro_block(epoch)
.unwrap()
.into_owned();
assert_eq!(Hash::digest(&block), block_hash);
let mut outputs = Vec::new();
for output in block.outputs {
Expand Down Expand Up @@ -445,7 +450,12 @@ fn finalized_slashing_with_service_award_for_auditor() {
node.node_service.chain.last_macro_block_hash()
);
let block_hash = node.node_service.chain.last_block_hash();
let block = node.node_service.chain.macro_block(epoch).unwrap();
let block = node
.node_service
.chain
.macro_block(epoch)
.unwrap()
.into_owned();
assert_eq!(Hash::digest(&block), block_hash);
let mut outputs = Vec::new();
for output in block.outputs {
Expand Down Expand Up @@ -533,7 +543,12 @@ fn service_award_round_normal(s: &mut Sandbox, service_award_budget: i64) {
node.node_service.chain.last_macro_block_hash()
);
let block_hash = node.node_service.chain.last_block_hash();
let block = node.node_service.chain.macro_block(epoch).unwrap();
let block = node
.node_service
.chain
.macro_block(epoch)
.unwrap()
.into_owned();
assert_eq!(Hash::digest(&block), block_hash);
let mut outputs = Vec::new();
for output in block.outputs {
Expand Down Expand Up @@ -692,7 +707,12 @@ fn service_award_round_without_participants(s: &mut Sandbox) {
node.node_service.chain.last_macro_block_hash()
);
let block_hash = node.node_service.chain.last_block_hash();
let block = node.node_service.chain.macro_block(epoch).unwrap();
let block = node
.node_service
.chain
.macro_block(epoch)
.unwrap()
.into_owned();
assert_eq!(Hash::digest(&block), block_hash);
let mut outputs = Vec::new();
for output in block.outputs {
Expand Down