Skip to content

Commit

Permalink
snap_backup: abort last connection of preparing while there are many (#…
Browse files Browse the repository at this point in the history
…16388)

close #16382

Now, a newly established prepare disk snapshot backup stream will abort the former one.

Signed-off-by: Yu Juncen <yu745514916@live.com>
  • Loading branch information
YuJuncen committed Jan 25, 2024
1 parent 8780c04 commit 5cf15aa
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 20 deletions.
46 changes: 32 additions & 14 deletions components/backup/src/disk_snap.rs
Expand Up @@ -2,6 +2,7 @@
//! This module contains things about disk snapshot.

use std::{
future::Pending,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand All @@ -14,9 +15,9 @@ use futures::future;
use futures_util::{
future::{BoxFuture, FutureExt},
sink::SinkExt,
stream::StreamExt,
stream::{AbortHandle, Abortable, StreamExt},
};
use grpcio::{RpcStatus, WriteFlags};
use grpcio::{RpcStatus, RpcStatusCode, WriteFlags};
use kvproto::{
brpb::{
PrepareSnapshotBackupEventType as PEvnT, PrepareSnapshotBackupRequest as PReq,
Expand Down Expand Up @@ -206,6 +207,7 @@ impl<SR: SnapshotBrHandle> Env<SR> {
pub struct StreamHandleLoop<SR: SnapshotBrHandle + 'static> {
pending_regions: Vec<BoxFuture<'static, (Region, Result<()>)>>,
env: Env<SR>,
aborted: Abortable<Pending<()>>,
}

impl<SR: SnapshotBrHandle + 'static> Drop for StreamHandleLoop<SR> {
Expand All @@ -218,15 +220,19 @@ enum StreamHandleEvent {
Req(PReq),
WaitApplyDone(Region, Result<()>),
ConnectionGone(Option<grpcio::Error>),
Abort,
}

impl<SR: SnapshotBrHandle + 'static> StreamHandleLoop<SR> {
pub fn new(env: Env<SR>) -> Self {
pub fn new(env: Env<SR>) -> (Self, AbortHandle) {
let (aborted, handle) = futures_util::future::abortable(std::future::pending());
env.active_stream.fetch_add(1, Ordering::SeqCst);
Self {
let this = Self {
env,
aborted,
pending_regions: vec![],
}
};
(this, handle)
}

fn async_wait_apply(&mut self, region: &Region) -> BoxFuture<'static, (Region, Result<()>)> {
Expand Down Expand Up @@ -261,20 +267,19 @@ impl<SR: SnapshotBrHandle + 'static> StreamHandleLoop<SR> {
&mut self,
input: &mut (impl Stream<Item = grpcio::Result<PReq>> + Unpin),
) -> StreamHandleEvent {
let pending_regions = &mut self.pending_regions;
let wait_applies = future::poll_fn(|cx| {
let selected =
self.pending_regions
.iter_mut()
.enumerate()
.find_map(|(i, fut)| match fut.poll_unpin(cx) {
Poll::Ready(r) => Some((i, r)),
Poll::Pending => None,
});
let selected = pending_regions.iter_mut().enumerate().find_map(|(i, fut)| {
match fut.poll_unpin(cx) {
Poll::Ready(r) => Some((i, r)),
Poll::Pending => None,
}
});
match selected {
Some((i, region)) => {
// We have polled the future (and make sure it has ready) before, it is
// safe to drop this future directly.
let _ = self.pending_regions.swap_remove(i);
let _ = pending_regions.swap_remove(i);
region.into()
}
None => Poll::Pending,
Expand All @@ -292,6 +297,9 @@ impl<SR: SnapshotBrHandle + 'static> StreamHandleLoop<SR> {
None => StreamHandleEvent::ConnectionGone(None)
}
}
_ = &mut self.aborted => {
StreamHandleEvent::Abort
}
}
}

Expand Down Expand Up @@ -348,6 +356,16 @@ impl<SR: SnapshotBrHandle + 'static> StreamHandleLoop<SR> {
Some(err) => Err(err),
};
}
StreamHandleEvent::Abort => {
warn!("Aborted disk snapshot prepare loop by the server.");
return sink
.0
.fail(RpcStatus::with_message(
RpcStatusCode::CANCELLED,
"the loop has been aborted by server".to_string(),
))
.await;
}
}
}
}
Expand Down
22 changes: 17 additions & 5 deletions components/backup/src/service.rs
@@ -1,8 +1,9 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::atomic::*;
use std::sync::{atomic::*, Arc, Mutex};

