Skip to content

Commit

Permalink
backup-stream: don't close the server stream when encountered errors (#…
Browse files Browse the repository at this point in the history
…14432) (#14911)

close #14426, ref #14432

Signed-off-by: hillium <yujuncen@pingcap.com>

Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com>
Co-authored-by: hillium <yujuncen@pingcap.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people committed Jun 9, 2023
1 parent 6ad9d09 commit fd5f88a
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 43 deletions.
247 changes: 205 additions & 42 deletions components/backup-stream/src/checkpoint_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::{
channel::mpsc::{self as async_mpsc, Receiver, Sender},
SinkExt, StreamExt,
};
use grpcio::{RpcStatus, RpcStatusCode, ServerStreamingSink, WriteFlags};
use grpcio::{RpcStatus, RpcStatusCode, WriteFlags};
use kvproto::{
errorpb::{Error as PbError, *},
logbackuppb::{FlushEvent, SubscribeFlushEventResponse},
Expand All @@ -19,7 +19,7 @@ use uuid::Uuid;

use crate::{
annotate,
errors::{Error, ReportableResult, Result},
errors::{Error, Result},
future,
metadata::{store::MetaStore, Checkpoint, CheckpointProvider, MetadataClient},
metrics, try_send, RegionCheckpointOperation, Task,
Expand All @@ -46,9 +46,11 @@ impl std::fmt::Debug for CheckpointManager {
enum SubscriptionOp {
Add(Subscription),
Emit(Box<[FlushEvent]>),
#[cfg(test)]
Inspect(Box<dyn FnOnce(&SubscriptionManager) + Send>),
}

struct SubscriptionManager {
pub struct SubscriptionManager {
subscribers: HashMap<Uuid, Subscription>,
input: Receiver<SubscriptionOp>,
}
Expand All @@ -63,50 +65,62 @@ impl SubscriptionManager {
self.subscribers.insert(Uuid::new_v4(), sub);
}
SubscriptionOp::Emit(events) => {
let mut canceled = vec![];
for (id, sub) in &mut self.subscribers {
let send_all = async {
for es in events.chunks(1024) {
let mut resp = SubscribeFlushEventResponse::new();
resp.set_events(es.to_vec().into());
sub.feed((resp, WriteFlags::default())).await?;
}
sub.flush().await
};

match send_all.await {
Err(grpcio::Error::RemoteStopped) => {
canceled.push(*id);
}
Err(err) => {
Error::from(err).report("sending subscription");
}
_ => {}
}
}
self.emit_events(events).await;
}
#[cfg(test)]
SubscriptionOp::Inspect(f) => {
f(&self);
}
}
}
// NOTE: Maybe close all subscription streams here.
}

for c in canceled {
match self.subscribers.remove(&c) {
Some(mut sub) => {
info!("client is gone, removing subscription"; "id" => %c);
sub.close().await.report_if_err(format_args!(
"during removing subscription {}",
c
))
}
None => {
warn!("BUG: the subscriber has been removed before we are going to remove it."; "id" => %c);
}
}
}
async fn emit_events(&mut self, events: Box<[FlushEvent]>) {
let mut canceled = vec![];
info!("log backup sending events"; "event_len" => %events.len(), "downstream" => %self.subscribers.len());
for (id, sub) in &mut self.subscribers {
let send_all = async {
for es in events.chunks(1024) {
let mut resp = SubscribeFlushEventResponse::new();
resp.set_events(es.to_vec().into());
sub.feed((resp, WriteFlags::default())).await?;
}
sub.flush().await
};

if let Err(err) = send_all.await {
canceled.push(*id);
Error::from(err).report("sending subscription");
}
}

for c in canceled {
self.remove_subscription(&c).await;
}
}

async fn remove_subscription(&mut self, id: &Uuid) {
match self.subscribers.remove(id) {
Some(sub) => {
info!("client is gone, removing subscription"; "id" => %id);
// The stream is an endless stream -- we don't need to close it.
drop(sub);
}
None => {
warn!("BUG: the subscriber has been removed before we are going to remove it."; "id" => %id);
}
}
}
}

// Note: can we make it more generic...?
pub type Subscription = ServerStreamingSink<kvproto::logbackuppb::SubscribeFlushEventResponse>;
#[cfg(not(test))]
pub type Subscription =
grpcio::ServerStreamingSink<kvproto::logbackuppb::SubscribeFlushEventResponse>;

#[cfg(test)]
pub type Subscription = tests::MockSink;

/// The result of getting a checkpoint.
/// The possibility of failed to getting checkpoint is pretty high:
Expand Down Expand Up @@ -284,6 +298,29 @@ impl CheckpointManager {
pub fn get_all(&self) -> Vec<LastFlushTsOfRegion> {
self.items.values().cloned().collect()
}

#[cfg(test)]
fn sync_with_subs_mgr<T: Send + 'static>(
&mut self,
f: impl FnOnce(&SubscriptionManager) -> T + Send + 'static,
) -> T {
use std::sync::Mutex;

let (tx, rx) = std::sync::mpsc::sync_channel(1);
let t = Arc::new(Mutex::new(None));
let tr = Arc::clone(&t);
self.manager_handle
.as_mut()
.unwrap()
.try_send(SubscriptionOp::Inspect(Box::new(move |x| {
*tr.lock().unwrap() = Some(f(x));
tx.send(()).unwrap();
})))
.unwrap();
rx.recv().unwrap();
let mut t = t.lock().unwrap();
t.take().unwrap()
}
}

