Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup-stream: don't close the server stream when encountered errors #14432

Merged
merged 7 commits into from
Mar 22, 2023

Conversation

YuJuncen
Copy link
Contributor

@YuJuncen YuJuncen commented Mar 20, 2023

What is changed and how it works?

Issue Number: Close #14426, Close #14910
What's Changed:
This PR make us won't close the grpc server stream when encountered some errors.

Check List

Tests

  • Manual test (add detailed scripts or steps below)
    There is a tricky unit test added, these tests can pass, however I'm not sure whether they should be added to the code.
Details
diff --git a/components/backup-stream/src/checkpoint_manager.rs b/components/backup-stream/src/checkpoint_manager.rs
index 1191b6010..9d0ac9ea7 100644
--- a/components/backup-stream/src/checkpoint_manager.rs
+++ b/components/backup-stream/src/checkpoint_manager.rs
@@ -51,10 +51,12 @@ impl std::fmt::Debug for CheckpointManager {
 enum SubscriptionOp {
     Add(Subscription),
     Emit(Box<[FlushEvent]>),
+    #[cfg(test)]
+    Inspect(Box<dyn FnOnce(&SubscriptionManager) + Send>),
 }
 
-struct SubscriptionManager {
-    subscribers: HashMap<Uuid, Subscription>,
+pub(crate) struct SubscriptionManager {
+    pub(crate) subscribers: HashMap<Uuid, Subscription>,
     input: Receiver<SubscriptionOp>,
 }
 
@@ -72,8 +74,13 @@ impl SubscriptionManager {
                 SubscriptionOp::Emit(events) => {
                     self.emit_events(events).await;
                 }
+                #[cfg(test)]
+                SubscriptionOp::Inspect(f) => {
+                    f(&self);
+                }
             }
         }
+        // NOTE: Maybe close all subscription streams here.
     }
 
     async fn emit_events(&mut self, events: Box<[FlushEvent]>) {
@@ -121,8 +128,12 @@ impl SubscriptionManager {
 }
 
 // Note: can we make it more generic...?
+#[cfg(not(test))]
 pub type Subscription = ServerStreamingSink<kvproto::logbackuppb::SubscribeFlushEventResponse>;
 
+#[cfg(test)]
+pub type Subscription = tests::MockSink;
+
 /// The result of getting a checkpoint.
 /// The possibility of failed to getting checkpoint is pretty high:
 /// because there is a gap between region leader change and flushing.
@@ -326,6 +337,29 @@ impl CheckpointManager {
     pub fn get_resolved_ts(&self) -> Option<TimeStamp> {
         self.resolved_ts.values().map(|x| x.checkpoint).min()
     }
+
+    #[cfg(test)]
+    pub(crate) fn sync_with_subs_mgr<T: Send + 'static>(
+        &mut self,
+        f: impl FnOnce(&SubscriptionManager) -> T + Send + 'static,
+    ) -> T {
+        use std::sync::Mutex;
+
+        let (tx, rx) = std::sync::mpsc::sync_channel(1);
+        let t = Arc::new(Mutex::new(None));
+        let tr = Arc::clone(&t);
+        self.manager_handle
+            .as_mut()
+            .unwrap()
+            .try_send(SubscriptionOp::Inspect(Box::new(move |x| {
+                *tr.lock().unwrap() = Some(f(x));
+                tx.send(()).unwrap();
+            })))
+            .unwrap();
+        rx.recv().unwrap();
+        let mut t = t.lock().unwrap();
+        t.take().unwrap()
+    }
 }
 
 fn not_leader(r: u64) -> PbError {
@@ -525,17 +559,21 @@ pub mod tests {
     use std::{
         assert_matches,
         collections::HashMap,
-        sync::{Arc, RwLock},
+        sync::{Arc, Mutex, RwLock},
         time::Duration,
     };
 
-    use futures::future::ok;
-    use kvproto::metapb::*;
+    use futures::{future::ok, Sink};
+    use grpcio::{RpcStatus, RpcStatusCode};
+    use kvproto::{logbackuppb::SubscribeFlushEventResponse, metapb::*};
     use pd_client::{PdClient, PdFuture};
     use txn_types::TimeStamp;
 
     use super::{BasicFlushObserver, FlushObserver, RegionIdWithVersion};
-    use crate::GetCheckpointResult;
+    use crate::{
+        subscription_track::{CheckpointType, ResolveResult},
+        GetCheckpointResult,
+    };
 
     fn region(id: u64, version: u64, conf_version: u64) -> Region {
         let mut r = Region::new();
@@ -547,6 +585,158 @@ pub mod tests {
         r
     }
 
+    #[derive(Clone)]
+    pub struct MockSink(Arc<Mutex<MockSinkInner>>);
+
+    impl MockSink {
+        fn with_fail_once(code: RpcStatusCode) -> Self {
+            let mut failed = false;
+            let inner = MockSinkInner {
+                items: Vec::default(),
+                closed: false,
+                on_error: Box::new(move || {
+                    if failed {
+                        RpcStatusCode::OK
+                    } else {
+                        failed = true;
+                        code
+                    }
+                }),
+            };
+            Self(Arc::new(Mutex::new(inner)))
+        }
+
+        fn trivial() -> Self {
+            let inner = MockSinkInner {
+                items: Vec::default(),
+                closed: false,
+                on_error: Box::new(|| RpcStatusCode::OK),
+            };
+            Self(Arc::new(Mutex::new(inner)))
+        }
+
+        pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> {
+            panic!("failed in a case should never fail: {}", status);
+        }
+    }
+
+    struct MockSinkInner {
+        items: Vec<SubscribeFlushEventResponse>,
+        closed: bool,
+        on_error: Box<dyn FnMut() -> grpcio::RpcStatusCode + Send>,
+    }
+
+    impl Sink<(SubscribeFlushEventResponse, grpcio::WriteFlags)> for MockSink {
+        type Error = grpcio::Error;
+
+        fn poll_ready(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Result<(), Self::Error>> {
+            Ok(()).into()
+        }
+
+        fn start_send(
+            self: std::pin::Pin<&mut Self>,
+            item: (SubscribeFlushEventResponse, grpcio::WriteFlags),
+        ) -> Result<(), Self::Error> {
+            let mut guard = self.0.lock().unwrap();
+            let code = (guard.on_error)();
+            if code != RpcStatusCode::OK {
+                return Err(grpcio::Error::RpcFailure(RpcStatus::new(code)));
+            }
+            guard.items.push(item.0);
+            Ok(())
+        }
+
+        fn poll_flush(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Result<(), Self::Error>> {
+            Ok(()).into()
+        }
+
+        fn poll_close(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Result<(), Self::Error>> {
+            let mut guard = self.0.lock().unwrap();
+            guard.closed = true;
+            Ok(()).into()
+        }
+    }
+
+    fn simple_resolve_result() -> ResolveResult {
+        let mut region = Region::new();
+        region.set_id(42);
+        ResolveResult {
+            region,
+            checkpoint: 42.into(),
+            checkpoint_type: CheckpointType::MinTs,
+        }
+    }
+
+    #[test]
+    fn test_rpc_sub() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+        let mut mgr = super::CheckpointManager::default();
+        rt.spawn(mgr.spawn_subscription_mgr());
+
+        let trivial_sink = MockSink::trivial();
+        rt.block_on(mgr.add_subscriber(trivial_sink.clone()))
+            .unwrap();
+
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        mgr.sync_with_subs_mgr(|_| {});
+        assert_eq!(trivial_sink.0.lock().unwrap().items.len(), 1);
+    }
+
+    #[test]
+    fn test_rpc_retry() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+        let mut mgr = super::CheckpointManager::default();
+        rt.spawn(mgr.spawn_subscription_mgr());
+
+        let error_sink = MockSink::with_fail_once(RpcStatusCode::UNAVAILABLE);
+        rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap();
+
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 1);
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        mgr.sync_with_subs_mgr(|_| {});
+        assert_eq!(error_sink.0.lock().unwrap().items.len(), 1);
+    }
+
+    #[test]
+    fn test_rpc_failure() {
+        let rt = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(1)
+            .build()
+            .unwrap();
+        let mut mgr = super::CheckpointManager::default();
+        rt.spawn(mgr.spawn_subscription_mgr());
+
+        let error_sink = MockSink::with_fail_once(RpcStatusCode::INTERNAL);
+        rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap();
+
+        mgr.resolve_regions(vec![simple_resolve_result()]);
+        mgr.flush();
+        assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 0);
+        let sink = error_sink.0.lock().unwrap();
+        assert_eq!(sink.items.len(), 0);
+        // The stream shouldn't be closed when exit by a failure.
+        assert_eq!(sink.closed, false);
+    }
+
     #[test]
     fn test_flush() {
         let mut mgr = super::CheckpointManager::default();
diff --git a/components/backup-stream/src/service.rs b/components/backup-stream/src/service.rs
index 9d312a984..43d4ede2f 100644
--- a/components/backup-stream/src/service.rs
+++ b/components/backup-stream/src/service.rs
@@ -94,8 +94,13 @@ impl LogBackup for Service {
         &mut self,
         _ctx: grpcio::RpcContext<'_>,
         _req: kvproto::logbackuppb::SubscribeFlushEventRequest,
-        sink: grpcio::ServerStreamingSink<kvproto::logbackuppb::SubscribeFlushEventResponse>,
+        #[allow(unused_variables)] sink: grpcio::ServerStreamingSink<
+            kvproto::logbackuppb::SubscribeFlushEventResponse,
+        >,
     ) {
+        #[cfg(test)]
+        panic!("Service should not be used in an unit test");
+        #[cfg(not(test))]
         try_send!(
             self.endpoint,
             Task::RegionCheckpointsOp(RegionCheckpointOperation::Subscribe(sink))

Release note

None

This is a fix over bug in master branch.

Signed-off-by: hillium <yujuncen@pingcap.com>
@ti-chi-bot
Copy link
Member

ti-chi-bot commented Mar 20, 2023

[REVIEW NOTIFICATION]

This pull request has been approved by:

  • 3pointer
  • BusyJay

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

@YuJuncen YuJuncen changed the title don't close the server stream when encountered errors backup-stream: don't close the server stream when encountered errors Mar 20, 2023
Signed-off-by: hillium <yujuncen@pingcap.com>
Signed-off-by: hillium <yujuncen@pingcap.com>
@YuJuncen
Copy link
Contributor Author

The changes which would infect non-test code are only in the commit 18867c4.

match send_all.await {
Err(grpcio::Error::RemoteStopped) => {
if let Err(err) = send_all.await {
let can_retry = matches!(&err, grpcio::Error::RpcFailure(rpc_err) if rpc_err.code() == RpcStatusCode::UNAVAILABLE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this error can retry?

Copy link
Contributor Author

@YuJuncen YuJuncen Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default grpc-go would retry it. (Should we also retry RESOURCE_EXHAUSTED?)

https://pkg.go.dev/github.com/grpc-ecosystem/go-grpc-middleware/retry#pkg-variables

Copy link
Contributor Author

@YuJuncen YuJuncen Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it seems there is transparent retry inside gRPC implementation, perhaps we can remove this?(That is for clients.) What do you think? @BusyJay

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea. I'm just curious why only this error can be retried.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I have removed the retry, because the client would retry sooner when the connection closed.

@YuJuncen YuJuncen requested a review from BusyJay March 21, 2023 03:30
Signed-off-by: hillium <yujuncen@pingcap.com>
.await
.report_if_err(format_args!("during removing subscription {}", id))
// The stream is an endless stream -- we don't need to close it.
drop(sub);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we met non-grpc error here. shall we close it manually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to @BusyJay, we should call close iff we have finished sending all items. In fact this stream is an infinite stream, so I think we don't need to close it in any conditions (Perhaps except server shutting down?).

@YuJuncen YuJuncen requested a review from 3pointer March 21, 2023 09:31
Copy link
Member

@BusyJay BusyJay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change look OK to me.

components/backup-stream/src/checkpoint_manager.rs Outdated Show resolved Hide resolved
@ti-chi-bot ti-chi-bot added the status/LGT1 Status: PR - There is already 1 approval label Mar 21, 2023
Signed-off-by: hillium <yujuncen@pingcap.com>
Signed-off-by: hillium <yujuncen@pingcap.com>
@ti-chi-bot ti-chi-bot added status/LGT2 Status: PR - There are already 2 approvals and removed status/LGT1 Status: PR - There is already 1 approval labels Mar 21, 2023
@YuJuncen
Copy link
Contributor Author

@BusyJay Hi could you help to /merge this?

@BusyJay
Copy link
Member

BusyJay commented Mar 22, 2023

/merge

@ti-chi-bot
Copy link
Member

@BusyJay: It seems you want to merge this PR, I will help you trigger all the tests:

/run-all-tests

You only need to trigger /merge once, and if the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes.

If you have any questions about the PR merge process, please refer to pr process.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@ti-chi-bot
Copy link
Member

This pull request has been accepted and is ready to merge.

Commit hash: 241641f

@ti-chi-bot ti-chi-bot added the status/can-merge Status: Can merge to base branch label Mar 22, 2023
@YuJuncen
Copy link
Contributor Author

/test

@YuJuncen
Copy link
Contributor Author

thread 'test::resolved_follower' panicked at 'not all keys are recorded: it remains ["7480000000000000ff015f728000000000ff0000ec0000000000faffffffffffffff0b", "7480000000000000ff015f728000000000ff0000020000000000fafffffffffffffff5", "7480000000000000ff015f728000000000ff00005a0000000000faffffffffffffff9d"] (total = 128)', components/backup-stream/tests/mod.rs:583:13

We need to improve the stability of integration tests...

@YuJuncen
Copy link
Contributor Author

YuJuncen commented Mar 22, 2023

/test
test::resolved_follower again, but I can swear this PR won't influence this case :(

@ti-chi-bot
Copy link
Member

In response to a cherrypick label: new pull request created to branch release-7.0: #14441.

ti-chi-bot added a commit that referenced this pull request Mar 23, 2023
…14432) (#14441)

close #14426, ref #14432

Signed-off-by: hillium <yujuncen@pingcap.com>

Co-authored-by: hillium <yujuncen@pingcap.com>
@YuJuncen
Copy link
Contributor Author

YuJuncen commented Jun 9, 2023

/run-cherry-picker

@ti-chi-bot
Copy link
Member

In response to a cherrypick label: new pull request created to branch release-6.5: #14911.

ti-chi-bot pushed a commit to ti-chi-bot/tikv that referenced this pull request Jun 9, 2023
close tikv#14426

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
YuJuncen added a commit to ti-chi-bot/tikv that referenced this pull request Jun 9, 2023
…ikv#14432)

close tikv#14426

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Signed-off-by: hillium <yujuncen@pingcap.com>
ti-chi-bot bot pushed a commit that referenced this pull request Jun 9, 2023
…14432) (#14911)

close #14426, ref #14432

Signed-off-by: hillium <yujuncen@pingcap.com>

Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com>
Co-authored-by: hillium <yujuncen@pingcap.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants