Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The Header MMR (One MMR To Rule Them All) #1716

Merged
merged 20 commits into from Oct 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/src/types.rs
Expand Up @@ -97,9 +97,9 @@ impl TxHashSet {
pub fn from_head(head: Arc<chain::Chain>) -> TxHashSet {
let roots = head.get_txhashset_roots();
TxHashSet {
output_root_hash: roots.0.to_hex(),
range_proof_root_hash: roots.1.to_hex(),
kernel_root_hash: roots.2.to_hex(),
output_root_hash: roots.output_root.to_hex(),
range_proof_root_hash: roots.rproof_root.to_hex(),
kernel_root_hash: roots.kernel_root.to_hex(),
}
}
}
Expand Down
163 changes: 106 additions & 57 deletions chain/src/chain.rs
Expand Up @@ -35,7 +35,7 @@ use grin_store::Error::NotFoundErr;
use pipe;
use store;
use txhashset;
use types::{ChainAdapter, NoStatus, Options, Tip, TxHashsetWriteStatus};
use types::{ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus};
use util::secp::pedersen::{Commitment, RangeProof};
use util::LOGGER;

Expand Down Expand Up @@ -153,6 +153,7 @@ pub struct Chain {
// POW verification function
pow_verifier: fn(&BlockHeader, u8) -> Result<(), pow::Error>,
archive_mode: bool,
genesis: BlockHeader,
}

unsafe impl Sync for Chain {}
Expand All @@ -178,7 +179,7 @@ impl Chain {
// open the txhashset, creating a new one if necessary
let mut txhashset = txhashset::TxHashSet::open(db_root.clone(), store.clone(), None)?;

setup_head(genesis, store.clone(), &mut txhashset)?;
setup_head(genesis.clone(), store.clone(), &mut txhashset)?;

let head = store.head()?;
debug!(
Expand All @@ -199,6 +200,7 @@ impl Chain {
verifier_cache,
block_hashes_cache: Arc::new(RwLock::new(LruCache::new(HASHES_CACHE_SIZE))),
archive_mode,
genesis: genesis.header.clone(),
})
}

Expand Down Expand Up @@ -246,54 +248,52 @@ impl Chain {

Ok(head)
}
Err(e) => {
match e.kind() {
ErrorKind::Orphan => {
let block_hash = b.hash();
let orphan = Orphan {
block: b,
opts: opts,
added: Instant::now(),
};

&self.orphans.add(orphan);
Err(e) => match e.kind() {
ErrorKind::Orphan => {
let block_hash = b.hash();
let orphan = Orphan {
block: b,
opts: opts,
added: Instant::now(),
};

debug!(
LOGGER,
"process_block: orphan: {:?}, # orphans {}{}",
block_hash,
self.orphans.len(),
if self.orphans.len_evicted() > 0 {
format!(", # evicted {}", self.orphans.len_evicted())
} else {
String::new()
},
);
Err(ErrorKind::Orphan.into())
}
ErrorKind::Unfit(ref msg) => {
debug!(
LOGGER,
"Block {} at {} is unfit at this time: {}",
b.hash(),
b.header.height,
msg
);
Err(ErrorKind::Unfit(msg.clone()).into())
}
_ => {
info!(
LOGGER,
"Rejected block {} at {}: {:?}",
b.hash(),
b.header.height,
e
);
add_to_hash_cache(b.hash());
Err(ErrorKind::Other(format!("{:?}", e).to_owned()).into())
}
&self.orphans.add(orphan);

debug!(
LOGGER,
"process_block: orphan: {:?}, # orphans {}{}",
block_hash,
self.orphans.len(),
if self.orphans.len_evicted() > 0 {
format!(", # evicted {}", self.orphans.len_evicted())
} else {
String::new()
},
);
Err(ErrorKind::Orphan.into())
}
}
ErrorKind::Unfit(ref msg) => {
debug!(
LOGGER,
"Block {} at {} is unfit at this time: {}",
b.hash(),
b.header.height,
msg
);
Err(ErrorKind::Unfit(msg.clone()).into())
}
_ => {
info!(
LOGGER,
"Rejected block {} at {}: {:?}",
b.hash(),
b.header.height,
e
);
add_to_hash_cache(b.hash());
Err(ErrorKind::Other(format!("{:?}", e).to_owned()).into())
}
},
}
}

