Skip to content

Commit

Permalink
Merge pull request #775 from ankon/pr/improve-offset-commit-handling
Browse files Browse the repository at this point in the history
Improve offset commit handling
  • Loading branch information
Nevon committed Jun 23, 2020
2 parents d55d9a9 + f478c69 commit 2df50c1
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 10 deletions.
15 changes: 7 additions & 8 deletions src/consumer/offsetManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ module.exports = class OffsetManager {
}

let offset = this.resolvedOffsets[topic][partition]
if (Long.fromValue(offset).equals(-1)) {
if (isInvalidOffset(offset)) {
offset = '0'
}

Expand Down Expand Up @@ -101,23 +101,22 @@ module.exports = class OffsetManager {
* @returns {Long}
*/
countResolvedOffsets() {
const toPartitions = topic => keys(this.resolvedOffsets[topic])
const committedOffsets = this.committedOffsets()

const subtractOffsets = (resolvedOffset, committedOffset) => {
const resolvedOffsetLong = Long.fromValue(resolvedOffset)
return committedOffset === '-1'
return isInvalidOffset(committedOffset)
? resolvedOffsetLong
: resolvedOffsetLong.subtract(Long.fromValue(committedOffset))
}

const subtractPartitionOffsets = (topic, partition) =>
subtractOffsets(
this.resolvedOffsets[topic][partition],
this.committedOffsets()[topic][partition]
const subtractPartitionOffsets = (resolvedTopicOffsets, committedTopicOffsets) =>
keys(resolvedTopicOffsets).map(partition =>
subtractOffsets(resolvedTopicOffsets[partition], committedTopicOffsets[partition])
)

const subtractTopicOffsets = topic =>
toPartitions(topic).map(partition => subtractPartitionOffsets(topic, partition))
subtractPartitionOffsets(this.resolvedOffsets[topic], committedOffsets[topic])

const offsetsDiff = this.topics.map(subtractTopicOffsets)
return flatten(offsetsDiff).reduce((sum, offset) => sum.add(offset), Long.fromValue(0))
Expand Down
3 changes: 1 addition & 2 deletions src/consumer/offsetManager/isInvalidOffset.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
const Long = require('long')
const isNumber = number => /^-?\d+$/.test(number)

module.exports = offset => !isNumber(offset) || Long.fromValue(offset).compare(0) === -1
module.exports = offset => (!offset && offset !== 0) || Long.fromValue(offset).isNegative()

0 comments on commit 2df50c1

Please sign in to comment.