Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]: Client should retry immediately after receive certain errors #672

Open
1 task done
bsbds opened this issue Mar 5, 2024 · 5 comments
Open
1 task done
Assignees
Labels
enhancement New feature or request

Comments

@bsbds
Copy link
Collaborator

bsbds commented Mar 5, 2024

Description about the feature

The current retry logic in the curp client always waits for a period of time after the propose has returned an error. However, this may not be necessary for certain errors like CurpError::WrongClusterVersion or CurpError::Redirect. These errors indicate that the client's local information is outdated, and after fetching the latest infomation from the cluster, the retry could proceed immediately.

async fn retry<'a, R, F>(&'a self, f: impl Fn(&'a Api) -> F) -> Result<R, tonic::Status>
where
F: Future<Output = Result<R, CurpError>>,
{
let mut backoff = self.config.init_backoff();
let mut last_err = None;
while let Some(delay) = backoff.next_delay() {
let err = match f(&self.inner).await {
Ok(res) => return Ok(res),
Err(err) => err,
};
match err {
// some errors that should not retry
CurpError::Duplicated(_)
| CurpError::ShuttingDown(_)
| CurpError::InvalidConfig(_)
| CurpError::NodeNotExists(_)
| CurpError::NodeAlreadyExists(_)
| CurpError::LearnerNotCatchUp(_) => {
return Err(tonic::Status::from(err));
}
// some errors that could have a retry
CurpError::ExpiredClientId(_)
| CurpError::KeyConflict(_)
| CurpError::Internal(_)
| CurpError::LeaderTransfer(_) => {}
// update leader state if we got a rpc transport error
CurpError::RpcTransport(_) => {
if let Err(e) = self.inner.fetch_leader_id(true).await {
warn!("fetch leader failed, error {e:?}");
}
}
// update the cluster state if got WrongClusterVersion
CurpError::WrongClusterVersion(_) => {
// the inner client should automatically update cluster state when fetch_cluster
if let Err(e) = self.inner.fetch_cluster(true).await {
warn!("fetch cluster failed, error {e:?}");
}
}
// update the leader state if got Redirect
CurpError::Redirect(Redirect { leader_id, term }) => {
let _ig = self.inner.update_leader(leader_id, term).await;
}
}
#[cfg(feature = "client-metrics")]
super::metrics::get().client_retry_count.add(1, &[]);
warn!(
"got error: {err:?}, retry on {} seconds later",
delay.as_secs_f32()
);
last_err = Some(err);
tokio::time::sleep(delay).await;
}
Err(tonic::Status::deadline_exceeded(format!(
"request timeout, last error: {:?}",
last_err.unwrap_or_else(|| unreachable!("last error must be set"))
)))
}
}

Code of Conduct

  • I agree to follow this project's Code of Conduct
@bsbds bsbds added the enhancement New feature or request label Mar 5, 2024
@Harsh1s
Copy link

Harsh1s commented Mar 5, 2024

Hello there @bsbds ! I'd like to work on this issue if that's fine by you. Thank you!

@bsbds
Copy link
Collaborator Author

bsbds commented Mar 6, 2024

Hello there @bsbds ! I'd like to work on this issue if that's fine by you. Thank you!

Sure, assigned.

@Harsh1s
Copy link

Harsh1s commented Mar 8, 2024

Hey @bsbds ! I have made the necessary changes to the code. Wanted to ask if there are any more error-codes other than CurpError::WrongClusterVersion and CurpError::Redirect that need immediate retrying? Thanks for the help!

@bsbds
Copy link
Collaborator Author

bsbds commented Mar 8, 2024

Hey @bsbds ! I have made the necessary changes to the code. Wanted to ask if there are any more error-codes other than CurpError::WrongClusterVersion and CurpError::Redirect that need immediate retrying? Thanks for the help!

From what I see now, there are only these two kinds of errors that do not require retrying.

