Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

JournalDB can now operate in "archive" mode #589

Merged
merged 5 commits into from
Mar 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 15 additions & 3 deletions ethcore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,24 @@ pub enum BlockStatus {
}

/// Client configuration. Includes configs for all sub-systems.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct ClientConfig {
/// Block queue configuration.
pub queue: BlockQueueConfig,
/// Blockchain configuration.
pub blockchain: BlockChainConfig,
/// Prefer journal rather than archive.
pub prefer_journal: bool,
}

impl Default for ClientConfig {
fn default() -> ClientConfig {
ClientConfig {
queue: Default::default(),
blockchain: Default::default(),
prefer_journal: false,
}
}
}

/// Information about the blockchain gathered together.
Expand Down Expand Up @@ -212,15 +224,15 @@ impl Client {
let mut dir = path.to_path_buf();
dir.push(H64::from(spec.genesis_header().hash()).hex());
//TODO: sec/fat: pruned/full versioning
dir.push(format!("v{}-sec-pruned", CLIENT_DB_VER_STR));
dir.push(format!("v{}-sec-{}", CLIENT_DB_VER_STR, if config.prefer_journal { "pruned" } else { "archive" }));
let path = dir.as_path();
let gb = spec.genesis_block();
let chain = Arc::new(RwLock::new(BlockChain::new(config.blockchain, &gb, path)));
let mut state_path = path.to_path_buf();
state_path.push("state");

let engine = Arc::new(try!(spec.to_engine()));
let mut state_db = JournalDB::new(state_path.to_str().unwrap());
let mut state_db = JournalDB::from_prefs(state_path.to_str().unwrap(), config.prefer_journal);
if state_db.is_empty() && engine.spec().ensure_db_good(&mut state_db) {
state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
}
Expand Down
3 changes: 3 additions & 0 deletions parity/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Usage:
Options:
--chain CHAIN Specify the blockchain type. CHAIN may be either a JSON chain specification file
or frontier, mainnet, morden, or testnet [default: frontier].
--archive Client should not prune the state/storage trie.
-d --db-path PATH Specify the database & configuration directory path [default: $HOME/.parity]
--keys-path PATH Specify the path for JSON key files to be found [default: $HOME/.web3/keys]

Expand Down Expand Up @@ -102,6 +103,7 @@ struct Args {
flag_chain: String,
flag_db_path: String,
flag_keys_path: String,
flag_archive: bool,
flag_no_bootstrap: bool,
flag_listen_address: String,
flag_public_address: Option<String>,
Expand Down Expand Up @@ -311,6 +313,7 @@ impl Configuration {
let mut client_config = ClientConfig::default();
client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size;
client_config.blockchain.max_cache_size = self.args.flag_cache_max_size;
client_config.prefer_journal = !self.args.flag_archive;
client_config.queue.max_mem_use = self.args.flag_queue_max_size;
let mut service = ClientService::start(client_config, spec, net_settings, &Path::new(&self.path())).unwrap();
let client = service.client().clone();
Expand Down
103 changes: 72 additions & 31 deletions util/src/journaldb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use kvdb::{Database, DBTransaction, DatabaseConfig};
use std::env;

/// Implementation of the HashDB trait for a disk-backed database with a memory overlay
/// and latent-removal semantics.
/// and, possibly, latent-removal semantics.
///
/// If `counters` is `None`, then it behaves exactly like OverlayDB. If not it behaves
/// differently:
///
/// Like OverlayDB, there is a memory overlay; `commit()` must be called in order to
/// write operations out to disk. Unlike OverlayDB, `remove()` operations do not take effect
Expand All @@ -34,7 +37,7 @@ use std::env;
pub struct JournalDB {
overlay: MemoryDB,
backing: Arc<Database>,
counters: Arc<RwLock<HashMap<H256, i32>>>,
counters: Option<Arc<RwLock<HashMap<H256, i32>>>>,
}

impl Clone for JournalDB {
Expand All @@ -48,36 +51,50 @@ impl Clone for JournalDB {
}

// all keys must be at least 12 bytes
const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ];
const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ];
const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ];
const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ];

const DB_VERSION: u32 = 3;
const DB_VERSION : u32 = 3;
const DB_VERSION_NO_JOURNAL : u32 = 3 + 256;

const PADDING : [u8; 10] = [ 0u8; 10 ];

impl JournalDB {

/// Create a new instance from file
pub fn new(path: &str) -> JournalDB {
Self::from_prefs(path, true)
}

/// Create a new instance from file
pub fn from_prefs(path: &str, prefer_journal: bool) -> JournalDB {
let opts = DatabaseConfig {
prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix
};
let backing = Database::open(&opts, path).unwrap_or_else(|e| {
panic!("Error opening state db: {}", e);
});
let with_journal;
if !backing.is_empty() {
match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::<u32>(&v))) {
Ok(Some(DB_VERSION)) => {},
Ok(Some(DB_VERSION)) => { with_journal = true; },
Ok(Some(DB_VERSION_NO_JOURNAL)) => { with_journal = false; },
v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v)
}
} else {
backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database");
backing.put(&VERSION_KEY, &encode(&(if prefer_journal { DB_VERSION } else { DB_VERSION_NO_JOURNAL }))).expect("Error writing version to database");
with_journal = prefer_journal;
}
let counters = JournalDB::read_counters(&backing);

let counters = if with_journal {
Some(Arc::new(RwLock::new(JournalDB::read_counters(&backing))))
} else {
None
};
JournalDB {
overlay: MemoryDB::new(),
backing: Arc::new(backing),
counters: Arc::new(RwLock::new(counters)),
counters: counters,
}
}

Expand All @@ -94,9 +111,47 @@ impl JournalDB {
self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none()
}

/// Commit all recent insert operations.
pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
let have_counters = self.counters.is_some();
if have_counters {
self.commit_with_counters(now, id, end)
} else {
self.commit_without_counters()
}
}

/// Drain the overlay and place it into a batch for the DB.
fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> (usize, usize) {
let mut ret = 0usize;
let mut deletes = 0usize;
for i in overlay.drain().into_iter() {
let (key, (value, rc)) = i;
if rc > 0 {
assert!(rc == 1);
batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?");
ret += 1;
}
if rc < 0 {
assert!(rc == -1);
ret += 1;
deletes += 1;
}
}
(ret, deletes)
}

/// Just commit the overlay into the backing DB.
fn commit_without_counters(&mut self) -> Result<u32, UtilError> {
let batch = DBTransaction::new();
let (ret, _) = Self::batch_overlay_insertions(&mut self.overlay, &batch);
try!(self.backing.write(batch));
Ok(ret as u32)
}

/// Commit all recent insert operations and historical removals from the old era
/// to the backing database.
pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result<u32, UtilError> {
// journal format:
// [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ]
// [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ]
Expand All @@ -121,9 +176,9 @@ impl JournalDB {
// and the key is safe to delete.

// record new commit's details.
debug!("commit: #{} ({}), end era: {:?}", now, id, end);
trace!("commit: #{} ({}), end era: {:?}", now, id, end);
let mut counters = self.counters.as_ref().unwrap().write().unwrap();
let batch = DBTransaction::new();
let mut counters = self.counters.write().unwrap();
{
let mut index = 0usize;
let mut last;
Expand Down Expand Up @@ -192,29 +247,15 @@ impl JournalDB {
try!(batch.delete(&h));
deletes += 1;
}
debug!("commit: Delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deletes);
trace!("commit: Delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deletes);
}

// Commit overlay insertions
let mut ret = 0u32;
let mut deletes = 0usize;
for i in self.overlay.drain().into_iter() {
let (key, (value, rc)) = i;
if rc > 0 {
assert!(rc == 1);
batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?");
ret += 1;
}
if rc < 0 {
assert!(rc == -1);
ret += 1;
deletes += 1;
}
}
let (ret, deletes) = Self::batch_overlay_insertions(&mut self.overlay, &batch);

try!(self.backing.write(batch));
debug!("commit: Deleted {} nodes", deletes);
Ok(ret)
trace!("commit: Deleted {} nodes", deletes);
Ok(ret as u32)
}


Expand Down Expand Up @@ -262,7 +303,7 @@ impl JournalDB {
era -= 1;
}
}
debug!("Recovered {} counters", res.len());
trace!("Recovered {} counters", res.len());
res
}
}
Expand Down
2 changes: 1 addition & 1 deletion util/src/overlaydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl OverlayDB {
})
}

/// Get the refs and value of the given key.
/// Put the refs and value of the given key, possibly deleting it from the db.
fn put_payload(&self, key: &H256, payload: (Bytes, u32)) -> bool {
if payload.1 > 0 {
let mut s = RlpStream::new_list(2);
Expand Down