Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
435: refactor: store module

BREAKCHANGE: This PR is a break change since we changed the rocksdb's column family name and column list, need to delete old database.

This PR refactor db and chain store, make it easy to switch different key-value store implementation.
- [x] Refactor `Batch` to associated type
- [x] Refactor `ChainIndex`
- [x] Remove some static lifetime bound for `ChainIndex`
- [x] Change `Col` from Option to u32
- [x] Move `Cachedb` to db module

Co-authored-by: quake wang <quake.wang@gmail.com>
  • Loading branch information
doitian and quake committed Apr 11, 2019
2 parents 1a759e2 + a2e8b45 commit 60b8c1a
Show file tree
Hide file tree
Showing 29 changed files with 450 additions and 636 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions benches/benches/process_block.rs
Expand Up @@ -6,9 +6,8 @@ use ckb_core::script::Script;
use ckb_core::transaction::{
CellInput, CellOutput, OutPoint, ProposalShortId, Transaction, TransactionBuilder,
};
use ckb_db::{diskdb::RocksDB, DBConfig};
use ckb_db::{CacheDB, DBConfig, RocksDB};
use ckb_notify::NotifyService;
use ckb_shared::cachedb::CacheDB;
use ckb_shared::shared::{Shared, SharedBuilder};
use ckb_shared::store::ChainKVStore;
use ckb_traits::ChainProvider;
Expand Down
101 changes: 33 additions & 68 deletions chain/src/chain.rs
Expand Up @@ -6,13 +6,13 @@ use ckb_core::extras::BlockExt;
use ckb_core::service::{Request, DEFAULT_CHANNEL_SIZE, SIGNAL_CHANNEL_SIZE};
use ckb_core::transaction::ProposalShortId;
use ckb_core::BlockNumber;
use ckb_db::batch::Batch;
use ckb_notify::NotifyController;
use ckb_shared::cell_set::CellSetDiff;
use ckb_shared::chain_state::ChainState;
use ckb_shared::error::SharedError;
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::Shared;
use ckb_shared::store::StoreBatch;
use ckb_traits::ChainProvider;
use ckb_verification::{BlockVerifier, TransactionsVerifier, Verifier};
use crossbeam_channel::{self, select, Receiver, Sender};
Expand Down Expand Up @@ -172,7 +172,6 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
Ok(())
}

#[allow(clippy::op_ref)]
pub(crate) fn insert_block(&self, block: Arc<Block>) -> Result<(), FailureError> {
let mut new_best_block = false;
let mut total_difficulty = U256::zero();
Expand Down Expand Up @@ -208,38 +207,31 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
txs_verified: None,
};

self.shared.store().save_with_batch(|batch| {
self.shared.store().insert_block(batch, &block);

if (cannon_total_difficulty > current_total_difficulty)
|| ((current_total_difficulty == cannon_total_difficulty)
&& (block.header().hash() < tip_hash))
{
debug!(
target: "chain",
"new best block found: {} => {}, difficulty diff = {}",
block.header().number(), block.header().hash(),
&cannon_total_difficulty - &current_total_difficulty
);

self.find_fork(&mut fork, tip_number, &block, ext);
cell_set_diff = self.reconcile_main_chain(batch, &mut fork, &mut chain_state)?;
self.update_index(batch, &fork.detached_blocks, &fork.attached_blocks);
self.update_proposal_ids(&mut chain_state, &fork);
self.shared
.store()
.insert_tip_header(batch, &block.header());

new_best_block = true;

total_difficulty = cannon_total_difficulty;
} else {
self.shared
.store()
.insert_block_ext(batch, &block.header().hash(), &ext);
}
Ok(())
})?;
let mut batch = self.shared.store().new_batch()?;
batch.insert_block(&block)?;
if (cannon_total_difficulty > current_total_difficulty)
|| ((current_total_difficulty == cannon_total_difficulty)
&& (block.header().hash() < tip_hash))
{
debug!(
target: "chain",
"new best block found: {} => {}, difficulty diff = {}",
block.header().number(), block.header().hash(),
&cannon_total_difficulty - &current_total_difficulty
);

self.find_fork(&mut fork, tip_number, &block, ext);
cell_set_diff = self.reconcile_main_chain(&mut batch, &mut fork, &mut chain_state)?;
self.update_index(&mut batch, &fork.detached_blocks, &fork.attached_blocks)?;
self.update_proposal_ids(&mut chain_state, &fork);
batch.insert_tip_header(&block.header())?;
new_best_block = true;

total_difficulty = cannon_total_difficulty;
} else {
batch.insert_block_ext(&block.header().hash(), &ext)?;
}
batch.commit()?;

if new_best_block {
let tip_header = block.header().clone();
Expand Down Expand Up @@ -270,43 +262,18 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {

pub(crate) fn update_index(
&self,
batch: &mut Batch,
batch: &mut StoreBatch,
detached_blocks: &[Block],
attached_blocks: &[Block],
) {
let old_number = match detached_blocks.get(0) {
Some(b) => b.header().number(),
None => 0,
};

let new_number = attached_blocks[0].header().number();

) -> Result<(), FailureError> {
for block in detached_blocks {
self.shared
.store()
.delete_block_number(batch, &block.header().hash());
self.shared
.store()
.delete_transaction_address(batch, block.commit_transactions());
batch.detach_block(block)?;
}

for block in attached_blocks {
let number = block.header().number();
let hash = block.header().hash();
self.shared.store().insert_block_hash(batch, number, &hash);
self.shared
.store()
.insert_block_number(batch, &hash, number);
self.shared.store().insert_transaction_address(
batch,
&hash,
block.commit_transactions(),
);
}

for n in new_number..old_number {
self.shared.store().delete_block_hash(batch, n + 1);
batch.attach_block(block)?;
}
Ok(())
}

fn alignment_fork(
Expand Down Expand Up @@ -423,7 +390,7 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
// we found new best_block total_difficulty > old_chain.total_difficulty
pub(crate) fn reconcile_main_chain(
&self,
batch: &mut Batch,
batch: &mut StoreBatch,
fork: &mut ForkChanges,
chain_state: &mut ChainState<CI>,
) -> Result<CellSetDiff, FailureError> {
Expand Down Expand Up @@ -498,9 +465,7 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
.zip(fork.attached_blocks().iter())
.rev()
{
self.shared
.store()
.insert_block_ext(batch, &b.header().hash(), ext);
batch.insert_block_ext(&b.header().hash(), ext)?;
}

if let Some(err) = found_error {
Expand Down
2 changes: 1 addition & 1 deletion db/Cargo.toml
Expand Up @@ -6,7 +6,6 @@ authors = ["Nervos Core Dev <dev@nervos.org>"]
edition = "2018"

[dependencies]
bincode = "1.1"
ckb-util = { path = "../util" }
rocksdb = "0.12.1"
fnv = "1.0.3"
Expand All @@ -15,6 +14,7 @@ serde_derive = "1.0"
serde_json = "1.0"
failure = "0.1.5"
log = "0.4"
lru-cache = { git = "https://github.com/nervosnetwork/lru-cache" }

[dev-dependencies]
tempfile = "3.0"
45 changes: 0 additions & 45 deletions db/src/batch.rs

This file was deleted.

26 changes: 7 additions & 19 deletions shared/src/cachedb.rs → db/src/cachedb.rs
@@ -1,5 +1,4 @@
use ckb_db::batch::{Batch, Col, Operation};
use ckb_db::kvdb::{KeyValueDB, Result};
use crate::{Col, KeyValueDB, Result};
use ckb_util::RwLock;
use fnv::FnvHashMap;
use lru_cache::LruCache;
Expand All @@ -23,7 +22,7 @@ where
pub fn new(db: T, cols: &[CacheCols]) -> Self {
let mut table = FnvHashMap::with_capacity_and_hasher(cols.len(), Default::default());
for (idx, capacity) in cols {
table.insert(Some(*idx), LruCache::new(*capacity));
table.insert(*idx, LruCache::new(*capacity));
}
CacheDB {
db,
Expand All @@ -36,22 +35,7 @@ impl<T> KeyValueDB for CacheDB<T>
where
T: KeyValueDB,
{
fn write(&self, batch: Batch) -> Result<()> {
let mut cache_guard = self.cache.write();
batch.operations.iter().for_each(|op| match op {
Operation::Insert { col, key, value } => {
if let Some(lru) = cache_guard.get_mut(&col) {
lru.insert(key.clone(), value.clone());
}
}
Operation::Delete { col, key } => {
if let Some(lru) = cache_guard.get_mut(&col) {
lru.remove(key);
}
}
});
self.db.write(batch)
}
type Batch = T::Batch;

fn read(&self, col: Col, key: &[u8]) -> Result<Option<Vec<u8>>> {
let cache_guard = self.cache.read();
Expand All @@ -72,4 +56,8 @@ where
}
self.db.partial_read(col, key, range)
}

fn batch(&self) -> Result<Self::Batch> {
self.db.batch()
}
}
39 changes: 0 additions & 39 deletions db/src/kvdb.rs

This file was deleted.

35 changes: 30 additions & 5 deletions db/src/lib.rs
Expand Up @@ -3,13 +3,38 @@
//! This Library contains the `KeyValueDB` traits
//! which provides key-value store interface

pub mod batch;
use failure::Fail;
use std::ops::Range;
use std::result;

pub mod cachedb;
pub mod config;
pub mod diskdb;
pub mod kvdb;
pub mod memorydb;
pub mod rocksdb;

pub use crate::cachedb::CacheDB;
pub use crate::config::DBConfig;
pub use crate::diskdb::RocksDB;
pub use crate::kvdb::KeyValueDB;
pub use crate::memorydb::MemoryKeyValueDB;
pub use crate::rocksdb::RocksDB;

pub type Col = u32;
pub type Result<T> = result::Result<T, Error>;

#[derive(Clone, Debug, PartialEq, Eq, Fail)]
pub enum Error {
#[fail(display = "DBError {}", _0)]
DBError(String),
}

pub trait KeyValueDB: Sync + Send {
type Batch: DbBatch;
fn read(&self, col: Col, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn partial_read(&self, col: Col, key: &[u8], range: &Range<usize>) -> Result<Option<Vec<u8>>>;
fn batch(&self) -> Result<Self::Batch>;
}

pub trait DbBatch {
fn insert(&mut self, col: Col, key: &[u8], value: &[u8]) -> Result<()>;
fn delete(&mut self, col: Col, key: &[u8]) -> Result<()>;
fn commit(self) -> Result<()>;
}

0 comments on commit 60b8c1a

Please sign in to comment.