Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup-stream: don't close the server stream when encountered errors (#14432) #14441

Merged

Conversation

ti-chi-bot
Copy link
Member

This is an automated cherry-pick of #14432

What is changed and how it works?

Issue Number: Close #14426

What's Changed:
This PR make us won't close the grpc server stream when encountered some errors.

Check List

Tests

  • Manual test (add detailed scripts or steps below)
    There is a tricky unit test added, these tests can pass, however I'm not sure whether they should be added to the code.
Details
diff --git a/components/backup-stream/src/checkpoint_manager.rs b/components/backup-stream/src/checkpoint_manager.rs
index 1191b6010..9d0ac9ea7 100644
--- a/components/backup-stream/src/checkpoint_manager.rs
+++ b/components/backup-stream/src/checkpoint_manager.rs
@@ -51,10 +51,12 @@ impl std::fmt::Debug for CheckpointManager {
 enum SubscriptionOp {
     Add(Subscription),
     Emit(Box<[FlushEvent]>),
+    #[cfg(test)]
+    Inspect(Box<dyn FnOnce(&SubscriptionManager) + Send>),
 }
 
-struct SubscriptionManager {
-    subscribers: HashMap<Uuid, Subscription>,
+pub(crate) struct SubscriptionManager {
+    pub(crate) subscribers: HashMap<Uuid, Subscription>,
     input: Receiver<SubscriptionOp>,
 }
 
@@ -72,8 +74,13 @@ impl SubscriptionManager {
                 SubscriptionOp::Emit(events) => {
                     self.emit_events(events).await;
                 }
+                #[cfg(test)]
+                SubscriptionOp::Inspect(f) => {
+                    f(&self);
+                }
             }
         }
+        // NOTE: Maybe close all subscription streams here.
     }
 
     async fn emit_events(&mut self, events: Box<[FlushEvent]>) {
@@ -121,8 +128,12 @@ impl SubscriptionManager {
 }
 
 // Note: can we make it more generic...?
+#[cfg(not(test))]
 pub type Subscription = ServerStreamingSink<kvproto::logbackuppb::SubscribeFlushEventResponse>;
 
+#[cfg(test)]
+pub type Subscription = tests::MockSink;
+
 /// The result of getting a checkpoint.
 /// The possibility of failed to getting checkpoint is pretty high:
 /// because there is a gap between region leader change and flushing.
@@ -326,6 +337,29 @@ impl CheckpointManager {
     pub fn get_resolved_ts(&self) -> Option<TimeStamp> {
         self.resolved_ts.values().map(|x| x.checkpoint).min()
     }
+
+    #[cfg(test)]
+    pub(crate) fn sync_with_subs_mgr<T: Send + 'static>(
+        &mut self,
+        f: impl FnOnce(&SubscriptionManager) -> T + Send + 'static,
+    ) -> T {
+        use std::sync::Mutex;
+
+        let (tx, rx) = std::sync::mpsc::sync_channel(1);
+        let t = Arc::new(Mutex::new(None));
+        let tr = Arc::clone(&t);
+        self.manager_handle
+            .as_mut()
+            .unwrap()
+            .try_send(SubscriptionOp::Inspect(Box::new(move |x| {
+                *tr.lock().unwrap() = Some(f(x));
+                tx.send(()).unwrap();
+            })))
+            .unwrap();
+        rx.recv().unwrap();
+        let mut t = t.lock().unwrap();
+        t.take().unwrap()
+    }
 }
 
 fn not_leader(r: u64) -> PbError {
@@ -525,17 +559,21 @@ pub mod tests {
     use std::{
         assert_matches,
         collections::HashMap,
-        sync::{Arc, RwLock},
+        sync::{Arc, Mutex, RwLock},
         time::Duration,
     };
 
-    use futures::future::ok;
-    use kvproto::metapb::*;
+    use futures::{future::ok, Sink};
+    use grpcio::{RpcStatus, RpcStatusCode};
+    use kvproto::{logbackuppb::SubscribeFlushEventResponse, metapb::*};
     use pd_client::{PdClient, PdFuture};
     use txn_types::TimeStamp;
 
     use super::{BasicFlushObserver, FlushObserver, RegionIdWithVersion};
-    use crate::GetCheckpointResult;
+    use crate::{
+        subscription_track::{CheckpointType, ResolveResult},
+        GetCheckpointResult,
+    };
 
     fn region(id: u64, version: u64, conf_version: u64) -> Region {
         let mut r = Region::new();
@@ -547,6 +585,158 @@ pub mod tests {
         r
     }
 
+    #[derive(Clone)]
+    pub struct MockSink(Arc<Mutex<MockSinkInner>>);
+
+    impl MockSink {
+        fn with_fail_once(code: RpcStatusCode) -> Self {
+            let mut failed = false;
+            let inner = MockSinkInner {
+                items: Vec::default(),
+                closed: false,
+                on_error: Box::new(move || {
+                    if failed {
+                        RpcStatusCode::OK
+                    } else {
+                        failed = true;
+                        code
+                    }
+                }),
+            };
+            Self(Arc::new(Mutex::new(inner)))
+        }
+
+        fn trivial() -> Self {
+            let inner = MockSinkInner {
+                items: Vec::default(),
+                closed: false,
+                on_error: Box::new(|| RpcStatusCode::OK),
+            };
+            Self(Arc::new(Mutex::new(inner)))
+        }
+
+        pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> {
+            panic!("failed in a case should never fail: {}", status);
+        }
+    }
+
+    struct MockSinkInner {
+        items: Vec<SubscribeFlushEventResponse>,
+        closed: bool,
+        on_error: Box<dyn FnMut() -> grpcio::RpcStatusCode + Send>,
+    }
+
+    impl Sink<(SubscribeFlushEventResponse, grpcio::WriteFlags)> for MockSink {
+        type Error = grpcio::Error;
+
+        fn poll_ready(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Result<(), Self::Error>> {
+            Ok(()).into()
+        }
+
+        fn start_send(
+            self: std::pin::Pin<&mut Self>,
+            item: (SubscribeFlushEventResponse, grpcio::WriteFlags),
+        ) -> Result<(), Self::Error> {
+            let mut guard = self.0.lock().unwrap();
+            let code = (guard.on_error)();
+            if code != RpcStatusCode::OK {
+                return Err(grpcio::Error::RpcFailure(RpcStatus::new(code)));
+            }
+            guard.items.push(item.0);
+            Ok(())
+        }
+
+        fn poll_flush(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Result<(), Self::Error>> {
+            Ok(()).into()
+        }
+
+        fn poll_close(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Result<(), Self::Error>> {
+            let mut guard = self.0.lock().unwrap();
+            guard.closed = true;
+            Ok(()).into()
+        }
+    }
+
+    fn simple_resolve_result() -> ResolveResult {
+        let mut region = Region::new();
+        region.set_id(42);
+        ResolveResult {
+            region,
+            checkpoint: 42.into(),
+            checkpoint_type: CheckpointType::MinTs,
+        }
+    }
+
+    #[test]
+    fn test_rpc_sub() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+        let mut mgr = super::CheckpointManager::default();
+        rt.spawn(mgr.spawn_subscription_mgr());
+
+        let trivial_sink = MockSink::trivial();
+        rt.block_on(mgr.add_subscriber(trivial_sink.clone()))
+            .unwrap();
+
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        mgr.sync_with_subs_mgr(|_| {});
+        assert_eq!(trivial_sink.0.lock().unwrap().items.len(), 1);
+    }
+
+    #[test]
+    fn test_rpc_retry() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+        let mut mgr = super::CheckpointManager::default();
+        rt.spawn(mgr.spawn_subscription_mgr());
+
+        let error_sink = MockSink::with_fail_once(RpcStatusCode::UNAVAILABLE);
+        rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap();
+
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 1);
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        mgr.sync_with_subs_mgr(|_| {});
+        assert_eq!(error_sink.0.lock().unwrap().items.len(), 1);
+    }
+
+    #[test]
+    fn test_rpc_failure() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+        let mut mgr = super::CheckpointManager::default();
+        rt.spawn(mgr.spawn_subscription_mgr());
+
+        let error_sink = MockSink::with_fail_once(RpcStatusCode::INTERNAL);
+        rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap();
+
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 0);
+        let sink = error_sink.0.lock().unwrap();
+        assert_eq!(sink.items.len(), 0);
+        // The stream shouldn't be closed when exit by a failure.
+        assert_eq!(sink.closed, false);
+    }
+
     #[test]
     fn test_flush() {
         let mut mgr = super::CheckpointManager::default();
diff --git a/components/backup-stream/src/service.rs b/components/backup-stream/src/service.rs
index 9d312a984..43d4ede2f 100644
--- a/components/backup-stream/src/service.rs
+++ b/components/backup-stream/src/service.rs
@@ -94,8 +94,13 @@ impl LogBackup for Service {
         &mut self,
         _ctx: grpcio::RpcContext<'_>,
         _req: kvproto::logbackuppb::SubscribeFlushEventRequest,
-        sink: grpcio::ServerStreamingSink<kvproto::logbackuppb::SubscribeFlushEventResponse>,
+        #[allow(unused_variables)] sink: grpcio::ServerStreamingSink<
+            kvproto::logbackuppb::SubscribeFlushEventResponse,
+        >,
     ) {
+        #[cfg(test)]
+        panic!("Service should not be used in an unit test");
+        #[cfg(not(test))]
         try_send!(
             self.endpoint,
             Task::RegionCheckpointsOp(RegionCheckpointOperation::Subscribe(sink))

Release note

None

This is a fix over bug in master branch.

Signed-off-by: hillium <yujuncen@pingcap.com>
Signed-off-by: hillium <yujuncen@pingcap.com>
Signed-off-by: hillium <yujuncen@pingcap.com>
Signed-off-by: hillium <yujuncen@pingcap.com>
Signed-off-by: hillium <yujuncen@pingcap.com>
Signed-off-by: hillium <yujuncen@pingcap.com>
@ti-chi-bot
Copy link
Member Author

ti-chi-bot commented Mar 22, 2023

[REVIEW NOTIFICATION]

This pull request has been approved by:

  • YuJuncen
  • tonyxuqqi

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

@ti-chi-bot ti-chi-bot added do-not-merge/cherry-pick-not-approved release-note-none Denotes a PR that doesn't merit a release note. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. type/cherry-pick-for-release-7.0 labels Mar 22, 2023
@VelocityLight VelocityLight added the cherry-pick-approved Cherry pick PR approved by release team. label Mar 22, 2023
@YuJuncen
Copy link
Contributor

/test

@ti-chi-bot ti-chi-bot added the status/LGT1 Indicates that a PR has LGTM 1. label Mar 22, 2023
@ti-chi-bot ti-chi-bot added status/LGT2 Indicates that a PR has LGTM 2. and removed status/LGT1 Indicates that a PR has LGTM 1. labels Mar 23, 2023
@tonyxuqqi
Copy link
Contributor

/merge

@ti-chi-bot
Copy link
Member Author

@tonyxuqqi: It seems you want to merge this PR, I will help you trigger all the tests:

/run-all-tests

You only need to trigger /merge once, and if the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes.

If you have any questions about the PR merge process, please refer to pr process.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@ti-chi-bot
Copy link
Member Author

This pull request has been accepted and is ready to merge.

Commit hash: fae59f2

@ti-chi-bot ti-chi-bot added the status/can-merge Indicates a PR has been approved by a committer. label Mar 23, 2023
@ti-chi-bot ti-chi-bot merged commit 412614b into tikv:release-7.0 Mar 23, 2023
@VelocityLight VelocityLight added do-not-merge/cherry-pick-not-approved cherry-pick-approved Cherry pick PR approved by release team. and removed cherry-pick-approved Cherry pick PR approved by release team. do-not-merge/cherry-pick-not-approved labels Mar 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-pick-approved Cherry pick PR approved by release team. release-note-none Denotes a PR that doesn't merit a release note. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. status/can-merge Indicates a PR has been approved by a committer. status/LGT2 Indicates that a PR has LGTM 2. type/cherry-pick-for-release-7.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants