-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve offset commit handling #775
Commits on Jun 23, 2020
-
Only call
committedOffsets()
onceThis function "does something", and is getting called inside a tight loop inside a critical area. Essentially we can avoid the call overhead and the condition check in each invocation here.
Configuration menu - View commit details
-
Copy full SHA for fdcec54 - Browse repository at this point
Copy the full SHA fdcec54View commit details -
Work on the complete partition offsets array at once
This skips a bunch of redundant property lookups: We can grab the needed partition offsets array immediate for each topic, and then have the subtraction logic simply process each partition of that.
Configuration menu - View commit details
-
Copy full SHA for 922880a - Browse repository at this point
Copy the full SHA 922880aView commit details -
Don't bother applying a regular expression to what should be a number
KafkaJS keeps track of offsets as strings and Longs to avoid problems with 64bit offsets. This does require frequent string->Long and Long->string conversions so that the Long parts don't leak into the interface. In the case of offsets in the offset manager we might also have undefined/null or empty offset strings, and isInvalidOffset needs to be able to detect these. But: We don't expect to have arbitrary strings here, and figuring out whether a thing is undefined/null/'' doesn't need a regular expression that checks the complete string -- a simple '!offset' is just fine.
Configuration menu - View commit details
-
Copy full SHA for f73ac9a - Browse repository at this point
Copy the full SHA f73ac9aView commit details -
Check for "offset is negative" directly
Long.compare does a lot more than what we need here: We just want to know whether the offset is negative (-1 for undefined, or potentially one of the special values).
Configuration menu - View commit details
-
Copy full SHA for 45af98d - Browse repository at this point
Copy the full SHA 45af98dView commit details -
Use
isInvalidOffset
to check whether the committed offset is knownIn some cases it seems possible that the committed offset isn't '-1', but rather undefined: ~~~~ kafkajs.Runner - Error when calling eachBatch ({"topic":"carrier-events","partition":0,"offset":"13435","stack":"TypeError: Cannot read property 'low' of undefined\n at Function.fromValue (/app/node_modules/long/src/long.js:286:25)\n at subtractOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:110:44)\n at subtractPartitionOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:114:7)\n at /app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:120:44\n at Array.map (<anonymous>)\n at subtractTopicOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:120:27)\n at Array.map (<anonymous>)\n at OffsetManager.countResolvedOffsets (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:122:37)\n at OffsetManager.commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/offsetManager/index.js:189:12)\n at ConsumerGroup.commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/consumerGroup.js:295:30)\n at commitOffsetsIfNecessary (/app/node_modules/@collaborne/kafkajs/src/consumer/runner.js:235:34)\n at messageGenerator (/app/node_modules/@collaborne/event-stream-library/src/client/shared-consumer.ts:341:10)\n at runMicrotasks (<anonymous>)\n at runNextTicks (internal/process/task_queues.js:62:5)\n at processImmediate (internal/timers.js:429:9)\n at process.topLevelDomainCallback (domain.js:137:15)"}) ~~~~ From what I can see this might be related to `seek` and `resolve` operations overlapping, which is possible: `seek` is only possible when the consumer is running, and when the consumer is running it will do resolving/committing. This is likely a bigger issue when the consumer group contains many topics. After reading the code for quite a while I think the fix here is trivial: Instead of checking explicitly for `-1` we simply should use the helper function created for that purpose.
Configuration menu - View commit details
-
Copy full SHA for 3faa4ff - Browse repository at this point
Copy the full SHA 3faa4ffView commit details -
Configuration menu - View commit details
-
Copy full SHA for a9f8fe8 - Browse repository at this point
Copy the full SHA a9f8fe8View commit details -
Configuration menu - View commit details
-
Copy full SHA for f478c69 - Browse repository at this point
Copy the full SHA f478c69View commit details