Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve offset commit handling #775

Merged
merged 7 commits into from
Jun 23, 2020

Conversation

ankon
Copy link
Contributor

@ankon ankon commented Jun 23, 2020

The main part of this PR is fixing the problem that batch processing sometimes will fail with this or a similar stack trace:

TypeError: Cannot read property 'low' of undefined
        at Function.fromValue (/app/node_modules/long/src/long.js:286:25)
        at subtractOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:110:44)
        at subtractPartitionOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:114:7)
        at /app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:120:44
        at Array.map (<anonymous>)
        at subtractTopicOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:120:27)
        at Array.map (<anonymous>)
        at OffsetManager.countResolvedOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:122:37)
        at OffsetManager.commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:189:12)
        at ConsumerGroup.commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/consumerGroup.js:295:30)
        at commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/runner.js:235:34)

Note that this is using @collaborne/kafkajs, our own fork of KafkaJS containing a bunch of the PRs that are still under review but that we found work well in practice. We have seen the same errors also with stock KafkaJS versions.

Basically after reviewing this error and the code in the mentioned areas the conclusion is pretty simple: This is a race between seeking and committing/resolving. The race is hard to reproduce in a controlled way, but the fix looks pretty trivial:

3faa4ff Tue, 23 Jun 2020 12:29:15 +0200 Use `isInvalidOffset` to check whether the committed offset is known [Andreas Kohn]

diff --git a/src/consumer/offsetManager/index.js b/src/consumer/offsetManager/index.js
index 577b092..80a1a32 100644
--- a/src/consumer/offsetManager/index.js
+++ b/src/consumer/offsetManager/index.js
@@ -105,7 +105,7 @@ module.exports = class OffsetManager {
 
     const subtractOffsets = (resolvedOffset, committedOffset) => {
       const resolvedOffsetLong = Long.fromValue(resolvedOffset)
-      return committedOffset === '-1'
+      return isInvalidOffset(committedOffset)
         ? resolvedOffsetLong
         : resolvedOffsetLong.subtract(Long.fromValue(committedOffset))
     }

There might be other ways to prevent the race, for example by carefully going over the state changes of the committed and resolved offsets, but given that the code already checked for the "-1" special value switching this out with isInvalidOffset (which checks undefined/null-ness as well) looks simple enough.

The other commits here are just "fall-out" from reading the code:

  1. Avoid some potentially expensive operations (regular expressions, mainly)
  2. Be more consistent

This function "does something", and is getting called inside a tight loop inside a critical
area. Essentially we can avoid the call overhead and the condition check in each invocation
here.
This skips a bunch of redundant property lookups: We can grab the needed
partition offsets array immediate for each topic, and then have the subtraction
logic simply process each partition of that.
KafkaJS keeps track of offsets as strings and Longs to avoid problems with 64bit offsets. This
does require frequent string->Long and Long->string conversions so that the Long parts don't leak
into the interface.

In the case of offsets in the offset manager we might also have undefined/null or empty offset
strings, and isInvalidOffset needs to be able to detect these. But: We don't expect to have arbitrary
strings here, and figuring out whether a thing is undefined/null/'' doesn't need a regular expression
that checks the complete string -- a simple '!offset' is just fine.
Long.compare does a lot more than what we need here: We just want to know whether
the offset is negative (-1 for undefined, or potentially one of the special values).
In some cases it seems possible that the committed offset isn't '-1', but rather undefined:
~~~~
kafkajs.Runner - Error when calling eachBatch ({"topic":"carrier-events","partition":0,"offset":"13435","stack":"TypeError: Cannot read property 'low' of undefined\n    at Function.fromValue (/app/node_modules/long/src/long.js:286:25)\n    at subtractOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:110:44)\n    at subtractPartitionOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:114:7)\n    at /app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:120:44\n    at Array.map (<anonymous>)\n    at subtractTopicOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:120:27)\n    at Array.map (<anonymous>)\n    at OffsetManager.countResolvedOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:122:37)\n    at OffsetManager.commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:189:12)\n    at ConsumerGroup.commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/consumerGroup.js:295:30)\n    at commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/runner.js:235:34)\n    at messageGenerator (/app/node_modules/@collaborne/event-stream-library/src/client/shared-consumer.ts:341:10)\n    at runMicrotasks (<anonymous>)\n    at runNextTicks (internal/process/task_queues.js:62:5)\n    at processImmediate (internal/timers.js:429:9)\n    at process.topLevelDomainCallback (domain.js:137:15)"})
~~~~

From what I can see this might be related to `seek` and `resolve` operations overlapping, which
is possible: `seek` is only possible when the consumer is running, and when the consumer is running
it will do resolving/committing. This is likely a bigger issue when the consumer group contains
many topics.

After reading the code for quite a while I think the fix here is trivial: Instead of checking
explicitly for `-1` we simply should use the helper function created for that purpose.
@Nevon Nevon merged commit 2df50c1 into tulios:master Jun 23, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants