Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Use an adaptive number of threads in the verification queue #2445

Merged
merged 12 commits into from
Nov 22, 2016
287 changes: 252 additions & 35 deletions ethcore/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub mod kind;
const MIN_MEM_LIMIT: usize = 16384;
const MIN_QUEUE_LIMIT: usize = 512;

// maximum possible number of verification threads.
const MAX_VERIFIERS: usize = 8;

/// Type alias for block queue convenience.
pub type BlockQueue = VerificationQueue<self::kind::Blocks>;

Expand All @@ -61,6 +64,37 @@ impl Default for Config {
}
}

struct VerifierHandle {
deleting: Arc<AtomicBool>,
sleep: Arc<AtomicBool>,
thread: JoinHandle<()>,
}

impl VerifierHandle {
// signal to the verifier thread that it should sleep.
fn sleep(&self) {
self.sleep.store(true, AtomicOrdering::SeqCst);
}

// signal to the verifier thread that it should wake up.
fn wake_up(&self) {
self.sleep.store(false, AtomicOrdering::SeqCst);
self.thread.thread().unpark();
}

// signal to the verifier thread that it should conclude its
// operations.
fn conclude(&self) {
self.wake_up();
self.deleting.store(true, AtomicOrdering::Release);
}

// join the verifier thread.
fn join(self) {
self.thread.join().expect("Verifier thread panicked");
}
}

/// An item which is in the process of being verified.
pub struct Verifying<K: Kind> {
hash: H256,
Expand Down Expand Up @@ -97,11 +131,12 @@ pub struct VerificationQueue<K: Kind> {
engine: Arc<Engine>,
more_to_verify: Arc<SCondvar>,
verification: Arc<Verification<K>>,
verifiers: Vec<JoinHandle<()>>,
verifiers: Mutex<(Vec<VerifierHandle>, usize)>,
deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>,
processing: RwLock<HashSet<H256>>,
ticks_since_adjustment: AtomicUsize,
max_queue_size: usize,
max_mem_use: usize,
}
Expand Down Expand Up @@ -157,6 +192,7 @@ struct Verification<K: Kind> {
more_to_verify: SMutex<()>,
empty: SMutex<()>,
sizes: Sizes,
check_seal: bool,
}

impl<K: Kind> VerificationQueue<K> {
Expand All @@ -173,7 +209,8 @@ impl<K: Kind> VerificationQueue<K> {
unverified: AtomicUsize::new(0),
verifying: AtomicUsize::new(0),
verified: AtomicUsize::new(0),
}
},
check_seal: check_seal,
});
let more_to_verify = Arc::new(SCondvar::new());
let deleting = Arc::new(AtomicBool::new(false));
Expand All @@ -185,44 +222,82 @@ impl<K: Kind> VerificationQueue<K> {
let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc();

let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
let thread_count = max(::num_cpus::get(), 3) - 2;
for i in 0..thread_count {
let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS);
let default_amount = max(::num_cpus::get(), 3) - 2;
let mut verifiers = Vec::with_capacity(max_verifiers);

debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);

for i in 0..max_verifiers {
debug!(target: "verification", "Adding verification thread #{}", i);

let deleting = deleting.clone();
let panic_handler = panic_handler.clone();
let verification = verification.clone();
let engine = engine.clone();
let more_to_verify = more_to_verify.clone();
let ready_signal = ready_signal.clone();
let wait = more_to_verify.clone();
let ready = ready_signal.clone();
let empty = empty.clone();
let deleting = deleting.clone();
let panic_handler = panic_handler.clone();
verifiers.push(
thread::Builder::new()
.name(format!("Verifier #{}", i))
.spawn(move || {
panic_handler.catch_panic(move || {
VerificationQueue::verify(verification, engine, more_to_verify, ready_signal, deleting, empty, check_seal)
}).unwrap()
})
.expect("Error starting block verification thread")
);

// enable only the first few verifiers.
let sleep = if i < default_amount {
Arc::new(AtomicBool::new(false))
} else {
Arc::new(AtomicBool::new(true))
};

verifiers.push(VerifierHandle {
deleting: deleting.clone(),
sleep: sleep.clone(),
thread: thread::Builder::new()
.name(format!("Verifier #{}", i))
.spawn(move || {
panic_handler.catch_panic(move || {
VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep)
}).unwrap()
})
.expect("Failed to create verifier thread.")
});
}

VerificationQueue {
engine: engine,
panic_handler: panic_handler,
ready_signal: ready_signal.clone(),
more_to_verify: more_to_verify.clone(),
verification: verification.clone(),
verifiers: verifiers,
deleting: deleting.clone(),
ready_signal: ready_signal,
more_to_verify: more_to_verify,
verification: verification,
verifiers: Mutex::new((verifiers, default_amount)),
deleting: deleting,
processing: RwLock::new(HashSet::new()),
empty: empty.clone(),
empty: empty,
ticks_since_adjustment: AtomicUsize::new(0),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
}
}

