Skip to content

Commit

Permalink
Merge branch 'release-7.5' into cherry-pick-16605-to-release-7.5
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] committed May 28, 2024
2 parents 9737e2a + 8fbb6d9 commit 4818976
Show file tree
Hide file tree
Showing 17 changed files with 546 additions and 356 deletions.
1 change: 1 addition & 0 deletions cmd/tikv-ctl/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ pub trait DebugExecutor {
json!({
"region": json!({
"id": r.get_id(),
"state": format!("{:?}", s.get_state()),
"start_key": hex::encode_upper(r.get_start_key()),
"end_key": hex::encode_upper(r.get_end_key()),
"region_epoch": json!({
Expand Down
125 changes: 109 additions & 16 deletions components/backup-stream/src/checkpoint_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use kvproto::{
metapb::Region,
};
use pd_client::PdClient;
use tikv_util::{box_err, defer, info, time::Instant, warn, worker::Scheduler};
use tikv_util::{box_err, defer, info, warn, worker::Scheduler};
use txn_types::TimeStamp;
use uuid::Uuid;

Expand All @@ -35,6 +35,7 @@ use crate::{
#[derive(Default)]
pub struct CheckpointManager {
checkpoint_ts: HashMap<u64, LastFlushTsOfRegion>,
frozen_resolved_ts: HashMap<u64, LastFlushTsOfRegion>,
resolved_ts: HashMap<u64, LastFlushTsOfRegion>,
manager_handle: Option<Sender<SubscriptionOp>>,
}
Expand Down Expand Up @@ -185,15 +186,29 @@ impl CheckpointManager {
sub.main_loop()
}

/// update the "dynamic" part, which is `resolved_ts`.
/// We call it "dynamic" because the data corresponding to the incoming data
/// part (in contrast of the flushing data part which is about to be write
/// to external storage and cannot be appended.)
pub fn resolve_regions(&mut self, region_and_checkpoint: Vec<ResolveResult>) {
for res in region_and_checkpoint {
self.do_update(res.region, res.checkpoint);
}
}

pub fn flush(&mut self) {
info!("log backup checkpoint manager flushing."; "resolved_ts_len" => %self.resolved_ts.len(), "resolved_ts" => ?self.get_resolved_ts());
self.checkpoint_ts = std::mem::take(&mut self.resolved_ts);
/// flush the `frozen_resolved_ts` to `checkpoint_ts`, and notify the
/// subscribers, with a possible final update to the checkpoint ts.
/// You may provide some extra resolve result from the `last_dive` argument.
/// They will be applied directly to the final checkpoint ts. It is the
/// caller's duty to make sure the resolve result is safe (i.e. All events
/// are surely flushed.)
pub fn flush_and_notify(&mut self, last_dive: Vec<ResolveResult>) {
info!("Notifying the flush result."; "last_dive_len" => last_dive.len());
for rr in last_dive {
Self::update_ts(&mut self.frozen_resolved_ts, rr.region, rr.checkpoint);
}
// Replace the storage directly with the content of this run.
self.checkpoint_ts = std::mem::take(&mut self.frozen_resolved_ts);
// Clippy doesn't know this iterator borrows `self.checkpoint_ts` :(
#[allow(clippy::needless_collect)]
let items = self
Expand All @@ -205,6 +220,38 @@ impl CheckpointManager {
self.notify(items.into_iter());
}

/// "freeze" the current resolved ts to the checkpoint ts.
/// This is usually called before we are going to flush and after freezing
/// the current batch of mutations.
///
/// When a flush of the data collector triggered:
///
/// ```text
/// ----------------------|----------------->
/// ^^^
/// Flushing data-+|+- Incoming data.
/// |
/// Flush Freeze Tempfiles
/// ```
///
/// Resolving over incoming data shouldn't advance the checkpoint of the
/// flushing data. So the current progress should be "freezed" when we are
/// about to flush.
pub fn freeze(&mut self) {
info!("log backup checkpoint manager freezing.";
"resolved_ts_len" => %self.resolved_ts.len(),
"resolved_ts" => ?self.get_resolved_ts(),
"frozen" => self.checkpoint_ts.len(),
);
self.frozen_resolved_ts = std::mem::take(&mut self.resolved_ts);
}

#[cfg(test)]
fn freeze_and_flush(&mut self) {
self.freeze();
self.flush_and_notify(vec![]);
}

/// update a region checkpoint in need.
#[cfg(test)]
fn update_region_checkpoint(&mut self, region: &Region, checkpoint: TimeStamp) {
Expand All @@ -224,6 +271,15 @@ impl CheckpointManager {
e.and_modify(|old_cp| {
let old_ver = old_cp.region.get_region_epoch().get_version();
let checkpoint_is_newer = old_cp.checkpoint < checkpoint;
if !checkpoint_is_newer {
warn!("received older checkpoint, maybe region merge.";
"region_id" => old_cp.region.get_id(),
"old_ver" => old_ver,
"new_ver" => ver,
"old_checkpoint" => old_cp.checkpoint.into_inner(),
"new_checkpoint" => checkpoint.into_inner()
);
}
if old_ver < ver || (old_ver == ver && checkpoint_is_newer) {
*old_cp = LastFlushTsOfRegion {
checkpoint,
Expand Down Expand Up @@ -474,7 +530,6 @@ pub struct CheckpointV3FlushObserver<S, O> {

checkpoints: Vec<ResolveResult>,
global_checkpoint_cache: HashMap<String, Checkpoint>,
start_time: Instant,
}

impl<S, O> CheckpointV3FlushObserver<S, O> {
Expand All @@ -486,7 +541,6 @@ impl<S, O> CheckpointV3FlushObserver<S, O> {
// We almost always have only one entry.
global_checkpoint_cache: HashMap::with_capacity(1),
baseline,
start_time: Instant::now(),
}
}
}
Expand Down Expand Up @@ -521,12 +575,9 @@ where
}

async fn after(&mut self, task: &str, _rts: u64) -> Result<()> {
let resolve_task = Task::RegionCheckpointsOp(RegionCheckpointOperation::Resolved {
checkpoints: std::mem::take(&mut self.checkpoints),
start_time: self.start_time,
});
let flush_task = Task::RegionCheckpointsOp(RegionCheckpointOperation::Flush);
try_send!(self.sched, resolve_task);
let flush_task = Task::RegionCheckpointsOp(RegionCheckpointOperation::FlushWith(
std::mem::take(&mut self.checkpoints),
));
try_send!(self.sched, flush_task);

let global_checkpoint = self.get_checkpoint(task).await?;
Expand Down Expand Up @@ -685,7 +736,7 @@ pub mod tests {
.unwrap();

mgr.resolve_regions(vec![simple_resolve_result()]);
mgr.flush();
mgr.freeze_and_flush();
mgr.sync_with_subs_mgr(|_| {});
assert_eq!(trivial_sink.0.lock().unwrap().items.len(), 1);
}
Expand All @@ -703,7 +754,7 @@ pub mod tests {
rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap();

mgr.resolve_regions(vec![simple_resolve_result()]);
mgr.flush();
mgr.freeze_and_flush();
assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 0);
let sink = error_sink.0.lock().unwrap();
assert_eq!(sink.items.len(), 0);
Expand All @@ -721,12 +772,12 @@ pub mod tests {
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. });

mgr.flush();
mgr.freeze_and_flush();
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::Ok { checkpoint , .. } if checkpoint.into_inner() == 8);
let r = mgr.get_from_region(RegionIdWithVersion::new(2, 35));
assert_matches::assert_matches!(r, GetCheckpointResult::Ok { checkpoint , .. } if checkpoint.into_inner() == 16);
mgr.flush();
mgr.freeze_and_flush();
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. });
}
Expand Down Expand Up @@ -758,6 +809,48 @@ pub mod tests {
assert_matches::assert_matches!(r, GetCheckpointResult::Ok{checkpoint, ..} if checkpoint.into_inner() == 24);
}

#[test]
fn test_mgr_freeze() {
let mut mgr = super::CheckpointManager::default();
mgr.resolve_regions(vec![
ResolveResult {
region: region(1, 32, 8),
checkpoint: TimeStamp::new(8),
checkpoint_type: CheckpointType::MinTs,
},
ResolveResult {
region: region(2, 34, 8),
checkpoint: TimeStamp::new(15),
checkpoint_type: CheckpointType::MinTs,
},
]);

// Freezed
mgr.freeze();
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. });
let r = mgr.get_from_region(RegionIdWithVersion::new(2, 34));
assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. });
// Shouldn't be recorded to resolved ts.
mgr.resolve_regions(vec![ResolveResult {
region: region(1, 32, 8),
checkpoint: TimeStamp::new(16),
checkpoint_type: CheckpointType::MinTs,
}]);

// Flush done, should be able to be queried.
mgr.flush_and_notify(vec![ResolveResult {
region: region(2, 34, 8),
checkpoint: TimeStamp::new(17),
checkpoint_type: CheckpointType::MinTs,
}]);

let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::Ok{checkpoint, ..} if checkpoint.into_inner() == 8);
let r = mgr.get_from_region(RegionIdWithVersion::new(2, 34));
assert_matches::assert_matches!(r, GetCheckpointResult::Ok{checkpoint, ..} if checkpoint.into_inner() == 17);
}

pub struct MockPdClient {
safepoint: RwLock<HashMap<String, TimeStamp>>,
}
Expand Down
Loading

0 comments on commit 4818976

Please sign in to comment.