Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Canonical state cache (master) (#2311)
Browse files Browse the repository at this point in the history
* State cache

* Reduced copying data between caches

Whitespace and optional symbols

* Reduced copying data between caches

Whitespace and optional symbols

* Set a limit on storage cache

* Style and docs
  • Loading branch information
arkpar committed Sep 27, 2016
1 parent 9d4bee4 commit ad63780
Show file tree
Hide file tree
Showing 20 changed files with 702 additions and 211 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ ethkey = { path = "../ethkey" }
ethcore-ipc-nano = { path = "../ipc/nano" }
rlp = { path = "../util/rlp" }
rand = "0.3"
lru-cache = "0.0.7"

[dependencies.hyper]
git = "https://github.com/ethcore/hyper"
Expand Down
49 changes: 27 additions & 22 deletions ethcore/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use common::*;
use engines::Engine;
use state::*;
use state_db::StateDB;
use verification::PreverifiedBlock;
use trace::FlatTrace;
use factory::Factories;
Expand Down Expand Up @@ -179,7 +180,7 @@ pub trait IsBlock {
/// Trait for a object that has a state database.
pub trait Drain {
/// Drop this object and return the underlieing database.
fn drain(self) -> Box<JournalDB>;
fn drain(self) -> StateDB;
}

impl IsBlock for ExecutedBlock {
Expand Down Expand Up @@ -231,7 +232,7 @@ impl<'x> OpenBlock<'x> {
engine: &'x Engine,
factories: Factories,
tracing: bool,
db: Box<JournalDB>,
db: StateDB,
parent: &Header,
last_hashes: Arc<LastHashes>,
author: Address,
Expand Down Expand Up @@ -474,7 +475,9 @@ impl LockedBlock {

impl Drain for LockedBlock {
/// Drop this object and return the underlieing database.
fn drain(self) -> Box<JournalDB> { self.block.state.drop().1 }
fn drain(self) -> StateDB {
self.block.state.drop().1
}
}

impl SealedBlock {
Expand All @@ -490,7 +493,9 @@ impl SealedBlock {

impl Drain for SealedBlock {
/// Drop this object and return the underlieing database.
fn drain(self) -> Box<JournalDB> { self.block.state.drop().1 }
fn drain(self) -> StateDB {
self.block.state.drop().1
}
}

impl IsBlock for SealedBlock {
Expand All @@ -505,7 +510,7 @@ pub fn enact(
uncles: &[Header],
engine: &Engine,
tracing: bool,
db: Box<JournalDB>,
db: StateDB,
parent: &Header,
last_hashes: Arc<LastHashes>,
factories: Factories,
Expand Down Expand Up @@ -537,7 +542,7 @@ pub fn enact_bytes(
block_bytes: &[u8],
engine: &Engine,
tracing: bool,
db: Box<JournalDB>,
db: StateDB,
parent: &Header,
last_hashes: Arc<LastHashes>,
factories: Factories,
Expand All @@ -553,7 +558,7 @@ pub fn enact_verified(
block: &PreverifiedBlock,
engine: &Engine,
tracing: bool,
db: Box<JournalDB>,
db: StateDB,
parent: &Header,
last_hashes: Arc<LastHashes>,
factories: Factories,
Expand All @@ -568,7 +573,7 @@ pub fn enact_and_seal(
block_bytes: &[u8],
engine: &Engine,
tracing: bool,
db: Box<JournalDB>,
db: StateDB,
parent: &Header,
last_hashes: Arc<LastHashes>,
factories: Factories,
Expand All @@ -588,9 +593,9 @@ mod tests {
use spec::*;
let spec = Spec::new_test();
let genesis_header = spec.genesis_header();
let mut db_result = get_temp_journal_db();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
spec.ensure_db_good(db.as_hashdb_mut()).unwrap();
spec.ensure_db_good(&mut db).unwrap();
let last_hashes = Arc::new(vec![genesis_header.hash()]);
let b = OpenBlock::new(&*spec.engine, Default::default(), false, db, &genesis_header, last_hashes, Address::zero(), (3141562.into(), 31415620.into()), vec![]).unwrap();
let b = b.close_and_lock();
Expand All @@ -604,25 +609,25 @@ mod tests {
let engine = &*spec.engine;
let genesis_header = spec.genesis_header();

let mut db_result = get_temp_journal_db();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
spec.ensure_db_good(db.as_hashdb_mut()).unwrap();
spec.ensure_db_good(&mut db).unwrap();
let last_hashes = Arc::new(vec![genesis_header.hash()]);
let b = OpenBlock::new(engine, Default::default(), false, db, &genesis_header, last_hashes.clone(), Address::zero(), (3141562.into(), 31415620.into()), vec![]).unwrap()
.close_and_lock().seal(engine, vec![]).unwrap();
let orig_bytes = b.rlp_bytes();
let orig_db = b.drain();

let mut db_result = get_temp_journal_db();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
spec.ensure_db_good(db.as_hashdb_mut()).unwrap();
spec.ensure_db_good(&mut db).unwrap();
let e = enact_and_seal(&orig_bytes, engine, false, db, &genesis_header, last_hashes, Default::default()).unwrap();

assert_eq!(e.rlp_bytes(), orig_bytes);

let db = e.drain();
assert_eq!(orig_db.keys(), db.keys());
assert!(orig_db.keys().iter().filter(|k| orig_db.get(k.0) != db.get(k.0)).next() == None);
assert_eq!(orig_db.journal_db().keys(), db.journal_db().keys());
assert!(orig_db.journal_db().keys().iter().filter(|k| orig_db.journal_db().get(k.0) != db.journal_db().get(k.0)).next() == None);
}

#[test]
Expand All @@ -632,9 +637,9 @@ mod tests {
let engine = &*spec.engine;
let genesis_header = spec.genesis_header();

let mut db_result = get_temp_journal_db();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
spec.ensure_db_good(db.as_hashdb_mut()).unwrap();
spec.ensure_db_good(&mut db).unwrap();
let last_hashes = Arc::new(vec![genesis_header.hash()]);
let mut open_block = OpenBlock::new(engine, Default::default(), false, db, &genesis_header, last_hashes.clone(), Address::zero(), (3141562.into(), 31415620.into()), vec![]).unwrap();
let mut uncle1_header = Header::new();
Expand All @@ -648,9 +653,9 @@ mod tests {
let orig_bytes = b.rlp_bytes();
let orig_db = b.drain();

let mut db_result = get_temp_journal_db();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
spec.ensure_db_good(db.as_hashdb_mut()).unwrap();
spec.ensure_db_good(&mut db).unwrap();
let e = enact_and_seal(&orig_bytes, engine, false, db, &genesis_header, last_hashes, Default::default()).unwrap();

let bytes = e.rlp_bytes();
Expand All @@ -659,7 +664,7 @@ mod tests {
assert_eq!(uncles[1].extra_data(), b"uncle2");

let db = e.drain();
assert_eq!(orig_db.keys(), db.keys());
assert!(orig_db.keys().iter().filter(|k| orig_db.get(k.0) != db.get(k.0)).next() == None);
assert_eq!(orig_db.journal_db().keys(), db.journal_db().keys());
assert!(orig_db.journal_db().keys().iter().filter(|k| orig_db.journal_db().get(k.0) != db.journal_db().get(k.0)).next() == None);
}
}
40 changes: 21 additions & 19 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use time::precise_time_ns;

// util
use util::{Bytes, PerfTimer, Itertools, Mutex, RwLock};
use util::journaldb::{self, JournalDB};
use util::journaldb;
use util::{U256, H256, Address, H2048, Uint};
use util::TrieFactory;
use util::kvdb::*;
Expand Down Expand Up @@ -65,7 +65,7 @@ use miner::{Miner, MinerService};
use snapshot::{self, io as snapshot_io};
use factory::Factories;
use rlp::{View, UntrustedRlp};

use state_db::StateDB;

// re-export
pub use types::blockchain_info::BlockChainInfo;
Expand Down Expand Up @@ -125,9 +125,9 @@ pub struct Client {
tracedb: RwLock<TraceDB<BlockChain>>,
engine: Arc<Engine>,
config: ClientConfig,
db: RwLock<Arc<Database>>,
pruning: journaldb::Algorithm,
state_db: RwLock<Box<JournalDB>>,
db: RwLock<Arc<Database>>,
state_db: Mutex<StateDB>,
block_queue: BlockQueue,
report: RwLock<ClientReport>,
import_lock: Mutex<()>,
Expand Down Expand Up @@ -171,14 +171,15 @@ impl Client {
let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone()));
let tracedb = RwLock::new(TraceDB::new(config.tracing.clone(), db.clone(), chain.clone()));

let mut state_db = journaldb::new(db.clone(), config.pruning, ::db::COL_STATE);
if state_db.is_empty() && try!(spec.ensure_db_good(state_db.as_hashdb_mut())) {
let journal_db = journaldb::new(db.clone(), config.pruning, ::db::COL_STATE);
let mut state_db = StateDB::new(journal_db);
if state_db.journal_db().is_empty() && try!(spec.ensure_db_good(&mut state_db)) {
let mut batch = DBTransaction::new(&db);
try!(state_db.commit(&mut batch, 0, &spec.genesis_header().hash(), None));
try!(db.write(batch).map_err(ClientError::Database));
}

if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.contains(h.state_root())) {
if !chain.block_header(&chain.best_block_hash()).map_or(true, |h| state_db.journal_db().contains(h.state_root())) {
warn!("State root not found for block #{} ({})", chain.best_block_number(), chain.best_block_hash().hex());
}

Expand Down Expand Up @@ -207,7 +208,7 @@ impl Client {
verifier: verification::new(config.verifier_type.clone()),
config: config,
db: RwLock::new(db),
state_db: RwLock::new(state_db),
state_db: Mutex::new(state_db),
block_queue: block_queue,
report: RwLock::new(Default::default()),
import_lock: Mutex::new(()),
Expand Down Expand Up @@ -298,7 +299,8 @@ impl Client {
// Enact Verified Block
let parent = chain_has_parent.unwrap();
let last_hashes = self.build_last_hashes(header.parent_hash().clone());
let db = self.state_db.read().boxed_clone();
let is_canon = header.parent_hash() == &chain.best_block_hash();
let db = if is_canon { self.state_db.lock().boxed_clone_canon() } else { self.state_db.lock().boxed_clone() };

let enact_result = enact_verified(block, engine, self.tracedb.read().tracing_enabled(), db, &parent, last_hashes, self.factories.clone());
if let Err(e) = enact_result {
Expand Down Expand Up @@ -441,7 +443,8 @@ impl Client {
// CHECK! I *think* this is fine, even if the state_root is equal to another
// already-imported block of the same number.
// TODO: Prove it with a test.
block.drain().commit(&mut batch, number, hash, ancient).expect("DB commit failed.");
let mut state = block.drain();
state.commit(&mut batch, number, hash, ancient).expect("DB commit failed.");

let route = chain.insert_block(&mut batch, block_data, receipts);
self.tracedb.read().import(&mut batch, TraceImportRequest {
Expand All @@ -454,7 +457,6 @@ impl Client {
// Final commit to the DB
self.db.read().write_buffered(batch);
chain.commit();

self.update_last_hashes(&parent, hash);
route
}
Expand Down Expand Up @@ -496,7 +498,7 @@ impl Client {
};

self.block_header(id).and_then(|header| {
let db = self.state_db.read().boxed_clone();
let db = self.state_db.lock().boxed_clone();

// early exit for pruned blocks
if db.is_pruned() && self.chain.read().best_block_number() >= block_number + HISTORY {
Expand Down Expand Up @@ -527,7 +529,7 @@ impl Client {
/// Get a copy of the best block's state.
pub fn state(&self) -> State {
State::from_existing(
self.state_db.read().boxed_clone(),
self.state_db.lock().boxed_clone(),
HeaderView::new(&self.best_block_header()).state_root(),
self.engine.account_start_nonce(),
self.factories.clone())
Expand All @@ -542,7 +544,7 @@ impl Client {
/// Get the report.
pub fn report(&self) -> ClientReport {
let mut report = self.report.read().clone();
report.state_db_mem = self.state_db.read().mem_used();
report.state_db_mem = self.state_db.lock().mem_used();
report
}

Expand Down Expand Up @@ -598,7 +600,7 @@ impl Client {
/// Take a snapshot at the given block.
/// If the ID given is "latest", this will default to 1000 blocks behind.
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockID, p: &snapshot::Progress) -> Result<(), EthcoreError> {
let db = self.state_db.read().boxed_clone();
let db = self.state_db.lock().journal_db().boxed_clone();
let best_block_number = self.chain_info().best_block_number;
let block_number = try!(self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at)));

Expand Down Expand Up @@ -677,14 +679,14 @@ impl snapshot::DatabaseRestore for Client {
trace!(target: "snapshot", "Replacing client database with {:?}", new_db);

let _import_lock = self.import_lock.lock();
let mut state_db = self.state_db.write();
let mut state_db = self.state_db.lock();
let mut chain = self.chain.write();
let mut tracedb = self.tracedb.write();
self.miner.clear();
let db = self.db.write();
try!(db.restore(new_db));

*state_db = journaldb::new(db.clone(), self.pruning, ::db::COL_STATE);
*state_db = StateDB::new(journaldb::new(db.clone(), self.pruning, ::db::COL_STATE));
*chain = Arc::new(BlockChain::new(self.config.blockchain.clone(), &[], db.clone()));
*tracedb = TraceDB::new(self.config.tracing.clone(), db.clone(), chain.clone());
Ok(())
Expand Down Expand Up @@ -908,7 +910,7 @@ impl BlockChainClient for Client {
}

fn state_data(&self, hash: &H256) -> Option<Bytes> {
self.state_db.read().state(hash)
self.state_db.lock().journal_db().state(hash)
}

fn block_receipts(&self, hash: &H256) -> Option<Bytes> {
Expand Down Expand Up @@ -1050,7 +1052,7 @@ impl MiningBlockChainClient for Client {
engine,
self.factories.clone(),
false, // TODO: this will need to be parameterised once we want to do immediate mining insertion.
self.state_db.read().boxed_clone(),
self.state_db.lock().boxed_clone(),
&chain.block_header(&h).expect("h is best block hash: so its header must exist: qed"),
self.build_last_hashes(h.clone()),
author,
Expand Down
10 changes: 6 additions & 4 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use block::{OpenBlock, SealedBlock};
use executive::Executed;
use error::CallError;
use trace::LocalizedTrace;
use state_db::StateDB;

/// Test client.
pub struct TestBlockChainClient {
Expand Down Expand Up @@ -283,23 +284,24 @@ impl TestBlockChainClient {
}
}

pub fn get_temp_journal_db() -> GuardedTempResult<Box<JournalDB>> {
pub fn get_temp_state_db() -> GuardedTempResult<StateDB> {
let temp = RandomTempPath::new();
let db = Database::open_default(temp.as_str()).unwrap();
let journal_db = journaldb::new(Arc::new(db), journaldb::Algorithm::EarlyMerge, None);
let state_db = StateDB::new(journal_db);
GuardedTempResult {
_temp: temp,
result: Some(journal_db)
result: Some(state_db)
}
}

impl MiningBlockChainClient for TestBlockChainClient {
fn prepare_open_block(&self, author: Address, gas_range_target: (U256, U256), extra_data: Bytes) -> OpenBlock {
let engine = &*self.spec.engine;
let genesis_header = self.spec.genesis_header();
let mut db_result = get_temp_journal_db();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
self.spec.ensure_db_good(db.as_hashdb_mut()).unwrap();
self.spec.ensure_db_good(&mut db).unwrap();

let last_hashes = vec![genesis_header.hash()];
let mut open_block = OpenBlock::new(
Expand Down

0 comments on commit ad63780

Please sign in to comment.