Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Commit

Permalink
Merge pull request #57 from wvanbergen/consumer_group_outof_range
Browse files Browse the repository at this point in the history
Handle sarama.ErrOffsetOfRange error somewhat gracefully.
  • Loading branch information
wvanbergen committed Jun 17, 2015
2 parents 4779c58 + 51a0af7 commit e236a65
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions consumergroup/consumer_group.go
Expand Up @@ -358,6 +358,21 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag
}

consumer, err := cg.consumer.ConsumePartition(topic, partition, nextOffset)
if (err == sarama.ErrOffsetOutOfRange) {
cg.Logf("%s/%d :: Partition consumer offset out of Range.\n", topic, partition)
// if the offset is out of range, simplistically decide whether to use OffsetNewest or OffsetOldest
// if the configuration specified offsetOldest, then switch to the oldest available offset, else
// switch to the newest available offset.
if (cg.config.Offsets.Initial == sarama.OffsetOldest) {
nextOffset = sarama.OffsetOldest
cg.Logf("%s/%d :: Partition consumer offset reset to oldest available offset.\n", topic, partition)
} else {
nextOffset = sarama.OffsetNewest
cg.Logf("%s/%d :: Partition consumer offset reset to newest available offset.\n", topic, partition)
}
// retry the consumePartition with the adjusted offset
consumer, err = cg.consumer.ConsumePartition(topic, partition, nextOffset)
}
if err != nil {
cg.Logf("%s/%d :: FAILED to start partition consumer: %s\n", topic, partition, err)
return
Expand Down

0 comments on commit e236a65

Please sign in to comment.