Skip to content

Commit

Permalink
Add support for Kafka 0.9.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Dec 14, 2015
1 parent 4af1e09 commit 0187673
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 4 deletions.
6 changes: 5 additions & 1 deletion admin/admin.go
Expand Up @@ -183,7 +183,11 @@ func (a *T) SetGroupOffsets(group, topic string, offsets []PartitionOffset) erro
return NewErrQuery(err, "failed to get coordinator")
}

req := sarama.OffsetCommitRequest{ConsumerGroup: group, Version: ProtocolVer1}
req := sarama.OffsetCommitRequest{
Version: ProtocolVer1,
ConsumerGroup: group,
ConsumerGroupGeneration: sarama.GroupGenerationUndefined,
}
for _, po := range offsets {
req.AddBlock(topic, po.Partition, po.Offset, sarama.ReceiveTime, po.Metadata)
}
Expand Down
1 change: 0 additions & 1 deletion producer/producer.go
Expand Up @@ -41,7 +41,6 @@ type produceResult struct {
// Spawn creates a producer instance and starts its internal goroutines.
func Spawn(cfg *config.T) (*T, error) {
saramaCfg := sarama.NewConfig()
saramaCfg.ClientID = cfg.ClientID
saramaCfg.ChannelBufferSize = cfg.Producer.ChannelBufferSize
saramaCfg.Producer.RequiredAcks = sarama.WaitForAll
saramaCfg.Producer.Return.Successes = true
Expand Down
2 changes: 1 addition & 1 deletion producer/producer_test.go
Expand Up @@ -170,7 +170,7 @@ func (s *ProducerSuite) failedMessages() []string {
for {
select {
case prodMsg := <-s.deadMessageCh:
b = append(b, string(prodMsg.Value.(sarama.ByteEncoder)))
b = append(b, string(prodMsg.Value.(sarama.StringEncoder)))
default:
goto done
}
Expand Down
12 changes: 11 additions & 1 deletion service/service_test.go
Expand Up @@ -450,7 +450,8 @@ func (s *ServiceSuite) TestSetOffsets(c *C) {
}
}

// It is not an error to set offsets for a topic that does not exist.
// Result of setting offsets for a non-existent topic depends on the Kafka
// version. It is ok for 0.8, but error in 0.9.
func (s *ServiceSuite) TestSetOffsetsNoSuchTopic(c *C) {
// Given
svc, _ := Spawn(s.cfg)
Expand All @@ -461,6 +462,15 @@ func (s *ServiceSuite) TestSetOffsetsNoSuchTopic(c *C) {
"application/json", strings.NewReader(`[{"partition": 0, "offset": 1100, "metadata": "A100"}]`))

// Then
kafkaVersion := os.Getenv("KAFKA_VERSION")
if strings.HasPrefix(kafkaVersion, "0.9") {
c.Assert(err, IsNil)
c.Assert(r.StatusCode, Equals, http.StatusNotFound)
body := ParseJSONBody(c, r).(map[string]interface{})
c.Assert(body["error"], Equals, "Unknown topic")
return
}

c.Assert(err, IsNil)
c.Assert(r.StatusCode, Equals, http.StatusOK)
c.Assert(ParseJSONBody(c, r), DeepEquals, apiserver.EmptyResponse)
Expand Down
1 change: 1 addition & 0 deletions testhelpers/testhelpers.go
Expand Up @@ -79,6 +79,7 @@ func NewKafkaHelper(c *C) *KafkaHelper {
cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
cfg.Consumer.Offsets.CommitInterval = 50 * time.Millisecond
cfg.ClientID = "unittest-runner"
err := error(nil)
if kh.client, err = sarama.NewClient(KafkaPeers, cfg); err != nil {
Expand Down

0 comments on commit 0187673

Please sign in to comment.