Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected error "Offset out of range" causes the consumer group to drop messages #1119

Closed
solanoepalacio opened this issue Jun 11, 2021 · 10 comments

Comments

@solanoepalacio
Copy link

Describe the bug
Unexpected error [ConsumerGroup] Offset out of range, resetting to default offset is causing the consumer group to jump from an offset with lag to the latest offset skipping a huge number of messages.

To Reproduce
I haven't been able to reproduce this issue outside of the production environment of the services. Also, it happens only in some of the environments (the ones with higher traffic).

###The use case in which we are experimenting this issue is as follows:
We have a producer service that pulls CSV files from an external service and produces one Kafka message per each record on the CSV file.
Many instances of a consumer service are connected to the topic as a consumer group and consume messages to process the messages according to business logic in write-intensive processing (slower than the producer service)

This makes it so that when the producer gets some CSV files some lag is accumulated (a few million in a production environment).

It all works correctly for some time. But after a few hours we see this error:

[ConsumerGroup] Offset out of range, resetting to default offset

This error is thrown during the consumer group fetch and the recoverFromFetch is triggering the change in the consumer offset (to "latest")

When this error is thrown we still have many hundred thousand messages in each topic partition to consume. All of such messages are skipped when the consumer "jumps" to the latest offset.

Expected behaviour
I would expect the consumer to keep fetching messages from the last committed offset until it reaches the latest offset after processing all messages in between.

Observed behaviour
When this error is thrown we still have many hundred thousand messages in each topic partition to consume. All of such messages are skipped when the consumer "jumps" to the latest offset.

Environment:

  • KafkaJS version: 1.15.0
  • Kafka version: 2.3.1
  • NodeJS version: 12.16

Additional context

  • We are using eachbatch to handle message consumption
  • We have tried both manual and auto-commit to see if manually passing the committed offset to resolveOffset after processing a batch would help, but the outcome was the same.

I doubt that fetched offset is erroneous. What else could be causing this? Could I work around this by setting the default offset config to 'none' and handling the error myself? (fetching the last committed offset of the topic and seeking to that one?)

@solanoepalacio
Copy link
Author

solanoepalacio commented Jun 16, 2021

It would be helpful for us to answer at least one of the following questions:

What causes the "Out Of Range" error?

  • What is causing the consumer group to throw the Fetch: Offset out range error?
  • Is there any way this can be related to having one consumer instance consuming many topics?
  • is there any way this can be related to having two consumers in the same process connected to different Kafka clusters?

Is there any way we could manually handle recovering from the fetch error?

Can this be related to corrupted metadata in zookeeper?

@gpgpublickey
Copy link

Hi @solanoepalacio sorry if I'm not answering any of your questions, I'm totally unknowledge about Kafka but I did an investigation about the issue trying to help and I find a related issue that maybe could give you more background or clues about a possible root cause for your problem if you didn't see it yet #578 (comment)

In addition, I was looking at the source code of this repo and I find inside of the fetch() method that when an "Offset out of range" is being thrown, an exception of type KafkaJSOffsetOutOfRange that extends of KafkaJSProtocolError is being propagated as part of the promise chain trying to recover (in this case reset to latest offset) the exception (ConsumerGroup.js line 410 => fetch() method, line 622 => propagation of the exception) maybe you can use them to get more control to implement a custom way to recover a reliable offset.

@Nevon
Copy link
Collaborator

Nevon commented Jun 17, 2021

I have a hypothesis, but without more information, it's very difficult to know if this is indeed the issue. At least it could be some place to start.

Offset out of range means that your consumer tried to either fetch or commit an offset that is not within the range of offsets maintained by the broker. Meaning if the broker has messages for offsets between 100 - 500, and you try to commit 99 or 501, the committed offset is clearly invalid and cannot be used.

There are a few ways this could happen, but one of them is due to retention. Specifically, I suspect retention.bytes is lower than the amount of bytes that you are pushing into the partition, since you mentioned importing millions of records from some CSV file. In that case, I could see the consumer starting to process offset N, while the broker runs the cleanup process and sees that the partition has more than retention.bytes bytes in it, so it deletes the oldest log segments, which includes offset N that the consumer is currently processing. When the consumer finishes processing the messages, it tries to commit offset N and the broker responds saying that the offset is out of range. Same thing could happen based on age, but it doesn't sound like you're processing very old records, so size is a more likely culprit.

