Skip to content

Commit

Permalink
Refactor JobMarket and handle checker threads panicking (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffa5 committed Aug 27, 2023
1 parent 91f98ef commit a24534a
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 238 deletions.
98 changes: 20 additions & 78 deletions src/checker/bfs.rs
@@ -1,11 +1,11 @@
//! Private module for selective re-export.

use crate::checker::{Checker, EventuallyBits, Expectation, Path};
use crate::job_market::JobMarket;
use crate::{fingerprint, CheckerBuilder, CheckerVisitor, Fingerprint, Model, Property};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use nohash_hasher::NoHashHasher;
use parking_lot::{Condvar, Mutex};
use std::collections::{HashMap, VecDeque};
use std::hash::{BuildHasherDefault, Hash};
use std::num::NonZeroUsize;
Expand All @@ -19,22 +19,17 @@ use std::thread::JoinHandle;
pub(crate) struct BfsChecker<M: Model> {
// Immutable state.
model: Arc<M>,
thread_count: usize,
handles: Vec<std::thread::JoinHandle<()>>,

// Mutable state.
job_market: Arc<Mutex<JobMarket<M::State>>>,
job_market: JobMarket<Job<M::State>>,
state_count: Arc<AtomicUsize>,
max_depth: Arc<AtomicUsize>,
generated:
Arc<DashMap<Fingerprint, Option<Fingerprint>, BuildHasherDefault<NoHashHasher<u64>>>>,
discoveries: Arc<DashMap<&'static str, Fingerprint>>,
}
struct JobMarket<State> {
wait_count: usize,
jobs: Vec<Job<State>>,
}
type Job<State> = VecDeque<(State, Fingerprint, EventuallyBits, NonZeroUsize)>;
type Job<State> = (State, Fingerprint, EventuallyBits, NonZeroUsize);

impl<M> BfsChecker<M>
where
Expand Down Expand Up @@ -86,16 +81,12 @@ where
let discoveries = Arc::new(DashMap::default());
let mut handles = Vec::new();

let has_new_job = Arc::new(Condvar::new());
let job_market = Arc::new(Mutex::new(JobMarket {
wait_count: thread_count,
jobs: vec![pending],
}));
let mut job_market = JobMarket::new(thread_count);
job_market.push(pending);
for t in 0..thread_count {
let model = Arc::clone(&model);
let visitor = Arc::clone(&visitor);
let has_new_job = Arc::clone(&has_new_job);
let job_market = Arc::clone(&job_market);
let mut job_market = job_market.clone();
let state_count = Arc::clone(&state_count);
let max_depth = Arc::clone(&max_depth);
let generated = Arc::clone(&generated);
Expand All @@ -110,39 +101,17 @@ where
// Step 1: Do work.
if pending.is_empty() {
pending = {
let mut job_market = job_market.lock();
match job_market.jobs.pop() {
None => {
// Done if all are waiting.
if job_market.wait_count == thread_count {
log::debug!(
"{}: No more work. Shutting down... gen={}",
t,
generated.len()
);
has_new_job.notify_all();
return;
}

// Otherwise more work may become available.
log::trace!(
"{}: No jobs. Awaiting. blocked={}",
t,
job_market.wait_count
);
has_new_job.wait(&mut job_market);
continue;
}
Some(job) => {
job_market.wait_count -= 1;
log::trace!(
"{}: Job found. size={}, blocked={}",
t,
job.len(),
job_market.wait_count
);
job
}
let jobs = job_market.pop();
if jobs.is_empty() {
log::debug!(
"{}: No more work. Shutting down... gen={}",
t,
generated.len()
);
return;
} else {
log::trace!("{}: Job found. size={}", t, jobs.len());
jobs
}
};
}
Expand All @@ -163,10 +132,6 @@ where
t,
generated.len()
);
let mut job_market = job_market.lock();
job_market.wait_count += 1;
drop(job_market);
has_new_job.notify_all();
return;
}
if let Some(target_state_count) = target_state_count {
Expand All @@ -182,27 +147,7 @@ where

// Step 2: Share work.
if pending.len() > 1 && thread_count > 1 {
let mut job_market = job_market.lock();
let pieces = 1 + std::cmp::min(
job_market.wait_count,
pending.len(),
);
let size = pending.len() / pieces;
for _ in 1..pieces {
log::trace!(
"{}: Sharing work. blocked={}, size={}",
t,
job_market.wait_count,
size
);
job_market
.jobs
.push(pending.split_off(pending.len() - size));
has_new_job.notify_one();
}
} else if pending.is_empty() {
let mut job_market = job_market.lock();
job_market.wait_count += 1;
job_market.split_and_push(&mut pending);
}
}
})
Expand All @@ -211,7 +156,6 @@ where
}
BfsChecker {
model,
thread_count,
handles,
job_market,
state_count,
Expand All @@ -230,7 +174,7 @@ where
Option<Fingerprint>,
BuildHasherDefault<NoHashHasher<u64>>,
>,
pending: &mut Job<M::State>,
pending: &mut VecDeque<Job<M::State>>,
discoveries: &DashMap<&'static str, Fingerprint>,
visitor: &Option<Box<dyn CheckerVisitor<M> + Send + Sync>>,
mut max_count: usize,
Expand Down Expand Up @@ -421,9 +365,7 @@ where
}