I think the (retriable) errors can be divided into two types, one type indicating that the server is currently unable to process this request, while the other type indicating that the server's current state is okay, and it's the client's own state that is outdated. What do you think?

@Harsh1s
Copy link

Harsh1s commented Mar 11, 2024

Hey @bsbds ! I'm sorry for the delay in this PR, I had my university exams. I have made some changes to the required function but I'm getting process didn't exit successfully: (signal: 6, SIGABRT: process abort signal) error when I run cargo test. Here are the changes I made:

/// Takes a function f and run retry.
    async fn retry<'a, R, F>(&'a self, f: impl Fn(&'a Api) -> F) -> Result<R, tonic::Status>
    where
        F: Future<Output = Result<R, CurpError>>,
    {
        let mut backoff = self.config.init_backoff();
        let mut last_err = None;
        let mut consecutive_client_error_count = 0;
        const MAX_CONSECUTIVE_CLIENT_ERROR_COUNT: usize = 5;

        loop {
            let err = match f(&self.inner).await {
                Ok(res) => return Ok(res),
                Err(err) => err,
            };

            match err {
                // Errors that should not retry
                CurpError::Duplicated(_)
                | CurpError::ShuttingDown(_)
                | CurpError::InvalidConfig(_)
                | CurpError::NodeNotExists(_)
                | CurpError::NodeAlreadyExists(_)
                | CurpError::LearnerNotCatchUp(_) => {
                    return Err(tonic::Status::from(err));
                }

                // Server-side errors that should retry after a delay
                CurpError::ExpiredClientId(_)
                | CurpError::KeyConflict(_)
                | CurpError::Internal(_)
                | CurpError::LeaderTransfer(_)
                | CurpError::RpcTransport(_) => {
                    consecutive_client_error_count = 0;
                    let delay = match backoff.next_delay() {
                        Some(delay) => delay,
                        None => break,
                    };
                    if let CurpError::RpcTransport(_) = &err {
                        // update leader state if we got an RPC transport error
                        if let Err(e) = self.inner.fetch_leader_id(true).await {
                            warn!("fetch leader failed, error {e:?}");
                        }
                    }

                    #[cfg(feature = "client-metrics")]
                    super::metrics::get().client_retry_count.add(1, &[]);

                    warn!(
                        "got error: {err:?}, retry on {} seconds later",
                        delay.as_secs_f32()
                    );
                    last_err = Some(err);
                    tokio::time::sleep(delay).await;
                }

                // Client-side errors that should retry immediately
                CurpError::WrongClusterVersion(_) | CurpError::Redirect(_) => {
                    if let CurpError::WrongClusterVersion(_) = &err {
                        // update the cluster state if got WrongClusterVersion
                        if let Err(e) = self.inner.fetch_cluster(true).await {
                            warn!("fetch cluster failed, error {e:?}");
                        }
                    } else if let CurpError::Redirect(Redirect { leader_id, term }) = &err {
                        // update the leader state if got Redirect
                        let _ig = self.inner.update_leader(*leader_id, *term).await;
                    }

                    warn!("got error: {err:?}, retrying immediately", err = err);

                    consecutive_client_error_count += 1;
                    if consecutive_client_error_count >= MAX_CONSECUTIVE_CLIENT_ERROR_COUNT {
                        warn!(
                            "Maximum consecutive client error count reached, not retrying anymore"
                        );
                        last_err = Some(err);
                        break;
                    }
                }
            }
        }

        Err(tonic::Status::deadline_exceeded(format!(
            "request timeout, last error: {:?}",
            last_err.unwrap_or_else(|| unreachable!("last error must be set"))
        )))
    }

I have made the changes such that the function is basically divided into two parts, one handles client side errors and other handles server side errors. With client side errors, we retry immediately without calling any delay or backoff. But with this comes the danger of infinite loop, hence I introduced a threshold for this as you can see. Can you help me figure where I'm going wrong?

I'm sorry for the delay and so many doubts, I'm new to async rust.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants