Skip to content
A simple thread pool for stateful parallel task execution
Rust
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
src
.gitignore
.travis.yml
Cargo.toml
LICENSE-APACHE
LICENSE-MIT
README.md

README.md

workerpool

Build Status crates.io docs.rs

A worker threadpool used to execute a number of jobs atop stateful workers in parallel. It spawns a specified number of worker threads and replenishes the pool if any worker threads panic.

A single Worker runs in its own thread, to be implemented according to the trait:

pub trait Worker : Default {
    type Input: Send;
    type Output: Send;

    fn execute(&mut self, Self::Input) -> Self::Output;
}

Usage

[dependencies]
workerpool = "1.2"

To use crossbeam's channels instead of std::sync::mpsc, enable the crossbeam feature:

[dependencies]
workerpool = { version = "1.2", features = ["crossbeam"] }

This crate provides Pool<W> where W: Worker. With a pool, there are four primary functions of interest:

  • Pool::<MyWorker>::new(n_threads) creates a new pool for a particular Worker.
  • pool.execute(inp) non-blocking executes the worker and ignores the return value.
  • pool.execute_to(tx, inp) non-blocking executes the worker and sends return value to the given Sender.
  • pool.join() blocking waits for all tasks (from execute and execute_to) to complete.

A worker is provided in workerpool::thunk, a stateless ThunkWorker<T>. It executes on inputs of Thunk<T>, effectively argumentless functions that are Sized + Send. These thunks are creates by wrapping functions which return T with Thunk::of.

extern crate workerpool;

use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use std::sync::mpsc::channel;

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<ThunkWorker<i32>>::new(n_workers);
    
    let (tx, rx) = channel();
    for i in 0..n_jobs {
        pool.execute_to(tx.clone(), Thunk::of(move || i * i));
    }
    
    assert_eq!(140, rx.iter().take(n_jobs as usize).sum());
}

For stateful workers, you have to implement Worker yourself.

Suppose there's a line-delimited process, such as cat or tr, which you'd like running on many threads for use in a pool-like manner. You may create and use a worker, with maintained state of the stdin/stdout for the process, as follows:

extern crate workerpool;

use workerpool::{Worker, Pool};
use std::process::{Command, ChildStdin, ChildStdout, Stdio};
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::sync::mpsc::channel;

struct LineDelimitedProcess {
    stdin: ChildStdin,
    stdout: BufReader<ChildStdout>,
}
impl Default for LineDelimitedProcess {
    fn default() -> Self {
        let child = Command::new("cat")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::inherit())
            .spawn()
            .unwrap();
        Self {
            stdin: child.stdin.unwrap(),
            stdout: BufReader::new(child.stdout.unwrap()),
        }
    }
}
impl Worker for LineDelimitedProcess {
    type Input = Box<[u8]>;
    type Output = io::Result<String>;

    fn execute(&mut self, inp: Self::Input) -> Self::Output {
        self.stdin.write_all(&*inp)?;
        self.stdin.write_all(b"\n")?;
        self.stdin.flush()?;
        let mut s = String::new();
        self.stdout.read_line(&mut s)?;
        s.pop(); // exclude newline
        Ok(s)
    }
}

fn main() {
    let n_workers = 4;
    let n_jobs = 8;
    let pool = Pool::<LineDelimitedProcess>::new(n_workers);
    
    let (tx, rx) = channel();
    for i in 0..n_jobs {
        let inp = Box::new([97 + i]);
        pool.execute_to(tx.clone(), inp);
    }
    
    // output is a permutation of "abcdefgh"
    let mut output = rx.iter()
        .take(n_jobs as usize)
        .fold(String::new(), |mut a, b| {
            a.push_str(&b.unwrap());
            a
        })
        .into_bytes();
    output.sort();
    assert_eq!(output, b"abcdefgh");
}

Similar libraries

License

This work is derivative of threadpool.

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

You can’t perform that action at this time.