fn is_done(&self) -> bool {
let job_market = self.job_market.lock();
job_market.jobs.is_empty() && job_market.wait_count == self.thread_count
|| self.discoveries.len() == self.model.properties().len()
self.job_market.is_closed() || self.discoveries.len() == self.model.properties().len()
}
}

Expand Down
106 changes: 24 additions & 82 deletions src/checker/dfs.rs
@@ -1,10 +1,10 @@
//! Private module for selective re-export.

use crate::checker::{Checker, EventuallyBits, Expectation, Path};
use crate::job_market::JobMarket;
use crate::{fingerprint, CheckerBuilder, CheckerVisitor, Fingerprint, Model, Property};
use dashmap::{DashMap, DashSet};
use nohash_hasher::NoHashHasher;
use parking_lot::{Condvar, Mutex};
use std::collections::{HashMap, VecDeque};
use std::hash::{BuildHasherDefault, Hash};
use std::num::NonZeroUsize;
Expand All @@ -18,21 +18,16 @@ use std::thread::JoinHandle;
pub(crate) struct DfsChecker<M: Model> {
// Immutable state.
model: Arc<M>,
thread_count: usize,
handles: Vec<std::thread::JoinHandle<()>>,

// Mutable state.
job_market: Arc<Mutex<JobMarket<M::State>>>,
job_market: JobMarket<Job<M::State>>,
state_count: Arc<AtomicUsize>,
max_depth: Arc<AtomicUsize>,
generated: Arc<DashSet<Fingerprint, BuildHasherDefault<NoHashHasher<u64>>>>,
discoveries: Arc<DashMap<&'static str, Vec<Fingerprint>>>,
}
struct JobMarket<State> {
wait_count: usize,
jobs: Vec<Job<State>>,
}
type Job<State> = Vec<(State, Vec<Fingerprint>, EventuallyBits, NonZeroUsize)>;
type Job<State> = (State, Vec<Fingerprint>, EventuallyBits, NonZeroUsize);

