Skip to content

Commit

Permalink
Merge pull request bsm#215 from georgeteo/gteo/fix-reset-offet-npe
Browse files Browse the repository at this point in the history
Fix NPE on ResetOffsets and MarkOffsets
  • Loading branch information
dim committed Feb 8, 2018
2 parents 3001c24 + b33c575 commit cf455bc
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
35 changes: 28 additions & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,19 @@ func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consum
// your application crashes. This means that you may end up processing the same
// message twice, and your processing should ideally be idempotent.
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
c.subs.Fetch(msg.Topic, msg.Partition).MarkOffset(msg.Offset+1, metadata)
sub := c.subs.Fetch(msg.Topic, msg.Partition)
if sub != nil {
sub.MarkOffset(msg.Offset+1, metadata)
}
}

// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
// See MarkOffset for additional explanation.
func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
c.subs.Fetch(topic, partition).MarkOffset(offset+1, metadata)
sub := c.subs.Fetch(topic, partition)
if sub != nil {
sub.MarkOffset(offset+1, metadata)
}
}

// MarkOffsets marks stashed offsets as processed.
Expand All @@ -156,7 +162,10 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) {
defer s.mu.Unlock()

for tp, info := range s.offsets {
c.subs.Fetch(tp.Topic, tp.Partition).MarkOffset(info.Offset+1, info.Metadata)
sub := c.subs.Fetch(tp.Topic, tp.Partition)
if sub != nil {
sub.MarkOffset(info.Offset+1, info.Metadata)
}
delete(s.offsets, tp)
}
}
Expand All @@ -168,13 +177,19 @@ func (c *Consumer) MarkOffsets(s *OffsetStash) {
//
// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
c.subs.Fetch(msg.Topic, msg.Partition).ResetOffset(msg.Offset+1, metadata)
sub := c.subs.Fetch(msg.Topic, msg.Partition)
if sub != nil {
sub.ResetOffset(msg.Offset+1, metadata)
}
}

// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
// See ResetOffset for additional explanation.
func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
c.subs.Fetch(topic, partition).ResetOffset(offset+1, metadata)
sub := c.subs.Fetch(topic, partition)
if sub != nil {
sub.ResetOffset(offset+1, metadata)
}
}

// ResetOffsets marks stashed offsets as processed.
Expand All @@ -184,7 +199,10 @@ func (c *Consumer) ResetOffsets(s *OffsetStash) {
defer s.mu.Unlock()

for tp, info := range s.offsets {
c.subs.Fetch(tp.Topic, tp.Partition).ResetOffset(info.Offset+1, info.Metadata)
sub := c.subs.Fetch(tp.Topic, tp.Partition)
if sub != nil {
sub.ResetOffset(info.Offset+1, info.Metadata)
}
delete(s.offsets, tp)
}
}
Expand Down Expand Up @@ -246,7 +264,10 @@ func (c *Consumer) CommitOffsets() error {
if kerr != sarama.ErrNoError {
err = kerr
} else if state, ok := snap[topicPartition{topic, partition}]; ok {
c.subs.Fetch(topic, partition).MarkCommitted(state.Info.Offset)
sub := c.subs.Fetch(topic, partition)
if sub != nil {
sub.MarkCommitted(state.Info.Offset)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,12 @@ var _ = Describe("Consumer", func() {

cs.MarkPartitionOffset("topic-a", 1, 3, "")
cs.MarkPartitionOffset("topic-a", 2, 4, "")
cs.MarkPartitionOffset("topic-b", 1, 2, "") // should not throw NPE
Expect(cs.CommitOffsets()).NotTo(HaveOccurred())

cs.ResetPartitionOffset("topic-a", 1, 2, "")
cs.ResetPartitionOffset("topic-a", 2, 3, "")
cs.ResetPartitionOffset("topic-b", 1, 2, "") // should not throw NPE
Expect(cs.CommitOffsets()).NotTo(HaveOccurred())

offsets, err := cs.fetchOffsets(cs.Subscriptions())
Expand Down

0 comments on commit cf455bc

Please sign in to comment.