From fd5f88a7fdda1bf70dcb0d239f60137110c54d46 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 9 Jun 2023 17:56:47 +0800 Subject: [PATCH] backup-stream: don't close the server stream when encountered errors (#14432) (#14911) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit close tikv/tikv#14426, ref tikv/tikv#14432 Signed-off-by: hillium Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com> Co-authored-by: hillium Co-authored-by: Ti Chi Robot --- .../backup-stream/src/checkpoint_manager.rs | 247 +++++++++++++++--- components/backup-stream/src/service.rs | 7 +- 2 files changed, 211 insertions(+), 43 deletions(-) diff --git a/components/backup-stream/src/checkpoint_manager.rs b/components/backup-stream/src/checkpoint_manager.rs index 47ec34d2113..ddff9e3a11e 100644 --- a/components/backup-stream/src/checkpoint_manager.rs +++ b/components/backup-stream/src/checkpoint_manager.rs @@ -6,7 +6,7 @@ use futures::{ channel::mpsc::{self as async_mpsc, Receiver, Sender}, SinkExt, StreamExt, }; -use grpcio::{RpcStatus, RpcStatusCode, ServerStreamingSink, WriteFlags}; +use grpcio::{RpcStatus, RpcStatusCode, WriteFlags}; use kvproto::{ errorpb::{Error as PbError, *}, logbackuppb::{FlushEvent, SubscribeFlushEventResponse}, @@ -19,7 +19,7 @@ use uuid::Uuid; use crate::{ annotate, - errors::{Error, ReportableResult, Result}, + errors::{Error, Result}, future, metadata::{store::MetaStore, Checkpoint, CheckpointProvider, MetadataClient}, metrics, try_send, RegionCheckpointOperation, Task, @@ -46,9 +46,11 @@ impl std::fmt::Debug for CheckpointManager { enum SubscriptionOp { Add(Subscription), Emit(Box<[FlushEvent]>), + #[cfg(test)] + Inspect(Box), } -struct SubscriptionManager { +pub struct SubscriptionManager { subscribers: HashMap, input: Receiver, } @@ -63,50 +65,62 @@ impl SubscriptionManager { self.subscribers.insert(Uuid::new_v4(), sub); } SubscriptionOp::Emit(events) => { - let mut canceled = vec![]; - for (id, sub) in &mut self.subscribers { - let send_all = async { - for es in events.chunks(1024) { - let mut resp = SubscribeFlushEventResponse::new(); - resp.set_events(es.to_vec().into()); - sub.feed((resp, WriteFlags::default())).await?; - } - sub.flush().await - }; - - match send_all.await { - Err(grpcio::Error::RemoteStopped) => { - canceled.push(*id); - } - Err(err) => { - Error::from(err).report("sending subscription"); - } - _ => {} - } - } + self.emit_events(events).await; + } + #[cfg(test)] + SubscriptionOp::Inspect(f) => { + f(&self); + } + } + } + // NOTE: Maybe close all subscription streams here. + } - for c in canceled { - match self.subscribers.remove(&c) { - Some(mut sub) => { - info!("client is gone, removing subscription"; "id" => %c); - sub.close().await.report_if_err(format_args!( - "during removing subscription {}", - c - )) - } - None => { - warn!("BUG: the subscriber has been removed before we are going to remove it."; "id" => %c); - } - } - } + async fn emit_events(&mut self, events: Box<[FlushEvent]>) { + let mut canceled = vec![]; + info!("log backup sending events"; "event_len" => %events.len(), "downstream" => %self.subscribers.len()); + for (id, sub) in &mut self.subscribers { + let send_all = async { + for es in events.chunks(1024) { + let mut resp = SubscribeFlushEventResponse::new(); + resp.set_events(es.to_vec().into()); + sub.feed((resp, WriteFlags::default())).await?; } + sub.flush().await + }; + + if let Err(err) = send_all.await { + canceled.push(*id); + Error::from(err).report("sending subscription"); + } + } + + for c in canceled { + self.remove_subscription(&c).await; + } + } + + async fn remove_subscription(&mut self, id: &Uuid) { + match self.subscribers.remove(id) { + Some(sub) => { + info!("client is gone, removing subscription"; "id" => %id); + // The stream is an endless stream -- we don't need to close it. + drop(sub); + } + None => { + warn!("BUG: the subscriber has been removed before we are going to remove it."; "id" => %id); } } } } // Note: can we make it more generic...? -pub type Subscription = ServerStreamingSink; +#[cfg(not(test))] +pub type Subscription = + grpcio::ServerStreamingSink; + +#[cfg(test)] +pub type Subscription = tests::MockSink; /// The result of getting a checkpoint. /// The possibility of failed to getting checkpoint is pretty high: @@ -284,6 +298,29 @@ impl CheckpointManager { pub fn get_all(&self) -> Vec { self.items.values().cloned().collect() } + + #[cfg(test)] + fn sync_with_subs_mgr( + &mut self, + f: impl FnOnce(&SubscriptionManager) -> T + Send + 'static, + ) -> T { + use std::sync::Mutex; + + let (tx, rx) = std::sync::mpsc::sync_channel(1); + let t = Arc::new(Mutex::new(None)); + let tr = Arc::clone(&t); + self.manager_handle + .as_mut() + .unwrap() + .try_send(SubscriptionOp::Inspect(Box::new(move |x| { + *tr.lock().unwrap() = Some(f(x)); + tx.send(()).unwrap(); + }))) + .unwrap(); + rx.recv().unwrap(); + let mut t = t.lock().unwrap(); + t.take().unwrap() + } } fn not_leader(r: u64) -> PbError { @@ -477,12 +514,13 @@ pub mod tests { use std::{ assert_matches, collections::HashMap, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, time::Duration, }; - use futures::future::ok; - use kvproto::metapb::*; + use futures::{future::ok, Sink}; + use grpcio::{RpcStatus, RpcStatusCode}; + use kvproto::{logbackuppb::SubscribeFlushEventResponse, metapb::*}; use pd_client::{PdClient, PdFuture}; use txn_types::TimeStamp; @@ -499,6 +537,131 @@ pub mod tests { r } + #[derive(Clone)] + pub struct MockSink(Arc>); + + impl MockSink { + fn with_fail_once(code: RpcStatusCode) -> Self { + let mut failed = false; + let inner = MockSinkInner { + items: Vec::default(), + closed: false, + on_error: Box::new(move || { + if failed { + RpcStatusCode::OK + } else { + failed = true; + code + } + }), + }; + Self(Arc::new(Mutex::new(inner))) + } + + fn trivial() -> Self { + let inner = MockSinkInner { + items: Vec::default(), + closed: false, + on_error: Box::new(|| RpcStatusCode::OK), + }; + Self(Arc::new(Mutex::new(inner))) + } + + pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> { + panic!("failed in a case should never fail: {}", status); + } + } + + struct MockSinkInner { + items: Vec, + closed: bool, + on_error: Box grpcio::RpcStatusCode + Send>, + } + + impl Sink<(SubscribeFlushEventResponse, grpcio::WriteFlags)> for MockSink { + type Error = grpcio::Error; + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Ok(()).into() + } + + fn start_send( + self: std::pin::Pin<&mut Self>, + item: (SubscribeFlushEventResponse, grpcio::WriteFlags), + ) -> Result<(), Self::Error> { + let mut guard = self.0.lock().unwrap(); + let code = (guard.on_error)(); + if code != RpcStatusCode::OK { + return Err(grpcio::Error::RpcFailure(RpcStatus::new(code))); + } + guard.items.push(item.0); + Ok(()) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Ok(()).into() + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut guard = self.0.lock().unwrap(); + guard.closed = true; + Ok(()).into() + } + } + + fn simple_resolve_result() -> (Region, TimeStamp) { + let mut region = Region::new(); + region.set_id(42); + (region, 42.into()) + } + + #[test] + fn test_rpc_sub() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + let mut mgr = super::CheckpointManager::default(); + rt.spawn(mgr.spawn_subscription_mgr()); + + let trivial_sink = MockSink::trivial(); + rt.block_on(mgr.add_subscriber(trivial_sink.clone())) + .unwrap(); + + mgr.update_region_checkpoints(vec![simple_resolve_result()]); + mgr.sync_with_subs_mgr(|_| {}); + assert_eq!(trivial_sink.0.lock().unwrap().items.len(), 1); + } + + #[test] + fn test_rpc_failure() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + let mut mgr = super::CheckpointManager::default(); + rt.spawn(mgr.spawn_subscription_mgr()); + + let error_sink = MockSink::with_fail_once(RpcStatusCode::INTERNAL); + rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap(); + + mgr.update_region_checkpoints(vec![simple_resolve_result()]); + assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 0); + let sink = error_sink.0.lock().unwrap(); + assert_eq!(sink.items.len(), 0); + // The stream shouldn't be closed when exit by a failure. + assert_eq!(sink.closed, false); + } + #[test] fn test_mgr() { let mut mgr = super::CheckpointManager::default(); diff --git a/components/backup-stream/src/service.rs b/components/backup-stream/src/service.rs index 9d312a984d1..43d4ede2f27 100644 --- a/components/backup-stream/src/service.rs +++ b/components/backup-stream/src/service.rs @@ -94,8 +94,13 @@ impl LogBackup for Service { &mut self, _ctx: grpcio::RpcContext<'_>, _req: kvproto::logbackuppb::SubscribeFlushEventRequest, - sink: grpcio::ServerStreamingSink, + #[allow(unused_variables)] sink: grpcio::ServerStreamingSink< + kvproto::logbackuppb::SubscribeFlushEventResponse, + >, ) { + #[cfg(test)] + panic!("Service should not be used in an unit test"); + #[cfg(not(test))] try_send!( self.endpoint, Task::RegionCheckpointsOp(RegionCheckpointOperation::Subscribe(sink))