Skip to content

Commit

Permalink
Merge pull request #63 from mailgun/maxim/develop
Browse files Browse the repository at this point in the history
Add incomplete response test for offset manager
  • Loading branch information
horkhe committed May 3, 2016
2 parents 5cda4e9 + bd8d100 commit ab6db39
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 24 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
@@ -1,7 +1,13 @@
# Changelog

#### Version 0.11.0 (TBD)
#### Version 0.11.0 (20016-05-03)

Major overhaul and refactoring of the implementation to make it easier to
understand how the internal components interact with each other. It is an
important step before implementation of explicit acknowledgements can be
started.

During refactoring the following bugs were detected and fixed:
* [#56](https://github.com/mailgun/kafka-pixy/issues/56) Invalid stored offset makes consumer panic.
* [#59](https://github.com/mailgun/kafka-pixy/issues/59) Messages are skipped by consumer during rebalancing.
* [#62](https://github.com/mailgun/kafka-pixy/issues/62) Messages consumed twice during rebalancing.
Expand Down
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -262,13 +262,13 @@ pretty much the same on Mac.
### Step 1. Download

```
curl -L https://github.com/mailgun/kafka-pixy/releases/download/v0.10.1/kafka-pixy-v0.10.1-linux-amd64.tar.gz | tar xz
curl -L https://github.com/mailgun/kafka-pixy/releases/download/v0.11.0/kafka-pixy-v0.11.0-linux-amd64.tar.gz | tar xz
```

### Step 2. Start

```
cd kafka-pixy-v0.10.1-linux-amd64
cd kafka-pixy-v0.11.0-linux-amd64
./kafka-pixy --kafkaPeers "<host1>:9092,...,<hostN>:9092" --zookeeperPeers "<host1>:2181,...,<hostM>:2181"
```

Expand Down
2 changes: 1 addition & 1 deletion consumer/offsetmgr/offsetmgr.go
Expand Up @@ -326,7 +326,7 @@ func (om *offsetManager) run() {
return
}
case <-commitTicker.C:
isRequestTimeout := time.Now().UTC().Sub(lastSubmitTime) > (om.f.cfg.Consumer.OffsetsCommitInterval << 2)
isRequestTimeout := time.Now().UTC().Sub(lastSubmitTime) > (om.f.cfg.Consumer.OffsetsCommitInterval << 1)
if isRequestTimeout && !isSameDecoratedOffset(lastSubmitRequest, lastCommittedOffset) {
triggerOrScheduleReassign(ErrRequestTimeout, "offset commit failed")
}
Expand Down
95 changes: 75 additions & 20 deletions consumer/offsetmgr/offsetmgr_test.go
Expand Up @@ -54,17 +54,16 @@ func (s *OffsetMgrSuite) TestInitialOffset(c *C) {
client, err := sarama.NewClient([]string{broker1.Addr()}, nil)
c.Assert(err, IsNil)
f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()

// When
om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 8), "g1", "t1", 8)
c.Assert(err, IsNil)
defer om.Stop()

// Then
fo := <-om.InitialOffset()
c.Assert(fo, DeepEquals, DecoratedOffset{2000, "bar"})

om.Stop()
f.Stop()
initialOffset := <-om.InitialOffset()
c.Assert(initialOffset, DeepEquals, DecoratedOffset{2000, "bar"})
}

// A partition offset manager can be closed even while it keeps trying to
Expand All @@ -87,17 +86,16 @@ func (s *OffsetMgrSuite) TestInitialNoCoordinator(c *C) {
client, err := sarama.NewClient([]string{broker1.Addr()}, nil)
c.Assert(err, IsNil)
f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()

// When
om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 8), "g1", "t1", 8)
c.Assert(err, IsNil)
defer om.Stop()

// Then
oce := <-om.Errors()
c.Assert(oce, DeepEquals, &OffsetCommitError{"g1", "t1", 8, ErrNoCoordinator})

om.Stop()
f.Stop()
}

// A partition offset manager can be closed even while it keeps trying to
Expand All @@ -122,17 +120,16 @@ func (s *OffsetMgrSuite) TestInitialFetchError(c *C) {
client, err := sarama.NewClient([]string{broker1.Addr()}, nil)
c.Assert(err, IsNil)
f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()

// When
om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7)
c.Assert(err, IsNil)
defer om.Stop()

// Then
oce := <-om.Errors()
c.Assert(oce, DeepEquals, &OffsetCommitError{"g1", "t1", 7, sarama.ErrNotLeaderForPartition})

