Skip to content

Commit

Permalink
cdc: reduce resolved ts message size (tikv#10666) (tikv#10679)
Browse files Browse the repository at this point in the history
* cherry pick tikv#10666 to release-5.1

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

* resolve conflicts

Signed-off-by: Neil Shen <overvenus@gmail.com>

* cdc: fix EventBatcher statistics

Signed-off-by: Neil Shen <overvenus@gmail.com>

Co-authored-by: Neil Shen <overvenus@gmail.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people committed Sep 14, 2021
1 parent efca0c6 commit 8af4302
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions components/cdc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ test_util = { path = "../test_util", default-features = false }
panic_hook = { path = "../panic_hook" }
raft = { version = "0.6.0-alpha", default-features = false }
time = "0.1"
criterion = "0.3"

[[test]]
name = "integrations"
Expand All @@ -99,3 +100,8 @@ path = "tests/integrations/mod.rs"
name = "failpoints"
path = "tests/failpoints/mod.rs"
required-features = ["failpoints"]

[[bench]]
name = "cdc_event"
harness = false
required-features = ["protobuf-codec"]
50 changes: 50 additions & 0 deletions components/cdc/benches/cdc_event/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use kvproto::cdcpb::ResolvedTs;
use protobuf::Message;

use cdc::CdcEvent;

fn bench_cdc_event_size(c: &mut Criterion) {
let mut group = c.benchmark_group("bench_cdc_event_size");

// A typical region id.
let region_id = 4194304;
// A typical ts.
let ts = 426624231625982140;
// Benchmark from 1 region id to 131,072 region ids.
for i in 0..18 {
let len = 2usize.pow(i);
let mut resolved_ts = ResolvedTs::default();
resolved_ts.ts = ts;
resolved_ts.regions = vec![region_id; len];

let message_compute_size = resolved_ts.clone();
group.bench_with_input(
BenchmarkId::new("protobuf::Message::compute_size", len),
&message_compute_size,
|b, message_compute_size| {
b.iter(|| {
black_box(message_compute_size.compute_size());
});
},
);

let cdc_event_size = CdcEvent::ResolvedTs(resolved_ts);
group.bench_with_input(
BenchmarkId::new("CdcEvent::ResolvedTs::size", len),
&cdc_event_size,
|b, cdc_event_size| {
b.iter(move || {
black_box(cdc_event_size.size());
});
},
);
}

group.finish();
}

criterion_group!(benches, bench_cdc_event_size);
criterion_main!(benches);
14 changes: 10 additions & 4 deletions components/cdc/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
use std::time::Duration;

use tikv_util::time::Instant;

use futures::{
channel::mpsc::{
channel as bounded, unbounded, Receiver, SendError as FuturesSendError, Sender,
Expand All @@ -16,8 +14,10 @@ use futures::{
use grpcio::WriteFlags;
use kvproto::cdcpb::ChangeDataEvent;

use tikv_util::time::Instant;
use tikv_util::{impl_display_as_debug, warn};

use crate::metrics::*;
use crate::service::{CdcEvent, EventBatcher};

const CDC_MSG_MAX_BATCH_SIZE: usize = 128;
Expand Down Expand Up @@ -207,6 +207,10 @@ impl<'a> Drain {
where
S: futures::Sink<(ChangeDataEvent, WriteFlags), Error = E> + Unpin,
{
let total_event_bytes = CDC_GRPC_ACCUMULATE_MESSAGE_BYTES.with_label_values(&["event"]);
let total_resolved_ts_bytes =
CDC_GRPC_ACCUMULATE_MESSAGE_BYTES.with_label_values(&["resolved_ts"]);

let memory_quota = self.memory_quota.clone();
let mut chunks = self.drain().ready_chunks(CDC_MSG_MAX_BATCH_SIZE);
while let Some(events) = chunks.next().await {
Expand All @@ -216,6 +220,7 @@ impl<'a> Drain {
bytes += size;
batcher.push(e);
});
let (event_bytes, resolved_ts_bytes) = batcher.statistics();
let resps = batcher.build();
let last_idx = resps.len() - 1;
// Events are about to be sent, free pending events memory counter.
Expand All @@ -226,6 +231,8 @@ impl<'a> Drain {
sink.feed((e, write_flags)).await?;
}
sink.flush().await?;
total_event_bytes.inc_by(event_bytes as i64);
total_resolved_ts_bytes.inc_by(resolved_ts_bytes as i64);
}
Ok(())
}
Expand Down Expand Up @@ -253,15 +260,14 @@ impl Drop for Drain {
}
}

#[cfg(test)]
#[allow(clippy::result_unit_err)]
pub fn recv_timeout<S, I>(s: &mut S, dur: std::time::Duration) -> Result<Option<I>, ()>
where
S: Stream<Item = I> + Unpin,
{
poll_timeout(&mut s.next(), dur)
}

#[cfg(test)]
pub fn poll_timeout<F, I>(fut: &mut F, dur: std::time::Duration) -> Result<I, ()>
where
F: std::future::Future<Output = I> + Unpin,
Expand Down
Loading

0 comments on commit 8af4302

Please sign in to comment.