Skip to content

Commit

Permalink
This is an automated cherry-pick of #14465
Browse files Browse the repository at this point in the history
close pingcap/tiflow#8561, ref #11993

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
overvenus authored and ti-chi-bot committed May 23, 2023
1 parent 68ad571 commit 4704551
Showing 1 changed file with 113 additions and 33 deletions.
146 changes: 113 additions & 33 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
cell::RefCell,
cmp::{Ord, Ordering as CmpOrdering, PartialOrd, Reverse},
collections::BinaryHeap,
fmt,
Expand Down Expand Up @@ -297,16 +298,8 @@ impl ResolvedRegionHeap {
(min_resolved_ts, outliers)
}

fn to_hash_set(&self) -> (TimeStamp, HashSet<u64>) {
let mut min_resolved_ts = TimeStamp::max();
let mut regions = HashSet::with_capacity_and_hasher(self.heap.len(), Default::default());
for resolved_region in &self.heap {
regions.insert(resolved_region.0.region_id);
if min_resolved_ts > resolved_region.0.resolved_ts {
min_resolved_ts = resolved_region.0.resolved_ts;
}
}
(min_resolved_ts, regions)
fn is_empty(&self) -> bool {
self.heap.is_empty()
}

fn clear(&mut self) {
Expand Down Expand Up @@ -349,7 +342,7 @@ pub struct Endpoint<T, E> {
sink_memory_quota: MemoryQuota,

old_value_cache: OldValueCache,
resolved_region_heap: ResolvedRegionHeap,
resolved_region_heap: RefCell<ResolvedRegionHeap>,

causal_ts_provider: Option<Arc<CausalTsProviderImpl>>,

Expand Down Expand Up @@ -444,9 +437,9 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
concurrency_manager,
min_resolved_ts: TimeStamp::max(),
min_ts_region_id: 0,
resolved_region_heap: ResolvedRegionHeap {
resolved_region_heap: RefCell::new(ResolvedRegionHeap {
heap: BinaryHeap::new(),
},
}),
old_value_cache,
resolved_region_count: 0,
unresolved_region_count: 0,
Expand Down Expand Up @@ -836,7 +829,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {

fn on_min_ts(&mut self, regions: Vec<u64>, min_ts: TimeStamp, current_ts: TimeStamp) {
// Reset resolved_regions to empty.
let resolved_regions = &mut self.resolved_region_heap;
let mut resolved_regions = self.resolved_region_heap.borrow_mut();
resolved_regions.clear();

let total_region_count = regions.len();
Expand Down Expand Up @@ -882,6 +875,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
"min_resolved_ts" => self.min_resolved_ts,
"min_ts_region_id" => self.min_ts_region_id,
"min_ts" => min_ts,
"lag" => ?Duration::from_millis(lag_millis),
"ok" => advance_ok,
"none" => advance_failed_none,
"stale" => advance_failed_stale,
Expand All @@ -895,13 +889,14 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
// so 1) downstreams know where they should send resolve lock requests,
// and 2) resolved ts of normal regions does not fallback.
//
// Max number of outliers, in most cases, only a few regions are outliers.
// TODO: figure out how to avoid create hashset every time, saving some CPU.
let max_outlier_count = 32;
let (outlier_min_resolved_ts, outlier_regions) = resolved_regions.pop(max_outlier_count);
let (normal_min_resolved_ts, normal_regions) = resolved_regions.to_hash_set();
self.broadcast_resolved_ts(outlier_min_resolved_ts, outlier_regions);
self.broadcast_resolved_ts(normal_min_resolved_ts, normal_regions);
// Regions are separated exponentially to reduce resolved ts events and
// save CPU for both TiKV and TiCDC.
let mut batch_count = 8;
while !resolved_regions.is_empty() {
let (outlier_min_resolved_ts, outlier_regions) = resolved_regions.pop(batch_count);
self.broadcast_resolved_ts(outlier_min_resolved_ts, outlier_regions);
batch_count *= 4;
}
}

fn broadcast_resolved_ts(&self, min_resolved_ts: TimeStamp, regions: HashSet<u64>) {
Expand Down Expand Up @@ -1193,6 +1188,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> RunnableWithTimer for Endpoin

// Reclaim resolved_region_heap memory.
self.resolved_region_heap
.borrow_mut()
.reset_and_shrink_to(self.capture_regions.len());

CDC_CAPTURED_REGION_COUNT.set(self.capture_regions.len() as i64);
Expand Down Expand Up @@ -1275,7 +1271,15 @@ mod tests {
};

use super::*;
<<<<<<< HEAD
use crate::{channel, recv_timeout};
=======
use crate::{
channel,
delegate::{post_init_downstream, ObservedRange},
recv_timeout,
};
>>>>>>> 83ce091887 (cdc: batch send resolved ts exponentially to speed up TiCDC resolve lock (#14465))

struct TestEndpointSuite {
// The order must ensure `endpoint` be dropped before other fields.
Expand Down Expand Up @@ -2458,11 +2462,6 @@ mod tests {
assert!(regions.contains(&5));
assert!(regions.contains(&6));

// Empty regions
let (ts, regions) = heap.to_hash_set();
assert_eq!(ts, TimeStamp::max());
assert!(regions.is_empty());

let mut heap1 = ResolvedRegionHeap {
heap: BinaryHeap::new(),
};
Expand All @@ -2476,13 +2475,6 @@ mod tests {
assert_eq!(regions.len(), 1);
assert!(regions.contains(&3));

let (ts, regions) = heap1.to_hash_set();
assert_eq!(ts, 4.into());
assert_eq!(regions.len(), 3);
assert!(regions.contains(&4));
assert!(regions.contains(&5));
assert!(regions.contains(&6));

heap1.reset_and_shrink_to(3);
assert_eq!(3, heap1.heap.capacity());
assert!(heap1.heap.is_empty());
Expand All @@ -2491,4 +2483,92 @@ mod tests {
heap1.clear();
assert!(heap1.heap.is_empty());
}

#[test]
fn test_on_min_ts() {
let cfg = CdcConfig {
// Disable automatic advance resolved ts during test.
min_ts_interval: ReadableDuration(Duration::from_secs(1000)),
..Default::default()
};
let mut suite = mock_endpoint(&cfg, None, ApiVersion::V1);
let quota = crate::channel::MemoryQuota::new(usize::MAX);
let (tx, mut rx) = channel::channel(1, quota);
let mut rx = rx.drain();

let conn = Conn::new(tx, String::new());
let conn_id = conn.get_id();
suite.run(Task::OpenConn { conn });
let mut req_header = Header::default();
req_header.set_cluster_id(0);

let mut regions = vec![];
for id in 1..4097 {
regions.push(id);
suite.add_region(id, 100);

let mut req = ChangeDataRequest::default();
req.set_region_id(id);
let region_epoch = req.get_region_epoch().clone();
let downstream = Downstream::new(
"".to_string(),
region_epoch.clone(),
id,
conn_id,
ChangeDataRequestKvApi::TiDb,
false,
ObservedRange::default(),
);
on_init_downstream(&downstream.get_state());
post_init_downstream(&downstream.get_state());
// Enable batch resolved ts in the test.
let version = FeatureGate::batch_resolved_ts();
suite.run(Task::Register {
request: req.clone(),
downstream,
conn_id,
version: version.clone(),
});

let mut resolver = Resolver::new(id);
resolver.track_lock(TimeStamp::compose(0, id), vec![], None);
let mut region = Region::default();
region.id = id;
region.set_region_epoch(region_epoch);
let failed = suite
.capture_regions
.get_mut(&id)
.unwrap()
.on_region_ready(resolver, region);
assert!(failed.is_empty());
}
suite
.task_rx
.recv_timeout(Duration::from_millis(100))
.unwrap_err();

suite.run(Task::MinTs {
regions,
min_ts: TimeStamp::compose(0, 4096),
current_ts: TimeStamp::compose(0, 4096),
});

// There should be at least 3 resolved ts events.
let mut last_resolved_ts = 0;
let mut last_batch_count = 0;
for _ in 0..3 {
let event = recv_timeout(&mut rx, Duration::from_millis(100))
.unwrap()
.unwrap()
.0;
assert!(last_resolved_ts < event.resolved_ts().ts, "{:?}", event);
assert!(
last_batch_count < event.resolved_ts().regions.len(),
"{:?}",
event
);
last_resolved_ts = event.resolved_ts().ts;
last_batch_count = event.resolved_ts().regions.len();
}
}
}

0 comments on commit 4704551

Please sign in to comment.