use futures::{channel::mpsc, FutureExt, SinkExt, StreamExt, TryFutureExt};
use futures_util::stream::AbortHandle;
use grpcio::{self, *};
use kvproto::brpb::*;
use raftstore::store::snapshot_backup::SnapshotBrHandle;
Expand All @@ -16,6 +17,7 @@ use crate::disk_snap::{self, StreamHandleLoop};
pub struct Service<H: SnapshotBrHandle> {
scheduler: Scheduler<Task>,
snap_br_env: disk_snap::Env<H>,
abort_last_req: Arc<Mutex<Option<AbortHandle>>>,
}

impl<H> Service<H>
Expand All @@ -27,6 +29,7 @@ where
Service {
scheduler,
snap_br_env: env,
abort_last_req: Arc::default(),
}
}
}
Expand Down Expand Up @@ -147,17 +150,26 @@ where
stream: grpcio::RequestStream<PrepareSnapshotBackupRequest>,
sink: grpcio::DuplexSink<PrepareSnapshotBackupResponse>,
) {
let l = StreamHandleLoop::new(self.snap_br_env.clone());
let (l, new_cancel) = StreamHandleLoop::new(self.snap_br_env.clone());
let peer = ctx.peer();
// Note: should we disconnect here once there are more than one stream...?
// Generally once two streams enter here, one may exit
info!("A new prepare snapshot backup stream created!";
"peer" => %ctx.peer(),
"peer" => %peer,
"stream_count" => %self.snap_br_env.active_stream(),
);
let abort_last_req = self.abort_last_req.clone();
self.snap_br_env.get_async_runtime().spawn(async move {
if let Err(err) = l.run(stream, sink.into()).await {
warn!("stream closed; perhaps a problem cannot be retried happens"; "reason" => ?err);
{
let mut lock = abort_last_req.lock().unwrap();
if let Some(cancel) = &*lock {
cancel.abort();
}
*lock = Some(new_cancel);
}
let res = l.run(stream, sink.into()).await;
info!("stream closed; probably everything is done or a problem cannot be retried happens";
"result" => ?res, "peer" => %peer);
});
}
}
Expand Down
6 changes: 5 additions & 1 deletion components/test_backup/src/disk_snap.rs
Expand Up @@ -208,7 +208,11 @@ impl PrepareBackup {
}

pub fn next(&mut self) -> PrepareSnapshotBackupResponse {
block_on(self.rx.next()).unwrap().unwrap()
self.try_next().unwrap()
}

pub fn try_next(&mut self) -> grpcio::Result<PrepareSnapshotBackupResponse> {
block_on(self.rx.next()).unwrap()
}
}

Expand Down
12 changes: 12 additions & 0 deletions tests/integrations/backup/disk_snap.rs
Expand Up @@ -107,6 +107,18 @@ fn test_prepare_merge() {
assert_failure(&resp);
}

#[test]
fn test_abort_last_one() {
let suite = Suite::new(1);
let mut call = suite.prepare_backup(1);
call.prepare(10);
let mut call2 = suite.prepare_backup(1);
call2.prepare(10);
let should_err = call.try_next();
assert!(should_err.is_err(), "{:?}", should_err);
assert!(call2.send_finalize());
}

#[test]
fn test_wait_apply() {
let mut suite = Suite::new(3);
Expand Down

0 comments on commit 5cf15aa

Please sign in to comment.