Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: added intervally resolve regions #14180

Merged
merged 15 commits into from Mar 13, 2023
Merged
234 changes: 151 additions & 83 deletions components/backup-stream/src/checkpoint_manager.rs
@@ -1,10 +1,11 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{cell::RefCell, collections::HashMap, sync::Arc, time::Duration};

use futures::{
channel::mpsc::{self as async_mpsc, Receiver, Sender},
SinkExt, StreamExt,
future::BoxFuture,
FutureExt, SinkExt, StreamExt,
};
use grpcio::{RpcStatus, RpcStatusCode, ServerStreamingSink, WriteFlags};
use kvproto::{
Expand All @@ -13,7 +14,7 @@ use kvproto::{
metapb::Region,
};
use pd_client::PdClient;
use tikv_util::{box_err, defer, info, warn, worker::Scheduler};
use tikv_util::{box_err, defer, info, time::Instant, warn, worker::Scheduler};
use txn_types::TimeStamp;
use uuid::Uuid;

Expand All @@ -22,7 +23,9 @@ use crate::{
errors::{Error, ReportableResult, Result},
future,
metadata::{store::MetaStore, Checkpoint, CheckpointProvider, MetadataClient},
metrics, try_send, RegionCheckpointOperation, Task,
metrics,
subscription_track::ResolveResult,
try_send, RegionCheckpointOperation, Task,
};

/// A manager for maintaining the last flush ts.
Expand All @@ -31,14 +34,15 @@ use crate::{
/// checkpoint then advancing the global checkpoint.
#[derive(Default)]
pub struct CheckpointManager {
items: HashMap<u64, LastFlushTsOfRegion>,
checkpoint_ts: HashMap<u64, LastFlushTsOfRegion>,
resolved_ts: HashMap<u64, LastFlushTsOfRegion>,
manager_handle: Option<Sender<SubscriptionOp>>,
}

impl std::fmt::Debug for CheckpointManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CheckpointManager")
.field("items", &self.items)
.field("items", &self.checkpoint_ts)
YuJuncen marked this conversation as resolved.
Show resolved Hide resolved
.finish()
}
}
Expand All @@ -60,49 +64,59 @@ impl SubscriptionManager {
while let Some(msg) = self.input.next().await {
match msg {
SubscriptionOp::Add(sub) => {
self.subscribers.insert(Uuid::new_v4(), sub);
let uid = Uuid::new_v4();
info!("log backup adding new subscriber"; "id" => %uid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it cause too many logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, one advancer in its lifetime should only subscribe once. 🤔

self.subscribers.insert(uid, sub);
}
SubscriptionOp::Emit(events) => {
let mut canceled = vec![];
for (id, sub) in &mut self.subscribers {
let send_all = async {
for es in events.chunks(1024) {
let mut resp = SubscribeFlushEventResponse::new();
resp.set_events(es.to_vec().into());
sub.feed((resp, WriteFlags::default())).await?;
}
sub.flush().await
};

match send_all.await {
Err(grpcio::Error::RemoteStopped) => {
canceled.push(*id);
}
Err(err) => {
Error::from(err).report("sending subscription");
}
_ => {}
}
}

for c in canceled {
match self.subscribers.remove(&c) {
Some(mut sub) => {
info!("client is gone, removing subscription"; "id" => %c);
sub.close().await.report_if_err(format_args!(
"during removing subscription {}",
c
))
}
None => {
warn!("BUG: the subscriber has been removed before we are going to remove it."; "id" => %c);
}
}
}
self.emit_events(events).await;
}
}
}
}

async fn emit_events(&mut self, events: Box<[FlushEvent]>) {
let mut canceled = vec![];
info!("log backup sending events"; "event_len" => %events.len(), "downstream" => %self.subscribers.len());
for (id, sub) in &mut self.subscribers {
let send_all = async {
for es in events.chunks(1024) {
let mut resp = SubscribeFlushEventResponse::new();
resp.set_events(es.to_vec().into());
sub.feed((resp, WriteFlags::default())).await?;
}
sub.flush().await
};

match send_all.await {
Err(grpcio::Error::RemoteStopped) => {
canceled.push(*id);
}
Err(err) => {
Error::from(err).report("sending subscription");
}
_ => {}
}
}

for c in canceled {
self.remove_subscription(&c).await;
}
}

async fn remove_subscription(&mut self, id: &Uuid) {
match self.subscribers.remove(id) {
Some(mut sub) => {
info!("client is gone, removing subscription"; "id" => %id);
sub.close()
.await
.report_if_err(format_args!("during removing subscription {}", id))
}
None => {
warn!("BUG: the subscriber has been removed before we are going to remove it."; "id" => %id);
}
}
}
}

// Note: can we make it more generic...?
Expand Down Expand Up @@ -154,11 +168,6 @@ impl GetCheckpointResult {
}

impl CheckpointManager {
/// clear the manager.
pub fn clear(&mut self) {
self.items.clear();
}

pub fn spawn_subscription_mgr(&mut self) -> future![()] {
let (tx, rx) = async_mpsc::channel(1024);
let sub = SubscriptionManager {
Expand All @@ -169,25 +178,67 @@ impl CheckpointManager {
sub.main_loop()
}

pub fn update_region_checkpoints(&mut self, region_and_checkpoint: Vec<(Region, TimeStamp)>) {
for (region, checkpoint) in &region_and_checkpoint {
self.do_update(region, *checkpoint);
pub fn resolve_regions(&mut self, region_and_checkpoint: Vec<ResolveResult>) {
for res in region_and_checkpoint {
self.do_update(res.region, res.checkpoint);
}
}

self.notify(region_and_checkpoint.into_iter());
pub fn flush(&mut self) {
info!("log backup checkpoint manager flushing."; "resolved_ts_len" => %self.resolved_ts.len(), "resolved_ts" => ?self.get_resolved_ts());
self.checkpoint_ts = std::mem::take(&mut self.resolved_ts);
// Clippy doesn't know this iterator borrows `self.checkpoint_ts` :(
#[allow(clippy::needless_collect)]
let items = self
.checkpoint_ts
.values()
.cloned()
.map(|x| (x.region, x.checkpoint))
.collect::<Vec<_>>();
self.notify(items.into_iter());
}

/// update a region checkpoint in need.
#[cfg(test)]
pub fn update_region_checkpoint(&mut self, region: &Region, checkpoint: TimeStamp) {
self.do_update(region, checkpoint);
self.notify(std::iter::once((region.clone(), checkpoint)));
Self::update_ts(&mut self.checkpoint_ts, region.clone(), checkpoint)
}

fn update_ts(
container: &mut HashMap<u64, LastFlushTsOfRegion>,
region: Region,
checkpoint: TimeStamp,
) {
let e = container.entry(region.get_id());
let ver = region.get_region_epoch().get_version();
// A hacky way to allow the two closures move out the region.
// It is safe given the two closures would only be called once.
let r = RefCell::new(Some(region));
e.and_modify(|old_cp| {
let old_ver = old_cp.region.get_region_epoch().get_version();
let checkpoint_is_newer = old_cp.checkpoint < checkpoint;
if old_ver < ver || (old_ver == ver && checkpoint_is_newer) {
*old_cp = LastFlushTsOfRegion {
checkpoint,
region: r.borrow_mut().take().expect(
"unreachable: `and_modify` and `or_insert_with` called at the same time.",
),
};
}
})
.or_insert_with(|| LastFlushTsOfRegion {
checkpoint,
region: r
.borrow_mut()
.take()
.expect("unreachable: `and_modify` and `or_insert_with` called at the same time."),
});
}

pub fn add_subscriber(&mut self, sub: Subscription) -> future![Result<()>] {
pub fn add_subscriber(&mut self, sub: Subscription) -> BoxFuture<'static, Result<()>> {
let mgr = self.manager_handle.as_ref().cloned();
let initial_data = self
.items
.checkpoint_ts
.values()
.map(|v| FlushEvent {
start_key: v.region.start_key.clone(),
Expand Down Expand Up @@ -225,6 +276,7 @@ impl CheckpointManager {
})?;
Ok(())
}
.boxed()
}

fn notify(&mut self, items: impl Iterator<Item = (Region, TimeStamp)>) {
Expand All @@ -248,28 +300,13 @@ impl CheckpointManager {
}
}

fn do_update(&mut self, region: &Region, checkpoint: TimeStamp) {
let e = self.items.entry(region.get_id());
e.and_modify(|old_cp| {
if old_cp.checkpoint < checkpoint
&& old_cp.region.get_region_epoch().get_version()
<= region.get_region_epoch().get_version()
{
*old_cp = LastFlushTsOfRegion {
checkpoint,
region: region.clone(),
};
}
})
.or_insert_with(|| LastFlushTsOfRegion {
checkpoint,
region: region.clone(),
});
fn do_update(&mut self, region: Region, checkpoint: TimeStamp) {
Self::update_ts(&mut self.resolved_ts, region, checkpoint)
}

/// get checkpoint from a region.
pub fn get_from_region(&self, region: RegionIdWithVersion) -> GetCheckpointResult {
let checkpoint = self.items.get(&region.region_id);
let checkpoint = self.checkpoint_ts.get(&region.region_id);
if checkpoint.is_none() {
return GetCheckpointResult::not_found(region);
}
Expand All @@ -282,7 +319,11 @@ impl CheckpointManager {

/// get all checkpoints stored.
pub fn get_all(&self) -> Vec<LastFlushTsOfRegion> {
self.items.values().cloned().collect()
self.checkpoint_ts.values().cloned().collect()
}

pub fn get_resolved_ts(&self) -> Option<TimeStamp> {
self.resolved_ts.values().map(|x| x.checkpoint).min()
}
}

Expand Down Expand Up @@ -333,7 +374,7 @@ pub struct LastFlushTsOfRegion {
#[async_trait::async_trait]
pub trait FlushObserver: Send + 'static {
/// The callback when the flush has advanced the resolver.
async fn before(&mut self, checkpoints: Vec<(Region, TimeStamp)>);
async fn before(&mut self, checkpoints: Vec<ResolveResult>);
/// The callback when the flush is done. (Files are fully written to
/// external storage.)
async fn after(&mut self, task: &str, rts: u64) -> Result<()>;
Expand Down Expand Up @@ -363,7 +404,7 @@ impl<PD> BasicFlushObserver<PD> {

#[async_trait::async_trait]
impl<PD: PdClient + 'static> FlushObserver for BasicFlushObserver<PD> {
async fn before(&mut self, _checkpoints: Vec<(Region, TimeStamp)>) {}
async fn before(&mut self, _checkpoints: Vec<ResolveResult>) {}

async fn after(&mut self, task: &str, rts: u64) -> Result<()> {
if let Err(err) = self
Expand Down Expand Up @@ -401,8 +442,9 @@ pub struct CheckpointV3FlushObserver<S, O> {
sched: Scheduler<Task>,
meta_cli: MetadataClient<S>,

checkpoints: Vec<(Region, TimeStamp)>,
checkpoints: Vec<ResolveResult>,
global_checkpoint_cache: HashMap<String, Checkpoint>,
start_time: Instant,
}

impl<S, O> CheckpointV3FlushObserver<S, O> {
Expand All @@ -414,6 +456,7 @@ impl<S, O> CheckpointV3FlushObserver<S, O> {
// We almost always have only one entry.
global_checkpoint_cache: HashMap::with_capacity(1),
baseline,
start_time: Instant::now(),
}
}
}
Expand Down Expand Up @@ -443,15 +486,19 @@ where
S: MetaStore + 'static,
O: FlushObserver + Send,
{
async fn before(&mut self, checkpoints: Vec<(Region, TimeStamp)>) {
async fn before(&mut self, checkpoints: Vec<ResolveResult>) {
self.checkpoints = checkpoints;
}

async fn after(&mut self, task: &str, _rts: u64) -> Result<()> {
let t = Task::RegionCheckpointsOp(RegionCheckpointOperation::Update(std::mem::take(
&mut self.checkpoints,
)));
try_send!(self.sched, t);
let resolve_task = Task::RegionCheckpointsOp(RegionCheckpointOperation::Resolved {
checkpoints: std::mem::take(&mut self.checkpoints),
start_time: self.start_time,
});
let flush_task = Task::RegionCheckpointsOp(RegionCheckpointOperation::Flush);
try_send!(self.sched, resolve_task);
try_send!(self.sched, flush_task);

let global_checkpoint = self.get_checkpoint(task).await?;
info!("getting global checkpoint from cache for updating."; "checkpoint" => ?global_checkpoint);
self.baseline
Expand Down Expand Up @@ -499,6 +546,26 @@ pub mod tests {
r
}

#[test]
fn test_flush() {
let mut mgr = super::CheckpointManager::default();
mgr.do_update(region(1, 32, 8), TimeStamp::new(8));
mgr.do_update(region(2, 34, 8), TimeStamp::new(15));
mgr.do_update(region(2, 35, 8), TimeStamp::new(16));
mgr.do_update(region(2, 35, 8), TimeStamp::new(14));
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. });

mgr.flush();
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::Ok { checkpoint , .. } if checkpoint.into_inner() == 8);
let r = mgr.get_from_region(RegionIdWithVersion::new(2, 35));
assert_matches::assert_matches!(r, GetCheckpointResult::Ok { checkpoint , .. } if checkpoint.into_inner() == 16);
mgr.flush();
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. });
}

#[test]
fn test_mgr() {
let mut mgr = super::CheckpointManager::default();
Expand All @@ -510,6 +577,7 @@ pub mod tests {
assert_matches::assert_matches!(r, GetCheckpointResult::EpochNotMatch { .. });
let r = mgr.get_from_region(RegionIdWithVersion::new(3, 44));
assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. });

mgr.update_region_checkpoint(&region(1, 30, 8), TimeStamp::new(16));
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::Ok{checkpoint, ..} if checkpoint.into_inner() == 8);
Expand Down