From a2e8b4588b69e3b2e3c89e514ff4cdef2d0b17b4 Mon Sep 17 00:00:00 2001 From: quake wang Date: Thu, 11 Apr 2019 10:11:01 +0900 Subject: [PATCH] chore: resolve some error handle issue --- chain/src/chain.rs | 22 +++--- db/src/lib.rs | 3 +- db/src/memorydb.rs | 6 +- db/src/rocksdb.rs | 31 ++++----- shared/src/index.rs | 20 +++--- shared/src/shared.rs | 4 +- shared/src/store.rs | 133 +++++++++++++++++++------------------ shared/src/tests/shared.rs | 14 ++-- 8 files changed, 114 insertions(+), 119 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 4284d9571c..e0a5002892 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -172,7 +172,6 @@ impl ChainService { Ok(()) } - #[allow(clippy::op_ref)] pub(crate) fn insert_block(&self, block: Arc) -> Result<(), FailureError> { let mut new_best_block = false; let mut total_difficulty = U256::zero(); @@ -208,8 +207,8 @@ impl ChainService { txs_verified: None, }; - let mut batch = self.shared.store().new_batch(); - batch.insert_block(&block); + let mut batch = self.shared.store().new_batch()?; + batch.insert_block(&block)?; if (cannon_total_difficulty > current_total_difficulty) || ((current_total_difficulty == cannon_total_difficulty) && (block.header().hash() < tip_hash)) @@ -223,16 +222,16 @@ impl ChainService { self.find_fork(&mut fork, tip_number, &block, ext); cell_set_diff = self.reconcile_main_chain(&mut batch, &mut fork, &mut chain_state)?; - self.update_index(&mut batch, &fork.detached_blocks, &fork.attached_blocks); + self.update_index(&mut batch, &fork.detached_blocks, &fork.attached_blocks)?; self.update_proposal_ids(&mut chain_state, &fork); - batch.insert_tip_header(&block.header()); + batch.insert_tip_header(&block.header())?; new_best_block = true; total_difficulty = cannon_total_difficulty; } else { - batch.insert_block_ext(&block.header().hash(), &ext); + batch.insert_block_ext(&block.header().hash(), &ext)?; } - batch.commit(); + batch.commit()?; if new_best_block { let tip_header = block.header().clone(); @@ -266,14 +265,15 @@ impl ChainService { batch: &mut StoreBatch, detached_blocks: &[Block], attached_blocks: &[Block], - ) { + ) -> Result<(), FailureError> { for block in detached_blocks { - batch.detach_block(block); + batch.detach_block(block)?; } for block in attached_blocks { - batch.attach_block(block); + batch.attach_block(block)?; } + Ok(()) } fn alignment_fork( @@ -465,7 +465,7 @@ impl ChainService { .zip(fork.attached_blocks().iter()) .rev() { - batch.insert_block_ext(&b.header().hash(), ext); + batch.insert_block_ext(&b.header().hash(), ext)?; } if let Some(err) = found_error { diff --git a/db/src/lib.rs b/db/src/lib.rs index a148265062..b7f7bde318 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -18,11 +18,10 @@ pub use crate::memorydb::MemoryKeyValueDB; pub use crate::rocksdb::RocksDB; pub type Col = u32; -pub type Error = ErrorKind; pub type Result = result::Result; #[derive(Clone, Debug, PartialEq, Eq, Fail)] -pub enum ErrorKind { +pub enum Error { #[fail(display = "DBError {}", _0)] DBError(String), } diff --git a/db/src/memorydb.rs b/db/src/memorydb.rs index bdc50df646..742937fa2f 100644 --- a/db/src/memorydb.rs +++ b/db/src/memorydb.rs @@ -1,5 +1,5 @@ // for unit test -use crate::{Col, DbBatch, ErrorKind, KeyValueDB, Result}; +use crate::{Col, DbBatch, Error, KeyValueDB, Result}; use ckb_util::RwLock; use fnv::FnvHashMap; use std::ops::Range; @@ -33,7 +33,7 @@ impl KeyValueDB for MemoryKeyValueDB { let db = self.db.read(); match db.get(&col) { - None => Err(ErrorKind::DBError(format!("column {:?} not found ", col))), + None => Err(Error::DBError(format!("column {} not found ", col))), Some(map) => Ok(map.get(key).cloned()), } } @@ -42,7 +42,7 @@ impl KeyValueDB for MemoryKeyValueDB { let db = self.db.read(); match db.get(&col) { - None => Err(ErrorKind::DBError(format!("column {:?} not found ", col))), + None => Err(Error::DBError(format!("column {} not found ", col))), Some(map) => Ok(map .get(key) .and_then(|data| data.get(range.start..range.end)) diff --git a/db/src/rocksdb.rs b/db/src/rocksdb.rs index 2cd8f40fcf..187c1bd18e 100644 --- a/db/src/rocksdb.rs +++ b/db/src/rocksdb.rs @@ -1,6 +1,6 @@ -use crate::{Col, DBConfig, DbBatch, Error, ErrorKind, KeyValueDB, Result}; +use crate::{Col, DBConfig, DbBatch, Error, KeyValueDB, Result}; use log::warn; -use rocksdb::{Error as RdbError, Options, WriteBatch, DB}; +use rocksdb::{ColumnFamily, Error as RdbError, Options, WriteBatch, DB}; use std::ops::Range; use std::sync::Arc; @@ -47,14 +47,16 @@ impl RocksDB { } } +fn cf_handle(db: &DB, col: Col) -> Result { + db.cf_handle(&col.to_string()) + .ok_or_else(|| Error::DBError(format!("column {} not found", col))) +} + impl KeyValueDB for RocksDB { type Batch = RocksdbBatch; fn read(&self, col: Col, key: &[u8]) -> Result>> { - let cf = self - .inner - .cf_handle(&col.to_string()) - .expect("column not found"); + let cf = cf_handle(&self.inner, col)?; self.inner .get_cf(cf, &key) .map(|v| v.map(|vi| vi.to_vec())) @@ -62,10 +64,7 @@ impl KeyValueDB for RocksDB { } fn partial_read(&self, col: Col, key: &[u8], range: &Range) -> Result>> { - let cf = self - .inner - .cf_handle(&col.to_string()) - .expect("column not found"); + let cf = cf_handle(&self.inner, col)?; self.inner .get_pinned_cf(cf, &key) .map(|v| v.and_then(|vi| vi.get(range.start..range.end).map(|slice| slice.to_vec()))) @@ -87,19 +86,13 @@ pub struct RocksdbBatch { impl DbBatch for RocksdbBatch { fn insert(&mut self, col: Col, key: &[u8], value: &[u8]) -> Result<()> { - let cf = self - .db - .cf_handle(&col.to_string()) - .expect("column not found"); + let cf = cf_handle(&self.db, col)?; self.wb.put_cf(cf, key, value)?; Ok(()) } fn delete(&mut self, col: Col, key: &[u8]) -> Result<()> { - let cf = self - .db - .cf_handle(&col.to_string()) - .expect("column not found"); + let cf = cf_handle(&self.db, col)?; self.wb.delete_cf(cf, &key)?; Ok(()) } @@ -112,7 +105,7 @@ impl DbBatch for RocksdbBatch { impl From for Error { fn from(err: RdbError) -> Error { - ErrorKind::DBError(err.into()) + Error::DBError(err.into()) } } diff --git a/shared/src/index.rs b/shared/src/index.rs index 0a7104fa67..a9fa14292a 100644 --- a/shared/src/index.rs +++ b/shared/src/index.rs @@ -5,14 +5,14 @@ use ckb_core::block::Block; use ckb_core::extras::{BlockExt, TransactionAddress}; use ckb_core::header::{BlockNumber, Header}; use ckb_core::transaction::{Transaction, TransactionBuilder}; -use ckb_db::KeyValueDB; +use ckb_db::{Error, KeyValueDB}; use numext_fixed_hash::H256; const META_TIP_HEADER_KEY: &[u8] = b"TIP_HEADER"; // maintain chain index, extend chainstore pub trait ChainIndex: ChainStore { - fn init(&self, genesis: &Block); + fn init(&self, genesis: &Block) -> Result<(), Error>; fn get_block_hash(&self, number: BlockNumber) -> Option; fn get_block_number(&self, hash: &H256) -> Option; fn get_tip_header(&self) -> Option
; @@ -21,8 +21,8 @@ pub trait ChainIndex: ChainStore { } impl ChainIndex for ChainKVStore { - fn init(&self, genesis: &Block) { - let mut batch = self.new_batch(); + fn init(&self, genesis: &Block) -> Result<(), Error> { + let mut batch = self.new_batch()?; let genesis_hash = genesis.header().hash(); let ext = BlockExt { received_at: genesis.header().timestamp(), @@ -44,11 +44,11 @@ impl ChainIndex for ChainKVStore { cells.push((ins, outs)); } - batch.insert_block(genesis); - batch.insert_block_ext(&genesis_hash, &ext); - batch.insert_tip_header(&genesis.header()); - batch.attach_block(genesis); - batch.commit(); + batch.insert_block(genesis)?; + batch.insert_block_ext(&genesis_hash, &ext)?; + batch.insert_tip_header(&genesis.header())?; + batch.attach_block(genesis)?; + batch.commit() } fn get_block_hash(&self, number: BlockNumber) -> Option { @@ -110,7 +110,7 @@ mod tests { let consensus = Consensus::default(); let block = consensus.genesis_block(); let hash = block.header().hash(); - store.init(&block); + store.init(&block).unwrap(); assert_eq!(&hash, &store.get_block_hash(0).unwrap()); assert_eq!( diff --git a/shared/src/shared.rs b/shared/src/shared.rs index 42a90dbb98..f2b5393ee2 100644 --- a/shared/src/shared.rs +++ b/shared/src/shared.rs @@ -55,7 +55,9 @@ impl Shared { match store.get_tip_header() { Some(h) => h, None => { - store.init(&genesis); + store + .init(&genesis) + .expect("init genesis block should be ok"); genesis.header().clone() } } diff --git a/shared/src/store.rs b/shared/src/store.rs index f5dd438024..1ad3238d3a 100644 --- a/shared/src/store.rs +++ b/shared/src/store.rs @@ -10,8 +10,7 @@ use ckb_core::extras::{BlockExt, TransactionAddress}; use ckb_core::header::{Header, HeaderBuilder}; use ckb_core::transaction::{ProposalShortId, Transaction, TransactionBuilder}; use ckb_core::uncle::UncleBlock; -use ckb_db::Col; -use ckb_db::{DbBatch, KeyValueDB}; +use ckb_db::{Col, DbBatch, Error, KeyValueDB}; use numext_fixed_hash::H256; use serde::Serialize; use std::ops::Range; @@ -40,7 +39,7 @@ impl ChainKVStore { pub trait ChainStore: Sync + Send { type Batch: StoreBatch; - fn new_batch(&self) -> Self::Batch; + fn new_batch(&self) -> Result; fn get_block(&self, block_hash: &H256) -> Option; fn get_header(&self, block_hash: &H256) -> Option
; @@ -51,23 +50,23 @@ pub trait ChainStore: Sync + Send { } pub trait StoreBatch { - fn insert_block(&mut self, block: &Block); - fn insert_block_ext(&mut self, block_hash: &H256, ext: &BlockExt); - fn insert_tip_header(&mut self, header: &Header); + fn insert_block(&mut self, block: &Block) -> Result<(), Error>; + fn insert_block_ext(&mut self, block_hash: &H256, ext: &BlockExt) -> Result<(), Error>; + fn insert_tip_header(&mut self, header: &Header) -> Result<(), Error>; - fn attach_block(&mut self, block: &Block); - fn detach_block(&mut self, block: &Block); + fn attach_block(&mut self, block: &Block) -> Result<(), Error>; + fn detach_block(&mut self, block: &Block) -> Result<(), Error>; - fn commit(self); + fn commit(self) -> Result<(), Error>; } impl ChainStore for ChainKVStore { type Batch = DefaultStoreBatch; - fn new_batch(&self) -> Self::Batch { - DefaultStoreBatch { - inner: self.db.batch().expect("new db batch should be ok"), - } + fn new_batch(&self) -> Result { + Ok(DefaultStoreBatch { + inner: self.db.batch()?, + }) } fn get_block(&self, h: &H256) -> Option { @@ -98,18 +97,19 @@ impl ChainStore for ChainKVStore { fn get_block_uncles(&self, h: &H256) -> Option> { // TODO Q use builder self.get(COLUMN_BLOCK_UNCLE, h.as_bytes()) - .map(|raw| deserialize(&raw[..]).unwrap()) + .map(|raw| deserialize(&raw[..]).expect("deserialize uncle should be ok")) } fn get_block_proposal_txs_ids(&self, h: &H256) -> Option> { self.get(COLUMN_BLOCK_PROPOSAL_IDS, h.as_bytes()) - .map(|raw| deserialize(&raw[..]).unwrap()) + .map(|raw| deserialize(&raw[..]).expect("deserialize proposal txs id should be ok")) } fn get_block_body(&self, h: &H256) -> Option> { self.get(COLUMN_BLOCK_TRANSACTION_ADDRESSES, h.as_bytes()) .and_then(|serialized_addresses| { - let addresses: Vec
= deserialize(&serialized_addresses).unwrap(); + let addresses: Vec
= + deserialize(&serialized_addresses).expect("deserialize address should be ok"); self.get(COLUMN_BLOCK_BODY, h.as_bytes()) .map(|serialized_body| { let txs: Vec = addresses @@ -129,7 +129,7 @@ impl ChainStore for ChainKVStore { fn get_block_ext(&self, block_hash: &H256) -> Option { self.get(COLUMN_EXT, block_hash.as_bytes()) - .map(|raw| deserialize(&raw[..]).unwrap()) + .map(|raw| deserialize(&raw[..]).expect("deserialize block ext should be ok")) } } @@ -139,83 +139,84 @@ pub struct DefaultStoreBatch { /// helper methods impl DefaultStoreBatch { - fn insert_raw(&mut self, col: Col, key: &[u8], value: &[u8]) { - self.inner - .insert(col, key, value) - .expect("batch insert_raw should be ok") + fn insert_raw(&mut self, col: Col, key: &[u8], value: &[u8]) -> Result<(), Error> { + self.inner.insert(col, key, value) } - fn insert_serialize(&mut self, col: Col, key: &[u8], item: &T) { - self.inner - .insert( - col, - key, - &serialize(item).expect("serializing should be ok"), - ) - .expect("batch insert_serialize should be ok") + fn insert_serialize( + &mut self, + col: Col, + key: &[u8], + item: &T, + ) -> Result<(), Error> { + self.inner.insert( + col, + key, + &serialize(item).expect("serializing should be ok"), + ) } - fn delete(&mut self, col: Col, key: &[u8]) { - self.inner - .delete(col, key) - .expect("batch delete should be ok") + fn delete(&mut self, col: Col, key: &[u8]) -> Result<(), Error> { + self.inner.delete(col, key) } } impl StoreBatch for DefaultStoreBatch { - fn insert_block(&mut self, b: &Block) { + fn insert_block(&mut self, b: &Block) -> Result<(), Error> { let hash = b.header().hash(); - self.insert_serialize(COLUMN_BLOCK_HEADER, hash.as_bytes(), b.header()); - self.insert_serialize(COLUMN_BLOCK_UNCLE, hash.as_bytes(), b.uncles()); + self.insert_serialize(COLUMN_BLOCK_HEADER, hash.as_bytes(), b.header())?; + self.insert_serialize(COLUMN_BLOCK_UNCLE, hash.as_bytes(), b.uncles())?; self.insert_serialize( COLUMN_BLOCK_PROPOSAL_IDS, hash.as_bytes(), b.proposal_transactions(), - ); - let (block_data, block_addresses) = flat_serialize(b.commit_transactions().iter()).unwrap(); - self.insert_raw(COLUMN_BLOCK_BODY, hash.as_bytes(), &block_data); + )?; + let (block_data, block_addresses) = + flat_serialize(b.commit_transactions().iter()).expect("flat serialize should be ok"); + self.insert_raw(COLUMN_BLOCK_BODY, hash.as_bytes(), &block_data)?; self.insert_serialize( COLUMN_BLOCK_TRANSACTION_ADDRESSES, hash.as_bytes(), &block_addresses, - ); + ) } - fn insert_block_ext(&mut self, block_hash: &H256, ext: &BlockExt) { - self.insert_serialize(COLUMN_EXT, block_hash.as_bytes(), ext); + fn insert_block_ext(&mut self, block_hash: &H256, ext: &BlockExt) -> Result<(), Error> { + self.insert_serialize(COLUMN_EXT, block_hash.as_bytes(), ext) } - fn attach_block(&mut self, block: &Block) { + fn attach_block(&mut self, block: &Block) -> Result<(), Error> { let hash = block.header().hash(); - let number = block.header().number().to_le_bytes(); - self.insert_raw(COLUMN_INDEX, &number, hash.as_bytes()); - self.insert_raw(COLUMN_INDEX, hash.as_bytes(), &number); - - let addresses = serialized_addresses(block.commit_transactions().iter()).unwrap(); + let addresses = serialized_addresses(block.commit_transactions().iter()) + .expect("serialize addresses should be ok"); for (id, tx) in block.commit_transactions().iter().enumerate() { let address = TransactionAddress { block_hash: hash.clone(), offset: addresses[id].offset, length: addresses[id].length, }; - self.insert_serialize(COLUMN_TRANSACTION_ADDR, tx.hash().as_bytes(), &address); + self.insert_serialize(COLUMN_TRANSACTION_ADDR, tx.hash().as_bytes(), &address)?; } + + let number = block.header().number().to_le_bytes(); + self.insert_raw(COLUMN_INDEX, &number, hash.as_bytes())?; + self.insert_raw(COLUMN_INDEX, hash.as_bytes(), &number) } - fn detach_block(&mut self, block: &Block) { - self.delete(COLUMN_INDEX, &block.header().number().to_le_bytes()); - self.delete(COLUMN_INDEX, block.header().hash().as_bytes()); + fn detach_block(&mut self, block: &Block) -> Result<(), Error> { for tx in block.commit_transactions() { - self.delete(COLUMN_TRANSACTION_ADDR, tx.hash().as_bytes()); + self.delete(COLUMN_TRANSACTION_ADDR, tx.hash().as_bytes())?; } + self.delete(COLUMN_INDEX, &block.header().number().to_le_bytes())?; + self.delete(COLUMN_INDEX, block.header().hash().as_bytes()) } - fn insert_tip_header(&mut self, h: &Header) { - self.insert_raw(COLUMN_META, META_TIP_HEADER_KEY, h.hash().as_bytes()); + fn insert_tip_header(&mut self, h: &Header) -> Result<(), Error> { + self.insert_raw(COLUMN_META, META_TIP_HEADER_KEY, h.hash().as_bytes()) } - fn commit(self) { - self.inner.commit().expect("batch commit should be ok"); + fn commit(self) -> Result<(), Error> { + self.inner.commit() } } @@ -246,9 +247,9 @@ mod tests { let block = consensus.genesis_block(); let hash = block.header().hash(); - let mut batch = store.new_batch(); - batch.insert_block(&block); - batch.commit(); + let mut batch = store.new_batch().unwrap(); + batch.insert_block(&block).unwrap(); + batch.commit().unwrap(); assert_eq!(block, &store.get_block(&hash).unwrap()); } @@ -263,9 +264,9 @@ mod tests { .build(); let hash = block.header().hash(); - let mut batch = store.new_batch(); - batch.insert_block(&block); - batch.commit(); + let mut batch = store.new_batch().unwrap(); + batch.insert_block(&block).unwrap(); + batch.commit().unwrap(); assert_eq!(block, store.get_block(&hash).unwrap()); } @@ -284,9 +285,9 @@ mod tests { }; let hash = block.header().hash(); - let mut batch = store.new_batch(); - batch.insert_block_ext(&hash, &ext); - batch.commit(); + let mut batch = store.new_batch().unwrap(); + batch.insert_block_ext(&hash, &ext).unwrap(); + batch.commit().unwrap(); assert_eq!(ext, store.get_block_ext(&hash).unwrap()); } } diff --git a/shared/src/tests/shared.rs b/shared/src/tests/shared.rs index e5c5180fe9..39ef7d4e68 100644 --- a/shared/src/tests/shared.rs +++ b/shared/src/tests/shared.rs @@ -11,10 +11,10 @@ fn new_shared() -> Shared> { SharedBuilder::::new().build() } -fn insert_block_timestamps(store: &ChainKVStore, timestamps: &[u64]) -> Vec -where - T: KeyValueDB + 'static, -{ +fn insert_block_timestamps( + store: &ChainKVStore, + timestamps: &[u64], +) -> Vec { let mut blocks = Vec::with_capacity(timestamps.len()); let mut hashes = Vec::with_capacity(timestamps.len()); let mut parent_hash = H256::zero(); @@ -27,11 +27,11 @@ where hashes.push(parent_hash.clone()); blocks.push(BlockBuilder::default().header(header).build()); } - let mut batch = store.new_batch(); + let mut batch = store.new_batch().unwrap(); for b in blocks { - batch.insert_block(&b); + batch.insert_block(&b).unwrap(); } - batch.commit(); + batch.commit().unwrap(); hashes }