kafka - Publisher & Consumer for Kafka 0.7.x in Go

Kafka is a distributed publish-subscribe messaging system: (

Go language: (

For Kafka 0.8.x take a look at


May 2015

  • fixed bug handling partial message at end of a fetch response when the payload is < 4 bytes
  • if the the kafka log segment being read is cleaned up, attempt resuming the consumer from the earliest available offset

April 2015

  • added support for Snappy compression
  • fixed handling of partial messages at the end of each fetch response
  • added ProduceFromChannel() method in the publisher, mirroring the ConsumeOnChannel() method in the consumer
  • changed the quit channel type to empty struct{}, adding ability to stop the consumer on demand without race conditions
  • reused connection in BatchPublish(), instead of establishing a brand new connection every time.
  • applied gofmt / golint on the code (renamed Id() to ID() for compliance)
  • added comments
  • better distinction between DEBUG and ERROR logs, with info on how to get the consumer unstuck when the max fetch size is too small

April 2013

  • Merged back from the apache repository & outstanding patches from jira applied

Get up and running

Install go (version 1):
For more info see:

Make sure to set your GOROOT properly ( Also set your GOPATH appropriately:

Build from source:

make kafka
Make the tools (publisher & consumer)
make tools
Start zookeeper, Kafka server
For more info on Kafka, see:


Start a consumer:

   $GOPATH/bin/consumer -topic test -consumeforever
  Consuming Messages :
  From: localhost:9092, topic: test, partition: 0

Now the consumer will just poll until a message is received.

Publish a message:

  $GOPATH/bin/publisher -topic test -message "Hello World"

The consumer should output message.

API Usage


broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
broker.Publish(kafka.NewMessage([]byte("testing 1 2 3")))

Publishing Compressed Messages

broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
broker.Publish(kafka.NewCompressedMessage([]byte("testing 1 2 3")))


broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
broker.Consume(func(msg *kafka.Message) { msg.Print() })

Or the consumer can use a channel based approach:

broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
go broker.ConsumeOnChannel(msgChan, 10, quitChan)

Consuming Offsets

broker := kafka.NewBrokerOffsetConsumer("localhost:9092", "mytesttopic", 0)
offsets, err := broker.GetOffsets(-1, 1)


jeffreydamick (at) gmail (dot) com

Big thank you to NeuStar for sponsoring this work.


