Skip to content

Commit

Permalink
cdc: notify pending tasks if associated regions change (#15947) (#15964)
Browse files Browse the repository at this point in the history
close #15910

Signed-off-by: qupeng <qupeng@pingcap.com>
Signed-off-by: qupeng <onlyqupeng@gmail.com>

Co-authored-by: qupeng <qupeng@pingcap.com>
Co-authored-by: qupeng <onlyqupeng@gmail.com>
Co-authored-by: Ping Yu <shui.yu@126.com>
  • Loading branch information
4 people committed Nov 10, 2023
1 parent c0d7a1b commit 9bece34
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 24 deletions.
7 changes: 5 additions & 2 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,6 @@ impl<T: 'static + CdcHandle<E>, E: KvEngine, S: StoreRegionMeta> Endpoint<T, E,
sink: conn.get_sink().clone(),
request_id: request.get_request_id(),
downstream_state,
scan_concurrency_semaphore: self.scan_concurrency_semaphore.clone(),
scan_speed_limiter: self.scan_speed_limiter.clone(),
fetch_speed_limiter: self.fetch_speed_limiter.clone(),
max_scan_batch_bytes: self.max_scan_batch_bytes,
Expand All @@ -823,10 +822,14 @@ impl<T: 'static + CdcHandle<E>, E: KvEngine, S: StoreRegionMeta> Endpoint<T, E,
};

