From 5cf15aacef1df5af22a29637ee33b742e02ec2da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Thu, 25 Jan 2024 15:43:51 +0800 Subject: [PATCH] snap_backup: abort last connection of preparing while there are many (#16388) close tikv/tikv#16382 Now, a newly established prepare disk snapshot backup stream will abort the former one. Signed-off-by: Yu Juncen --- components/backup/src/disk_snap.rs | 46 +++++++++++++++++-------- components/backup/src/service.rs | 22 +++++++++--- components/test_backup/src/disk_snap.rs | 6 +++- tests/integrations/backup/disk_snap.rs | 12 +++++++ 4 files changed, 66 insertions(+), 20 deletions(-) diff --git a/components/backup/src/disk_snap.rs b/components/backup/src/disk_snap.rs index 27c5b2e2b19..94d956cc11c 100644 --- a/components/backup/src/disk_snap.rs +++ b/components/backup/src/disk_snap.rs @@ -2,6 +2,7 @@ //! This module contains things about disk snapshot. use std::{ + future::Pending, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -14,9 +15,9 @@ use futures::future; use futures_util::{ future::{BoxFuture, FutureExt}, sink::SinkExt, - stream::StreamExt, + stream::{AbortHandle, Abortable, StreamExt}, }; -use grpcio::{RpcStatus, WriteFlags}; +use grpcio::{RpcStatus, RpcStatusCode, WriteFlags}; use kvproto::{ brpb::{ PrepareSnapshotBackupEventType as PEvnT, PrepareSnapshotBackupRequest as PReq, @@ -206,6 +207,7 @@ impl Env { pub struct StreamHandleLoop { pending_regions: Vec)>>, env: Env, + aborted: Abortable>, } impl Drop for StreamHandleLoop { @@ -218,15 +220,19 @@ enum StreamHandleEvent { Req(PReq), WaitApplyDone(Region, Result<()>), ConnectionGone(Option), + Abort, } impl StreamHandleLoop { - pub fn new(env: Env) -> Self { + pub fn new(env: Env) -> (Self, AbortHandle) { + let (aborted, handle) = futures_util::future::abortable(std::future::pending()); env.active_stream.fetch_add(1, Ordering::SeqCst); - Self { + let this = Self { env, + aborted, pending_regions: vec![], - } + }; + (this, handle) } fn async_wait_apply(&mut self, region: &Region) -> BoxFuture<'static, (Region, Result<()>)> { @@ -261,20 +267,19 @@ impl StreamHandleLoop { &mut self, input: &mut (impl Stream> + Unpin), ) -> StreamHandleEvent { + let pending_regions = &mut self.pending_regions; let wait_applies = future::poll_fn(|cx| { - let selected = - self.pending_regions - .iter_mut() - .enumerate() - .find_map(|(i, fut)| match fut.poll_unpin(cx) { - Poll::Ready(r) => Some((i, r)), - Poll::Pending => None, - }); + let selected = pending_regions.iter_mut().enumerate().find_map(|(i, fut)| { + match fut.poll_unpin(cx) { + Poll::Ready(r) => Some((i, r)), + Poll::Pending => None, + } + }); match selected { Some((i, region)) => { // We have polled the future (and make sure it has ready) before, it is // safe to drop this future directly. - let _ = self.pending_regions.swap_remove(i); + let _ = pending_regions.swap_remove(i); region.into() } None => Poll::Pending, @@ -292,6 +297,9 @@ impl StreamHandleLoop { None => StreamHandleEvent::ConnectionGone(None) } } + _ = &mut self.aborted => { + StreamHandleEvent::Abort + } } } @@ -348,6 +356,16 @@ impl StreamHandleLoop { Some(err) => Err(err), }; } + StreamHandleEvent::Abort => { + warn!("Aborted disk snapshot prepare loop by the server."); + return sink + .0 + .fail(RpcStatus::with_message( + RpcStatusCode::CANCELLED, + "the loop has been aborted by server".to_string(), + )) + .await; + } } } } diff --git a/components/backup/src/service.rs b/components/backup/src/service.rs index 04d996944a4..7e38093df53 100644 --- a/components/backup/src/service.rs +++ b/components/backup/src/service.rs @@ -1,8 +1,9 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. -use std::sync::atomic::*; +use std::sync::{atomic::*, Arc, Mutex}; use futures::{channel::mpsc, FutureExt, SinkExt, StreamExt, TryFutureExt}; +use futures_util::stream::AbortHandle; use grpcio::{self, *}; use kvproto::brpb::*; use raftstore::store::snapshot_backup::SnapshotBrHandle; @@ -16,6 +17,7 @@ use crate::disk_snap::{self, StreamHandleLoop}; pub struct Service { scheduler: Scheduler, snap_br_env: disk_snap::Env, + abort_last_req: Arc>>, } impl Service @@ -27,6 +29,7 @@ where Service { scheduler, snap_br_env: env, + abort_last_req: Arc::default(), } } } @@ -147,17 +150,26 @@ where stream: grpcio::RequestStream, sink: grpcio::DuplexSink, ) { - let l = StreamHandleLoop::new(self.snap_br_env.clone()); + let (l, new_cancel) = StreamHandleLoop::new(self.snap_br_env.clone()); + let peer = ctx.peer(); // Note: should we disconnect here once there are more than one stream...? // Generally once two streams enter here, one may exit info!("A new prepare snapshot backup stream created!"; - "peer" => %ctx.peer(), + "peer" => %peer, "stream_count" => %self.snap_br_env.active_stream(), ); + let abort_last_req = self.abort_last_req.clone(); self.snap_br_env.get_async_runtime().spawn(async move { - if let Err(err) = l.run(stream, sink.into()).await { - warn!("stream closed; perhaps a problem cannot be retried happens"; "reason" => ?err); + { + let mut lock = abort_last_req.lock().unwrap(); + if let Some(cancel) = &*lock { + cancel.abort(); + } + *lock = Some(new_cancel); } + let res = l.run(stream, sink.into()).await; + info!("stream closed; probably everything is done or a problem cannot be retried happens"; + "result" => ?res, "peer" => %peer); }); } } diff --git a/components/test_backup/src/disk_snap.rs b/components/test_backup/src/disk_snap.rs index aa1c94f8e5e..c252f68d09d 100644 --- a/components/test_backup/src/disk_snap.rs +++ b/components/test_backup/src/disk_snap.rs @@ -208,7 +208,11 @@ impl PrepareBackup { } pub fn next(&mut self) -> PrepareSnapshotBackupResponse { - block_on(self.rx.next()).unwrap().unwrap() + self.try_next().unwrap() + } + + pub fn try_next(&mut self) -> grpcio::Result { + block_on(self.rx.next()).unwrap() } } diff --git a/tests/integrations/backup/disk_snap.rs b/tests/integrations/backup/disk_snap.rs index bdef242b1a1..23a61a937e9 100644 --- a/tests/integrations/backup/disk_snap.rs +++ b/tests/integrations/backup/disk_snap.rs @@ -107,6 +107,18 @@ fn test_prepare_merge() { assert_failure(&resp); } +#[test] +fn test_abort_last_one() { + let suite = Suite::new(1); + let mut call = suite.prepare_backup(1); + call.prepare(10); + let mut call2 = suite.prepare_backup(1); + call2.prepare(10); + let should_err = call.try_next(); + assert!(should_err.is_err(), "{:?}", should_err); + assert!(call2.send_finalize()); +} + #[test] fn test_wait_apply() { let mut suite = Suite::new(3);