om.Stop()
f.Stop()
}

// If offset commit fails then the corresponding error is sent down to the
Expand Down Expand Up @@ -161,6 +158,7 @@ func (s *OffsetMgrSuite) TestCommitError(c *C) {
c.Assert(err, IsNil)

f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()
om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7)
c.Assert(err, IsNil)

Expand All @@ -183,7 +181,65 @@ func (s *OffsetMgrSuite) TestCommitError(c *C) {
wg.Wait()
committedOffset := lastCommittedOffset(broker1, "g1", "t1", 7)
c.Assert(committedOffset, DeepEquals, DecoratedOffset{1000, "foo"})
f.Stop()
}

// If offset a response received from Kafka for an offset commit request does
// not contain information for a submitted offset, then offset manager keeps,
// retrying until it succeeds.
func (s *OffsetMgrSuite) TestCommitIncompleteResponse(c *C) {
// Given
broker1 := sarama.NewMockBroker(c, 101)
defer broker1.Close()

offsetCommitResponse := sarama.OffsetCommitResponse{
Errors: map[string]map[int32]sarama.KError{"t1": {2: sarama.ErrNoError}}}

broker1.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(c).
SetBroker(broker1.Addr(), broker1.BrokerID()),
"ConsumerMetadataRequest": sarama.NewMockConsumerMetadataResponse(c).
SetCoordinator("g1", broker1),
"OffsetFetchRequest": sarama.NewMockOffsetFetchResponse(c).
SetOffset("g1", "t1", 1, 1000, "foo", sarama.ErrNoError).
SetOffset("g1", "t1", 2, 2000, "bar", sarama.ErrNoError),
"OffsetCommitRequest": sarama.NewMockWrapper(&offsetCommitResponse),
})

cfg := testhelpers.NewTestConfig("c1")
cfg.Consumer.BackOffTimeout = 1000 * time.Millisecond
cfg.Consumer.OffsetsCommitInterval = 50 * time.Millisecond
cfg.Consumer.ReturnErrors = true
client, err := sarama.NewClient([]string{broker1.Addr()}, nil)
c.Assert(err, IsNil)

f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()
om1, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 1), "g1", "t1", 1)
c.Assert(err, IsNil)
om2, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 2), "g1", "t1", 2)
c.Assert(err, IsNil)

// When
om1.SubmitOffset(1001, "foo1")
om2.SubmitOffset(2001, "bar2")
var wg sync.WaitGroup
actor.Spawn(actor.RootID.NewChild("stopper"), &wg, om1.Stop)
actor.Spawn(actor.RootID.NewChild("stopper"), &wg, om2.Stop)

// Then
oce := <-om1.Errors()
c.Assert(oce, DeepEquals, &OffsetCommitError{"g1", "t1", 1, sarama.ErrIncompleteResponse})
c.Assert(<-om2.CommittedOffsets(), Equals, DecoratedOffset{2001, "bar2"})

broker1.SetHandlerByMap(map[string]sarama.MockResponse{
"ConsumerMetadataRequest": sarama.NewMockConsumerMetadataResponse(c).
SetCoordinator("g1", broker1),
"OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(c).
SetError("g1", "t1", 1, sarama.ErrNoError),
})

wg.Wait()
c.Assert(<-om1.CommittedOffsets(), Equals, DecoratedOffset{1001, "foo1"})
}