let cdc_handle = self.cdc_handle.clone();
let concurrency_semaphore = self.scan_concurrency_semaphore.clone();
let memory_quota = self.sink_memory_quota.clone();
self.workers.spawn(async move {
CDC_SCAN_TASKS.with_label_values(&["total"]).inc();
match init.initialize(change_cmd, cdc_handle, memory_quota).await {
match init
.initialize(change_cmd, cdc_handle, concurrency_semaphore, memory_quota)
.await
{
Ok(()) => {
CDC_SCAN_TASKS.with_label_values(&["finish"]).inc();
}
Expand Down
41 changes: 32 additions & 9 deletions components/cdc/src/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ pub(crate) struct Initializer<E> {
pub(crate) request_id: u64,
pub(crate) checkpoint_ts: TimeStamp,

pub(crate) scan_concurrency_semaphore: Arc<Semaphore>,
pub(crate) scan_speed_limiter: Limiter,
pub(crate) fetch_speed_limiter: Limiter,

Expand All @@ -110,9 +109,11 @@ impl<E: KvEngine> Initializer<E> {
&mut self,
change_observer: ChangeObserver,
cdc_handle: T,
concurrency_semaphore: Arc<Semaphore>,
memory_quota: Arc<MemoryQuota>,
) -> Result<()> {
fail_point!("cdc_before_initialize");
let _permit = concurrency_semaphore.acquire().await;

// To avoid holding too many snapshots and holding them too long,
// we need to acquire scan concurrency permit before taking snapshot.
Expand Down Expand Up @@ -188,8 +189,6 @@ impl<E: KvEngine> Initializer<E> {
region: Region,
memory_quota: Arc<MemoryQuota>,
) -> Result<()> {
let scan_concurrency_semaphore = self.scan_concurrency_semaphore.clone();
let _permit = scan_concurrency_semaphore.acquire().await;
CDC_SCAN_TASKS.with_label_values(&["ongoing"]).inc();
defer!(CDC_SCAN_TASKS.with_label_values(&["ongoing"]).dec());

Expand Down Expand Up @@ -655,7 +654,6 @@ mod tests {
conn_id: ConnId::new(),
request_id: 0,
checkpoint_ts: 1.into(),
scan_concurrency_semaphore: Arc::new(Semaphore::new(1)),
scan_speed_limiter: Limiter::new(scan_limit as _),
fetch_speed_limiter: Limiter::new(fetch_limit as _),
max_scan_batch_bytes: 1024 * 1024,
Expand Down Expand Up @@ -1034,26 +1032,51 @@ mod tests {

let change_cmd = ChangeObserver::from_cdc(1, ObserveHandle::new());
let raft_router = CdcRaftRouter(MockRaftStoreRouter::new());
let concurrency_semaphore = Arc::new(Semaphore::new(1));
let memory_quota = Arc::new(MemoryQuota::new(usize::MAX));

initializer.downstream_state.store(DownstreamState::Stopped);
block_on(initializer.initialize(change_cmd, raft_router.clone(), memory_quota.clone()))
.unwrap_err();
block_on(initializer.initialize(
change_cmd,
raft_router.clone(),
concurrency_semaphore.clone(),
memory_quota.clone(),
))
.unwrap_err();

let (tx, rx) = sync_channel(1);
let concurrency_semaphore_ = concurrency_semaphore.clone();
pool.spawn(async move {
let _permit = concurrency_semaphore_.acquire().await;
tx.send(()).unwrap();
tx.send(()).unwrap();
tx.send(()).unwrap();
});
rx.recv_timeout(Duration::from_millis(200)).unwrap();

let (tx1, rx1) = sync_channel(1);
let change_cmd = ChangeObserver::from_cdc(1, ObserveHandle::new());
pool.spawn(async move {
// Migrated to 2021 migration. This let statement is probably not needed, see
// https://doc.rust-lang.org/edition-guide/rust-2021/disjoint-capture-in-closures.html
let _ = (
&initializer,
&change_cmd,
&raft_router,
&concurrency_semaphore,
);
let res = initializer
.initialize(change_cmd, raft_router, memory_quota)
.initialize(change_cmd, raft_router, concurrency_semaphore, memory_quota)
.await;
tx1.send(res).unwrap();
});
// Must timeout because there is no enough permit.
rx1.recv_timeout(Duration::from_millis(200)).unwrap_err();

// Shouldn't timeout, gets an error instead.
// Release the permit
rx.recv_timeout(Duration::from_millis(200)).unwrap();
let res = rx1.recv_timeout(Duration::from_millis(200)).unwrap();
assert!(res.is_err());
res.unwrap_err();

worker.stop();
}
Expand Down
30 changes: 18 additions & 12 deletions components/cdc/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,26 @@ impl RegionChangeObserver for CdcObserver {
event: RegionChangeEvent,
_: StateRole,
) {
if let RegionChangeEvent::Destroy = event {
let region_id = ctx.region().get_id();
if let Some(observe_id) = self.is_subscribed(region_id) {
// Unregister all downstreams.
let store_err = RaftStoreError::RegionNotFound(region_id);
let deregister = Deregister::Delegate {
region_id,
observe_id,
err: CdcError::request(store_err.into()),
};
if let Err(e) = self.sched.schedule(Task::Deregister(deregister)) {
error!("cdc schedule cdc task failed"; "error" => ?e);
match event {
RegionChangeEvent::Destroy
| RegionChangeEvent::Update(
RegionChangeReason::Split | RegionChangeReason::CommitMerge,
) => {
let region_id = ctx.region().get_id();
if let Some(observe_id) = self.is_subscribed(region_id) {
// Unregister all downstreams.
let store_err = RaftStoreError::RegionNotFound(region_id);
let deregister = Deregister::Delegate {
region_id,
observe_id,
err: CdcError::request(store_err.into()),
};
if let Err(e) = self.sched.schedule(Task::Deregister(deregister)) {
error!("cdc schedule cdc task failed"; "error" => ?e);
}
}
}
_ => {}
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions components/cdc/tests/failpoints/test_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,3 +569,29 @@ fn test_cdc_stream_multiplexing() {
}
assert!(request_2_ready);
}

// This case tests pending regions can still get region split/merge
// notifications.
#[test]
fn test_cdc_notify_pending_regions() {
let cluster = new_server_cluster(0, 1);
cluster.pd_client.disable_default_operator();
let mut suite = TestSuiteBuilder::new().cluster(cluster).build();
let region = suite.cluster.get_region(&[]);
let rid = region.id;
let (mut req_tx, _, receive_event) = new_event_feed_v2(suite.get_region_cdc_client(rid));

fail::cfg("cdc_before_initialize", "pause").unwrap();
let mut req = suite.new_changedata_request(rid);
req.request_id = 1;
block_on(req_tx.send((req, WriteFlags::default()))).unwrap();

thread::sleep(Duration::from_millis(100));
suite.cluster.must_split(&region, b"x");
let event = receive_event(false);
matches!(
event.get_events()[0].event,
Some(Event_oneof_event::Error(ref e)) if e.has_region_not_found(),
);
fail::remove("cdc_before_initialize");
}
6 changes: 5 additions & 1 deletion components/cdc/tests/failpoints/test_register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ fn test_connections_register_impl<F: KvFormat>() {
let mut events = receive_event(false).events.to_vec();
match events.pop().unwrap().event.unwrap() {
Event_oneof_event::Error(err) => {
assert!(err.has_epoch_not_match(), "{:?}", err);
assert!(
err.has_epoch_not_match() || err.has_region_not_found(),
"{:?}",
err
);
}
other => panic!("unknown event {:?}", other),
}
Expand Down

0 comments on commit 9bece34

Please sign in to comment.