fn verify(verification: Arc<Verification<K>>, engine: Arc<Engine>, wait: Arc<SCondvar>, ready: Arc<QueueSignal>, deleting: Arc<AtomicBool>, empty: Arc<SCondvar>, check_seal: bool) {
fn verify(
verification: Arc<Verification<K>>,
engine: Arc<Engine>,
wait: Arc<SCondvar>,
ready: Arc<QueueSignal>,
deleting: Arc<AtomicBool>,
empty: Arc<SCondvar>,
sleep: Arc<AtomicBool>,
) {
while !deleting.load(AtomicOrdering::Acquire) {
{
while sleep.load(AtomicOrdering::SeqCst) {
trace!(target: "verification", "Verifier sleeping");
::std::thread::park();
trace!(target: "verification", "Verifier waking up");

if deleting.load(AtomicOrdering::Acquire) {
return;
}
}
}

{
let mut more_to_verify = verification.more_to_verify.lock().unwrap();

Expand Down Expand Up @@ -255,7 +330,7 @@ impl<K: Kind> VerificationQueue<K> {
};

let hash = item.hash();
let is_ready = match K::verify(item, &*engine, check_seal) {
let is_ready = match K::verify(item, &*engine, verification.check_seal) {
Ok(verified) => {
let mut verifying = verification.verifying.lock();
let mut idx = None;
Expand Down Expand Up @@ -302,9 +377,15 @@ impl<K: Kind> VerificationQueue<K> {
}
}

fn drain_verifying(verifying: &mut VecDeque<Verifying<K>>, verified: &mut VecDeque<K::Verified>, bad: &mut HashSet<H256>, sizes: &Sizes) {
fn drain_verifying(
verifying: &mut VecDeque<Verifying<K>>,
verified: &mut VecDeque<K::Verified>,
bad: &mut HashSet<H256>,
sizes: &Sizes,
) {
let mut removed_size = 0;
let mut inserted_size = 0;

while let Some(output) = verifying.front_mut().and_then(|x| x.output.take()) {
assert!(verifying.pop_front().is_some());
let size = output.heap_size_of_children();
Expand Down Expand Up @@ -487,14 +568,85 @@ impl<K: Kind> VerificationQueue<K> {
}
}

/// Optimise memory footprint of the heap fields.
/// Optimise memory footprint of the heap fields, and adjust the number of threads
/// to better suit the workload.
pub fn collect_garbage(&self) {
{
self.verification.unverified.lock().shrink_to_fit();
// number of ticks to average queue stats over
// when deciding whether to change the number of verifiers.
#[cfg(not(test))]
const READJUSTMENT_PERIOD: usize = 12;

#[cfg(test)]
const READJUSTMENT_PERIOD: usize = 1;

let (u_len, v_len) = {
let u_len = {
let mut q = self.verification.unverified.lock();
q.shrink_to_fit();
q.len()
};
self.verification.verifying.lock().shrink_to_fit();
self.verification.verified.lock().shrink_to_fit();
}

let v_len = {
let mut q = self.verification.verified.lock();
q.shrink_to_fit();
q.len()
};

(u_len as isize, v_len as isize)
};

self.processing.write().shrink_to_fit();

if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD {
self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst);
} else {
return;
}

let current = self.verifiers.lock().1;

let diff = (v_len - u_len).abs();
let total = v_len + u_len;

self.scale_verifiers(
if u_len < 20 {
1
} else if diff <= total / 10 {
current
} else if v_len > u_len {
current - 1
} else {
current + 1
}
);
}

// wake up or sleep verifiers to get as close to the target as
// possible, never going over the amount of initially allocated threads
// or below 1.
fn scale_verifiers(&self, target: usize) {
let mut verifiers = self.verifiers.lock();
let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers;

let target = min(verifiers.len(), target);
let target = max(1, target);

debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target);

// scaling up
for i in *verifier_count..target {
debug!(target: "verification", "Waking up verifier {}", i);
verifiers[i].wake_up();
}

// scaling down.
for i in target..*verifier_count {
debug!(target: "verification", "Putting verifier {} to sleep", i);
verifiers[i].sleep();
}

*verifier_count = target;
}
}

Expand All @@ -509,10 +661,23 @@ impl<K: Kind> Drop for VerificationQueue<K> {
trace!(target: "shutdown", "[VerificationQueue] Closing...");
self.clear();
self.deleting.store(true, AtomicOrdering::Release);

let mut verifiers = self.verifiers.get_mut();
let mut verifiers = &mut verifiers.0;

// first pass to signal conclusion. must be done before
// notify or deadlock possible.
for handle in verifiers.iter() {
handle.conclude();
}

self.more_to_verify.notify_all();
for t in self.verifiers.drain(..) {
t.join().unwrap();

// second pass to join.
for handle in verifiers.drain(..) {
handle.join();
}

trace!(target: "shutdown", "[VerificationQueue] Closed.");
}
}
Expand Down Expand Up @@ -611,4 +776,56 @@ mod tests {
}
assert!(queue.queue_info().is_full());
}

#[test]
fn scaling_limits() {
use super::MAX_VERIFIERS;

let queue = get_test_queue();
queue.scale_verifiers(MAX_VERIFIERS + 1);

assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1);

queue.scale_verifiers(0);

assert!(queue.verifiers.lock().1 == 1);
}

#[test]
fn readjust_verifiers() {
let queue = get_test_queue();

// put all the verifiers to sleep to ensure
// the test isn't timing sensitive.
let num_verifiers = {
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].sleep();
}

verifiers.1
};

for block in get_good_dummy_block_seq(5000) {
queue.import(Unverified::new(block)).expect("Block good by definition; qed");
}

// almost all unverified == bump verifier count.
queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, num_verifiers + 1);

// wake them up again and verify everything.
{
let verifiers = queue.verifiers.lock();
for i in 0..verifiers.1 {
verifiers.0[i].wake_up();
}
}

queue.flush();

// nothing to verify == use minimum number of verifiers.
queue.collect_garbage();
assert_eq!(queue.verifiers.lock().1, 1);
}
}