impl<M> DfsChecker<M>
where
Expand Down Expand Up @@ -79,7 +74,7 @@ where
}
ebits
};
let pending: Vec<_> = init_states
let pending: VecDeque<_> = init_states
.into_iter()
.map(|s| {
let fp = fingerprint(&s);
Expand All @@ -89,16 +84,12 @@ where
let discoveries = Arc::new(DashMap::default());
let mut handles = Vec::new();

let has_new_job = Arc::new(Condvar::new());
let job_market = Arc::new(Mutex::new(JobMarket {
wait_count: thread_count,
jobs: vec![pending],
}));
let mut job_market = JobMarket::new(thread_count);
job_market.push(pending);
for t in 0..thread_count {
let model = Arc::clone(&model);
let visitor = Arc::clone(&visitor);
let has_new_job = Arc::clone(&has_new_job);
let job_market = Arc::clone(&job_market);
let mut job_market = job_market.clone();
let state_count = Arc::clone(&state_count);
let max_depth = Arc::clone(&max_depth);
let generated = Arc::clone(&generated);
Expand All @@ -108,44 +99,22 @@ where
.name(format!("checker-{}", t))
.spawn(move || {
log::debug!("{}: Thread started.", t);
let mut pending = Vec::new();
let mut pending = VecDeque::new();
loop {
// Step 1: Do work.
if pending.is_empty() {
pending = {
let mut job_market = job_market.lock();
match job_market.jobs.pop() {
None => {
// Done if all are waiting.
if job_market.wait_count == thread_count {
log::debug!(
"{}: No more work. Shutting down... gen={}",
t,
generated.len()
);
has_new_job.notify_all();
return;
}

// Otherwise more work may become available.
log::trace!(
"{}: No jobs. Awaiting. blocked={}",
t,
job_market.wait_count
);
has_new_job.wait(&mut job_market);
continue;
}
Some(job) => {
job_market.wait_count -= 1;
log::trace!(
"{}: Job found. size={}, blocked={}",
t,
job.len(),
job_market.wait_count
);
job
}
let jobs = job_market.pop();
if jobs.is_empty() {
log::debug!(
"{}: No more work. Shutting down... gen={}",
t,
generated.len()
);
return;
} else {
log::trace!("{}: Job found. size={}", t, jobs.len());
jobs
}
};
}
Expand All @@ -167,10 +136,6 @@ where
t,
generated.len()
);
let mut job_market = job_market.lock();
job_market.wait_count += 1;
drop(job_market);
has_new_job.notify_all();
return;
}
if let Some(target_state_count) = target_state_count {
Expand All @@ -186,27 +151,7 @@ where

// Step 2: Share work.
if pending.len() > 1 && thread_count > 1 {
let mut job_market = job_market.lock();
let pieces = 1 + std::cmp::min(
job_market.wait_count,
pending.len(),
);
let size = pending.len() / pieces;
for _ in 1..pieces {
log::trace!(
"{}: Sharing work. blocked={}, size={}",
t,
job_market.wait_count,
size
);
job_market
.jobs
.push(pending.split_off(pending.len() - size));
has_new_job.notify_one();
}
} else if pending.is_empty() {
let mut job_market = job_market.lock();
job_market.wait_count += 1;
job_market.split_and_push(&mut pending);
}
}
})
Expand All @@ -215,7 +160,6 @@ where
}
DfsChecker {
model,
thread_count,
handles,
job_market,
state_count,
Expand All @@ -231,7 +175,7 @@ where
model: &M,
state_count: &AtomicUsize,
generated: &DashSet<Fingerprint, BuildHasherDefault<NoHashHasher<u64>>>,
pending: &mut Job<M::State>,
pending: &mut VecDeque<Job<M::State>>,
discoveries: &DashMap<&'static str, Vec<Fingerprint>>,
visitor: &Option<Box<dyn CheckerVisitor<M> + Send + Sync>>,
mut max_count: usize,
Expand All @@ -251,7 +195,7 @@ where
max_count -= 1;

// Done if none pending.
let (state, fingerprints, mut ebits, max_depth) = match pending.pop() {
let (state, fingerprints, mut ebits, max_depth) = match pending.pop_back() {
None => return,
Some(pair) => pair,
};
Expand Down Expand Up @@ -388,7 +332,7 @@ where
next_fingerprints.push(*f);
}
next_fingerprints.push(next_fingerprint);
pending.push((
pending.push_back((
next_state,
next_fingerprints,
ebits.clone(),
Expand Down Expand Up @@ -445,9 +389,7 @@ where
}

fn is_done(&self) -> bool {
let job_market = self.job_market.lock();
job_market.jobs.is_empty() && job_market.wait_count == self.thread_count
|| self.discoveries.len() == self.model.properties().len()
self.job_market.is_closed() || self.discoveries.len() == self.model.properties().len()
}
}

Expand Down

0 comments on commit a24534a

Please sign in to comment.