Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Commit

Permalink
Merge pull request #68 from ChannelMeter/retry-claim
Browse files Browse the repository at this point in the history
Retry claiming partitions if the partition is already claims. See #62
  • Loading branch information
wvanbergen committed Aug 17, 2015
2 parents e3a6879 + 22b7a7e commit 7c09a42
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions consumergroup/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,15 @@ func (cg *ConsumerGroup) partitionConsumer(topic string, partition int32, messag
default:
}

err := cg.instance.ClaimPartition(topic, partition)
if err != nil {
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
return
for maxRetries, tries := 3, 0; tries < maxRetries; tries++ {
if err := cg.instance.ClaimPartition(topic, partition); err == nil {
break
} else if err == kazoo.ErrPartitionClaimedByOther && tries+1 < maxRetries {
time.Sleep(1 * time.Second)
} else {
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
return
}
}
defer cg.instance.ReleasePartition(topic, partition)

Expand Down

0 comments on commit 7c09a42

Please sign in to comment.