// It is guaranteed that a partition offset manager commits all pending offsets
Expand All @@ -207,6 +263,7 @@ func (s *OffsetMgrSuite) TestCommitBeforeClose(c *C) {
client, err := sarama.NewClient([]string{broker1.Addr()}, saramaCfg)
c.Assert(err, IsNil)
f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()
c.Assert(err, IsNil)
om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -273,7 +330,6 @@ func (s *OffsetMgrSuite) TestCommitBeforeClose(c *C) {

committedOffset := lastCommittedOffset(broker1, "g1", "t1", 7)
c.Assert(committedOffset, DeepEquals, DecoratedOffset{1001, "foo"})
f.Stop()
}

// Different consumer groups can keep different offsets for the same
Expand Down Expand Up @@ -302,6 +358,7 @@ func (s *OffsetMgrSuite) TestCommitDifferentGroups(c *C) {
client, err := sarama.NewClient([]string{broker1.Addr()}, nil)
c.Assert(err, IsNil)
f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()
om1, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7)
c.Assert(err, IsNil)
om2, err := f.SpawnOffsetManager(s.ns.NewChild("g2", "t1", 7), "g2", "t1", 7)
Expand All @@ -322,7 +379,6 @@ func (s *OffsetMgrSuite) TestCommitDifferentGroups(c *C) {
c.Assert(committedOffset1, DeepEquals, DecoratedOffset{1017, "foo3"})
committedOffset2 := lastCommittedOffset(broker1, "g2", "t1", 7)
c.Assert(committedOffset2, DeepEquals, DecoratedOffset{2019, "bar3"})
f.Stop()
}

func (s *OffsetMgrSuite) TestCommitNetworkError(c *C) {
Expand Down Expand Up @@ -351,6 +407,7 @@ func (s *OffsetMgrSuite) TestCommitNetworkError(c *C) {
client, err := sarama.NewClient([]string{broker1.Addr()}, saramaCfg)
c.Assert(err, IsNil)
f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()
om1, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7)
c.Assert(err, IsNil)
om2, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 8), "g1", "t1", 8)
Expand Down Expand Up @@ -389,7 +446,6 @@ func (s *OffsetMgrSuite) TestCommitNetworkError(c *C) {
c.Assert(committedOffset2, DeepEquals, DecoratedOffset{2001, "bar2"})
committedOffset3 := lastCommittedOffset(broker1, "g2", "t1", 7)
c.Assert(committedOffset3, DeepEquals, DecoratedOffset{3001, "bar3"})
f.Stop()
}

func (s *OffsetMgrSuite) TestCommittedChannel(c *C) {
Expand All @@ -413,6 +469,7 @@ func (s *OffsetMgrSuite) TestCommittedChannel(c *C) {
client, err := sarama.NewClient([]string{broker1.Addr()}, nil)
c.Assert(err, IsNil)
f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()
om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7)
c.Assert(err, IsNil)

Expand All @@ -430,7 +487,6 @@ func (s *OffsetMgrSuite) TestCommittedChannel(c *C) {
committedOffsets = append(committedOffsets, committedOffset)
}
c.Assert(committedOffsets, DeepEquals, []DecoratedOffset{{1005, "bar5"}})
f.Stop()
}

// Test for issue https://github.com/mailgun/kafka-pixy/issues/29. The problem
Expand Down Expand Up @@ -460,6 +516,7 @@ func (s *OffsetMgrSuite) TestBugConnectionRestored(c *C) {
client, err := sarama.NewClient([]string{broker1.Addr()}, saramaCfg)
c.Assert(err, IsNil)
f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()
om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7)
c.Assert(err, IsNil)

Expand Down Expand Up @@ -496,6 +553,7 @@ func (s *OffsetMgrSuite) TestBugConnectionRestored(c *C) {
// It will be assigned the broken connection to broker2.
om, err = f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 7), "g1", "t1", 7)
c.Assert(err, IsNil)
defer om.Stop()

log.Infof(" THEN")
// Then: the new partition offset manager re-establishes connection with
Expand All @@ -507,9 +565,6 @@ func (s *OffsetMgrSuite) TestBugConnectionRestored(c *C) {
case <-time.After(200 * time.Millisecond):
}
c.Assert(do.Offset, Equals, int64(1000), Commentf("Failed to retrieve initial offset: %s", oce.Err))

om.Stop()
f.Stop()
}

// Test for issue https://github.com/mailgun/kafka-pixy/issues/62. The problem
Expand Down Expand Up @@ -537,6 +592,7 @@ func (s *OffsetMgrSuite) TestBugOffsetDroppedOnStop(c *C) {
client, err := sarama.NewClient([]string{broker1.Addr()}, nil)
c.Assert(err, IsNil)
f := SpawnFactory(s.ns.NewChild(), cfg, client)
defer f.Stop()
om, err := f.SpawnOffsetManager(s.ns.NewChild("g1", "t1", 1), "g1", "t1", 1)
c.Assert(err, IsNil)
time.Sleep(100 * time.Millisecond)
Expand All @@ -561,7 +617,6 @@ func (s *OffsetMgrSuite) TestBugOffsetDroppedOnStop(c *C) {
committedOffsets = append(committedOffsets, committedOffset)
}
c.Assert(committedOffsets, DeepEquals, []DecoratedOffset{{1001, "bar1"}, {1002, "bar2"}})
f.Stop()
}

// lastCommittedOffset traverses the mock broker history backwards searching
Expand Down

0 comments on commit ab6db39

Please sign in to comment.