Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log_backup: store pending region info when giving up retry #16624

Merged
merged 6 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
143 changes: 120 additions & 23 deletions components/backup-stream/src/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,19 @@ async fn scan_executor_loop(init: impl InitialScan, mut cmds: Receiver<ScanCmd>)
}
}

/// spawn the executors to some runtime.
#[cfg(test)]
fn spawn_executors_to(
init: impl InitialScan + Send + Sync + 'static,
handle: &tokio::runtime::Handle,
) -> ScanPoolHandle {
let (tx, rx) = tokio::sync::mpsc::channel(MESSAGE_BUFFER_SIZE);
handle.spawn(async move {
scan_executor_loop(init, rx).await;
});
ScanPoolHandle { tx }
}

/// spawn the executors in the scan pool.
fn spawn_executors(
init: impl InitialScan + Send + Sync + 'static,
Expand Down Expand Up @@ -477,10 +490,11 @@ where
return;
}
Some(err) => {
self.subs
.set_pending_if(&region, |sub, _| sub.handle.id == handle.id);
if !should_retry(&err) {
self.failure_count.remove(&region.id);
self.subs
.deregister_region_if(&region, |sub, _| sub.handle.id == handle.id);
// The pending record will be cleaned up by `Stop` command.
return;
}
err
Expand Down Expand Up @@ -542,10 +556,10 @@ where
}
let tx = tx.unwrap();
// tikv_util::Instant cannot be converted to std::time::Instant :(
let start = std::time::Instant::now();
let start = tokio::time::Instant::now();
debug!("Scheduing subscription."; utils::slog_region(&region), "after" => ?backoff, "handle" => ?handle);
let scheduled = async move {
tokio::time::sleep_until((start + backoff).into()).await;
tokio::time::sleep_until(start + backoff).await;
let handle = handle.unwrap_or_else(|| ObserveHandle::new());
if let Err(err) = tx.send(ObserveOp::Start { region, handle }).await {
warn!("log backup failed to schedule start observe."; "err" => %err);
Expand Down Expand Up @@ -820,7 +834,7 @@ mod test {
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
time::Duration,
};

use engine_test::{kv::KvTestEngine, raft::RaftTestEngine};
Expand All @@ -834,11 +848,11 @@ mod test {
RegionInfo,
};
use tikv::{config::BackupStreamConfig, storage::Statistics};
use tikv_util::{info, memory::MemoryQuota, worker::dummy_scheduler};
use tikv_util::{box_err, info, memory::MemoryQuota, worker::dummy_scheduler};
use tokio::{sync::mpsc::Sender, task::JoinHandle};
use txn_types::TimeStamp;

use super::{spawn_executors, InitialScan, RegionSubscriptionManager};
use super::{spawn_executors_to, InitialScan, RegionSubscriptionManager};
use crate::{
errors::Error,
metadata::{store::SlashEtcStore, MetadataClient, StreamTask},
Expand Down Expand Up @@ -1018,6 +1032,7 @@ mod test {
let task_name = "test";
let task_start_ts = TimeStamp::new(42);
let pool = tokio::runtime::Builder::new_current_thread()
.start_paused(true)
.enable_all()
.build()
.unwrap();
Expand Down Expand Up @@ -1057,7 +1072,7 @@ mod test {
failure_count: Default::default(),
memory_manager,
messenger: tx.downgrade(),
scan_pool_handle: spawn_executors(init, 2),
scan_pool_handle: spawn_executors_to(init, pool.handle()),
scans: CallbackWaitGroup::new(),
};
let events = Arc::new(Mutex::new(vec![]));
Expand Down Expand Up @@ -1176,20 +1191,30 @@ mod test {
.unwrap();
}

fn sync(&self) {
self.rt.block_on(async {
let (tx, rx) = tokio::sync::oneshot::channel();
self.handle
.as_ref()
.unwrap()
.send(ObserveOp::ResolveRegions {
callback: Box::new(move |_result| {
tx.send(()).unwrap();
}),
min_ts: self.task_start_ts.next(),
})
.await
.unwrap();
rx.await.unwrap();
})
}

#[track_caller]
fn wait_initial_scan_all_finish(&self, expected_region: usize) {
info!("[TEST] Start waiting initial scanning finish.");
self.rt.block_on(async move {
let max_wait = Duration::from_secs(1);
let start = Instant::now();
loop {
for _ in 0..200 {
let (tx, rx) = tokio::sync::oneshot::channel();
if start.elapsed() > max_wait {
panic!(
"wait initial scan takes too long! events = {:?}",
self.events
);
}
self.handle
.as_ref()
.unwrap()
Expand All @@ -1212,6 +1237,10 @@ mod test {
// Advance the global timer in case of someone is waiting for timer.
tokio::time::advance(Duration::from_secs(16)).await;
}
panic!(
"wait initial scan takes too long! events = {:?}",
self.events
);
})
}

Expand All @@ -1223,7 +1252,6 @@ mod test {

#[test]
fn test_basic_retry() {
test_util::init_log_for_test();
use ObserveEvent::*;
let failed = Arc::new(AtomicBool::new(false));
let mut suite = Suite::new(FuncInitialScan(move |r, _, _| {
Expand All @@ -1234,7 +1262,6 @@ mod test {
Err(Error::OutOfQuota { region_id: r.id })
}));
let _guard = suite.rt.enter();
tokio::time::pause();
suite.insert_and_start_region(suite.region(1, 1, 1, b"a", b"b"));
suite.insert_and_start_region(suite.region(2, 1, 1, b"b", b"c"));
suite.wait_initial_scan_all_finish(2);
Expand All @@ -1256,10 +1283,9 @@ mod test {
fn test_on_high_mem() {
let mut suite = Suite::new(FuncInitialScan(|_, _, _| Ok(Statistics::default())));
let _guard = suite.rt.enter();
tokio::time::pause();
suite.insert_and_start_region(suite.region(1, 1, 1, b"a", b"b"));
suite.insert_and_start_region(suite.region(2, 1, 1, b"b", b"c"));
suite.advance_ms(0);
suite.sync();
let mut rs = suite.subs.current_regions();
rs.sort();
assert_eq!(rs, [1, 2]);
Expand Down Expand Up @@ -1293,10 +1319,8 @@ mod test {

#[test]
fn test_region_split_inflight() {
test_util::init_log_for_test();
let mut suite = Suite::new(FuncInitialScan(|_, _, _| Ok(Statistics::default())));
let _guard = suite.rt.enter();
tokio::time::pause();
suite.insert_region(suite.region(1, 1, 1, b"a", b"b"));
// Region split..?
suite.insert_region(suite.region(1, 2, 1, b"a", b"az"));
Expand All @@ -1312,4 +1336,77 @@ mod test {
&[Start(1), RefreshObs(1), StartResult(1, true)]
);
}

#[test]
fn test_unretryable_failure() {
let mut suite = Suite::new(FuncInitialScan(move |region, _, _| {
if region.get_region_epoch().get_version() != 2 {
let mut r2 = region.clone();
r2.mut_region_epoch().version = 2;
*r2.mut_end_key() = b"az".to_vec();
Err(Error::RaftStore(raftstore::Error::EpochNotMatch(
"Testing Testing".to_string(),
vec![r2],
)))
} else {
Ok(Statistics::default())
}
}));
suite.insert_and_start_region(suite.region(1, 1, 1, b"a", b"b"));
suite.sync();
// The region has been updated!
suite.insert_region(suite.region(1, 2, 1, b"a", b"az"));
suite.run(ObserveOp::RefreshResolver {
region: suite.region(1, 2, 1, b"a", b"az"),
});
suite.wait_initial_scan_all_finish(1);
suite.wait_shutdown();
use ObserveEvent::*;
assert_eq!(
*suite.events.lock().unwrap(),
[
Start(1),
StartResult(1, false),
RefreshObs(1),
StartResult(1, true)
]
);
}

#[test]
fn test_always_failure_initial_scan() {
let start_time = tokio::time::Instant::now();
let target = start_time + Duration::from_secs(300);
let init = FuncInitialScan(move |_, _, _| {
let now = tokio::time::Instant::now();
if now < target {
return Err(Error::Other(box_err!(
"work in progress now... please wait more {:?}",
target - now
)));
}
Ok(Statistics::default())
});
let mut suite = Suite::new(init);
let _g = suite.rt.enter();
suite.insert_region(suite.region(1, 1, 1, b"a", b"b"));
suite.start_region(suite.region(1, 1, 1, b"a", b"b"));
suite.wait_initial_scan_all_finish(1);
suite.wait_shutdown();
fn consume_many<'a, T: Eq>(mut slice: &'a [T], pat: &[T]) -> (&'a [T], usize) {
assert!(!pat.is_empty());
let mut n = 0;
while slice.starts_with(pat) {
slice = &slice[pat.len()..];
n += 1;
}
(slice, n)
}
let events_lock = suite.events.lock().unwrap();
let events = events_lock.as_slice();
use ObserveEvent::*;
let (rem, count) = consume_many(events, &[Start(1), StartResult(1, false)]);
assert!(count > 0);
assert_eq!(rem, [Start(1), StartResult(1, true)]);
}
}
43 changes: 43 additions & 0 deletions components/backup-stream/src/subscription_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@ impl SubscriptionTracer {
/// there are still tiny impure things need to do. (e.g. getting the
/// checkpoint of this region.)
///
/// A typical state machine of a region:
///
/// ```text
/// +-----[Start(Err)]------+
/// +----+ +--------------+
/// v |
/// Absent --------[Start]------> Pending --[Start(OK)]--> Active
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could become running too?

/// ^ | |
/// +-------------[Stop]-------------+--------[Stop]---------+
/// ```
///
/// This state is a placeholder for those regions: once they failed in the
/// impure operations, this would be the evidence proofing they were here.
///
Expand Down Expand Up @@ -263,6 +274,38 @@ impl SubscriptionTracer {
.collect()
}

pub fn set_pending_if(
&self,
region: &Region,
if_cond: impl FnOnce(&ActiveSubscription, &Region) -> bool,
) -> bool {
let region_id = region.get_id();
let remove_result = self.0.entry(region_id);
match remove_result {
Entry::Vacant(_) => false,
Entry::Occupied(mut o) => match o.get_mut() {
SubscribeState::Pending(r) => {
info!("remove pending subscription"; "region_id"=> %region_id, utils::slog_region(r));

o.remove();
true
}
SubscribeState::Running(s) => {
if if_cond(s, region) {
let r = s.meta.clone();
TRACK_REGION.dec();
s.stop();
info!("Inactivating subscription."; "observer" => ?s, "region_id"=> %region_id);

*o.get_mut() = SubscribeState::Pending(r);
return true;
}
false
}
},
}
}

/// try to mark a region no longer be tracked by this observer.
/// returns whether success (it failed if the region hasn't been observed
/// when calling this.)
Expand Down