Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#16008
Browse files Browse the repository at this point in the history
close tikv#15414

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
YuJuncen authored and ti-chi-bot committed Jan 21, 2024
1 parent 776488a commit 29aeb0a
Show file tree
Hide file tree
Showing 12 changed files with 869 additions and 201 deletions.
5 changes: 5 additions & 0 deletions components/backup-stream/Cargo.toml
Expand Up @@ -91,11 +91,16 @@ engine_test = { workspace = true }
grpcio = { workspace = true }
hex = "0.4"
protobuf = { version = "2.8", features = ["bytes"] }
<<<<<<< HEAD
=======
rand = "0.8.0"
>>>>>>> 66301257e4 (log_backup: stop task while memory out of quota (#16008))
tempdir = "0.3"
tempfile = "3.0"
test_pd = { workspace = true }
test_pd_client = { workspace = true }
test_raftstore = { workspace = true }
test_util = { workspace = true }
tokio = { version = "1.5", features = ["test-util"] }
url = "2"
walkdir = "2"
124 changes: 97 additions & 27 deletions components/backup-stream/src/endpoint.rs
Expand Up @@ -15,7 +15,7 @@ use error_code::ErrorCodeExt;
use futures::{stream::AbortHandle, FutureExt, TryFutureExt};
use kvproto::{
brpb::{StreamBackupError, StreamBackupTaskInfo},
metapb::Region,
metapb::{Region, RegionEpoch},
};
use pd_client::PdClient;
use raft::StateRole;
Expand All @@ -39,7 +39,7 @@ use tikv_util::{
use tokio::{
io::Result as TokioResult,
runtime::{Handle, Runtime},
sync::{oneshot, Semaphore},
sync::{mpsc::Sender, oneshot, Semaphore},
};
use tokio_stream::StreamExt;
use txn_types::TimeStamp;
Expand All @@ -51,7 +51,7 @@ use crate::{
BasicFlushObserver, CheckpointManager, CheckpointV3FlushObserver, FlushObserver,
GetCheckpointResult, RegionIdWithVersion, Subscription,
},
errors::{Error, Result},
errors::{Error, ReportableResult, Result},
event_loader::InitialDataLoader,
future,
metadata::{store::MetaStore, MetadataClient, MetadataEvent, StreamTask},
Expand Down Expand Up @@ -86,7 +86,7 @@ pub struct Endpoint<S, R, E: KvEngine, PDC> {
pub range_router: Router,
observer: BackupStreamObserver,
pool: Runtime,
region_operator: RegionSubscriptionManager<S, R, PDC>,
region_operator: Sender<ObserveOp>,
failover_time: Option<Instant>,
// We holds the config before, even it is useless for now,
// however probably it would be useful in the future.
Expand Down Expand Up @@ -167,9 +167,7 @@ where
Arc::clone(&initial_scan_semaphore),
),
accessor.clone(),
observer.clone(),
meta_client.clone(),
pd_client.clone(),
((config.num_threads + 1) / 2).max(1),
resolver,
);
Expand Down Expand Up @@ -437,13 +435,15 @@ where

/// Convert a batch of events to the cmd batch, and update the resolver
/// status.
fn record_batch(subs: SubscriptionTracer, batch: CmdBatch) -> Option<ApplyEvents> {
fn record_batch(subs: SubscriptionTracer, batch: CmdBatch) -> Result<ApplyEvents> {
let region_id = batch.region_id;
let mut resolver = match subs.get_subscription_of(region_id) {
Some(rts) => rts,
None => {
debug!("the region isn't registered (no resolver found) but sent to backup_batch, maybe stale."; "region_id" => %region_id);
return None;
// Sadly, we know nothing about the epoch in this context. Thankfully this is a
// local error and won't be sent to outside.
return Err(Error::ObserveCanceled(region_id, RegionEpoch::new()));
}
};
// Stale data is acceptable, while stale locks may block the checkpoint
Expand All @@ -460,11 +460,11 @@ where
// ```
if batch.pitr_id != resolver.value().handle.id {
debug!("stale command"; "region_id" => %region_id, "now" => ?resolver.value().handle.id, "remote" => ?batch.pitr_id);
return None;
return Err(Error::ObserveCanceled(region_id, RegionEpoch::new()));
}

let kvs = ApplyEvents::from_cmd_batch(batch, resolver.value_mut().resolver());
Some(kvs)
let kvs = ApplyEvents::from_cmd_batch(batch, resolver.value_mut().resolver())?;
Ok(kvs)
}

fn backup_batch(&self, batch: CmdBatch, work: Work) {
Expand All @@ -473,13 +473,42 @@ where
let router = self.range_router.clone();
let sched = self.scheduler.clone();
let subs = self.subs.clone();
<<<<<<< HEAD
self.pool.spawn(async move {
=======
let region_op = self.region_operator.clone();
let region = batch.region_id;
let from_idx = batch.cmds.first().map(|c| c.index).unwrap_or(0);
let (to_idx, term) = batch
.cmds
.last()
.map(|c| (c.index, c.term))
.unwrap_or((0, 0));
self.pool.spawn(root!("backup_batch"; async move {
>>>>>>> 66301257e4 (log_backup: stop task while memory out of quota (#16008))
let region_id = batch.region_id;
let kvs = Self::record_batch(subs, batch);
if kvs.as_ref().map(|x| x.is_empty()).unwrap_or(true) {
return;
}
let kvs = kvs.unwrap();
let kvs = match kvs {
Err(Error::OutOfQuota { region_id }) => {
region_op.send(ObserveOp::HighMemUsageWarning { region_id }).await
.map_err(|err| Error::Other(box_err!("failed to send, are we shutting down? {}", err)))
.report_if_err("");
return
}
Err(Error::ObserveCanceled(..)) => {
return;
}
Err(err) => {
err.report(format_args!("unexpected error during handing region event for {}.", region_id));
return;
}
Ok(batch) => {
if batch.is_empty() {
return
}
batch
}
};

HANDLE_EVENT_DURATION_HISTOGRAM
.with_label_values(&["to_stream_event"])
Expand Down Expand Up @@ -579,6 +608,7 @@ where
.try_for_each(|r| {
tx.blocking_send(ObserveOp::Start {
region: r.region.clone(),
handle: ObserveHandle::new(),
})
});
}),
Expand All @@ -593,11 +623,26 @@ where
// Don't reschedule this command: or once the endpoint's mailbox gets
// full, the system might deadlock.
while let Some(cmd) = rx.recv().await {
self.region_operator.request(cmd).await;
self.region_op(cmd).await;
}
Ok(())
}

/// send an operation request to the manager.
/// the returned future would be resolved after send is success.
/// the operation would be executed asynchronously.
async fn region_op(&self, cmd: ObserveOp) {
self.region_operator
.send(cmd)
.await
.map_err(|err| {
Error::Other(
format!("cannot send to region operator, are we shutting down? ({err})").into(),
)
})
.report_if_err("send region cmd")
}

// register task ranges
pub fn on_register(&self, task: StreamTask) {
let name = task.info.name.clone();
Expand Down Expand Up @@ -762,7 +807,10 @@ where
}),
min_ts,
};
op.request(req).await;
if let Err(err) = op.send(req).await {
annotate!(err, "BUG: region operator channel closed.")
.report("when executing region op");
}
rx.await
.map_err(|err| annotate!(err, "failed to send request for resolve regions"))
}
Expand Down Expand Up @@ -906,7 +954,15 @@ where
/// Modify observe over some region.
/// This would register the region to the RaftStore.
pub fn on_modify_observe(&self, op: ObserveOp) {
self.pool.block_on(self.region_operator.request(op));
self.pool
.block_on(self.region_operator.send(op))
.map_err(|err| {
Error::Other(box_err!(
"cannot send to region operator, are we shutting down? ({})",
err
))
})
.report_if_err("during on_modify_observe");
}

fn update_semaphore_capacity(&self, sema: &Arc<Semaphore>, diff: isize) {
Expand Down Expand Up @@ -1085,6 +1141,9 @@ pub enum BackupStreamResolver<RT, EK> {
V1(LeadershipResolver),
// for raftstore-v2, it has less regions. we use CDCHandler to check leadership of a region.
V2(RT, PhantomData<EK>),
#[cfg(test)]
// for some test cases, it is OK to don't check leader.
Nop,
}

impl<RT, EK> BackupStreamResolver<RT, EK>
Expand All @@ -1099,6 +1158,8 @@ where
let x = x.clone();
resolve_by_raft(regions, min_ts, x).await
}
#[cfg(test)]
BackupStreamResolver::Nop => regions,
}
}
}
Expand Down Expand Up @@ -1197,6 +1258,7 @@ type ResolveRegionsCallback = Box<dyn FnOnce(ResolvedRegions) + 'static + Send>;
pub enum ObserveOp {
Start {
region: Region,
handle: ObserveHandle,
},
Stop {
region: Region,
Expand All @@ -1211,24 +1273,27 @@ pub enum ObserveOp {
RefreshResolver {
region: Region,
},
NotifyFailToStartObserve {
NotifyStartObserveResult {
region: Region,
handle: ObserveHandle,
err: Box<Error>,
has_failed_for: u8,
err: Option<Box<Error>>,
},
ResolveRegions {
callback: ResolveRegionsCallback,
min_ts: TimeStamp,
},
HighMemUsageWarning {
region_id: u64,
},
}

impl std::fmt::Debug for ObserveOp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Start { region } => f
Self::Start { region, handle } => f
.debug_struct("Start")
.field("region", &utils::debug_region(region))
.field("handle", &handle)
.finish(),
Self::Stop { region } => f
.debug_struct("Stop")
Expand All @@ -1242,23 +1307,27 @@ impl std::fmt::Debug for ObserveOp {
.debug_struct("RefreshResolver")
.field("region", &utils::debug_region(region))
.finish(),
Self::NotifyFailToStartObserve {
Self::NotifyStartObserveResult {
region,
handle,
err,
has_failed_for,
} => f
.debug_struct("NotifyFailToStartObserve")
.debug_struct("NotifyStartObserveResult")
.field("region", &utils::debug_region(region))
.field("handle", handle)
.field("err", err)
.field("has_failed_for", has_failed_for)
.finish(),
Self::ResolveRegions { min_ts, .. } => f
.debug_struct("ResolveRegions")
.field("min_ts", min_ts)
.field("callback", &format_args!("fn {{ .. }}"))
.finish(),
Self::HighMemUsageWarning {
region_id: inconsistent_region_id,
} => f
.debug_struct("HighMemUsageWarning")
.field("inconsistent_region", &inconsistent_region_id)
.finish(),
}
}
}
Expand Down Expand Up @@ -1319,8 +1388,9 @@ impl Task {
ObserveOp::Stop { .. } => "modify_observe.stop",
ObserveOp::Destroy { .. } => "modify_observe.destroy",
ObserveOp::RefreshResolver { .. } => "modify_observe.refresh_resolver",
ObserveOp::NotifyFailToStartObserve { .. } => "modify_observe.retry",
ObserveOp::NotifyStartObserveResult { .. } => "modify_observe.retry",
ObserveOp::ResolveRegions { .. } => "modify_observe.resolve",
ObserveOp::HighMemUsageWarning { .. } => "modify_observe.high_mem",
},
Task::ForceFlush(..) => "force_flush",
Task::FatalError(..) => "fatal_error",
Expand Down
21 changes: 19 additions & 2 deletions components/backup-stream/src/errors.rs
@@ -1,7 +1,8 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
error::Error as StdError, fmt::Display, io::Error as IoError, result::Result as StdResult,
error::Error as StdError, fmt::Display, io::Error as IoError, panic::Location,
result::Result as StdResult,
};

use error_code::ErrorCodeExt;
Expand All @@ -20,19 +21,29 @@ use crate::{endpoint::Task, metrics};

#[derive(ThisError, Debug)]
pub enum Error {
<<<<<<< HEAD
#[error("gRPC meet error {0}")]
Grpc(#[from] GrpcError),
#[cfg(feature = "metasotre-etcd")]
#[error("Etcd meet error {0}")]
Etcd(#[from] EtcdErrorExt),
#[error("Protobuf meet error {0}")]
Protobuf(#[from] ProtobufError),
=======
>>>>>>> 66301257e4 (log_backup: stop task while memory out of quota (#16008))
#[error("No such task {task_name:?}")]
NoSuchTask { task_name: String },
#[error("Observe have already canceled for region {0} (version = {1:?})")]
ObserveCanceled(u64, RegionEpoch),
#[error("Malformed metadata {0}")]
MalformedMetadata(String),
#[error("Out of quota for region {region_id}")]
OutOfQuota { region_id: u64 },

#[error("gRPC meet error {0}")]
Grpc(#[from] GrpcError),
#[error("Protobuf meet error {0}")]
Protobuf(#[from] ProtobufError),
#[error("I/O Error: {0}")]
Io(#[from] IoError),
#[error("Txn error: {0}")]
Expand All @@ -45,6 +56,7 @@ pub enum Error {
RaftRequest(StoreError),
#[error("Error from raftstore: {0}")]
RaftStore(#[from] RaftStoreError),

#[error("{context}: {inner_error}")]
Contextual {
context: String,
Expand Down Expand Up @@ -90,6 +102,7 @@ impl ErrorCodeExt for Error {
Error::Other(_) => OTHER,
Error::RaftStore(_) => RAFTSTORE,
Error::ObserveCanceled(..) => OBSERVE_CANCELED,
Error::OutOfQuota { .. } => OUT_OF_QUOTA,
Error::Grpc(_) => GRPC,
}
}
Expand Down Expand Up @@ -149,6 +162,7 @@ where
Error: From<E>,
{
#[inline(always)]
#[track_caller]
fn report_if_err(self, context: impl ToString) {
if let Err(err) = self {
Error::from(err).report(context.to_string())
Expand All @@ -172,8 +186,11 @@ macro_rules! annotate {
}

impl Error {
#[track_caller]
pub fn report(&self, context: impl Display) {
warn!("backup stream meet error"; "context" => %context, "err" => %self, "verbose_err" => ?self);
warn!("backup stream meet error"; "context" => %context, "err" => %self,
"verbose_err" => ?self,
"position" => ?Location::caller());
metrics::STREAM_ERROR
.with_label_values(&[self.kind()])
.inc()
Expand Down

0 comments on commit 29aeb0a

Please sign in to comment.