Skip to content

Commit

Permalink
Merge pull request #3 from jgardona/feat/syncmodule
Browse files Browse the repository at this point in the history
Implement sync module
  • Loading branch information
jgardona committed Dec 5, 2023
2 parents 51910da + f449636 commit 3db7040
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "rpools"
version = "0.2.4"
version = "0.3.0"
authors = ["jcbritobr <jcbritobr@gmail.com>"]
edition = "2018"
repository = "https://github.com/jgardona/rpools"
Expand Down
33 changes: 29 additions & 4 deletions README.md
Expand Up @@ -19,8 +19,15 @@

A minimalist rust workerpool implementation that uses channels to synchronize the jobs. It can spawn a fixed number of worker threads, that waits for a job queue.

## Install

* Use
```
$ cargo add rpools
```

## Usage

* **A simple workerpool**
```rust
use rpools::pool::WorkerPool;
use std::sync::mpsc::channel;
Expand All @@ -47,8 +54,26 @@ A minimalist rust workerpool implementation that uses channels to synchronize th
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
```

* Test
* **Use sync module to synchronize your pool**

```rust
let njobs = 20;
let nworkers = 3;
let pool = pool::WorkerPool::new(nworkers);
let atomic = Arc::new(AtomicUsize::new(0));
let wg = WaitGroup::default();

// send the jobs to the pool
for _ in 0..njobs {
let wg = wg.clone();
let atomic = atomic.clone();
pool.execute(move || {
atomic.fetch_add(1, Ordering::Relaxed);
drop(wg);
});
}

```shell
$ cargo test
// wait for the pool finnishes
wg.wait();
assert_eq!(njobs, atomic.load(Ordering::Relaxed));
```
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -31,3 +31,4 @@

// Imports and makes pool public.
pub mod pool;
pub mod sync;
21 changes: 21 additions & 0 deletions src/pool.rs
Expand Up @@ -18,6 +18,27 @@ type Handle = thread::JoinHandle<()>;
/// Implements a continuous pool of rust threads thats doesn't stops
/// unless it gets out of scope.
///
/// ### Examples
///
/// let njobs = 20;
/// let nworkers = 3;
/// let pool = pool::WorkerPool::new(nworkers);
/// let atomic = Arc::new(AtomicUsize::new(0));
/// let wg = WaitGroup::default();
///
/// // send the jobs to the pool
/// for _ in 0..njobs {
/// let wg = wg.clone();
/// let atomic = atomic.clone();
/// pool.execute(move || {
/// atomic.fetch_add(1, Ordering::Relaxed);
/// drop(wg);
/// });
/// }
///
/// // wait for the pool finnishes
/// wg.wait();
/// assert_eq!(njobs, atomic.load(Ordering::Relaxed));
pub struct WorkerPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
Expand Down
91 changes: 91 additions & 0 deletions src/sync.rs
@@ -0,0 +1,91 @@
//! ## Sync
//!
//! This module has data structures used to synchronize
//! threads. WaitGroup is used to make a thread to wait
//! others.
//!
//! ### Examples
//! ```
//! use rpools::pool::WorkerPool;
//! use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc, Mutex};
//! use rpools::sync::WaitGroup;
//!
//! let njobs = 20;
//! let nworkers = 3;
//! let pool = WorkerPool::new(nworkers);
//! let atomic = Arc::new(AtomicUsize::new(0));
//! let wg = WaitGroup::default();
//!
//! // send the jobs to the pool
//! for _ in 0..njobs {
//! let wg = wg.clone();
//! let atomic = atomic.clone();
//! pool.execute(move || {
//! atomic.fetch_add(1, Ordering::Relaxed);
//! drop(wg);
//! });
//! }
//!
//! // wait for the pool finnishes
//! wg.wait();
//! assert_eq!(njobs, atomic.load(Ordering::Relaxed));
//! ```

use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Condvar, Mutex,
};


/// A data struct to store a counter, a mutex and a condvar.
/// It is responsible and serves as semaphore to synchronize threads.
#[derive(Default)]
struct Wg {
counter: AtomicUsize,
mu: Mutex<bool>,
condvar: Condvar,
}

/// A public wrapper above Wg. This data structure is responsible
/// to do the logics of the semaphore, block the target thread and
/// wait for signals to continue processing.
#[derive(Default)]
pub struct WaitGroup(Arc<Wg>);

impl WaitGroup {
/// Blocks the current thread and waits until counter becomes 0. If
/// counter is 0, start processing again.
pub fn wait(&self) {
let mut started = self.0.mu.lock().expect("Cant get the lock");
while !*started {
started = self
.0
.condvar
.wait(started)
.expect("Cant block the current thread");
if self.0.counter.load(Ordering::Relaxed) == 0 {
*started = true;
}
}
}
}

/// Implements Clone for WaitGroup
impl Clone for WaitGroup {
/// For each clone of this struct, increments the
/// counter in one.
fn clone(&self) -> Self {
self.0.counter.fetch_add(1, Ordering::Relaxed);
Self(self.0.clone())
}
}

/// Implements Drop for WaitGroup
impl Drop for WaitGroup {
/// When a shared reference goes out of scope,
/// decrement the counter in one.
fn drop(&mut self) {
self.0.counter.fetch_sub(1, Ordering::Relaxed);
self.0.condvar.notify_one();
}
}
31 changes: 29 additions & 2 deletions tests/integration.rs
@@ -1,6 +1,33 @@
use std::{sync::mpsc, sync::Arc, sync::Mutex};
use std::{
sync::mpsc,
sync::{atomic::AtomicUsize, Mutex},
sync::{atomic::Ordering, Arc},
};

use rpools::pool;
use rpools::{pool, sync::WaitGroup};

#[test]
fn test_waitgroup() {
let njobs = 20;
let nworkers = 3;
let pool = pool::WorkerPool::new(nworkers);
let atomic = Arc::new(AtomicUsize::new(0));
let wg = WaitGroup::default();

// send the jobs to the pool
for _ in 0..njobs {
let wg = wg.clone();
let atomic = atomic.clone();
pool.execute(move || {
atomic.fetch_add(1, Ordering::Relaxed);
drop(wg);
});
}

// wait for the pool finnishes
wg.wait();
assert_eq!(njobs, atomic.load(Ordering::Relaxed));
}

#[test]
fn pool_should_synchronize_sender_and_receiver_and_fold_results() {
Expand Down

0 comments on commit 3db7040

Please sign in to comment.