Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 89 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* [Fallible Parallel Iterators](#fallible-parallel-iterators)
* [Using Mutable Variables](#using-mutable-variables)
* [Configurations](#configurations)
* [Underlying Approach and Parallel Runners](#underlying-approach-and-parallel-runners)
* [Runner: Pools and Executors](#runner-pools-and-executors)
* [Contributing](#contributing)

## Parallel Computation by Iterators
Expand Down Expand Up @@ -420,16 +420,97 @@ This is guaranteed by the fact that both consuming computation calls and configu
Additionally, maximum number of threads that can be used by parallel computations can be globally bounded by the environment variable `ORX_PARALLEL_MAX_NUM_THREADS`. Please see the corresponding [example](https://github.com/orxfun/orx-parallel/blob/main/examples/max_num_threads_config.rs) for details.


## Underlying Approach and Parallel Runners
## Runner: Pools and Executors

This crate defines parallel computation by combining two basic components.

* Pulling **inputs** in parallel is achieved through [`ConcurrentIter`](https://crates.io/crates/orx-concurrent-iter). Concurrent iterator implementations are lock-free, efficient and support pull-by-chunks optimization to reduce the parallelization overhead. A thread can pull any number of inputs from the concurrent iterator every time it becomes idle. This provides the means to dynamically decide on the chunk sizes.
* Writing **outputs** in parallel is handled using thread-safe containers such as [`ConcurrentBag`](https://crates.io/crates/orx-concurrent-bag) and [`ConcurrentOrderedBag`](https://crates.io/crates/orx-concurrent-ordered-bag). Similarly, these are lock-free collections that aim for high performance collection of results.
**Pulling inputs**
* Pulling inputs in parallel is achieved through [`ConcurrentIter`](https://crates.io/crates/orx-concurrent-iter). Concurrent iterator implementations are lock-free, efficient and support pull-by-chunks optimization to reduce the parallelization overhead. A thread can pull any number of inputs from the concurrent iterator every time it becomes idle. This provides the means to dynamically decide on the chunk sizes.
* Furthermore, this allows to reduce the overhead of defining creating tasks. To illustrate, provided that the computation will be handled by `n` threads, a closure holding a reference to the input concurrent iterator is defined to represent the computation. This same closure is passed to `n` threads; i.e., `n` spawn calls are made. Each of these threads keep pulling elements from the input until the computation is completed, without requiring to define another task.

Finally, [`ParallelRunner`](https://docs.rs/orx-parallel/latest/orx_parallel/runner/trait.ParallelRunner.html) trait manages parallelization of the given computation with desired configuration. The objective of the parallel runner is to optimize the chunk sizes to solve the tradeoff between impact of heterogeneity of individual computations and overhead of parallelization.
**Writing outputs**
* When we collect results, writing outputs is handled using lock-free containers such as [`ConcurrentBag`](https://crates.io/crates/orx-concurrent-bag) and [`ConcurrentOrderedBag`](https://crates.io/crates/orx-concurrent-ordered-bag) which aim for high performance collection of results.

Since it is a trait, parallel runner is customizable. It is possible to implement and use your *own runner* by calling [`with_runner`](https://docs.rs/orx-parallel/latest/orx_parallel/trait.ParIter.html#tymethod.with_runner) transformation method on the parallel iterator. Default parallel runner targets to be efficient in general. When we have a use case with special characteristics, we can implement a `ParallelRunner` optimized for this scenario and use with the parallel iterators.
There are two main decisions to be taken while executing these components:
* how many threads do we use?
* what is the chunk size; i.e., how many input items does a thread pull each time?

A [`ParallelRunner`](https://docs.rs/orx-parallel/latest/orx_parallel/trait.ParallelRunner) is a combination of a `ParThreadPool` and a `ParallelExecutor` that are responsible for these decisions, respectively.

### ParThreadPool: number of threads

[`ParThreadPool`](https://docs.rs/orx-parallel/latest/orx_parallel/trait.ParThreadPool) trait generalizes thread pools that can be used for parallel computations. This allows the parallel computation to be generic over thread pools.

When not explicitly set, [`DefaultPool`](https://docs.rs/orx-parallel/latest/orx_parallel/type.DefaultPool) is used:
* When **std** feature is enabled, default pool is the [`StdDefaultPool`](https://docs.rs/orx-parallel/latest/orx_parallel/struct.StdDefaultPool). In other words, all available native threads can be used by the parallel computation. This number can globally bounded by "ORX_PARALLEL_MAX_NUM_THREADS" environment variable when set.
* When working in a **no-std** environment, default pool is the [`SequentialPool`](https://docs.rs/orx-parallel/latest/orx_parallel/struct.SequentialPool). As the name suggests, this pool executes the parallel computation sequentially on the main thread. It can be considered as a placeholder to be overwritten by `with_pool` or `with_runner` methods to achieve parallelism.

*Note that thread pool defines the resource, or upper bound. This upper bound can further be bounded by the [`num_threads`](https://docs.rs/orx-parallel/latest/orx_parallel/trait.ParIter.html#tymethod.num_threads) configuration. Finally, parallel executor might choose not to use all available threads if it decides that the computation is small enough.*

To overwrite the defaults and explicitly set the thread pool to be used for the computation, [`with_pool`](https://docs.rs/orx-parallel/latest/orx_parallel/trait.ParIter.html#tymethod.with_pool) or [`with_runner`](https://docs.rs/orx-parallel/latest/orx_parallel/trait.ParIter.html#tymethod.with_runner) methods are used.

```rust
use orx_parallel::*;

let inputs: Vec<_> = (0..42).collect();

// uses the DefaultPool
// assuming "std" enabled, StdDefaultPool will be used; i.e., native threads
let sum = inputs.par().sum();

// equivalent to:
let sum2 = inputs.par().with_pool(StdDefaultPool::default()).sum();
assert_eq!(sum, sum2);

#[cfg(feature = "scoped_threadpool")]
{
let mut pool = scoped_threadpool::Pool::new(8);
// uses the scoped_threadpool::Pool created with 8 threads
let sum2 = inputs.par().with_pool(&mut pool).sum();
assert_eq!(sum, sum2);
}

#[cfg(feature = "rayon-core")]
{
let pool = rayon_core::ThreadPoolBuilder::new()
.num_threads(8)
.build()
.unwrap();
// uses the rayon-core::ThreadPool created with 8 threads
let sum2 = inputs.par().with_pool(&pool).sum();
assert_eq!(sum, sum2);
}

#[cfg(feature = "yastl")]
{
let pool = YastlPool::new(8);
// uses the yastl::Pool created with 8 threads
let sum2 = inputs.par().with_pool(&pool).sum();
assert_eq!(sum, sum2);
}
```

`ParThreadPool` implementations of several thread pools are provided in this crate as optional features (see [features](#features) section). Provided that the pool supports scoped computations, it is trivial to implement this trait in most cases (see [implementations](https://github.com/orxfun/orx-parallel/tree/main/src/runner/implementations) for examples).

### ParallelExecutor: chunk size

Once thread pool provides the computation resources, it is [`ParallelExecutor`](https://docs.rs/orx-parallel/latest/orx_parallel/trait.ParallelExecutor)'s task to distribute work to available threads. As mentioned above, all threads receive exactly the same closure. This closure continues to pull elements from the input concurrent iterator and operate on the inputs until all elements are processed.

The critical decision that parallel executor makes is the chunk size. Depending on the state of the computation, it can dynamically decide on number of elements to pull from the input iterator. The tradeoff it tries to solve is as follows:

* the larger the chunk size,
* the smaller the parallelization overhead; but also
* the larger the risk of imbalance in cases of heterogeneity.

## Features

* **std**: This is a **no-std** crate while *std* is included as a default feature. Please use `--no-default-features` flag for no-std use cases. **std** feature enables `StdDefaultPool` as the default thread provider which uses native threads.
* **rayon-core**: This feature enables using `rayon_core::ThreadPool` for parallel computations.
* **scoped_threadpool**: This feature enables using `scoped_threadpool::Pool`.
* **scoped-pool**: This feature enables using `scoped-pool::Pool`.
* **yastl**: This feature enables using `yastl::Pool`.
* **pond**: This feature enables using `pond::Pool`.
* **poolite**: This feature enables using `poolite::Pool`.

## Contributing

Expand All @@ -439,7 +520,8 @@ Please open an [issue](https://github.com/orxfun/orx-parallel/issues/new) or cre

* if you notice an error,
* have a question or think something could be improved,
* have an input collection or generator that needs to be parallelized, or
* have an input collection or generator that needs to be parallelized,
* want to use a particular thread pool with parallel iterators,
* having trouble representing a particular parallel computation with parallel iterators,
* or anything else:)

Expand Down
2 changes: 1 addition & 1 deletion src/executor/fixed_chunk_executor/parallel_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl ParallelExecutor for FixedChunkRunner {
I: ConcurrentIter,
{
let num_spawned = num_spawned.into_inner();
if num_spawned % LAG_PERIODICITY == 0 {
if num_spawned.is_multiple_of(LAG_PERIODICITY) {
match self.next_chunk(num_spawned, iter.try_get_len()) {
Some(c) => self.current_chunk_size.store(c, Ordering::Relaxed),
None => return false,
Expand Down