If you don't know the configuration of the topic, you can get it via KafkaJS:

const { ConfigResourceTypes } = require('kafkajs')

await admin.describeConfigs({
  includeSynonyms: false,
  resources: [
    {
      type: ConfigResourceTypes.TOPIC,
      name: 'topic-name',
      configNames: ['retention.bytes']
    }
  ]
})

There's also log.retention.bytes on the cluster level. What speaks against this hypothesis is that both log.retention.bytes and retention.bytes are "infinite" by default, so this would have to be something you've configured on your cluster or topic.


As for the specific questions. Keep in mind that my answers are to the best of my current knowledge. It's of course possible that there's some unintended interaction that I'm not aware of.

Is there any way this can be related to having one consumer instance consuming many topics?

No. There's not really a difference between consuming from several topics and consuming from one topic but several partitions. The client primarily operates on partitions, not topics, so if this has something to do with it you'd see the same issue even when just consuming from a single topic. We also have tons of users consuming from several topics with no issues.

is there any way this can be related to having two consumers in the same process connected to different Kafka clusters?

Multiple consumer instances are completely isolated from one another and share nothing except client configuration, so this should be unrelated. The one caveat here is to make sure that they belong to different consumer groups if they are consuming different topics. All members of a consumer group should be identical (or eventually become identical in the case of a rolling deployment, for example). But whether you have several consumer instances in the same process or different processes doesn't matter, other than them completing over time in the event loop.

Is there any way we could manually handle recovering from the fetch error?

Not in an automated way, as far as I can think of. You could manually reset the consumer group offsets to a known good value - but the question is what value to resume from. The core issue is that the consumer is trying to commit an offset that doesn't exist, so how do you know what offset to reset to? That's why the client falls back to the only reasonable default, which is what the consumer is set to start from if there is no offset to resume from (fromBeginning: true | false).

Can this be related to corrupted metadata in zookeeper?

I can't say with any certainty, but it wouldn't be my first guess. The brokers need to know which log offsets are valid, so they must have this information themselves, rather than having to go to Zookeeper for it every time.

@snird
Copy link

snird commented Nov 18, 2021

I have the same issue.
I rewrote my consumer from eachBatch to eachMessage, made sure it is the only process reading, doing only "autocommit", and none of this helped.
I also receive these errors occasionally:
"The requested offset is not within the range of offsets maintained by the server"

@lzhenglin
Copy link

Excuse me, has the final problem been solved?

@snird
Copy link

snird commented Mar 7, 2022

For me the issue was using the same consumer group for 2 separate queues.
It works with other implementation of Kafka clients, but here it fails for some reason.

I haven't researched deeper, just created an additional consumer group and called it a day.

@m3co-code
Copy link

We experienced the same error message and had the case that one consumer group would start fromBeginning (as per setting), which was unintended for us.

Could this be also related when a consumer group wants to read messages from a partition, but the partition moved to another broker and the client/consumer would need to switch now the new partition? This is only a guess from our side, there were "things" going on with our Kafka cluster as this was during a node pool upgrade on a Kubernetes cluster.

The Kafka server version we're using is 3.2.0 currently setup with the Strimzi operator.

@kylevogt
Copy link

We experienced this same issue while updating one of our busier MSK clusters to 3.3.1. Our CPU usage was quite high during the upgrade, which we think may have contributed to the issue. We had similar situation to others though. We had a handful of partitions get reset back to beginning (we have fromBeginning: true). We keep 7 day retention so this resulted in random partitions suddenly being behind by nearly 2M offsets which was very noticeable in our alerting.

Looking back at logs after reseting offsets to catch us back up and what not we saw the same Offset out of range, resetting to default offset log line others have mentioned.

Would it be reasonable to change fromBeginning away from a boolean and instead have a third option to just crash and rejoin the group to try again? I'm fairly confident this was a transient error where for some reason kafkajs and the broker it was talking to weren't in agreement on what offsets were valid. I think if the consumer had instead crashed and restarted it likely would have reconnected and started consuming again just fine.

@dorin-musteata
Copy link

Any updates on this ?

@solanoepalacio
Copy link
Author

We were never able to find a solution to the issue. We had to change our client implementation. I'll close the issue since I couldn't add any more data on the topic, nor reproduce it again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants