Skip to content

Commit

Permalink
Merge #2505 #2526
Browse files Browse the repository at this point in the history
2505: feat: provide `--overwrite-spec` to override the chain spec in storage r=quake,yangby-cryptape a=keroro520

Ckb stores chain spec into the database and compare the configured chain spec on starting.

A user may want to overwrite the stored chain spec when it changes the configured chain spec and makes sure the change is acceptable. Therefore this PR adds `--overwrite-spec` command option to overwrite the stored chain spec.

---

* If you want to just skip the chain spec checking without overriding the stored one:

  ```shell
  ckb run --skip-spec-check ...
  ```

* If you want to overwrite the stored chain spec with the configured one:

  ```shell
  ckb run --overwrite-spec ...
  ```

2526: feat: multi thread number_hash_mapping migration r=yangby-cryptape,quake a=zhangsoledad

multi-thread number_hash_mapping migration
extract multi-thread migration template:

https://github.com/nervosnetwork/ckb/blob/e841287b9484ae325acc64e4e74c5fb705a09bcd/shared/src/migrations/add_number_hash_mapping.rs#L20-L21

Co-authored-by: keroro <keroroxx520@gmail.com>
Co-authored-by: zhangsoledad <787953403@qq.com>
  • Loading branch information
3 people committed Jan 27, 2021
3 parents cbcd97b + 7135a0a + e841287 commit b031268
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 121 deletions.
10 changes: 9 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ members = [
"verification",
"tx-pool",
"shared",
"shared/migration-template",
"chain",
"sync",
"util/instrument",
Expand Down
60 changes: 41 additions & 19 deletions ckb-bin/src/subcommand/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use ckb_network_alert::alert_relayer::AlertRelayer;
use ckb_resource::Resource;
use ckb_rpc::{RpcServer, ServiceBuilder};
use ckb_shared::shared::{Shared, SharedBuilder};
use ckb_store::ChainStore;
use ckb_store::{ChainDB, ChainStore};
use ckb_sync::{NetTimeProtocol, Relayer, SyncShared, Synchronizer};
use ckb_types::packed::Byte32;
use ckb_types::{core::cell::setup_system_cell_cache, prelude::*};
use ckb_verification::{GenesisVerifier, Verifier};
use std::sync::Arc;
Expand Down Expand Up @@ -193,33 +194,54 @@ fn verify_genesis(shared: &Shared) -> Result<(), ExitCode> {

fn check_spec(shared: &Shared, args: &RunArgs) -> Result<(), ExitCode> {
let store = shared.store();
if let Some(spec_hash) = store.get_chain_spec_hash() {
if args.chain_spec_hash != spec_hash && !args.skip_chain_spec_check {
eprintln!(
"chain_spec_hash mismatch Config({}) storage({}), pass command line argument --skip-spec-check if you are sure that the two different chains are compatible.",
args.chain_spec_hash, spec_hash
);
return Err(ExitCode::Config);
}
} else {
store
.put_chain_spec_hash(&args.chain_spec_hash)
.map_err(|err| {
eprintln!(
"Touch chain_spec_hash {} error: {}",
args.chain_spec_hash, err
);
ExitCode::IO
})?;
let stored_spec_hash = store.get_chain_spec_hash();

if stored_spec_hash.is_none() {
// fresh yet
write_chain_spec_hash(store, &args.chain_spec_hash)?;
info_target!(
crate::LOG_TARGET_MAIN,
"Touch chain spec hash: {}",
args.chain_spec_hash
);
} else if stored_spec_hash.as_ref() == Some(&args.chain_spec_hash) {
// stored == configured
// do nothing
} else if args.overwrite_chain_spec {
// stored != configured with --overwrite-spec
write_chain_spec_hash(store, &args.chain_spec_hash)?;
info_target!(
crate::LOG_TARGET_MAIN,
"Overwrite chain spec hash from {} to {}",
stored_spec_hash.expect("checked"),
args.overwrite_chain_spec,
);
} else if args.skip_chain_spec_check {
// stored != configured with --skip-spec-check
// do nothing
} else {
// stored != configured
eprintln!(
"chain_spec_hash mismatch Config({}) storage({}), pass command line argument \
--skip-spec-check if you are sure that the two different chains are compatible; \
or pass --overwrite-spec to force overriding stored chain spec with configured chain spec",
args.chain_spec_hash, stored_spec_hash.expect("checked")
);
return Err(ExitCode::Config);
}
Ok(())
}

fn write_chain_spec_hash(store: &ChainDB, chain_spec_hash: &Byte32) -> Result<(), ExitCode> {
store.put_chain_spec_hash(chain_spec_hash).map_err(|err| {
eprintln!(
"store.put_chain_spec_hash {} error: {}",
chain_spec_hash, err
);
ExitCode::IO
})
}

fn sanitize_block_assembler_config(
args: &RunArgs,
) -> Result<Option<BlockAssemblerConfig>, ExitCode> {
Expand Down
2 changes: 1 addition & 1 deletion shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ ckb-db-schema = { path = "../db-schema", version = "= 0.40.0-pre" }
ckb-async-runtime = { path = "../util/runtime", version = "= 0.40.0-pre" }
ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.40.0-pre" }
ckb-channel = { path = "../util/channel", version = "= 0.40.0-pre" }
ckb-chain-iter = { path = "../util/chain-iter", version = "= 0.40.0-pre" }
ckb-migration-template = { path = "migration-template", version = "= 0.40.0-pre" }
num_cpus = "1.10"
16 changes: 16 additions & 0 deletions shared/migration-template/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "ckb-migration-template"
version = "0.40.0-pre"
license = "MIT"
authors = ["Nervos <dev@nervos.org>"]
edition = "2018"
description = "Provide proc-macros to setup migration."
homepage = "https://github.com/nervosnetwork/ckb"
repository = "https://github.com/nervosnetwork/ckb"

[lib]
proc-macro = true

[dependencies]
quote = "1.0"
syn = "1.0"
75 changes: 75 additions & 0 deletions shared/migration-template/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//! Provide proc-macros to setup migration.

extern crate proc_macro;

use proc_macro::TokenStream;
use quote::quote;
use syn::parse_macro_input;

/// multi thread migration template
#[proc_macro]
pub fn multi_thread_migration(input: TokenStream) -> TokenStream {
let block_expr = parse_macro_input!(input as syn::ExprBlock);
let expanded = quote! {
const MAX_THREAD: u64 = 6;
const MIN_THREAD: u64 = 2;
const BATCH: usize = 1_000;

let chain_db = ChainDB::new(db, StoreConfig::default());
let tip = chain_db.get_tip_header().expect("db tip header index");
let tip_number = tip.number();

let tb_num = std::cmp::max(MIN_THREAD, num_cpus::get() as u64);
let tb_num = std::cmp::min(tb_num, MAX_THREAD);
let chunk_size = tip_number / tb_num;
let remainder = tip_number % tb_num;
let _barrier = ::std::sync::Arc::new(::std::sync::Barrier::new(tb_num as usize));

let handles: Vec<_> = (0..tb_num).map(|i| {
let chain_db = chain_db.clone();
let pb = ::std::sync::Arc::clone(&pb);
let barrier = Arc::clone(&_barrier);

let last = i == (tb_num - 1);
let size = if last {
chunk_size + remainder
} else {
chunk_size
};
let end = if last {
tip_number + 1
} else {
(i + 1) * chunk_size
};

let pbi = pb(size * 2);
pbi.set_style(
ProgressStyle::default_bar()
.template(
"{prefix:.bold.dim} {spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta}) {msg}",
)
.progress_chars("#>-"),
);
pbi.set_position(0);
pbi.enable_steady_tick(5000);
::std::thread::spawn(move || {
let mut wb = chain_db.new_write_batch();

#block_expr

if !wb.is_empty() {
chain_db.write(&wb).unwrap();
}
pbi.finish_with_message("done!");
})
}).collect();

// Wait for other threads to finish.
for handle in handles {
handle.join().unwrap();
}
Ok(chain_db.into_inner())
};

TokenStream::from(expanded)
}
5 changes: 5 additions & 0 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
//! TODO(doc): @quake

// num_cpus is used in proc_macro
// declare here for mute ./devtools/ci/check-cargotoml.sh error
extern crate num_cpus;

mod migrations;
pub mod shared;

Expand Down
76 changes: 37 additions & 39 deletions shared/src/migrations/add_number_hash_mapping.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use ckb_app_config::StoreConfig;
use ckb_chain_iter::ChainIterator;
use ckb_db::{Result, RocksDB};
use ckb_db::{Direction, IteratorMode, Result, RocksDB};
use ckb_db_migration::{Migration, ProgressBar, ProgressStyle};
use ckb_db_schema::COLUMN_NUMBER_HASH;
use ckb_store::ChainDB;
use ckb_types::{packed, prelude::*};
use ckb_db_schema::{COLUMN_BLOCK_BODY, COLUMN_INDEX, COLUMN_NUMBER_HASH};
use ckb_migration_template::multi_thread_migration;
use ckb_store::{ChainDB, ChainStore};
use ckb_types::{molecule::io::Write, packed, prelude::*};
use std::sync::Arc;

const BATCH: usize = 1_000;

pub struct AddNumberHashMapping;

const VERSION: &str = "20200710181855";
Expand All @@ -19,40 +17,40 @@ impl Migration for AddNumberHashMapping {
db: RocksDB,
pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
) -> Result<RocksDB> {
let chain_db = ChainDB::new(db, StoreConfig::default());
let iter = ChainIterator::new(&chain_db);
let pb = pb(iter.len());
pb.set_style(
ProgressStyle::default_bar()
.template(
"{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})",
)
.progress_chars("#>-"),
);
let mut batch = chain_db.new_write_batch();
for block in iter {
let txs_len: packed::Uint32 = (block.transactions().len() as u32).pack();
batch.put(
COLUMN_NUMBER_HASH,
packed::NumberHash::new_builder()
.number(block.number().pack())
.block_hash(block.header().hash())
.build()
.as_slice(),
txs_len.as_slice(),
)?;

if batch.len() > BATCH {
chain_db.write(&batch)?;
batch.clear()?;
multi_thread_migration! {
{
for number in i * chunk_size..end {
let block_number: packed::Uint64 = number.pack();
let raw_hash = chain_db.get(COLUMN_INDEX, block_number.as_slice()).expect("DB data integrity");
let txs_len = chain_db.get_iter(
COLUMN_BLOCK_BODY,
IteratorMode::From(&raw_hash, Direction::Forward),
)
.take_while(|(key, _)| key.starts_with(&raw_hash))
.count();

let raw_txs_len: packed::Uint32 = (txs_len as u32).pack();

let mut raw_key = Vec::with_capacity(40);
raw_key.write_all(block_number.as_slice()).expect("write_all block_number");
raw_key.write_all(&raw_hash).expect("write_all hash");
let key = packed::NumberHash::new_unchecked(raw_key.into());

wb.put(
COLUMN_NUMBER_HASH,
key.as_slice(),
raw_txs_len.as_slice(),
)
.expect("put number_hash");

if wb.len() > BATCH {
chain_db.write(&wb).expect("write db batch");
wb.clear().unwrap();
}
pbi.inc(1);
}
}
pb.inc(1);
}
if !batch.is_empty() {
chain_db.write(&batch)?;
}
pb.finish_with_message("finish");
Ok(chain_db.into_inner())
}

fn version(&self) -> &str {
Expand Down

0 comments on commit b031268

Please sign in to comment.