Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub was disconnected without closing iterator. #34

Open
simonsso opened this issue Jul 4, 2023 · 1 comment
Open

Pubsub was disconnected without closing iterator. #34

simonsso opened this issue Jul 4, 2023 · 1 comment

Comments

@simonsso
Copy link
Contributor

simonsso commented Jul 4, 2023

I know this is a very vague error report and it can’t be reproduced, but I want to share our observations in case someone has seen something similar in their projects

Background
We are running a google cloud deployed service handling a burst of data from pubsubs every four hours. Messages are fetched with a loop like this:

loop {
    let subscription = subscriber.stream_subscription(transaction_subscription_name.clone(),pubsub::StreamSubscriptionConfig::default());

    while let Some(event) = subscription.next().await { 
        // Handle messages here.
    }
    // Log print Stream terminated and needs to be restarted
}

During normal operation we get a recoverable error every 30-50 minutes:

"Status { code: Unavailable, message: \"The service was unable to fulfill your request. Please try again. [code=8a75]\", metadata: MetadataMap { headers: {\"subscription\": \"chain-transactions-prod\"} }, source: None }"

This error is cough by the outer loop which restarts the subscription.

The issue, messages piling up not delivered
Last weekend we observed an outage, where we had messages piling up in our pubsub without being handled. The service was manually restarted four times, checking the logs from the incident I noticed, During the entire outage from 2023-06-30 07:12:40.865 UTC to 2023-07-01 11:40:57.218 UTC, not a single code=8a75 occurred, and still no messaged were consumed by our loop.
After the restarts messages were immediately handled but then again after some time it got silently disconnected again.

One theory is ya-gcp got disconnected from the pubsub service but was not able to notify the consumer by terminating the iterator.

The questions are:

  1. Is there a possible execution path in the lower layers of the code in ya-gcp where this could occur unpropagated?
  2. Should we use the API in a different way?

We have not been able to reproduce the problem, the same code had been running without issues for several months and no issues observed in the last few days

@rnarubin
Copy link
Collaborator

rnarubin commented Jul 5, 2023

It's always tough with non-reproducing bugs, let me see if I can help.

  1. What version of the lib are you using?
  2. Is your application instrumented to emit tracing logs? You could try to enable debug level for ya_gcp, which is still relatively light logging. Among these messages will be internal connection and retry attempts
  3. Can you describe more about what happens in your message handling? There are a few considerations here. PubSub has some limits on the number of outstanding messages, outstanding bytes, etc. Are all of the messages getting acked appropriately?
  4. Have you tried adding a timeout around the subscription.next()? If so, does reconnecting after such a timeout resume the message ingestion

Should we use the API in a different way?

Broadly speaking, the code you have is reasonable. It's possible to remove the reconnect loop, but whether this is simpler or not is kind of up to your needs for introspection. The lib already does a reconnect loop internally like you have in your example:

'reconnect: loop {

The reason this doesn't reconnect forever is that the default retry policy has a retry limit. The PubSub servers return Unavailable when there aren't messages in the subscription (among other reasons) -- I'm not sure if this is always code=8a75, maybe that means "no messages". You could consider increasing the retry limit, or tuning the retry policy to better reflect your message workflow, then let the lib handle retries for you. If you enable debug logging, you'll likely see some messages like retrying after error
message = "retrying after error",

One of our applications, for example, sets an unbounded retry limit:

        let retry_forever_config = retry_policy::exponential_backoff::Config {
            max_retries: None,
            // other values loaded from config files
           ..file_config.retry.backoff
        };
        let exponential_backoff_with_infinite_retries = retry_policy::ExponentialBackoff::new(
            PubSubRetryCheck::default(),
            retry_forever_config,
        );

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants