Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#16541
Browse files Browse the repository at this point in the history
close tikv#16540

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
overvenus authored and ti-chi-bot committed Mar 18, 2024
1 parent 58d231b commit 6d388a1
Show file tree
Hide file tree
Showing 11 changed files with 986 additions and 23 deletions.
155 changes: 155 additions & 0 deletions components/backup-stream/src/checkpoint_manager.rs
Expand Up @@ -499,6 +499,161 @@ pub mod tests {
r
}

<<<<<<< HEAD
=======
#[derive(Clone)]
pub struct MockSink(Arc<Mutex<MockSinkInner>>);

impl MockSink {
fn with_fail_once(code: RpcStatusCode) -> Self {
let mut failed = false;
let inner = MockSinkInner {
items: Vec::default(),
closed: false,
on_error: Box::new(move || {
if failed {
RpcStatusCode::OK
} else {
failed = true;
code
}
}),
};
Self(Arc::new(Mutex::new(inner)))
}

fn trivial() -> Self {
let inner = MockSinkInner {
items: Vec::default(),
closed: false,
on_error: Box::new(|| RpcStatusCode::OK),
};
Self(Arc::new(Mutex::new(inner)))
}

#[allow(clippy::unused_async)]
pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> {
panic!("failed in a case should never fail: {}", status);
}
}

struct MockSinkInner {
items: Vec<SubscribeFlushEventResponse>,
closed: bool,
on_error: Box<dyn FnMut() -> grpcio::RpcStatusCode + Send>,
}

impl Sink<(SubscribeFlushEventResponse, grpcio::WriteFlags)> for MockSink {
type Error = grpcio::Error;

fn poll_ready(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Ok(()).into()
}

fn start_send(
self: std::pin::Pin<&mut Self>,
item: (SubscribeFlushEventResponse, grpcio::WriteFlags),
) -> Result<(), Self::Error> {
let mut guard = self.0.lock().unwrap();
let code = (guard.on_error)();
if code != RpcStatusCode::OK {
return Err(grpcio::Error::RpcFailure(RpcStatus::new(code)));
}
guard.items.push(item.0);
Ok(())
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Ok(()).into()
}

fn poll_close(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
let mut guard = self.0.lock().unwrap();
guard.closed = true;
Ok(()).into()
}
}

fn simple_resolve_result() -> ResolveResult {
let mut region = Region::new();
region.set_id(42);
ResolveResult {
region,
checkpoint: 42.into(),
checkpoint_type: CheckpointType::MinTs,
}
}

#[test]
fn test_rpc_sub() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();
let mut mgr = super::CheckpointManager::default();
rt.spawn(mgr.spawn_subscription_mgr());

let trivial_sink = MockSink::trivial();
rt.block_on(mgr.add_subscriber(trivial_sink.clone()))
.unwrap();

mgr.resolve_regions(vec![simple_resolve_result()]);
mgr.flush();
mgr.sync_with_subs_mgr(|_| {});
assert_eq!(trivial_sink.0.lock().unwrap().items.len(), 1);
}

#[test]
fn test_rpc_failure() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();
let mut mgr = super::CheckpointManager::default();
rt.spawn(mgr.spawn_subscription_mgr());

let error_sink = MockSink::with_fail_once(RpcStatusCode::INTERNAL);
rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap();

mgr.resolve_regions(vec![simple_resolve_result()]);
mgr.flush();
assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 0);
let sink = error_sink.0.lock().unwrap();
assert_eq!(sink.items.len(), 0);
// The stream shouldn't be closed when exit by a failure.
assert_eq!(sink.closed, false);
}

#[test]
fn test_flush() {
let mut mgr = super::CheckpointManager::default();
mgr.do_update(region(1, 32, 8), TimeStamp::new(8));
mgr.do_update(region(2, 34, 8), TimeStamp::new(15));
mgr.do_update(region(2, 35, 8), TimeStamp::new(16));
mgr.do_update(region(2, 35, 8), TimeStamp::new(14));
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. });

mgr.flush();
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::Ok { checkpoint , .. } if checkpoint.into_inner() == 8);
let r = mgr.get_from_region(RegionIdWithVersion::new(2, 35));
assert_matches::assert_matches!(r, GetCheckpointResult::Ok { checkpoint , .. } if checkpoint.into_inner() == 16);
mgr.flush();
let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32));
assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. });
}

>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541))
#[test]
fn test_mgr() {
let mut mgr = super::CheckpointManager::default();
Expand Down
21 changes: 21 additions & 0 deletions components/backup-stream/src/router.rs
Expand Up @@ -871,8 +871,13 @@ impl StreamTaskInfo {
// copying.
#[allow(clippy::map_entry)]
if !w.contains_key(&key) {
<<<<<<< HEAD
let path = self.temp_dir.join(key.temp_file_name());
let val = Mutex::new(DataFile::new(path, self.compression_type).await?);
=======
let path = key.temp_file_name();
let val = Mutex::new(DataFile::new(path, &self.temp_file_pool)?);
>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541))
w.insert(key, val);
}

Expand Down Expand Up @@ -1354,7 +1359,11 @@ impl MetadataInfo {
impl DataFile {
/// create and open a logfile at the path.
/// Note: if a file with same name exists, would truncate it.
<<<<<<< HEAD
async fn new(local_path: impl AsRef<Path>, compression_type: CompressionType) -> Result<Self> {
=======
fn new(local_path: impl AsRef<Path>, files: &Arc<TempFilePool>) -> Result<Self> {
>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541))
let sha256 = Hasher::new(MessageDigest::sha256())
.map_err(|err| Error::Other(box_err!("openssl hasher failed to init: {}", err)))?;
let inner =
Expand Down Expand Up @@ -2289,9 +2298,21 @@ mod tests {
let mut f = File::create(file_path.clone()).await?;
f.write_all("test-data".as_bytes()).await?;

<<<<<<< HEAD
let data_file = DataFile::new(file_path, CompressionType::Zstd)
.await
.unwrap();
=======
let file_name = format!("{}", uuid::Uuid::new_v4());
let file_path = Path::new(&file_name);
let tempfile = TempDir::new().unwrap();
let cfg = make_tempfiles_cfg(tempfile.path());
let pool = Arc::new(TempFilePool::new(cfg).unwrap());
let mut f = pool.open_for_write(file_path).unwrap();
f.write_all(b"test-data").await?;
f.done().await?;
let mut data_file = DataFile::new(file_path, &pool).unwrap();
>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541))
let info = DataFileInfo::new();

let mut meta = MetadataInfo::with_capacity(1);
Expand Down
124 changes: 123 additions & 1 deletion components/backup-stream/src/subscription_manager.rs
Expand Up @@ -457,7 +457,7 @@ where
let now = Instant::now();
let timedout = self.wait(Duration::from_secs(30)).await;
if timedout {
warn!("waiting for initial scanning done timed out, forcing progress!";
warn!("waiting for initial scanning done timed out, forcing progress!";
"take" => ?now.saturating_elapsed(), "timedout" => %timedout);
}
let regions = leader_checker
Expand All @@ -468,17 +468,128 @@ where
// If there isn't any region observed, the `min_ts` can be used as resolved ts
// safely.
let rts = min_region.map(|rs| rs.checkpoint).unwrap_or(min_ts);
<<<<<<< HEAD
info!("getting checkpoint"; "defined_by_region" => ?min_region);
self.subs.warn_if_gap_too_huge(rts);
callback(ResolvedRegions::new(
rts,
cps.into_iter().map(|r| (r.region, r.checkpoint)).collect(),
));
=======
if min_region
.map(|mr| mr.checkpoint_type != CheckpointType::MinTs)
.unwrap_or(false)
{
info!("getting non-trivial checkpoint"; "defined_by_region" => ?min_region);
}
callback(ResolvedRegions::new(rts, cps));
}
ObserveOp::HighMemUsageWarning { region_id } => {
self.on_high_memory_usage(region_id);
>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541))
}
}
}
}

<<<<<<< HEAD
=======
async fn on_observe_result(
&mut self,
region: Region,
handle: ObserveHandle,
err: Option<Box<Error>>,
) {
let err = match err {
None => {
self.failure_count.remove(&region.id);
let sub = self.subs.get_subscription_of(region.id);
if let Some(mut sub) = sub {
if sub.value().handle.id == handle.id {
sub.value_mut().resolver.phase_one_done();
}
}
return;
}
Some(err) => {
if !should_retry(&err) {
self.failure_count.remove(&region.id);
self.subs
.deregister_region_if(&region, |sub, _| sub.handle.id == handle.id);
return;
}
err
}
};

let region_id = region.id;
match self.retry_observe(region.clone(), handle).await {
Ok(has_resent_req) => {
if !has_resent_req {
self.failure_count.remove(&region_id);
}
}
Err(e) => {
self.issue_fatal_of(
&region,
e.context(format_args!(
"retry encountered error, origin error is {}",
err
)),
);
self.failure_count.remove(&region_id);
}
}
}

fn on_high_memory_usage(&mut self, inconsistent_region_id: u64) {
let mut lame_region = Region::new();
lame_region.set_id(inconsistent_region_id);
let mut act_region = None;
self.subs.deregister_region_if(&lame_region, |act, _| {
act_region = Some(act.meta.clone());
true
});
let delay = OOM_BACKOFF_BASE
+ Duration::from_secs(rand::thread_rng().gen_range(0..OOM_BACKOFF_JITTER_SECS));
info!("log backup triggering high memory usage.";
"region" => %inconsistent_region_id,
"mem_usage" => %self.memory_manager.used_ratio(),
"mem_max" => %self.memory_manager.capacity());
if let Some(region) = act_region {
self.schedule_start_observe(delay, region, None);
}
}

fn schedule_start_observe(
&self,
backoff: Duration,
region: Region,
handle: Option<ObserveHandle>,
) {
let tx = self.messenger.upgrade();
let region_id = region.id;
if tx.is_none() {
warn!(
"log backup subscription manager: cannot upgrade self-sender, are we shutting down?"
);
return;
}
let tx = tx.unwrap();
// tikv_util::Instant cannot be converted to std::time::Instant :(
let start = std::time::Instant::now();
let scheduled = async move {
tokio::time::sleep_until((start + backoff).into()).await;
let handle = handle.unwrap_or_else(|| ObserveHandle::new());
if let Err(err) = tx.send(ObserveOp::Start { region, handle }).await {
warn!("log backup failed to schedule start observe."; "err" => %err);
}
};
tokio::spawn(root!("scheduled_subscription"; scheduled; "after" = ?backoff, region_id));
}

#[instrument(skip_all, fields(id = region.id))]
>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541))
async fn refresh_resolver(&self, region: &Region) {
let need_refresh_all = !self.subs.try_update_region(region);

Expand Down Expand Up @@ -654,6 +765,17 @@ where
) {
self.subs
.register_region(region, handle.clone(), Some(last_checkpoint));
<<<<<<< HEAD
=======
let feedback_channel = match self.messenger.upgrade() {
Some(ch) => ch,
None => {
warn!("log backup subscription manager is shutting down, aborting new scan.";
utils::slog_region(region), "handle" => ?handle.id);
return;
}
};
>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541))
self.spawn_scan(ScanCmd {
region: region.clone(),
handle,
Expand Down

0 comments on commit 6d388a1

Please sign in to comment.