Skip to content

Commit

Permalink
Use new allocator API, various fixes, formating (#17)
Browse files Browse the repository at this point in the history
* Various updates

- Update to the new allocator api
- Remove unused crate features
- Remove unused rust features
- Use `cacheline_pad` macro which didn't work previously
- Remove manual opt-out of `impl Sync for Consumer/Producer` as
  that's the automatic behavior now (related: issue #1)
- Move criterion benches to `./benches`. (run them via `cargo bench`)
- Running the two simple throughput benches now requires
  `cargo test --release -- --ignored --nocapture`

* Apply clippy suggestions

* Rustfmt the code

* Remove nightly feature from .travis.yml

* Bump version
  • Loading branch information
Bobo1239 authored and polyfractal committed May 15, 2018
1 parent 2b76215 commit 6402187
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 319 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ script:
travis-cargo build &&
travis-cargo test &&
travis-cargo bench &&
travis-cargo --only nightly doc
travis-cargo doc
after_success:
- travis-cargo --only nightly doc-upload
- travis-cargo doc-upload
env:
global:
- TRAVIS_CARGO_NIGHTLY_FEATURE=nightly
- TRAVIS_CARGO_NIGHTLY_FEATURE=""
- secure: hJGlmMnuuwl2C4UvCzt6jr0u7OGuGef6aaNTISa2DMbYOdIaeei2ZKIQMyCOCag5fp7QHJbfKBA7P/5BGsOa1YC7aeBa3M5gSshNpwmocEeSbS5KaieUe2HPbxliEXZMO7A0C/7TcuydJApzBljF8t6nLglckVsLCUZxPaqnpew=
21 changes: 7 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bounded-spsc-queue"
version = "0.2.0"
version = "0.3.0"
authors = ["Zachary Tong <zacharyjtong@gmail.com>"]
license = "Apache-2.0"
description = "A bounded SPSC queue"
Expand All @@ -15,19 +15,12 @@ readme = "README.md"

[lib]
name = "bounded_spsc_queue"
test = true
doctest = false

[features]
# no features by default
default = []
nightly = []
#benchmark = ["criterion", "time"]
[[bench]]
name = "benchmark"
harness = false

#[dependencies.time]
#version = "0.1.34"
#optional = true

#[dependencies.criterion]
#git = "https://github.com/japaric/criterion.rs.git"
#optional = true
[dev-dependencies]
criterion = "0.2.3"
time = "0.1.39"
169 changes: 169 additions & 0 deletions benches/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
extern crate bounded_spsc_queue;
#[macro_use]
extern crate criterion;
extern crate time;

use std::thread;

use criterion::{Bencher, Criterion};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::Arc;

criterion_group!(
benches,
bench_single_thread_chan,
bench_single_thread_spsc,
bench_threaded_chan,
bench_threaded_spsc,
bench_threaded_reverse_chan,
bench_threaded_reverse_spsc
);
criterion_main!(benches);

fn bench_single_thread_chan(c: &mut Criterion) {
c.bench_function("bench_single_chan", bench_chan);
}

fn bench_single_thread_spsc(c: &mut Criterion) {
c.bench_function("bench_single_spsc", bench_spsc);
}

fn bench_threaded_chan(c: &mut Criterion) {
c.bench_function("bench_threaded_chan", bench_chan_threaded);
}

fn bench_threaded_spsc(c: &mut Criterion) {
c.bench_function("bench_threaded_spsc", bench_spsc_threaded);
}

fn bench_threaded_reverse_chan(c: &mut Criterion) {
c.bench_function("bench_reverse_chan", bench_chan_threaded2);
}

fn bench_threaded_reverse_spsc(c: &mut Criterion) {
c.bench_function("bench_reverse_spsc", bench_spsc_threaded2);
}

fn bench_chan(b: &mut Bencher) {
let (tx, rx) = sync_channel::<u8>(500);
b.iter(|| {
tx.send(1).unwrap();
rx.recv().unwrap()
});
}

fn bench_chan_threaded(b: &mut Bencher) {
let (tx, rx) = sync_channel::<u8>(500);
let flag = AtomicBool::new(false);
let arc_flag = Arc::new(flag);

let flag_clone = arc_flag.clone();
thread::spawn(move || {
while flag_clone.load(Ordering::Acquire) == false {
// Try to do as much work as possible without checking the atomic
for _ in 0..400 {
rx.recv().unwrap();
}
}
});

b.iter(|| tx.send(1));

let flag_clone = arc_flag.clone();
flag_clone.store(true, Ordering::Release);

// We have to loop a minimum of 400 times to guarantee the other thread shuts down
for _ in 0..400 {
let _ = tx.send(1);
}
}

fn bench_chan_threaded2(b: &mut Bencher) {
let (tx, rx) = sync_channel::<u8>(500);
let flag = AtomicBool::new(false);
let arc_flag = Arc::new(flag);

let flag_clone = arc_flag.clone();
thread::spawn(move || {
while flag_clone.load(Ordering::Acquire) == false {
// Try to do as much work as possible without checking the atomic
for _ in 0..400 {
let _ = tx.send(1);
}
}
});

b.iter(|| rx.recv().unwrap());

let flag_clone = arc_flag.clone();
flag_clone.store(true, Ordering::Release);

// We have to loop a minimum of 400 times to guarantee the other thread shuts down
for _ in 0..400 {
let _ = rx.try_recv();
}
}

fn bench_spsc(b: &mut Bencher) {
let (p, c) = bounded_spsc_queue::make(500);

b.iter(|| {
p.push(1);
c.pop()
});
}

fn bench_spsc_threaded(b: &mut Bencher) {
let (p, c) = bounded_spsc_queue::make(500);

let flag = AtomicBool::new(false);
let arc_flag = Arc::new(flag);

let flag_clone = arc_flag.clone();
thread::spawn(move || {
while flag_clone.load(Ordering::Acquire) == false {
// Try to do as much work as possible without checking the atomic
for _ in 0..400 {
c.pop();
}
}
});

b.iter(|| p.push(1));

let flag_clone = arc_flag.clone();
flag_clone.store(true, Ordering::Release);

// We have to loop a minimum of 400 times to guarantee the other thread shuts down
for _ in 0..400 {
p.try_push(1);
}
}

fn bench_spsc_threaded2(b: &mut Bencher) {
let (p, c) = bounded_spsc_queue::make(500);

let flag = AtomicBool::new(false);
let arc_flag = Arc::new(flag);

let flag_clone = arc_flag.clone();
thread::spawn(move || {
while flag_clone.load(Ordering::Acquire) == false {
// Try to do as much work as possible without checking the atomic
for _ in 0..400 {
p.push(1);
}
}
});

b.iter(|| c.pop());

let flag_clone = arc_flag.clone();
flag_clone.store(true, Ordering::Release);

// We have to loop a minimum of 400 times to guarantee the other thread shuts down
for _ in 0..400 {
c.try_pop();
}
}
Loading

0 comments on commit 6402187

Please sign in to comment.