Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sync module #3

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rpools"
version = "0.2.4"
version = "0.3.0"
authors = ["jcbritobr <jcbritobr@gmail.com>"]
edition = "2018"
repository = "https://github.com/jgardona/rpools"
Expand Down
33 changes: 29 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@

A minimalist rust workerpool implementation that uses channels to synchronize the jobs. It can spawn a fixed number of worker threads, that waits for a job queue.

## Install

* Use
```
$ cargo add rpools
```

## Usage

* **A simple workerpool**
```rust
use rpools::pool::WorkerPool;
use std::sync::mpsc::channel;
Expand All @@ -47,8 +54,26 @@ A minimalist rust workerpool implementation that uses channels to synchronize th
assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
```

* Test
* **Use sync module to synchronize your pool**

```rust
let njobs = 20;
let nworkers = 3;
let pool = pool::WorkerPool::new(nworkers);
let atomic = Arc::new(AtomicUsize::new(0));
let wg = WaitGroup::default();

// send the jobs to the pool
for _ in 0..njobs {
let wg = wg.clone();
let atomic = atomic.clone();
pool.execute(move || {
atomic.fetch_add(1, Ordering::Relaxed);
drop(wg);
});
}

```shell
$ cargo test
// wait for the pool finnishes
wg.wait();
assert_eq!(njobs, atomic.load(Ordering::Relaxed));
```
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@

// Imports and makes pool public.
pub mod pool;
pub mod sync;
21 changes: 21 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,27 @@ type Handle = thread::JoinHandle<()>;
/// Implements a continuous pool of rust threads thats doesn't stops
/// unless it gets out of scope.
///
/// ### Examples
///
/// let njobs = 20;
/// let nworkers = 3;
/// let pool = pool::WorkerPool::new(nworkers);
/// let atomic = Arc::new(AtomicUsize::new(0));
/// let wg = WaitGroup::default();
///
/// // send the jobs to the pool
/// for _ in 0..njobs {
/// let wg = wg.clone();
/// let atomic = atomic.clone();
/// pool.execute(move || {
/// atomic.fetch_add(1, Ordering::Relaxed);
/// drop(wg);
/// });
/// }
///
/// // wait for the pool finnishes
/// wg.wait();
/// assert_eq!(njobs, atomic.load(Ordering::Relaxed));
pub struct WorkerPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
Expand Down
91 changes: 91 additions & 0 deletions src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! ## Sync
//!
//! This module has data structures used to synchronize
//! threads. WaitGroup is used to make a thread to wait
//! others.
//!
//! ### Examples
//! ```
//! use rpools::pool::WorkerPool;
//! use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc, Mutex};
//! use rpools::sync::WaitGroup;
//!
//! let njobs = 20;
//! let nworkers = 3;
//! let pool = WorkerPool::new(nworkers);
//! let atomic = Arc::new(AtomicUsize::new(0));
//! let wg = WaitGroup::default();
//!
//! // send the jobs to the pool
//! for _ in 0..njobs {
//! let wg = wg.clone();
//! let atomic = atomic.clone();
//! pool.execute(move || {
//! atomic.fetch_add(1, Ordering::Relaxed);
//! drop(wg);
//! });
//! }
//!
//! // wait for the pool finnishes
//! wg.wait();
//! assert_eq!(njobs, atomic.load(Ordering::Relaxed));
//! ```

use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Condvar, Mutex,
};


/// A data struct to store a counter, a mutex and a condvar.
/// It is responsible and serves as semaphore to synchronize threads.
#[derive(Default)]
struct Wg {
counter: AtomicUsize,
mu: Mutex<bool>,
condvar: Condvar,
}

/// A public wrapper above Wg. This data structure is responsible
/// to do the logics of the semaphore, block the target thread and
/// wait for signals to continue processing.
#[derive(Default)]
pub struct WaitGroup(Arc<Wg>);

impl WaitGroup {
/// Blocks the current thread and waits until counter becomes 0. If
/// counter is 0, start processing again.
pub fn wait(&self) {
let mut started = self.0.mu.lock().expect("Cant get the lock");
while !*started {
started = self
.0
.condvar
.wait(started)
.expect("Cant block the current thread");
if self.0.counter.load(Ordering::Relaxed) == 0 {
*started = true;
}
}
}
}

/// Implements Clone for WaitGroup
impl Clone for WaitGroup {
/// For each clone of this struct, increments the
/// counter in one.
fn clone(&self) -> Self {
self.0.counter.fetch_add(1, Ordering::Relaxed);
Self(self.0.clone())
}
}

/// Implements Drop for WaitGroup
impl Drop for WaitGroup {
/// When a shared reference goes out of scope,
/// decrement the counter in one.
fn drop(&mut self) {
self.0.counter.fetch_sub(1, Ordering::Relaxed);
self.0.condvar.notify_one();
}
}
31 changes: 29 additions & 2 deletions tests/integration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
use std::{sync::mpsc, sync::Arc, sync::Mutex};
use std::{
sync::mpsc,
sync::{atomic::AtomicUsize, Mutex},
sync::{atomic::Ordering, Arc},
};

use rpools::pool;
use rpools::{pool, sync::WaitGroup};

#[test]
fn test_waitgroup() {
let njobs = 20;
let nworkers = 3;
let pool = pool::WorkerPool::new(nworkers);
let atomic = Arc::new(AtomicUsize::new(0));
let wg = WaitGroup::default();

// send the jobs to the pool
for _ in 0..njobs {
let wg = wg.clone();
let atomic = atomic.clone();
pool.execute(move || {
atomic.fetch_add(1, Ordering::Relaxed);
drop(wg);
});
}

// wait for the pool finnishes
wg.wait();
assert_eq!(njobs, atomic.load(Ordering::Relaxed));
}

#[test]
fn pool_should_synchronize_sender_and_receiver_and_fold_results() {
Expand Down