From b80f5e6fd9b6f0d71aa0e1bc71f8ab516764103d Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 7 Feb 2023 15:19:35 +0800 Subject: [PATCH 1/8] added intervally resolve regions Signed-off-by: hillium --- .../backup-stream/src/checkpoint_manager.rs | 234 +++++++++++------- components/backup-stream/src/endpoint.rs | 88 ++++++- components/backup-stream/src/metrics.rs | 6 + .../backup-stream/src/subscription_manager.rs | 22 +- components/backup-stream/tests/mod.rs | 61 ++++- 5 files changed, 302 insertions(+), 109 deletions(-) diff --git a/components/backup-stream/src/checkpoint_manager.rs b/components/backup-stream/src/checkpoint_manager.rs index 5cf4292faa3..e9cce2430e3 100644 --- a/components/backup-stream/src/checkpoint_manager.rs +++ b/components/backup-stream/src/checkpoint_manager.rs @@ -1,10 +1,11 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{cell::RefCell, collections::HashMap, sync::Arc, time::Duration}; use futures::{ channel::mpsc::{self as async_mpsc, Receiver, Sender}, - SinkExt, StreamExt, + future::BoxFuture, + FutureExt, SinkExt, StreamExt, }; use grpcio::{RpcStatus, RpcStatusCode, ServerStreamingSink, WriteFlags}; use kvproto::{ @@ -13,7 +14,7 @@ use kvproto::{ metapb::Region, }; use pd_client::PdClient; -use tikv_util::{box_err, defer, info, warn, worker::Scheduler}; +use tikv_util::{box_err, defer, info, time::Instant, warn, worker::Scheduler}; use txn_types::TimeStamp; use uuid::Uuid; @@ -22,7 +23,9 @@ use crate::{ errors::{Error, ReportableResult, Result}, future, metadata::{store::MetaStore, Checkpoint, CheckpointProvider, MetadataClient}, - metrics, try_send, RegionCheckpointOperation, Task, + metrics, + subscription_track::ResolveResult, + try_send, RegionCheckpointOperation, Task, }; /// A manager for maintaining the last flush ts. @@ -31,14 +34,15 @@ use crate::{ /// checkpoint then advancing the global checkpoint. #[derive(Default)] pub struct CheckpointManager { - items: HashMap, + checkpoint_ts: HashMap, + resolved_ts: HashMap, manager_handle: Option>, } impl std::fmt::Debug for CheckpointManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("CheckpointManager") - .field("items", &self.items) + .field("items", &self.checkpoint_ts) .finish() } } @@ -60,49 +64,59 @@ impl SubscriptionManager { while let Some(msg) = self.input.next().await { match msg { SubscriptionOp::Add(sub) => { - self.subscribers.insert(Uuid::new_v4(), sub); + let uid = Uuid::new_v4(); + info!("log backup adding new subscriber"; "id" => %uid); + self.subscribers.insert(uid, sub); } SubscriptionOp::Emit(events) => { - let mut canceled = vec![]; - for (id, sub) in &mut self.subscribers { - let send_all = async { - for es in events.chunks(1024) { - let mut resp = SubscribeFlushEventResponse::new(); - resp.set_events(es.to_vec().into()); - sub.feed((resp, WriteFlags::default())).await?; - } - sub.flush().await - }; - - match send_all.await { - Err(grpcio::Error::RemoteStopped) => { - canceled.push(*id); - } - Err(err) => { - Error::from(err).report("sending subscription"); - } - _ => {} - } - } - - for c in canceled { - match self.subscribers.remove(&c) { - Some(mut sub) => { - info!("client is gone, removing subscription"; "id" => %c); - sub.close().await.report_if_err(format_args!( - "during removing subscription {}", - c - )) - } - None => { - warn!("BUG: the subscriber has been removed before we are going to remove it."; "id" => %c); - } - } - } + self.emit_events(events).await; } } } } + + async fn emit_events(&mut self, events: Box<[FlushEvent]>) { + let mut canceled = vec![]; + info!("log backup sending events"; "event_len" => %events.len(), "downstream" => %self.subscribers.len()); + for (id, sub) in &mut self.subscribers { + let send_all = async { + for es in events.chunks(1024) { + let mut resp = SubscribeFlushEventResponse::new(); + resp.set_events(es.to_vec().into()); + sub.feed((resp, WriteFlags::default())).await?; + } + sub.flush().await + }; + + match send_all.await { + Err(grpcio::Error::RemoteStopped) => { + canceled.push(*id); + } + Err(err) => { + Error::from(err).report("sending subscription"); + } + _ => {} + } + } + + for c in canceled { + self.remove_subscription(&c).await; + } + } + + async fn remove_subscription(&mut self, id: &Uuid) { + match self.subscribers.remove(id) { + Some(mut sub) => { + info!("client is gone, removing subscription"; "id" => %id); + sub.close() + .await + .report_if_err(format_args!("during removing subscription {}", id)) + } + None => { + warn!("BUG: the subscriber has been removed before we are going to remove it."; "id" => %id); + } + } + } } // Note: can we make it more generic...? @@ -154,11 +168,6 @@ impl GetCheckpointResult { } impl CheckpointManager { - /// clear the manager. - pub fn clear(&mut self) { - self.items.clear(); - } - pub fn spawn_subscription_mgr(&mut self) -> future![()] { let (tx, rx) = async_mpsc::channel(1024); let sub = SubscriptionManager { @@ -169,25 +178,67 @@ impl CheckpointManager { sub.main_loop() } - pub fn update_region_checkpoints(&mut self, region_and_checkpoint: Vec<(Region, TimeStamp)>) { - for (region, checkpoint) in ®ion_and_checkpoint { - self.do_update(region, *checkpoint); + pub fn resolve_regions(&mut self, region_and_checkpoint: Vec) { + for res in region_and_checkpoint { + self.do_update(res.region, res.checkpoint); } + } - self.notify(region_and_checkpoint.into_iter()); + pub fn flush(&mut self) { + info!("log backup checkpoint manager flushing."; "resolved_ts_len" => %self.resolved_ts.len(), "resolved_ts" => ?self.get_resolved_ts()); + self.checkpoint_ts = std::mem::take(&mut self.resolved_ts); + // Clippy doesn't know this iterator borrows `self.checkpoint_ts` :( + #[allow(clippy::needless_collect)] + let items = self + .checkpoint_ts + .values() + .cloned() + .map(|x| (x.region, x.checkpoint)) + .collect::>(); + self.notify(items.into_iter()); } /// update a region checkpoint in need. #[cfg(test)] pub fn update_region_checkpoint(&mut self, region: &Region, checkpoint: TimeStamp) { - self.do_update(region, checkpoint); - self.notify(std::iter::once((region.clone(), checkpoint))); + Self::update_ts(&mut self.checkpoint_ts, region.clone(), checkpoint) + } + + fn update_ts( + container: &mut HashMap, + region: Region, + checkpoint: TimeStamp, + ) { + let e = container.entry(region.get_id()); + let ver = region.get_region_epoch().get_version(); + // A hacky way to allow the two closures move out the region. + // It is safe given the two closures would only be called once. + let r = RefCell::new(Some(region)); + e.and_modify(|old_cp| { + let old_ver = old_cp.region.get_region_epoch().get_version(); + let checkpoint_is_newer = old_cp.checkpoint < checkpoint; + if old_ver < ver || (old_ver == ver && checkpoint_is_newer) { + *old_cp = LastFlushTsOfRegion { + checkpoint, + region: r.borrow_mut().take().expect( + "unreachable: `and_modify` and `or_insert_with` called at the same time.", + ), + }; + } + }) + .or_insert_with(|| LastFlushTsOfRegion { + checkpoint, + region: r + .borrow_mut() + .take() + .expect("unreachable: `and_modify` and `or_insert_with` called at the same time."), + }); } - pub fn add_subscriber(&mut self, sub: Subscription) -> future![Result<()>] { + pub fn add_subscriber(&mut self, sub: Subscription) -> BoxFuture<'static, Result<()>> { let mgr = self.manager_handle.as_ref().cloned(); let initial_data = self - .items + .checkpoint_ts .values() .map(|v| FlushEvent { start_key: v.region.start_key.clone(), @@ -225,6 +276,7 @@ impl CheckpointManager { })?; Ok(()) } + .boxed() } fn notify(&mut self, items: impl Iterator) { @@ -248,28 +300,13 @@ impl CheckpointManager { } } - fn do_update(&mut self, region: &Region, checkpoint: TimeStamp) { - let e = self.items.entry(region.get_id()); - e.and_modify(|old_cp| { - if old_cp.checkpoint < checkpoint - && old_cp.region.get_region_epoch().get_version() - <= region.get_region_epoch().get_version() - { - *old_cp = LastFlushTsOfRegion { - checkpoint, - region: region.clone(), - }; - } - }) - .or_insert_with(|| LastFlushTsOfRegion { - checkpoint, - region: region.clone(), - }); + fn do_update(&mut self, region: Region, checkpoint: TimeStamp) { + Self::update_ts(&mut self.resolved_ts, region, checkpoint) } /// get checkpoint from a region. pub fn get_from_region(&self, region: RegionIdWithVersion) -> GetCheckpointResult { - let checkpoint = self.items.get(®ion.region_id); + let checkpoint = self.checkpoint_ts.get(®ion.region_id); if checkpoint.is_none() { return GetCheckpointResult::not_found(region); } @@ -282,7 +319,11 @@ impl CheckpointManager { /// get all checkpoints stored. pub fn get_all(&self) -> Vec { - self.items.values().cloned().collect() + self.checkpoint_ts.values().cloned().collect() + } + + pub fn get_resolved_ts(&self) -> Option { + self.resolved_ts.values().map(|x| x.checkpoint).min() } } @@ -333,7 +374,7 @@ pub struct LastFlushTsOfRegion { #[async_trait::async_trait] pub trait FlushObserver: Send + 'static { /// The callback when the flush has advanced the resolver. - async fn before(&mut self, checkpoints: Vec<(Region, TimeStamp)>); + async fn before(&mut self, checkpoints: Vec); /// The callback when the flush is done. (Files are fully written to /// external storage.) async fn after(&mut self, task: &str, rts: u64) -> Result<()>; @@ -363,7 +404,7 @@ impl BasicFlushObserver { #[async_trait::async_trait] impl FlushObserver for BasicFlushObserver { - async fn before(&mut self, _checkpoints: Vec<(Region, TimeStamp)>) {} + async fn before(&mut self, _checkpoints: Vec) {} async fn after(&mut self, task: &str, rts: u64) -> Result<()> { if let Err(err) = self @@ -401,8 +442,9 @@ pub struct CheckpointV3FlushObserver { sched: Scheduler, meta_cli: MetadataClient, - checkpoints: Vec<(Region, TimeStamp)>, + checkpoints: Vec, global_checkpoint_cache: HashMap, + start_time: Instant, } impl CheckpointV3FlushObserver { @@ -414,6 +456,7 @@ impl CheckpointV3FlushObserver { // We almost always have only one entry. global_checkpoint_cache: HashMap::with_capacity(1), baseline, + start_time: Instant::now(), } } } @@ -443,15 +486,19 @@ where S: MetaStore + 'static, O: FlushObserver + Send, { - async fn before(&mut self, checkpoints: Vec<(Region, TimeStamp)>) { + async fn before(&mut self, checkpoints: Vec) { self.checkpoints = checkpoints; } async fn after(&mut self, task: &str, _rts: u64) -> Result<()> { - let t = Task::RegionCheckpointsOp(RegionCheckpointOperation::Update(std::mem::take( - &mut self.checkpoints, - ))); - try_send!(self.sched, t); + let resolve_task = Task::RegionCheckpointsOp(RegionCheckpointOperation::Resolved { + checkpoints: std::mem::take(&mut self.checkpoints), + start_time: self.start_time, + }); + let flush_task = Task::RegionCheckpointsOp(RegionCheckpointOperation::Flush); + try_send!(self.sched, resolve_task); + try_send!(self.sched, flush_task); + let global_checkpoint = self.get_checkpoint(task).await?; info!("getting global checkpoint from cache for updating."; "checkpoint" => ?global_checkpoint); self.baseline @@ -499,6 +546,26 @@ pub mod tests { r } + #[test] + fn test_flush() { + let mut mgr = super::CheckpointManager::default(); + mgr.do_update(region(1, 32, 8), TimeStamp::new(8)); + mgr.do_update(region(2, 34, 8), TimeStamp::new(15)); + mgr.do_update(region(2, 35, 8), TimeStamp::new(16)); + mgr.do_update(region(2, 35, 8), TimeStamp::new(14)); + let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32)); + assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. }); + + mgr.flush(); + let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32)); + assert_matches::assert_matches!(r, GetCheckpointResult::Ok { checkpoint , .. } if checkpoint.into_inner() == 8); + let r = mgr.get_from_region(RegionIdWithVersion::new(2, 35)); + assert_matches::assert_matches!(r, GetCheckpointResult::Ok { checkpoint , .. } if checkpoint.into_inner() == 16); + mgr.flush(); + let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32)); + assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. }); + } + #[test] fn test_mgr() { let mut mgr = super::CheckpointManager::default(); @@ -510,6 +577,7 @@ pub mod tests { assert_matches::assert_matches!(r, GetCheckpointResult::EpochNotMatch { .. }); let r = mgr.get_from_region(RegionIdWithVersion::new(3, 44)); assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. }); + mgr.update_region_checkpoint(®ion(1, 30, 8), TimeStamp::new(16)); let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32)); assert_matches::assert_matches!(r, GetCheckpointResult::Ok{checkpoint, ..} if checkpoint.into_inner() == 8); diff --git a/components/backup-stream/src/endpoint.rs b/components/backup-stream/src/endpoint.rs index ff380551b90..37488c1ff88 100644 --- a/components/backup-stream/src/endpoint.rs +++ b/components/backup-stream/src/endpoint.rs @@ -56,7 +56,7 @@ use crate::{ observer::BackupStreamObserver, router::{ApplyEvents, Router, TaskSelector}, subscription_manager::{RegionSubscriptionManager, ResolvedRegions}, - subscription_track::SubscriptionTracer, + subscription_track::{ResolveResult, SubscriptionTracer}, try_send, utils::{self, CallbackWaitGroup, StopWatch, Work}, }; @@ -184,7 +184,7 @@ where pool.spawn(op_loop); let mut checkpoint_mgr = CheckpointManager::default(); pool.spawn(checkpoint_mgr.spawn_subscription_mgr()); - Endpoint { + let ep = Endpoint { meta_client, range_router, scheduler, @@ -203,7 +203,9 @@ where failover_time: None, config, checkpoint_mgr, - } + }; + ep.pool.spawn(ep.min_ts_worker()); + ep } } @@ -759,7 +761,7 @@ where let mut resolved = get_rts.await?; let mut new_rts = resolved.global_checkpoint(); fail::fail_point!("delay_on_flush"); - flush_ob.before(resolved.take_region_checkpoints()).await; + flush_ob.before(resolved.take_resolve_result()).await; if let Some(rewritten_rts) = flush_ob.rewrite_resolved_ts(&task).await { info!("rewriting resolved ts"; "old" => %new_rts, "new" => %rewritten_rts); new_rts = rewritten_rts.min(new_rts); @@ -915,13 +917,30 @@ where } } + fn min_ts_worker(&self) -> future![()] { + let sched = self.scheduler.clone(); + async move { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + try_send!( + sched, + Task::RegionCheckpointsOp(RegionCheckpointOperation::PrepareMinTsForResolve) + ); + } + } + } + pub fn handle_region_checkpoints_op(&mut self, op: RegionCheckpointOperation) { match op { - RegionCheckpointOperation::Update(u) => { - // Let's clear all stale checkpoints first. - // Or they may slow down the global checkpoint. - self.checkpoint_mgr.clear(); - self.checkpoint_mgr.update_region_checkpoints(u); + RegionCheckpointOperation::Resolved { + checkpoints, + start_time, + } => { + self.checkpoint_mgr.resolve_regions(checkpoints); + metrics::MIN_TS_RESOLVE_DURATION.observe(start_time.saturating_elapsed_secs()); + } + RegionCheckpointOperation::Flush => { + self.checkpoint_mgr.flush(); } RegionCheckpointOperation::Get(g, cb) => { let _guard = self.pool.handle().enter(); @@ -949,6 +968,34 @@ where } }); } + RegionCheckpointOperation::PrepareMinTsForResolve => { + let min_ts = self.pool.block_on(self.prepare_min_ts()); + let start_time = Instant::now(); + try_send!( + self.scheduler, + Task::RegionCheckpointsOp(RegionCheckpointOperation::Resolve { + min_ts, + start_time + }) + ); + } + RegionCheckpointOperation::Resolve { min_ts, start_time } => { + let sched = self.scheduler.clone(); + try_send!( + self.scheduler, + Task::ModifyObserve(ObserveOp::ResolveRegions { + callback: Box::new(move |mut resolved| { + let t = + Task::RegionCheckpointsOp(RegionCheckpointOperation::Resolved { + checkpoints: resolved.take_resolve_result(), + start_time, + }); + try_send!(sched, t); + }), + min_ts + }) + ); + } } } @@ -993,7 +1040,16 @@ pub enum RegionSet { } pub enum RegionCheckpointOperation { - Update(Vec<(Region, TimeStamp)>), + Flush, + PrepareMinTsForResolve, + Resolve { + min_ts: TimeStamp, + start_time: Instant, + }, + Resolved { + checkpoints: Vec, + start_time: Instant, + }, Get(RegionSet, Box) + Send>), Subscribe(Subscription), } @@ -1001,9 +1057,17 @@ pub enum RegionCheckpointOperation { impl fmt::Debug for RegionCheckpointOperation { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Update(arg0) => f.debug_tuple("Update").field(arg0).finish(), + Self::Flush => f.debug_tuple("Flush").finish(), Self::Get(arg0, _) => f.debug_tuple("Get").field(arg0).finish(), + Self::Subscribe(_) => f.debug_tuple("Subscription").finish(), + Self::Resolved { checkpoints, .. } => { + f.debug_tuple("Resolve").field(checkpoints).finish() + } + Self::PrepareMinTsForResolve => f.debug_tuple("PrepareMinTsForResolve").finish(), + Self::Resolve { min_ts, .. } => { + f.debug_struct("Resolve").field("min_ts", min_ts).finish() + } } } } @@ -1181,7 +1245,7 @@ impl Task { ObserveOp::NotifyFailToStartObserve { .. } => "modify_observe.retry", ObserveOp::ResolveRegions { .. } => "modify_observe.resolve", }, - Task::ForceFlush(_) => "force_flush", + Task::ForceFlush(..) => "force_flush", Task::FatalError(..) => "fatal_error", Task::Sync(..) => "sync", Task::MarkFailover(_) => "mark_failover", diff --git a/components/backup-stream/src/metrics.rs b/components/backup-stream/src/metrics.rs index 0805dae5f77..d7f836833b0 100644 --- a/components/backup-stream/src/metrics.rs +++ b/components/backup-stream/src/metrics.rs @@ -160,4 +160,10 @@ lazy_static! { "The regions that lost leadership during resolving" ) .unwrap(); + pub static ref MIN_TS_RESOLVE_DURATION: Histogram = register_histogram!( + "tikv_log_backup_resolve_duration_sec", + "The duration of resolving.", + exponential_buckets(0.001, 2.0, 16).unwrap() + ) + .unwrap(); } diff --git a/components/backup-stream/src/subscription_manager.rs b/components/backup-stream/src/subscription_manager.rs index a31a43980b5..4f75423a241 100644 --- a/components/backup-stream/src/subscription_manager.rs +++ b/components/backup-stream/src/subscription_manager.rs @@ -38,7 +38,7 @@ use crate::{ metrics, observer::BackupStreamObserver, router::{Router, TaskSelector}, - subscription_track::SubscriptionTracer, + subscription_track::{ResolveResult, SubscriptionTracer}, try_send, utils::{self, CallbackWaitGroup, Work}, Task, @@ -58,7 +58,7 @@ struct ScanCmd { /// The response of requesting resolve the new checkpoint of regions. pub struct ResolvedRegions { - items: Vec<(Region, TimeStamp)>, + items: Vec, checkpoint: TimeStamp, } @@ -67,7 +67,7 @@ impl ResolvedRegions { /// Note: Maybe we can compute the global checkpoint internal and getting /// the interface clear. However we must take the `min_ts` or we cannot /// provide valid global checkpoint if there isn't any region checkpoint. - pub fn new(checkpoint: TimeStamp, checkpoints: Vec<(Region, TimeStamp)>) -> Self { + pub fn new(checkpoint: TimeStamp, checkpoints: Vec) -> Self { Self { items: checkpoints, checkpoint, @@ -75,7 +75,16 @@ impl ResolvedRegions { } /// take the region checkpoints from the structure. + #[deprecated = "please use `take_resolve_result` instead."] pub fn take_region_checkpoints(&mut self) -> Vec<(Region, TimeStamp)> { + std::mem::take(&mut self.items) + .into_iter() + .map(|x| (x.region, x.checkpoint)) + .collect() + } + + /// take the resolve result from this struct. + pub fn take_resolve_result(&mut self) -> Vec { std::mem::take(&mut self.items) } @@ -455,7 +464,7 @@ where } ObserveOp::ResolveRegions { callback, min_ts } => { let now = Instant::now(); - let timedout = self.wait(Duration::from_secs(30)).await; + let timedout = self.wait(Duration::from_secs(5)).await; if timedout { warn!("waiting for initial scanning done timed out, forcing progress!"; "take" => ?now.saturating_elapsed(), "timedout" => %timedout); @@ -470,10 +479,7 @@ where let rts = min_region.map(|rs| rs.checkpoint).unwrap_or(min_ts); info!("getting checkpoint"; "defined_by_region" => ?min_region); self.subs.warn_if_gap_too_huge(rts); - callback(ResolvedRegions::new( - rts, - cps.into_iter().map(|r| (r.region, r.checkpoint)).collect(), - )); + callback(ResolvedRegions::new(rts, cps)); } } } diff --git a/components/backup-stream/tests/mod.rs b/components/backup-stream/tests/mod.rs index b7afcd1441f..0d4e6f144b6 100644 --- a/components/backup-stream/tests/mod.rs +++ b/components/backup-stream/tests/mod.rs @@ -21,7 +21,7 @@ use backup_stream::{ router::Router, Endpoint, GetCheckpointResult, RegionCheckpointOperation, RegionSet, Service, Task, }; -use futures::{executor::block_on, AsyncWriteExt, Future, Stream, StreamExt, TryStreamExt}; +use futures::{executor::block_on, AsyncWriteExt, Future, Stream, StreamExt}; use grpcio::{ChannelBuilder, Server, ServerBuilder}; use kvproto::{ brpb::{CompressionType, Local, Metadata, StorageBackend}, @@ -275,7 +275,10 @@ impl Suite { /// create a subscription stream. this has simply asserted no error, because /// in theory observing flushing should not emit error. change that if /// needed. - fn flush_stream(&self) -> impl Stream { + fn flush_stream( + &self, + panic_while_fail: bool, + ) -> impl Stream { let streams = self .log_backup_cli .iter() @@ -288,8 +291,18 @@ impl Suite { }) .unwrap_or_else(|err| panic!("failed to subscribe on {} because {}", id, err)); let id = *id; - stream.map_ok(move |x| (id, x)).map(move |x| { - x.unwrap_or_else(move |err| panic!("failed to rec from {} because {}", id, err)) + stream.filter_map(move |x| { + futures::future::ready(match x { + Ok(x) => Some((id, x)), + Err(err) => { + if panic_while_fail { + panic!("failed to rec from {} because {}", id, err) + } else { + println!("[WARN] failed to rec from {} because {}", id, err); + None + } + } + }) }) }) .collect::>(); @@ -463,7 +476,8 @@ impl Suite { } fn force_flush_files(&self, task: &str) { - self.run(|| Task::ForceFlush(task.to_owned())); + // TODO: use the callback to make the test more stable. + self.run(|| Task::ForceFlush(task.to_owned(), Box::new(|| {}))); self.sync(); } @@ -1264,7 +1278,7 @@ mod test { #[test] fn subscribe_flushing() { let mut suite = super::SuiteBuilder::new_named("sub_flush").build(); - let stream = suite.flush_stream(); + let stream = suite.flush_stream(true); for i in 1..10 { let split_key = make_split_key_at_record(1, i * 20); suite.must_split(&split_key); @@ -1306,6 +1320,41 @@ mod test { )); } + #[test] + fn resolved_follower() { + let mut suite = super::SuiteBuilder::new_named("r").build(); + let round1 = run_async_test(suite.write_records(0, 128, 1)); + suite.must_register_task(1, "r"); + suite.run(|| Task::RegionCheckpointsOp(RegionCheckpointOperation::PrepareMinTsForResolve)); + suite.sync(); + std::thread::sleep(Duration::from_secs(1)); + + let leader = suite.cluster.leader_of_region(1).unwrap(); + suite.must_shuffle_leader(1); + let round2 = run_async_test(suite.write_records(256, 128, 1)); + suite + .endpoints + .get(&leader.store_id) + .unwrap() + .scheduler() + .schedule(Task::ForceFlush("r".to_owned(), Box::new(|| {}))) + .unwrap(); + suite.sync(); + std::thread::sleep(Duration::from_secs(1)); + run_async_test(suite.check_for_write_records( + suite.flushed_files.path(), + round1.iter().map(|x| x.as_slice()), + )); + assert!(suite.global_checkpoint() > 256); + suite.force_flush_files("r"); + suite.wait_for_flush(); + assert!(suite.global_checkpoint() > 512); + run_async_test(suite.check_for_write_records( + suite.flushed_files.path(), + round1.union(&round2).map(|x| x.as_slice()), + )); + } + #[test] fn network_partition() { let mut suite = super::SuiteBuilder::new_named("network_partition") From 23044e49707a2dc873aa2813a63f8ff0a793d5fe Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 15 Feb 2023 11:22:25 +0800 Subject: [PATCH 2/8] make clippy happy Signed-off-by: hillium --- components/backup-stream/tests/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/components/backup-stream/tests/mod.rs b/components/backup-stream/tests/mod.rs index 0d4e6f144b6..d6dfb2b2839 100644 --- a/components/backup-stream/tests/mod.rs +++ b/components/backup-stream/tests/mod.rs @@ -477,7 +477,7 @@ impl Suite { fn force_flush_files(&self, task: &str) { // TODO: use the callback to make the test more stable. - self.run(|| Task::ForceFlush(task.to_owned(), Box::new(|| {}))); + self.run(|| Task::ForceFlush(task.to_owned())); self.sync(); } @@ -1337,7 +1337,7 @@ mod test { .get(&leader.store_id) .unwrap() .scheduler() - .schedule(Task::ForceFlush("r".to_owned(), Box::new(|| {}))) + .schedule(Task::ForceFlush("r".to_owned())) .unwrap(); suite.sync(); std::thread::sleep(Duration::from_secs(1)); @@ -1360,7 +1360,7 @@ mod test { let mut suite = super::SuiteBuilder::new_named("network_partition") .nodes(3) .build(); - let stream = suite.flush_stream(); + let stream = suite.flush_stream(true); suite.must_register_task(1, "network_partition"); let leader = suite.cluster.leader_of_region(1).unwrap(); let round1 = run_async_test(suite.write_records(0, 64, 1)); From 8e33567d8a453068bb49398dfc1786d5fdd7918e Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 15 Feb 2023 11:22:25 +0800 Subject: [PATCH 3/8] make clippy happy Signed-off-by: hillium --- components/backup-stream/tests/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/components/backup-stream/tests/mod.rs b/components/backup-stream/tests/mod.rs index 0d4e6f144b6..d6dfb2b2839 100644 --- a/components/backup-stream/tests/mod.rs +++ b/components/backup-stream/tests/mod.rs @@ -477,7 +477,7 @@ impl Suite { fn force_flush_files(&self, task: &str) { // TODO: use the callback to make the test more stable. - self.run(|| Task::ForceFlush(task.to_owned(), Box::new(|| {}))); + self.run(|| Task::ForceFlush(task.to_owned())); self.sync(); } @@ -1337,7 +1337,7 @@ mod test { .get(&leader.store_id) .unwrap() .scheduler() - .schedule(Task::ForceFlush("r".to_owned(), Box::new(|| {}))) + .schedule(Task::ForceFlush("r".to_owned())) .unwrap(); suite.sync(); std::thread::sleep(Duration::from_secs(1)); @@ -1360,7 +1360,7 @@ mod test { let mut suite = super::SuiteBuilder::new_named("network_partition") .nodes(3) .build(); - let stream = suite.flush_stream(); + let stream = suite.flush_stream(true); suite.must_register_task(1, "network_partition"); let leader = suite.cluster.leader_of_region(1).unwrap(); let round1 = run_async_test(suite.write_records(0, 64, 1)); From c603e86fb3b14edfbcc8d7bbf00a0e77da6b7b10 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 28 Feb 2023 13:50:38 +0800 Subject: [PATCH 4/8] added some comments Signed-off-by: hillium --- components/backup-stream/src/endpoint.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/components/backup-stream/src/endpoint.rs b/components/backup-stream/src/endpoint.rs index 37488c1ff88..669150224a6 100644 --- a/components/backup-stream/src/endpoint.rs +++ b/components/backup-stream/src/endpoint.rs @@ -971,6 +971,9 @@ where RegionCheckpointOperation::PrepareMinTsForResolve => { let min_ts = self.pool.block_on(self.prepare_min_ts()); let start_time = Instant::now(); + // We need to reschedule the `Resolve` task to queue, because the subscription + // is asynchronous -- there may be transactions committed before + // the min_ts we prepared but haven't been observed yet. try_send!( self.scheduler, Task::RegionCheckpointsOp(RegionCheckpointOperation::Resolve { From 6e4f144085f156c605b107e683c65575c88dad71 Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 9 Mar 2023 15:35:23 +0800 Subject: [PATCH 5/8] make min_ts interval configurable Signed-off-by: hillium --- components/backup-stream/src/endpoint.rs | 4 ++-- src/config/mod.rs | 10 ++++++++++ tests/integrations/config/mod.rs | 1 + 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/components/backup-stream/src/endpoint.rs b/components/backup-stream/src/endpoint.rs index 66b52ac3745..91c0220a6c8 100644 --- a/components/backup-stream/src/endpoint.rs +++ b/components/backup-stream/src/endpoint.rs @@ -93,7 +93,6 @@ pub struct Endpoint { failover_time: Option, // We holds the config before, even it is useless for now, // however probably it would be useful in the future. - #[allow(dead_code)] config: BackupStreamConfig, checkpoint_mgr: CheckpointManager, } @@ -923,9 +922,10 @@ where fn min_ts_worker(&self) -> future![()] { let sched = self.scheduler.clone(); + let interval = self.config.min_ts_interval.0; async move { loop { - tokio::time::sleep(Duration::from_secs(10)).await; + tokio::time::sleep(interval).await; try_send!( sched, Task::RegionCheckpointsOp(RegionCheckpointOperation::PrepareMinTsForResolve) diff --git a/src/config/mod.rs b/src/config/mod.rs index 4be54665443..80d333bdcae 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -2568,6 +2568,8 @@ impl Default for BackupConfig { #[serde(default)] #[serde(rename_all = "kebab-case")] pub struct BackupStreamConfig { + #[online_config(skip)] + pub min_ts_interval: ReadableDuration, #[online_config(skip)] pub max_flush_interval: ReadableDuration, #[online_config(skip)] @@ -2595,6 +2597,13 @@ impl BackupStreamConfig { ); self.num_threads = default_cfg.num_threads; } + if self.max_flush_interval < ReadableDuration::secs(1) { + return Err(format!( + "the max_flush_interval is too small, it is {}, and should be greater than 1s.", + self.max_flush_interval + ) + .into()); + } Ok(()) } } @@ -2605,6 +2614,7 @@ impl Default for BackupStreamConfig { let total_mem = SysQuota::memory_limit_in_bytes(); let quota_size = (total_mem as f64 * 0.1).min(ReadableSize::mb(512).0 as _); Self { + min_ts_interval: ReadableDuration::secs(10), max_flush_interval: ReadableDuration::minutes(3), // use at most 50% of vCPU by default num_threads: (cpu_num * 0.5).clamp(2.0, 12.0) as usize, diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index 351e9d74ca0..b368c32066b 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -780,6 +780,7 @@ fn test_serde_custom_tikv_config() { file_size_limit: ReadableSize::gb(5), initial_scan_pending_memory_quota: ReadableSize::kb(2), initial_scan_rate_limit: ReadableSize::mb(3), + min_ts_interval: ReadableDuration::secs(2), }; value.import = ImportConfig { num_threads: 123, From 6144c3ad0cea9c79ce15dd4507f9e3c97a33be5e Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 9 Mar 2023 16:07:40 +0800 Subject: [PATCH 6/8] address comments Signed-off-by: hillium --- components/backup-stream/src/checkpoint_manager.rs | 3 ++- src/config/mod.rs | 11 +++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/components/backup-stream/src/checkpoint_manager.rs b/components/backup-stream/src/checkpoint_manager.rs index 92210bda0ea..50a6ac27864 100644 --- a/components/backup-stream/src/checkpoint_manager.rs +++ b/components/backup-stream/src/checkpoint_manager.rs @@ -42,7 +42,8 @@ pub struct CheckpointManager { impl std::fmt::Debug for CheckpointManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("CheckpointManager") - .field("items", &self.checkpoint_ts) + .field("checkpoints", &self.checkpoint_ts) + .field("resolved-ts", &self.resolved_ts) .finish() } } diff --git a/src/config/mod.rs b/src/config/mod.rs index 80d333bdcae..f37d8d48c62 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -2597,13 +2597,20 @@ impl BackupStreamConfig { ); self.num_threads = default_cfg.num_threads; } - if self.max_flush_interval < ReadableDuration::secs(1) { + if self.max_flush_interval < ReadableDuration::secs(10) { return Err(format!( - "the max_flush_interval is too small, it is {}, and should be greater than 1s.", + "the max_flush_interval is too small, it is {}, and should be greater than 10s.", self.max_flush_interval ) .into()); } + if self.min_ts_interval < ReadableDuration::secs(1) { + return Err(format!( + "the min_ts_interval is too small, it is {}, and should be greater than 1s.", + self.min_ts_interval + ) + .into()); + } Ok(()) } } From 060eabbc37d8fd060407479235740abfda4e2664 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Thu, 9 Mar 2023 17:35:36 +0800 Subject: [PATCH 7/8] Update components/backup-stream/src/endpoint.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 3pointer Signed-off-by: 山岚 <36239017+YuJuncen@users.noreply.github.com> --- components/backup-stream/src/endpoint.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/backup-stream/src/endpoint.rs b/components/backup-stream/src/endpoint.rs index 91c0220a6c8..49ca811285b 100644 --- a/components/backup-stream/src/endpoint.rs +++ b/components/backup-stream/src/endpoint.rs @@ -1069,7 +1069,7 @@ impl fmt::Debug for RegionCheckpointOperation { Self::Subscribe(_) => f.debug_tuple("Subscription").finish(), Self::Resolved { checkpoints, .. } => { - f.debug_tuple("Resolve").field(checkpoints).finish() + f.debug_tuple("Resolved").field(checkpoints).finish() } Self::PrepareMinTsForResolve => f.debug_tuple("PrepareMinTsForResolve").finish(), Self::Resolve { min_ts, .. } => { From 96a0b080bb6b3c798682b548d19a094d046a2d1a Mon Sep 17 00:00:00 2001 From: hillium Date: Mon, 13 Mar 2023 15:38:54 +0800 Subject: [PATCH 8/8] fix test Signed-off-by: hillium --- tests/integrations/config/test-custom.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integrations/config/test-custom.toml b/tests/integrations/config/test-custom.toml index ecab04350b6..7f5dbfa1db7 100644 --- a/tests/integrations/config/test-custom.toml +++ b/tests/integrations/config/test-custom.toml @@ -632,6 +632,7 @@ s3-multi-part-size = "15MB" sst-max-size = "789MB" [log-backup] +min-ts-interval = "2s" max-flush-interval = "11s" num-threads = 7 enable = true