Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

fix deadlock in queue drop #3903

Merged
merged 1 commit into from
Dec 19, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions ethcore/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub struct VerificationQueue<K: Kind> {
max_queue_size: usize,
max_mem_use: usize,
scale_verifiers: bool,
verifier_handles: Vec<JoinHandle<()>>,
verifier_handles: Vec<JoinHandle<()>>,
state: Arc<(Mutex<State>, Condvar)>,
}

Expand Down Expand Up @@ -225,8 +225,8 @@ impl<K: Kind> VerificationQueue<K> {

let num_cpus = ::num_cpus::get();
let max_verifiers = min(num_cpus, MAX_VERIFIERS);
let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers));
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers));
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let mut verifier_handles = Vec::with_capacity(max_verifiers);

debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
Expand All @@ -248,11 +248,11 @@ impl<K: Kind> VerificationQueue<K> {
.spawn(move || {
panic_handler.catch_panic(move || {
VerificationQueue::verify(
verification,
engine,
wait,
ready,
empty,
verification,
engine,
wait,
ready,
empty,
state,
i,
)
Expand Down Expand Up @@ -299,11 +299,11 @@ impl<K: Kind> VerificationQueue<K> {

debug!(target: "verification", "verifier {} sleeping", id);
state.1.wait(&mut cur_state);
debug!(target: "verification", "verifier {} waking up", id);
debug!(target: "verification", "verifier {} waking up", id);
}

if let State::Exit = *cur_state {
debug!(target: "verification", "verifier {} exiting", id);
if let State::Exit = *cur_state {
debug!(target: "verification", "verifier {} exiting", id);
break;
}
}
Expand All @@ -326,7 +326,7 @@ impl<K: Kind> VerificationQueue<K> {
}

if let State::Exit = *state.0.lock() {
debug!(target: "verification", "verifier {} exiting", id);
debug!(target: "verification", "verifier {} exiting", id);
return;
}
}
Expand Down Expand Up @@ -681,8 +681,12 @@ impl<K: Kind> Drop for VerificationQueue<K> {
*self.state.0.lock() = State::Exit;
self.state.1.notify_all();

// wake up all threads waiting for more work.
self.more_to_verify.notify_all();
// acquire this lock to force threads to reach the waiting point
// if they're in-between the exit check and the more_to_verify wait.
{
let _more = self.verification.more_to_verify.lock().unwrap();
self.more_to_verify.notify_all();
}

// wait for all verifier threads to join.
for thread in self.verifier_handles.drain(..) {
Expand Down Expand Up @@ -811,7 +815,7 @@ mod tests {
fn readjust_verifiers() {
let queue = get_test_queue(true);

// put all the verifiers to sleep to ensure
// put all the verifiers to sleep to ensure
// the test isn't timing sensitive.
*queue.state.0.lock() = State::Work(0);

Expand Down