fn not_leader(r: u64) -> PbError {
Expand Down Expand Up @@ -477,12 +514,13 @@ pub mod tests {
use std::{
assert_matches,
collections::HashMap,
sync::{Arc, RwLock},
sync::{Arc, Mutex, RwLock},
time::Duration,
};

use futures::future::ok;
use kvproto::metapb::*;
use futures::{future::ok, Sink};
use grpcio::{RpcStatus, RpcStatusCode};
use kvproto::{logbackuppb::SubscribeFlushEventResponse, metapb::*};
use pd_client::{PdClient, PdFuture};
use txn_types::TimeStamp;

Expand All @@ -499,6 +537,131 @@ pub mod tests {
r
}

#[derive(Clone)]
pub struct MockSink(Arc<Mutex<MockSinkInner>>);

impl MockSink {
fn with_fail_once(code: RpcStatusCode) -> Self {
let mut failed = false;
let inner = MockSinkInner {
items: Vec::default(),
closed: false,
on_error: Box::new(move || {
if failed {
RpcStatusCode::OK
} else {
failed = true;
code
}
}),
};
Self(Arc::new(Mutex::new(inner)))
}

fn trivial() -> Self {
let inner = MockSinkInner {
items: Vec::default(),
closed: false,
on_error: Box::new(|| RpcStatusCode::OK),
};
Self(Arc::new(Mutex::new(inner)))
}

pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> {
panic!("failed in a case should never fail: {}", status);
}
}

struct MockSinkInner {
items: Vec<SubscribeFlushEventResponse>,
closed: bool,
on_error: Box<dyn FnMut() -> grpcio::RpcStatusCode + Send>,
}

impl Sink<(SubscribeFlushEventResponse, grpcio::WriteFlags)> for MockSink {
type Error = grpcio::Error;

fn poll_ready(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Ok(()).into()
}

fn start_send(
self: std::pin::Pin<&mut Self>,
item: (SubscribeFlushEventResponse, grpcio::WriteFlags),
) -> Result<(), Self::Error> {
let mut guard = self.0.lock().unwrap();
let code = (guard.on_error)();
if code != RpcStatusCode::OK {
return Err(grpcio::Error::RpcFailure(RpcStatus::new(code)));
}
guard.items.push(item.0);
Ok(())
}

fn poll_flush(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Ok(()).into()
}

fn poll_close(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
let mut guard = self.0.lock().unwrap();
guard.closed = true;
Ok(()).into()
}
}

fn simple_resolve_result() -> (Region, TimeStamp) {
let mut region = Region::new();
region.set_id(42);
(region, 42.into())
}

#[test]
fn test_rpc_sub() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();
let mut mgr = super::CheckpointManager::default();
rt.spawn(mgr.spawn_subscription_mgr());

let trivial_sink = MockSink::trivial();
rt.block_on(mgr.add_subscriber(trivial_sink.clone()))
.unwrap();

mgr.update_region_checkpoints(vec![simple_resolve_result()]);
mgr.sync_with_subs_mgr(|_| {});
assert_eq!(trivial_sink.0.lock().unwrap().items.len(), 1);
}

#[test]
fn test_rpc_failure() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();
let mut mgr = super::CheckpointManager::default();
rt.spawn(mgr.spawn_subscription_mgr());

let error_sink = MockSink::with_fail_once(RpcStatusCode::INTERNAL);
rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap();

mgr.update_region_checkpoints(vec![simple_resolve_result()]);
assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 0);
let sink = error_sink.0.lock().unwrap();
assert_eq!(sink.items.len(), 0);
// The stream shouldn't be closed when exit by a failure.
assert_eq!(sink.closed, false);
}

#[test]
fn test_mgr() {
let mut mgr = super::CheckpointManager::default();
Expand Down
7 changes: 6 additions & 1 deletion components/backup-stream/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,13 @@ impl LogBackup for Service {
&mut self,
_ctx: grpcio::RpcContext<'_>,
_req: kvproto::logbackuppb::SubscribeFlushEventRequest,
sink: grpcio::ServerStreamingSink<kvproto::logbackuppb::SubscribeFlushEventResponse>,
#[allow(unused_variables)] sink: grpcio::ServerStreamingSink<
kvproto::logbackuppb::SubscribeFlushEventResponse,
>,
) {
#[cfg(test)]
panic!("Service should not be used in an unit test");
#[cfg(not(test))]
try_send!(
self.endpoint,
Task::RegionCheckpointsOp(RegionCheckpointOperation::Subscribe(sink))
Expand Down

0 comments on commit fd5f88a

Please sign in to comment.