Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup-stream: don't close the server stream when encountered errors (#14432) #14911

Merged

Conversation

ti-chi-bot
Copy link
Member

This is an automated cherry-pick of #14432

What is changed and how it works?

Issue Number: Close #14426

What's Changed:
This PR make us won't close the grpc server stream when encountered some errors.

Check List

Tests

  • Manual test (add detailed scripts or steps below)
    There is a tricky unit test added, these tests can pass, however I'm not sure whether they should be added to the code.
Details
diff --git a/components/backup-stream/src/checkpoint_manager.rs b/components/backup-stream/src/checkpoint_manager.rs
index 1191b6010..9d0ac9ea7 100644
--- a/components/backup-stream/src/checkpoint_manager.rs
+++ b/components/backup-stream/src/checkpoint_manager.rs
@@ -51,10 +51,12 @@ impl std::fmt::Debug for CheckpointManager {
 enum SubscriptionOp {
     Add(Subscription),
     Emit(Box<[FlushEvent]>),
+    #[cfg(test)]
+    Inspect(Box<dyn FnOnce(&SubscriptionManager) + Send>),
 }
 
-struct SubscriptionManager {
-    subscribers: HashMap<Uuid, Subscription>,
+pub(crate) struct SubscriptionManager {
+    pub(crate) subscribers: HashMap<Uuid, Subscription>,
     input: Receiver<SubscriptionOp>,
 }
 
@@ -72,8 +74,13 @@ impl SubscriptionManager {
                 SubscriptionOp::Emit(events) => {
                     self.emit_events(events).await;
                 }
+                #[cfg(test)]
+                SubscriptionOp::Inspect(f) => {
+                    f(&self);
+                }
             }
         }
+        // NOTE: Maybe close all subscription streams here.
     }
 
     async fn emit_events(&mut self, events: Box<[FlushEvent]>) {
@@ -121,8 +128,12 @@ impl SubscriptionManager {
 }
 
 // Note: can we make it more generic...?
+#[cfg(not(test))]
 pub type Subscription = ServerStreamingSink<kvproto::logbackuppb::SubscribeFlushEventResponse>;
 
+#[cfg(test)]
+pub type Subscription = tests::MockSink;
+
 /// The result of getting a checkpoint.
 /// The possibility of failed to getting checkpoint is pretty high:
 /// because there is a gap between region leader change and flushing.
@@ -326,6 +337,29 @@ impl CheckpointManager {
     pub fn get_resolved_ts(&self) -> Option<TimeStamp> {
         self.resolved_ts.values().map(|x| x.checkpoint).min()
     }
+
+    #[cfg(test)]
+    pub(crate) fn sync_with_subs_mgr<T: Send + 'static>(
+        &mut self,
+        f: impl FnOnce(&SubscriptionManager) -> T + Send + 'static,
+    ) -> T {
+        use std::sync::Mutex;
+
+        let (tx, rx) = std::sync::mpsc::sync_channel(1);
+        let t = Arc::new(Mutex::new(None));
+        let tr = Arc::clone(&t);
+        self.manager_handle
+            .as_mut()
+            .unwrap()
+            .try_send(SubscriptionOp::Inspect(Box::new(move |x| {
+                *tr.lock().unwrap() = Some(f(x));
+                tx.send(()).unwrap();
+            })))
+            .unwrap();
+        rx.recv().unwrap();
+        let mut t = t.lock().unwrap();
+        t.take().unwrap()
+    }
 }
 
 fn not_leader(r: u64) -> PbError {
@@ -525,17 +559,21 @@ pub mod tests {
     use std::{
         assert_matches,
         collections::HashMap,
-        sync::{Arc, RwLock},
+        sync::{Arc, Mutex, RwLock},
         time::Duration,
     };
 
-    use futures::future::ok;
-    use kvproto::metapb::*;
+    use futures::{future::ok, Sink};
+    use grpcio::{RpcStatus, RpcStatusCode};
+    use kvproto::{logbackuppb::SubscribeFlushEventResponse, metapb::*};
     use pd_client::{PdClient, PdFuture};
     use txn_types::TimeStamp;
 
     use super::{BasicFlushObserver, FlushObserver, RegionIdWithVersion};
-    use crate::GetCheckpointResult;
+    use crate::{
+        subscription_track::{CheckpointType, ResolveResult},
+        GetCheckpointResult,
+    };
 
     fn region(id: u64, version: u64, conf_version: u64) -> Region {
         let mut r = Region::new();
@@ -547,6 +585,158 @@ pub mod tests {
         r
     }
 
+    #[derive(Clone)]
+    pub struct MockSink(Arc<Mutex<MockSinkInner>>);
+
+    impl MockSink {
+        fn with_fail_once(code: RpcStatusCode) -> Self {
+            let mut failed = false;
+            let inner = MockSinkInner {
+                items: Vec::default(),
+                closed: false,
+                on_error: Box::new(move || {
+                    if failed {
+                        RpcStatusCode::OK
+                    } else {
+                        failed = true;
+                        code
+                    }
+                }),
+            };
+            Self(Arc::new(Mutex::new(inner)))
+        }
+
+        fn trivial() -> Self {
+            let inner = MockSinkInner {
+                items: Vec::default(),
+                closed: false,
+                on_error: Box::new(|| RpcStatusCode::OK),
+            };
+            Self(Arc::new(Mutex::new(inner)))
+        }
+
+        pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> {
+            panic!("failed in a case should never fail: {}", status);
+        }
+    }
+
+    struct MockSinkInner {
+        items: Vec<SubscribeFlushEventResponse>,
+        closed: bool,
+        on_error: Box<dyn FnMut() -> grpcio::RpcStatusCode + Send>,
+    }
+
+    impl Sink<(SubscribeFlushEventResponse, grpcio::WriteFlags)> for MockSink {
+        type Error = grpcio::Error;
+
+        fn poll_ready(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Result<(), Self::Error>> {
+            Ok(()).into()
+        }
+
+        fn start_send(
+            self: std::pin::Pin<&mut Self>,
+            item: (SubscribeFlushEventResponse, grpcio::WriteFlags),
+        ) -> Result<(), Self::Error> {
+            let mut guard = self.0.lock().unwrap();
+            let code = (guard.on_error)();
+            if code != RpcStatusCode::OK {
+                return Err(grpcio::Error::RpcFailure(RpcStatus::new(code)));
+            }
+            guard.items.push(item.0);
+            Ok(())
+        }
+
+        fn poll_flush(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Result<(), Self::Error>> {
+            Ok(()).into()
+        }
+
+        fn poll_close(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Result<(), Self::Error>> {
+            let mut guard = self.0.lock().unwrap();
+            guard.closed = true;
+            Ok(()).into()
+        }
+    }
+
+    fn simple_resolve_result() -> ResolveResult {
+        let mut region = Region::new();
+        region.set_id(42);
+        ResolveResult {
+            region,
+            checkpoint: 42.into(),
+            checkpoint_type: CheckpointType::MinTs,
+        }
+    }
+
+    #[test]
+    fn test_rpc_sub() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+        let mut mgr = super::CheckpointManager::default();
+        rt.spawn(mgr.spawn_subscription_mgr());
+
+        let trivial_sink = MockSink::trivial();
+        rt.block_on(mgr.add_subscriber(trivial_sink.clone()))
+            .unwrap();
+
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        mgr.sync_with_subs_mgr(|_| {});
+        assert_eq!(trivial_sink.0.lock().unwrap().items.len(), 1);
+    }
+
+    #[test]
+    fn test_rpc_retry() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+        let mut mgr = super::CheckpointManager::default();
+        rt.spawn(mgr.spawn_subscription_mgr());
+
+        let error_sink = MockSink::with_fail_once(RpcStatusCode::UNAVAILABLE);
+        rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap();
+
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 1);
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        mgr.sync_with_subs_mgr(|_| {});
+        assert_eq!(error_sink.0.lock().unwrap().items.len(), 1);
+    }
+
+    #[test]
+    fn test_rpc_failure() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+        let mut mgr = super::CheckpointManager::default();
+        rt.spawn(mgr.spawn_subscription_mgr());
+
+        let error_sink = MockSink::with_fail_once(RpcStatusCode::INTERNAL);
+        rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap();
+
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 0);
+        let sink = error_sink.0.lock().unwrap();
+        assert_eq!(sink.items.len(), 0);
+        // The stream shouldn't be closed when exit by a failure.
+        assert_eq!(sink.closed, false);
+    }
+
     #[test]
     fn test_flush() {
         let mut mgr = super::CheckpointManager::default();
diff --git a/components/backup-stream/src/service.rs b/components/backup-stream/src/service.rs
index 9d312a984..43d4ede2f 100644
--- a/components/backup-stream/src/service.rs
+++ b/components/backup-stream/src/service.rs
@@ -94,8 +94,13 @@ impl LogBackup for Service {
         &mut self,
         _ctx: grpcio::RpcContext<'_>,
         _req: kvproto::logbackuppb::SubscribeFlushEventRequest,
-        sink: grpcio::ServerStreamingSink<kvproto::logbackuppb::SubscribeFlushEventResponse>,
+        #[allow(unused_variables)] sink: grpcio::ServerStreamingSink<
+            kvproto::logbackuppb::SubscribeFlushEventResponse,
+        >,
     ) {
+        #[cfg(test)]
+        panic!("Service should not be used in an unit test");
+        #[cfg(not(test))]
         try_send!(
             self.endpoint,
             Task::RegionCheckpointsOp(RegionCheckpointOperation::Subscribe(sink))

Release note

None

This is a fix over bug in master branch.

@ti-chi-bot
Copy link
Contributor

ti-chi-bot bot commented Jun 9, 2023

[REVIEW NOTIFICATION]

This pull request has been approved by:

  • 3pointer
  • overvenus

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

…ikv#14432)

close tikv#14426

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Signed-off-by: hillium <yujuncen@pingcap.com>
Signed-off-by: hillium <yujuncen@pingcap.com>
@ti-chi-bot ti-chi-bot bot added the status/LGT1 Status: PR - There is already 1 approval label Jun 9, 2023
@ti-chi-bot ti-chi-bot added the cherry-pick-approved Cherry pick PR approved by release team. label Jun 9, 2023
@ti-chi-bot ti-chi-bot bot added status/LGT2 Status: PR - There are already 2 approvals and removed status/LGT1 Status: PR - There is already 1 approval labels Jun 9, 2023
@overvenus
Copy link
Member

/merge

@ti-chi-bot
Copy link
Contributor

ti-chi-bot bot commented Jun 9, 2023

@overvenus: It seems you want to merge this PR, I will help you trigger all the tests:

/run-all-tests

You only need to trigger /merge once, and if the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes.

If you have any questions about the PR merge process, please refer to pr process.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@ti-chi-bot
Copy link
Contributor

ti-chi-bot bot commented Jun 9, 2023

This pull request has been accepted and is ready to merge.

Commit hash: 22d714a

@ti-chi-bot ti-chi-bot bot added the status/can-merge Status: Can merge to base branch label Jun 9, 2023
@ti-chi-bot ti-chi-bot bot merged commit fd5f88a into tikv:release-6.5 Jun 9, 2023
1 check passed
@ti-chi-bot ti-chi-bot removed the cherry-pick-approved Cherry pick PR approved by release team. label Jun 14, 2023
@ti-chi-bot ti-chi-bot added the cherry-pick-approved Cherry pick PR approved by release team. label Jun 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-pick-approved Cherry pick PR approved by release team. release-note-none size/L status/can-merge Status: Can merge to base branch status/LGT2 Status: PR - There are already 2 approvals type/cherry-pick-for-release-6.5
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants