Skip to content

Commit

Permalink
fix: only spawn threads into a pool, to avoid unlimited thread spawns
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Sep 29, 2020
1 parent 27b6281 commit dca4c5a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 58 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Expand Up @@ -18,7 +18,6 @@ futures = "0.1"
futures-cpupool = { version = "0.1", optional = true }
groupy = "0.3.1"
num_cpus = { version = "1", optional = true }
crossbeam = { version = "0.7", optional = true }
paired = { version = "0.20.0", optional = true }
rand_core = "0.5"
byteorder = "1"
Expand All @@ -44,7 +43,7 @@ criterion = "0.3.2"
default = ["groth16", "multicore"]
gpu = ["rust-gpu-tools", "ff-cl-gen", "fs2"]
groth16 = ["paired"]
multicore = ["futures-cpupool", "crossbeam", "num_cpus"]
multicore = ["futures-cpupool", "num_cpus"]

[[test]]
name = "mimc"
Expand Down
55 changes: 24 additions & 31 deletions src/gpu/multiexp.rs
Expand Up @@ -4,7 +4,6 @@ use super::sources;
use super::utils;
use crate::multicore::Worker;
use crate::multiexp::{multiexp as cpu_multiexp, FullDensity};
use crossbeam::thread;
use ff::{PrimeField, ScalarEngine};
use futures::Future;
use groupy::{CurveAffine, CurveProjective};
Expand Down Expand Up @@ -285,27 +284,29 @@ where

let chunk_size = ((n as f64) / (num_devices as f64)).ceil() as usize;

match thread::scope(|s| -> Result<<G as CurveAffine>::Projective, GPUError> {
crate::multicore::THREAD_POOL.install(|| {
use rayon::prelude::*;

let mut acc = <G as CurveAffine>::Projective::zero();
let mut threads = Vec::new();
if n > 0 {
for ((bases, exps), kern) in bases
.chunks(chunk_size)
.zip(exps.chunks(chunk_size))
.zip(self.kernels.iter_mut())
{
threads.push(s.spawn(
move |_| -> Result<<G as CurveAffine>::Projective, GPUError> {
let mut acc = <G as CurveAffine>::Projective::zero();
for (bases, exps) in bases.chunks(kern.n).zip(exps.chunks(kern.n)) {
let result = kern.multiexp(bases, exps, bases.len())?;
acc.add_assign(&result);
}
Ok(acc)
},
));
}
}

let results = if n > 0 {
bases
.par_chunks(chunk_size)
.zip(exps.par_chunks(chunk_size))
.zip(self.kernels.par_iter_mut())
.map(|((bases, exps), kern)| -> Result<<G as CurveAffine>::Projective, GPUError> {
let mut acc = <G as CurveAffine>::Projective::zero();
for (bases, exps) in bases.chunks(kern.n).zip(exps.chunks(kern.n)) {
let result = kern.multiexp(bases, exps, bases.len())?;
acc.add_assign(&result);
}

Ok(acc)
})
.collect::<Vec<_>>()
} else {
Vec::new()
};

let cpu_acc = cpu_multiexp(
&pool,
Expand All @@ -315,20 +316,12 @@ where
&mut None,
);

let mut results = vec![];
for t in threads {
results.push(t.join());
}
for r in results {
acc.add_assign(&r??);
acc.add_assign(&r?);
}

acc.add_assign(&cpu_acc.wait().unwrap());

Ok(acc)
}) {
Ok(res) => res,
Err(e) => Err(GPUError::from(e)),
}
})
}
}
36 changes: 11 additions & 25 deletions src/multicore.rs
@@ -1,13 +1,12 @@
//! An interface for dealing with the kinds of parallel computations involved in
//! `bellperson`. It's currently just a thin wrapper around [`CpuPool`] and
//! [`crossbeam`] but may be extended in the future to allow for various
//! [`rayon`] but may be extended in the future to allow for various
//! parallelism strategies.
//!
//! [`CpuPool`]: futures_cpupool::CpuPool

#[cfg(feature = "multicore")]
mod implementation {
use crossbeam::{self, thread::Scope};
use futures::{Future, IntoFuture, Poll};
use futures_cpupool::{CpuFuture, CpuPool};
use lazy_static::lazy_static;
Expand All @@ -28,31 +27,19 @@ mod implementation {
.num_threads(*NUM_CPUS)
.build()
.unwrap();
static ref CPU_POOL: CpuPool = CpuPool::new(*NUM_CPUS);
}

#[derive(Clone)]
pub struct Worker {
cpus: usize,
pool: CpuPool,
}
pub struct Worker {}

impl Worker {
// We don't expose this outside the library so that
// all `Worker` instances have the same number of
// CPUs configured.
pub(crate) fn new_with_cpus(cpus: usize) -> Worker {
Worker {
cpus,
pool: CpuPool::new(cpus),
}
}

pub fn new() -> Worker {
Self::new_with_cpus(*NUM_CPUS)
Worker {}
}

pub fn log_num_cpus(&self) -> u32 {
log2_floor(self.cpus)
log2_floor(*NUM_CPUS)
}

pub fn compute<F, R>(&self, f: F) -> WorkerFuture<R::Item, R::Error>
Expand All @@ -64,23 +51,22 @@ mod implementation {
R::Error: Send + 'static,
{
WorkerFuture {
future: self.pool.spawn_fn(f),
future: CPU_POOL.spawn_fn(f),
}
}

pub fn scope<'a, F, R>(&self, elements: usize, f: F) -> R
where
F: FnOnce(&Scope<'a>, usize) -> R,
F: FnOnce(&rayon::Scope<'a>, usize) -> R + Send,
R: Send,
{
let chunk_size = if elements < self.cpus {
let chunk_size = if elements < *NUM_CPUS {
1
} else {
elements / self.cpus
elements / *NUM_CPUS
};

// TODO: Handle case where threads fail
crossbeam::scope(|scope| f(scope, chunk_size))
.expect("Threads aren't allowed to fail yet")
THREAD_POOL.scope(|scope| f(scope, chunk_size))
}
}

Expand Down

0 comments on commit dca4c5a

Please sign in to comment.