Skip to content

Commit

Permalink
tikv_util: introduce future channel (tikv#13407)
Browse files Browse the repository at this point in the history
ref tikv#12842

It's a super set of batch channel. It can be used as a batch channel
or just a future channel.

This is the first PR to use unified thread pool in apply system for v2.

Signed-off-by: Jay Lee <BusyJayLee@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
BusyJay and ti-chi-bot committed Sep 15, 2022
1 parent 1179131 commit 7382d4e
Show file tree
Hide file tree
Showing 9 changed files with 462 additions and 547 deletions.
5 changes: 5 additions & 0 deletions components/tikv_util/Cargo.toml
Expand Up @@ -73,3 +73,8 @@ regex = "1.0"
tempfile = "3.0"
toml = "0.5"
utime = "0.2"

[[bench]]
name = "channel"
path = "benches/channel/mod.rs"
test = true
Expand Up @@ -113,8 +113,8 @@ fn bench_crossbeam_channel(b: &mut Bencher) {
}

#[bench]
fn bench_receiver_stream_batch(b: &mut Bencher) {
let (tx, rx) = mpsc::batch::bounded::<i32>(128, 8);
fn bench_receiver_stream_unbounded_batch(b: &mut Bencher) {
let (tx, rx) = mpsc::future::unbounded::<i32>(mpsc::future::WakePolicy::TillReach(8));
for _ in 0..1 {
let tx1 = tx.clone();
thread::spawn(move || {
Expand All @@ -124,12 +124,9 @@ fn bench_receiver_stream_batch(b: &mut Bencher) {
});
}

let mut rx = Some(mpsc::batch::BatchReceiver::new(
rx,
32,
Vec::new,
mpsc::batch::VecCollector,
));
let rx = mpsc::future::BatchReceiver::new(rx, 32, Vec::new, Vec::push);

let mut rx = Some(block_on(rx.into_future()).1);

b.iter(|| {
let mut count = 0;
Expand All @@ -150,8 +147,8 @@ fn bench_receiver_stream_batch(b: &mut Bencher) {
}

#[bench]
fn bench_receiver_stream(b: &mut Bencher) {
let (tx, rx) = mpsc::batch::bounded::<i32>(128, 1);
fn bench_receiver_stream_unbounded_nobatch(b: &mut Bencher) {
let (tx, rx) = mpsc::future::unbounded::<i32>(mpsc::future::WakePolicy::Immediately);
for _ in 0..1 {
let tx1 = tx.clone();
thread::spawn(move || {
Expand All @@ -161,7 +158,7 @@ fn bench_receiver_stream(b: &mut Bencher) {
});
}

let mut rx = Some(rx);
let mut rx = Some(block_on(rx.into_future()).1);
b.iter(|| {
let mut count = 0;
let mut rx1 = rx.take().unwrap();
Expand Down
File renamed without changes.

0 comments on commit 7382d4e

Please sign in to comment.