Expand Down Expand Up @@ -494,11 +494,15 @@ impl Chain {
Ok((extension.roots(), extension.sizes()))
})?;

// Carefully destructure these correctly...
// TODO - Maybe sizes should be a struct to add some type safety here...
let (_, output_mmr_size, _, kernel_mmr_size) = sizes;

b.header.output_root = roots.output_root;
b.header.range_proof_root = roots.rproof_root;
b.header.kernel_root = roots.kernel_root;
b.header.output_mmr_size = sizes.0;
b.header.kernel_mmr_size = sizes.2;
b.header.output_mmr_size = output_mmr_size;
b.header.kernel_mmr_size = kernel_mmr_size;
Ok(())
}

Expand Down Expand Up @@ -526,7 +530,7 @@ impl Chain {
}

/// Returns current txhashset roots
pub fn get_txhashset_roots(&self) -> (Hash, Hash, Hash) {
pub fn get_txhashset_roots(&self) -> TxHashSetRoots {
let mut txhashset = self.txhashset.write().unwrap();
txhashset.roots()
}
Expand Down Expand Up @@ -594,6 +598,40 @@ impl Chain {
Ok(())
}

/// Rebuild the sync MMR based on current header_head.
/// We rebuild the sync MMR when first entering sync mode so ensure we
/// have an MMR we can safely rewind based on the headers received from a peer.
/// TODO - think about how to optimize this.
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut txhashset = self.txhashset.write().unwrap();
let mut batch = self.store.batch()?;
txhashset::sync_extending(&mut txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}

/// Rebuild the header MMR based on current header_head.
/// We rebuild the header MMR after receiving a txhashset from a peer.
/// The txhashset contains output, rangeproof and kernel MMRs but we construct
/// the header MMR locally based on headers from our db.
/// TODO - think about how to optimize this.
fn rebuild_header_mmr(
&self,
head: &Tip,
txhashset: &mut txhashset::TxHashSet,
) -> Result<(), Error> {
let mut batch = self.store.batch()?;
txhashset::header_extending(txhashset, &mut batch, |extension| {
extension.rebuild(head, &self.genesis)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}

/// Writes a reading view on a txhashset state that's been provided to us.
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
Expand Down Expand Up @@ -621,6 +659,10 @@ impl Chain {
let mut txhashset =
txhashset::TxHashSet::open(self.db_root.clone(), self.store.clone(), Some(&header))?;

// The txhashset.zip contains the output, rangeproof and kernel MMRs.
// We must rebuild the header MMR ourselves based on the headers in our db.
self.rebuild_header_mmr(&Tip::from_block(&header), &mut txhashset)?;

// Validate the full kernel history (kernel MMR root for every block header).
self.validate_kernel_history(&header, &txhashset)?;

Expand Down Expand Up @@ -983,6 +1025,15 @@ fn setup_head(
// to match the provided block header.
let header = batch.get_block_header(&head.last_block_h)?;

// If we have no header MMR then rebuild as necessary.
// Supports old nodes with no header MMR.
txhashset::header_extending(txhashset, &mut batch, |extension| {
if extension.size() == 0 {
extension.rebuild(&head, &genesis.header)?;
}
Ok(())
})?;

let res = txhashset::extending(txhashset, &mut batch, |extension| {
extension.rewind(&header)?;
extension.validate_roots()?;
Expand Down Expand Up @@ -1042,17 +1093,15 @@ fn setup_head(
batch.save_head(&tip)?;
batch.setup_height(&genesis.header, &tip)?;

// Apply the genesis block to our empty MMRs.
txhashset::extending(txhashset, &mut batch, |extension| {
extension.apply_block(&genesis)?;

// Save the block_sums to the db for use later.
extension
.batch
.save_block_sums(&genesis.hash(), &BlockSums::default())?;

Ok(())
})?;

// Save the block_sums to the db for use later.
batch.save_block_sums(&genesis.hash(), &BlockSums::default())?;

info!(LOGGER, "chain: init: saved genesis: {:?}", genesis.hash());
}
Err(e) => return Err(ErrorKind::StoreErr(e, "chain init load head".to_owned()))?,
Expand Down
48 changes: 28 additions & 20 deletions chain/src/pipe.rs
Expand Up @@ -91,10 +91,19 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E

// Check if this block is already know due it being in the current set of orphan blocks.
check_known_orphans(&b.header, ctx)?;

// Check we have *this* block in the store.
// Stop if we have processed this block previously (it is in the store).
// This is more expensive than the earlier check_known() as we hit the store.
check_known_store(&b.header, ctx)?;
}

// Header specific processing.
handle_block_header(&b.header, ctx)?;
{
validate_header(&b.header, ctx)?;
add_block_header(&b.header, ctx)?;
update_header_head(&b.header, ctx)?;
}

// Check if are processing the "next" block relative to the current chain head.
let head = ctx.batch.head()?;
Expand All @@ -104,11 +113,6 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
// * special case where this is the first fast sync full block
// Either way we can proceed (and we know the block is new and unprocessed).
} else {
// Check we have *this* block in the store.
// Stop if we have processed this block previously (it is in the store).
// This is more expensive than the earlier check_known() as we hit the store.
check_known_store(&b.header, ctx)?;

// At this point it looks like this is a new block that we have not yet processed.
// Check we have the *previous* block in the store.
// If we do not then treat this block as an orphan.
Expand All @@ -128,7 +132,7 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
if is_next_block(&b.header, &head) {
// No need to rewind if we are processing the next block.
} else {
// Rewind the re-apply blocks on the forked chain to
// Rewind and re-apply blocks on the forked chain to
// put the txhashset in the correct forked state
// (immediately prior to this new block).
rewind_and_apply_fork(b, extension)?;
Expand Down Expand Up @@ -174,12 +178,8 @@ pub fn process_block(b: &Block, ctx: &mut BlockContext) -> Result<Option<Tip>, E
// Add the newly accepted block and header to our index.
add_block(b, ctx)?;

// Update the chain head (and header_head) if total work is increased.
let res = {
let _ = update_header_head(&b.header, ctx)?;
let res = update_head(b, ctx)?;
res
};
// Update the chain head if total work is increased.
let res = update_head(b, ctx)?;
Ok(res)
}

Expand Down Expand Up @@ -209,8 +209,22 @@ pub fn sync_block_headers(

if !all_known {
for header in headers {
handle_block_header(header, ctx)?;
validate_header(header, ctx)?;
add_block_header(header, ctx)?;
}

let first_header = headers.first().unwrap();
let prev_header = ctx.batch.get_block_header(&first_header.previous)?;
txhashset::sync_extending(&mut ctx.txhashset, &mut ctx.batch, |extension| {
// Optimize this if "next" header
extension.rewind(&prev_header)?;

for header in headers {
extension.apply_header(header)?;
}

Ok(())
})?;
}

// Update header_head (if most work) and sync_head (regardless) in all cases,
Expand All @@ -231,12 +245,6 @@ pub fn sync_block_headers(
}
}

fn handle_block_header(header: &BlockHeader, ctx: &mut BlockContext) -> Result<(), Error> {
validate_header(header, ctx)?;
add_block_header(header, ctx)?;
Ok(())
}

/// Process block header as part of "header first" block propagation.
/// We validate the header but we do not store it or update header head based
/// on this. We will update these once we get the block back after requesting
Expand Down