Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clone and index traits #25

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "orx-concurrent-vec"
version = "2.5.0"
version = "2.6.0"
edition = "2021"
authors = ["orxfun <orx.ugur.arikan@gmail.com>"]
description = "An efficient, convenient and lightweight grow-only read & write concurrent data structure allowing high performance concurrent collection."
Expand All @@ -11,19 +11,20 @@ categories = ["data-structures", "concurrency", "rust-patterns"]

[dependencies]
orx-pseudo-default = "1.4"
orx-pinned-vec = "3.4"
orx-fixed-vec = "3.4"
orx-split-vec = "3.4"
orx-pinned-concurrent-col = "2.4"
orx-pinned-vec = "3.7"
orx-fixed-vec = "3.7"
orx-split-vec = "3.7"
orx-pinned-concurrent-col = "2.6"
orx-concurrent-option = "1.1"

[dev-dependencies]
orx-concurrent-bag = "2.4"
orx-concurrent-bag = "2.6"
criterion = "0.5.1"
rand = "0.8.5"
rayon = "1.9.0"
test-case = "3.3.1"
append-only-vec = "0.1.5"
boxcar = "0.2.5"

[[bench]]
name = "collect_with_extend"
Expand Down
71 changes: 11 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ assert_eq!(measurements.len(), 100);
assert_eq!(averages.len(), 10);
```

You may find more concurrent grow & read examples in the respective test files:
* [tests/concurrent_push_read.rs](https://github.com/orxfun/orx-concurrent-vec/blob/main/tests/concurrent_push_read.rs)
* [tests/concurrent_extend_read.rs](https://github.com/orxfun/orx-concurrent-vec/blob/main/tests/concurrent_extend_read.rs)
* [tests/concurrent_clone.rs](https://github.com/orxfun/orx-concurrent-vec/blob/main/tests/concurrent_clone.rs)

## Properties of the Concurrent Model

ConcurrentVec wraps a [`PinnedVec`](https://crates.io/crates/orx-pinned-vex) of [`ConcurrentOption`](https://crates.io/crates/orx-concurrent-option) elements. This composition leads to the following safety guarantees:
Expand All @@ -118,35 +123,15 @@ Together, concurrency model of the `ConcurrentVec` has the following properties:

*You may find the details of the benchmarks at [benches/collect_with_push.rs](https://github.com/orxfun/orx-concurrent-vec/blob/main/benches/collect_with_push.rs).*

In the experiment, `rayon`s parallel iterator, `AppendOnlyVec`s and `ConcurrentVec`s `push` methods are used to collect results from multiple threads. Further, different underlying pinned vectors of the `ConcurrentVec` are tested. We observe that:
In the experiment, `rayon`s parallel iterator, and push methods of `AppendOnlyVec`, `boxcar::Vec` and `ConcurrentVec` are used to collect results from multiple threads. Further, different underlying pinned vectors of the `ConcurrentVec` are evaluated.

<img src="https://raw.githubusercontent.com/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_push.PNG" alt="https://raw.githubusercontent.com/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_push.PNG" />

We observe that:
* The default `Doubling` growth strategy leads to efficient concurrent collection of results. Note that this variant does not require any input to construct.
* On the other hand, `Linear` growth strategy performs significantly better. Note that value of this argument means that each fragment of the underlying `SplitVec` will have a capacity of 2^12 (4096) elements. The underlying reason of improvement is potentially be due to less waste and could be preferred with minor knowledge of the data to be pushed.
* Finally, `Fixed` growth strategy is the least flexible and requires perfect knowledge about the hard-constrained capacity (will panic if we exceed). Since it does not outperform `Linear`, we do not necessarily prefer `Fixed` even if we have the perfect knowledge.

```bash
rayon/num_threads=8,num_items_per_thread-type=[16384]
time: [16.057 ms 16.390 ms 16.755 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[16384]
time: [23.679 ms 24.480 ms 25.439 ms]
concurrent_vec(Doubling)/num_threads=8,num_items_per_thread-type=[16384]
time: [14.055 ms 14.287 ms 14.526 ms]
concurrent_vec(Linear(12))/num_threads=8,num_items_per_thread-type=[16384]
time: [8.4686 ms 8.6396 ms 8.8373 ms]
concurrent_vec(Fixed)/num_threads=8,num_items_per_thread-type=[16384]
time: [9.8297 ms 9.9945 ms 10.151 ms]

rayon/num_threads=8,num_items_per_thread-type=[65536]
time: [43.118 ms 44.065 ms 45.143 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[65536]
time: [110.66 ms 114.09 ms 117.94 ms]
concurrent_vec(Doubling)/num_threads=8,num_items_per_thread-type=[65536]
time: [61.461 ms 62.547 ms 63.790 ms]
concurrent_vec(Linear(12))/num_threads=8,num_items_per_thread-type=[65536]
time: [37.420 ms 37.740 ms 38.060 ms]
concurrent_vec(Fixed)/num_threads=8,num_items_per_thread-type=[65536]
time: [43.017 ms 43.584 ms 44.160 ms]
```

The performance can further be improved by using `extend` method instead of `push`. You may see results in the next subsection and details in the [performance notes](https://docs.rs/orx-concurrent-bag/2.3.0/orx_concurrent_bag/#section-performance-notes) of `ConcurrentBag` which has similar characteristics.

### Performance with `extend`
Expand All @@ -158,41 +143,7 @@ The only difference in this follow up experiment is that we use `extend` rather
* There is not a significant difference between extending by batches of 64 elements or batches of 65536 elements. We do not need a well tuned number, a large enough batch size seems to be just fine.
* Not all scenarios allow to extend in batches; however, the significant performance improvement makes it preferable whenever possible.

```bash
rayon/num_threads=8,num_items_per_thread-type=[16384]
time: [16.102 ms 16.379 ms 16.669 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[16384]
time: [27.922 ms 28.611 ms 29.356 ms]
concurrent_vec(Doubling) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
time: [8.7361 ms 8.8347 ms 8.9388 ms]
concurrent_vec(Linear(12)) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
time: [4.2035 ms 4.2975 ms 4.4012 ms]
concurrent_vec(Fixed) | batch-size=64/num_threads=8,num_items_per_thread-type=[16384]
time: [4.9670 ms 5.0928 ms 5.2217 ms]
concurrent_vec(Doubling) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
time: [9.2441 ms 9.3988 ms 9.5594 ms]
concurrent_vec(Linear(12)) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
time: [3.5663 ms 3.6527 ms 3.7405 ms]
concurrent_vec(Fixed) | batch-size=16384/num_threads=8,num_items_per_thread-type=[16384]
time: [5.0839 ms 5.2169 ms 5.3576 ms]

rayon/num_threads=8,num_items_per_thread-type=[65536]
time: [47.861 ms 48.836 ms 49.843 ms]
append_only_vec/num_threads=8,num_items_per_thread-type=[65536]
time: [125.52 ms 128.89 ms 132.41 ms]
concurrent_vec(Doubling) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
time: [42.516 ms 43.097 ms 43.715 ms]
concurrent_vec(Linear(12)) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
time: [20.025 ms 20.269 ms 20.521 ms]
concurrent_vec(Fixed) | batch-size=64/num_threads=8,num_items_per_thread-type=[65536]
time: [25.284 ms 25.818 ms 26.375 ms]
concurrent_vec(Doubling) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
time: [39.371 ms 39.887 ms 40.470 ms]
concurrent_vec(Linear(12)) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
time: [17.808 ms 17.923 ms 18.046 ms]
concurrent_vec(Fixed) | batch-size=65536/num_threads=8,num_items_per_thread-type=[65536]
time: [24.291 ms 24.702 ms 25.133 ms]
```
<img src="https://raw.githubusercontent.com/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_extend.PNG" alt="https://raw.githubusercontent.com/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_extend.PNG" />

## Concurrent Friend Collections

Expand Down
51 changes: 42 additions & 9 deletions benches/collect_with_extend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ fn with_concurrent_vec<T: Sync, P: IntoConcurrentPinnedVec<ConcurrentOption<T>>>
for _ in 0..num_threads {
s.spawn(|| {
for j in (0..num_items_per_thread).step_by(batch_size) {
let into_iter =
(j..(j + batch_size)).map(|j| std::hint::black_box(compute(j, j + 1)));
let into_iter = (j..(j + batch_size)).map(|j| compute(j, j + 1));
vec.extend(into_iter);
}
});
Expand All @@ -65,7 +64,7 @@ fn with_rayon<T: Send + Sync + Clone + Copy>(
.into_par_iter()
.flat_map(|_| {
(0..num_items_per_thread)
.map(move |j| std::hint::black_box(compute(j, j + 1)))
.map(move |j| compute(j, j + 1))
.collect::<Vec<_>>()
})
.collect();
Expand All @@ -83,7 +82,26 @@ fn with_append_only_vec<T: Send + Sync + Clone + Copy>(
for _ in 0..num_threads {
s.spawn(|| {
for j in 0..num_items_per_thread {
vec.push(std::hint::black_box(compute(j, j + 1)));
vec.push(compute(j, j + 1));
}
});
}
});

vec
}

fn with_boxcar<T: Send + Sync + Clone + Copy>(
num_threads: usize,
num_items_per_thread: usize,
compute: fn(usize, usize) -> T,
vec: boxcar::Vec<T>,
) -> boxcar::Vec<T> {
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
for j in 0..num_items_per_thread {
vec.push(compute(j, j + 1));
}
});
}
Expand All @@ -105,14 +123,16 @@ fn bench_grow(c: &mut Criterion) {

let max_len = num_threads * num_items_per_thread;

let compute = compute_large_data;

// rayon

group.bench_with_input(BenchmarkId::new("rayon", &treatment), &(), |b, _| {
b.iter(|| {
black_box(with_rayon(
black_box(num_threads),
black_box(num_items_per_thread),
compute_large_data,
compute,
))
})
});
Expand All @@ -127,13 +147,26 @@ fn bench_grow(c: &mut Criterion) {
black_box(with_append_only_vec(
black_box(num_threads),
black_box(num_items_per_thread),
compute_large_data,
compute,
AppendOnlyVec::new(),
))
})
},
);

// BOXCAR

group.bench_with_input(BenchmarkId::new("boxcar", &treatment), &(), |b, _| {
b.iter(|| {
black_box(with_boxcar(
black_box(num_threads),
black_box(num_items_per_thread),
compute,
boxcar::Vec::new(),
))
})
});

// ConcurrentVec

let batch_sizes = vec![64, num_items_per_thread];
Expand All @@ -154,7 +187,7 @@ fn bench_grow(c: &mut Criterion) {
black_box(with_concurrent_vec(
black_box(num_threads),
black_box(num_items_per_thread),
compute_large_data,
compute,
batch_size,
ConcurrentVec::with_doubling_growth(),
))
Expand All @@ -172,7 +205,7 @@ fn bench_grow(c: &mut Criterion) {
black_box(with_concurrent_vec(
black_box(num_threads),
black_box(num_items_per_thread),
compute_large_data,
compute,
batch_size,
ConcurrentVec::with_linear_growth(12, num_linear_fragments),
))
Expand All @@ -185,7 +218,7 @@ fn bench_grow(c: &mut Criterion) {
black_box(with_concurrent_vec(
black_box(num_threads),
black_box(num_items_per_thread),
compute_large_data,
compute,
batch_size,
ConcurrentVec::with_fixed_capacity(num_threads * num_items_per_thread),
))
Expand Down
38 changes: 35 additions & 3 deletions benches/collect_with_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn with_concurrent_vec<T: Sync, P: IntoConcurrentPinnedVec<ConcurrentOption<T>>>
for _ in 0..num_threads {
s.spawn(|| {
for j in 0..num_items_per_thread {
vec.push(std::hint::black_box(compute(j, j + 1)));
vec.push(compute(j, j + 1));
}
});
}
Expand All @@ -62,7 +62,7 @@ fn with_rayon<T: Send + Sync + Clone + Copy>(
.into_par_iter()
.flat_map(|_| {
(0..num_items_per_thread)
.map(move |j| std::hint::black_box(compute(j, j + 1)))
.map(move |j| compute(j, j + 1))
.collect::<Vec<_>>()
})
.collect();
Expand All @@ -80,7 +80,26 @@ fn with_append_only_vec<T: Send + Sync + Clone + Copy>(
for _ in 0..num_threads {
s.spawn(|| {
for j in 0..num_items_per_thread {
vec.push(std::hint::black_box(compute(j, j + 1)));
vec.push(compute(j, j + 1));
}
});
}
});

vec
}

fn with_boxcar<T: Send + Sync + Clone + Copy>(
num_threads: usize,
num_items_per_thread: usize,
compute: fn(usize, usize) -> T,
vec: boxcar::Vec<T>,
) -> boxcar::Vec<T> {
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
for j in 0..num_items_per_thread {
vec.push(compute(j, j + 1));
}
});
}
Expand Down Expand Up @@ -133,6 +152,19 @@ fn bench_grow(c: &mut Criterion) {
},
);

// BOXCAR

group.bench_with_input(BenchmarkId::new("boxcar", &treatment), &(), |b, _| {
b.iter(|| {
black_box(with_boxcar(
black_box(num_threads),
black_box(num_items_per_thread),
compute,
boxcar::Vec::new(),
))
})
});

// WITH-SCOPE

group.bench_with_input(
Expand Down
Binary file modified benches/results/collect.xlsx
Binary file not shown.
Binary file modified docs/img/bench_collect_with_extend.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/img/bench_collect_with_push.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 14 additions & 0 deletions src/common_traits/clone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::ConcurrentVec;
use orx_concurrent_option::ConcurrentOption;
use orx_fixed_vec::IntoConcurrentPinnedVec;

impl<T, P> Clone for ConcurrentVec<T, P>
where
P: IntoConcurrentPinnedVec<ConcurrentOption<T>>,
T: Clone,
{
fn clone(&self) -> Self {
let core = unsafe { self.core.clone_with_len(self.len()) };
Self { core }
}
}
46 changes: 46 additions & 0 deletions src/common_traits/debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::ConcurrentVec;
use orx_concurrent_option::ConcurrentOption;
use orx_fixed_vec::IntoConcurrentPinnedVec;
use std::fmt::Debug;

const ELEM_PER_LINE: usize = 8;

impl<T, P> Debug for ConcurrentVec<T, P>
where
P: IntoConcurrentPinnedVec<ConcurrentOption<T>>,
T: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let len = self.len();
let capacity = self.capacity();

write!(f, "ConcurrentVec {{")?;
write!(f, "\n len: {},", len)?;
write!(f, "\n capacity: {},", capacity)?;
write!(f, "\n data: [,")?;
for i in 0..len {
if i % ELEM_PER_LINE == 0 {
write!(f, "\n ")?;
}
match self.get(i) {
Some(x) => write!(f, "{:?}, ", x)?,
None => write!(f, "*, ")?,
}
}
write!(f, "\n ],")?;
write!(f, "\n}}")
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn debug() {
let vec = ConcurrentVec::new();
vec.extend([0, 4, 1, 2, 5, 6, 32, 5, 1, 121, 2, 42]);
let dbg_str = format!("{:?}", &vec);
assert_eq!(dbg_str, "ConcurrentVec {\n len: 12,\n capacity: 12,\n data: [,\n 0, 4, 1, 2, 5, 6, 32, 5, \n 1, 121, 2, 42, \n ],\n}");
}
}
Loading