Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Sync reorg up to history size #3874

Merged
merged 5 commits into from
Dec 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ impl BlockChainClient for Client {
}

fn import_block(&self, bytes: Bytes) -> Result<H256, BlockImportError> {
use verification::queue::kind::HasHash;
use verification::queue::kind::BlockLike;
use verification::queue::kind::blocks::Unverified;

// create unverified block here so the `sha3` calculation can be cached.
Expand Down Expand Up @@ -1245,7 +1245,9 @@ impl BlockChainClient for Client {
}

fn chain_info(&self) -> BlockChainInfo {
self.chain.read().chain_info()
let mut chain_info = self.chain.read().chain_info();
chain_info.pending_total_difficulty = chain_info.total_difficulty + self.block_queue.total_difficulty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we often have multiple competing blocks from different forks in the queue? (i.e. this value will often be higher than the actual total difficulty of the best queued block)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to make a decision whether to try and sync to a peer for a full sync. When the queue is full we don't care much if this is precise. When it is empty or nearly empty and we are far behind we don't care either. For the up-to date sync to unknown hashes this is not used because there's no way to get other peer's total difficulty. We treat it as unknown and try and download these new blocks anyway.

chain_info
}

fn additional_params(&self) -> BTreeMap<String, String> {
Expand Down Expand Up @@ -1369,6 +1371,7 @@ impl BlockChainClient for Client {
PruningInfo {
earliest_chain: self.chain.read().first_block_number().unwrap_or(1),
earliest_state: self.state_db.lock().journal_db().earliest_era().unwrap_or(0),
state_history_size: Some(self.history),
}
}

Expand Down
9 changes: 9 additions & 0 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub struct TestBlockChainClient {
pub first_block: RwLock<Option<(H256, u64)>>,
/// Traces to return
pub traces: RwLock<Option<Vec<LocalizedTrace>>>,
/// Pruning history size to report.
pub history: RwLock<Option<u64>>,
}

/// Used for generating test client blocks.
Expand Down Expand Up @@ -154,6 +156,7 @@ impl TestBlockChainClient {
ancient_block: RwLock::new(None),
first_block: RwLock::new(None),
traces: RwLock::new(None),
history: RwLock::new(None),
};
client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
client.genesis_hash = client.last_hash.read().clone();
Expand Down Expand Up @@ -314,6 +317,11 @@ impl TestBlockChainClient {
let res = res.into_iter().next().unwrap().expect("Successful import");
assert_eq!(res, TransactionImportResult::Current);
}

/// Set reported history size.
pub fn set_history(&self, h: Option<u64>) {
*self.history.write() = h;
}
}

pub fn get_temp_state_db() -> GuardedTempResult<StateDB> {
Expand Down Expand Up @@ -704,6 +712,7 @@ impl BlockChainClient for TestBlockChainClient {
PruningInfo {
earliest_chain: 1,
earliest_state: 1,
state_history_size: *self.history.read(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions ethcore/src/types/pruning_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ pub struct PruningInfo {
pub earliest_chain: u64,
/// The first block where state requests may be served.
pub earliest_state: u64,
/// State pruning history size.
pub state_history_size: Option<u64>,
}
35 changes: 24 additions & 11 deletions ethcore/src/verification/queue/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@
use engines::Engine;
use error::Error;

use util::{HeapSizeOf, H256};
use util::{HeapSizeOf, H256, U256};

pub use self::blocks::Blocks;
pub use self::headers::Headers;

/// Something which can produce a hash and a parent hash.
pub trait HasHash {
pub trait BlockLike {
/// Get the hash of this item.
fn hash(&self) -> H256;

/// Get the hash of this item's parent.
fn parent_hash(&self) -> H256;

/// Get the difficulty of this item.
fn difficulty(&self) -> U256;
}

/// Defines transitions between stages of verification.
Expand All @@ -45,13 +48,13 @@ pub trait HasHash {
/// consistent.
pub trait Kind: 'static + Sized + Send + Sync {
/// The first stage: completely unverified.
type Input: Sized + Send + HasHash + HeapSizeOf;
type Input: Sized + Send + BlockLike + HeapSizeOf;

/// The second stage: partially verified.
type Unverified: Sized + Send + HasHash + HeapSizeOf;
type Unverified: Sized + Send + BlockLike + HeapSizeOf;

/// The third stage: completely verified.
type Verified: Sized + Send + HasHash + HeapSizeOf;
type Verified: Sized + Send + BlockLike + HeapSizeOf;

/// Attempt to create the `Unverified` item from the input.
fn create(input: Self::Input, engine: &Engine) -> Result<Self::Unverified, Error>;
Expand All @@ -62,14 +65,14 @@ pub trait Kind: 'static + Sized + Send + Sync {

/// The blocks verification module.
pub mod blocks {
use super::{Kind, HasHash};
use super::{Kind, BlockLike};

use engines::Engine;
use error::Error;
use header::Header;
use verification::{PreverifiedBlock, verify_block_basic, verify_block_unordered};

use util::{Bytes, HeapSizeOf, H256};
use util::{Bytes, HeapSizeOf, H256, U256};

/// A mode for verifying blocks.
pub struct Blocks;
Expand Down Expand Up @@ -126,41 +129,51 @@ pub mod blocks {
}
}

impl HasHash for Unverified {
impl BlockLike for Unverified {
fn hash(&self) -> H256 {
self.header.hash()
}

fn parent_hash(&self) -> H256 {
self.header.parent_hash().clone()
}

fn difficulty(&self) -> U256 {
self.header.difficulty().clone()
}
}

impl HasHash for PreverifiedBlock {
impl BlockLike for PreverifiedBlock {
fn hash(&self) -> H256 {
self.header.hash()
}

fn parent_hash(&self) -> H256 {
self.header.parent_hash().clone()
}

fn difficulty(&self) -> U256 {
self.header.difficulty().clone()
}
}
}

/// Verification for headers.
pub mod headers {
use super::{Kind, HasHash};
use super::{Kind, BlockLike};

use engines::Engine;
use error::Error;
use header::Header;
use verification::verify_header_params;

use util::hash::H256;
use util::U256;

impl HasHash for Header {
impl BlockLike for Header {
fn hash(&self) -> H256 { self.hash() }
fn parent_hash(&self) -> H256 { self.parent_hash().clone() }
fn difficulty(&self) -> U256 { self.difficulty().clone() }
}

/// A mode for verifying headers.
Expand Down
55 changes: 46 additions & 9 deletions ethcore/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use error::*;
use engines::Engine;
use service::*;

use self::kind::{HasHash, Kind};
use self::kind::{BlockLike, Kind};

pub use types::verification_queue_info::VerificationQueueInfo as QueueInfo;

Expand Down Expand Up @@ -132,13 +132,14 @@ pub struct VerificationQueue<K: Kind> {
deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>,
processing: RwLock<HashSet<H256>>,
processing: RwLock<HashMap<H256, U256>>, // hash to difficulty
ticks_since_adjustment: AtomicUsize,
max_queue_size: usize,
max_mem_use: usize,
scale_verifiers: bool,
verifier_handles: Vec<JoinHandle<()>>,
state: Arc<(Mutex<State>, Condvar)>,
total_difficulty: RwLock<U256>,
}

struct QueueSignal {
Expand Down Expand Up @@ -269,14 +270,15 @@ impl<K: Kind> VerificationQueue<K> {
more_to_verify: more_to_verify,
verification: verification,
deleting: deleting,
processing: RwLock::new(HashSet::new()),
processing: RwLock::new(HashMap::new()),
empty: empty,
ticks_since_adjustment: AtomicUsize::new(0),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
scale_verifiers: scale_verifiers,
verifier_handles: verifier_handles,
state: state,
total_difficulty: RwLock::new(0.into()),
}
}

Expand Down Expand Up @@ -434,6 +436,7 @@ impl<K: Kind> VerificationQueue<K> {
sizes.unverified.store(0, AtomicOrdering::Release);
sizes.verifying.store(0, AtomicOrdering::Release);
sizes.verified.store(0, AtomicOrdering::Release);
*self.total_difficulty.write() = 0.into();

self.processing.write().clear();
}
Expand All @@ -448,7 +451,7 @@ impl<K: Kind> VerificationQueue<K> {

/// Check if the item is currently in the queue
pub fn status(&self, hash: &H256) -> Status {
if self.processing.read().contains(hash) {
if self.processing.read().contains_key(hash) {
return Status::Queued;
}
if self.verification.bad.lock().contains(hash) {
Expand All @@ -461,7 +464,7 @@ impl<K: Kind> VerificationQueue<K> {
pub fn import(&self, input: K::Input) -> ImportResult {
let h = input.hash();
{
if self.processing.read().contains(&h) {
if self.processing.read().contains_key(&h) {
return Err(ImportError::AlreadyQueued.into());
}

Expand All @@ -480,7 +483,11 @@ impl<K: Kind> VerificationQueue<K> {
Ok(item) => {
self.verification.sizes.unverified.fetch_add(item.heap_size_of_children(), AtomicOrdering::SeqCst);

self.processing.write().insert(h.clone());
self.processing.write().insert(h.clone(), item.difficulty());
{
let mut td = self.total_difficulty.write();
*td = *td + item.difficulty();
}
self.verification.unverified.lock().push_back(item);
self.more_to_verify.notify_all();
Ok(h)
Expand Down Expand Up @@ -511,7 +518,10 @@ impl<K: Kind> VerificationQueue<K> {
bad.reserve(hashes.len());
for hash in hashes {
bad.insert(hash.clone());
processing.remove(hash);
if let Some(difficulty) = processing.remove(hash) {
let mut td = self.total_difficulty.write();
*td = *td - difficulty;
}
}

let mut new_verified = VecDeque::new();
Expand All @@ -520,7 +530,10 @@ impl<K: Kind> VerificationQueue<K> {
if bad.contains(&output.parent_hash()) {
removed_size += output.heap_size_of_children();
bad.insert(output.hash());
processing.remove(&output.hash());
if let Some(difficulty) = processing.remove(&output.hash()) {
let mut td = self.total_difficulty.write();
*td = *td - difficulty;
}
} else {
new_verified.push_back(output);
}
Expand All @@ -538,7 +551,10 @@ impl<K: Kind> VerificationQueue<K> {
}
let mut processing = self.processing.write();
for hash in hashes {
processing.remove(hash);
if let Some(difficulty) = processing.remove(hash) {
let mut td = self.total_difficulty.write();
*td = *td - difficulty;
}
}
processing.is_empty()
}
Expand Down Expand Up @@ -592,6 +608,11 @@ impl<K: Kind> VerificationQueue<K> {
}
}

/// Get the total difficulty of all the blocks in the queue.
pub fn total_difficulty(&self) -> U256 {
self.total_difficulty.read().clone()
}

/// Get the current number of working verifiers.
pub fn num_verifiers(&self) -> usize {
match *self.state.0.lock() {
Expand Down Expand Up @@ -760,6 +781,22 @@ mod tests {
}
}

#[test]
fn returns_total_difficulty() {
let queue = get_test_queue(false);
let block = get_good_dummy_block();
let hash = BlockView::new(&block).header().hash().clone();
if let Err(e) = queue.import(Unverified::new(block)) {
panic!("error importing block that is valid by definition({:?})", e);
}
queue.flush();
assert_eq!(queue.total_difficulty(), 131072.into());
queue.drain(10);
assert_eq!(queue.total_difficulty(), 131072.into());
queue.mark_as_good(&[ hash ]);
assert_eq!(queue.total_difficulty(), 0.into());
}

#[test]
fn returns_ok_for_drained_duplicates() {
let queue = get_test_queue(false);
Expand Down
2 changes: 1 addition & 1 deletion parity/cli/config.full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ notify_work = ["http://localhost:3001"]
[footprint]
tracing = "auto"
pruning = "auto"
pruning_history = 64
pruning_history = 1200
cache_size_db = 64
cache_size_blocks = 8
cache_size_queue = 50
Expand Down
4 changes: 2 additions & 2 deletions parity/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ usage! {
or |c: &Config| otry!(c.footprint).tracing.clone(),
flag_pruning: String = "auto",
or |c: &Config| otry!(c.footprint).pruning.clone(),
flag_pruning_history: u64 = 64u64,
flag_pruning_history: u64 = 1200u64,
or |c: &Config| otry!(c.footprint).pruning_history.clone(),
flag_cache_size_db: u32 = 64u32,
or |c: &Config| otry!(c.footprint).cache_size_db.clone(),
Expand Down Expand Up @@ -629,7 +629,7 @@ mod tests {
// -- Footprint Options
flag_tracing: "auto".into(),
flag_pruning: "auto".into(),
flag_pruning_history: 64u64,
flag_pruning_history: 1200u64,
flag_cache_size_db: 64u32,
flag_cache_size_blocks: 8u32,
flag_cache_size_queue: 50u32,
Expand Down
Loading