Skip to content

Go library providing abstraction over Shopify's Sarama library.

License

Notifications You must be signed in to change notification settings

Jaskaranbir/go-kafkaproxy

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

KafkaProxy for Sarama library

This is just a simple Go library providing abstraction over Shopify's Sarama library.

Usage:

  • Install dep dependencies:
dep ensure
  • Import/Use in your code!
import github.com/Jaskaranbir/go-kafkaproxy/pkg/consumer // Import Consumer
import github.com/Jaskaranbir/go-kafkaproxy/pkg/producer // Import Producer

Docs:

Minimal examples:

Consumer:

msgHandler := func(msg *sarama.ConsumerMessage, c *consumer.Consumer) {
  // Convert from []byte to string
  println("Received message: ", string(msg.Value))
  consumer := c.Get()
  if !c.IsClosed() {
    consumer.MarkOffset(msg, "")
  } else {
    log.Fatalln("Consumer was closed before offsets could be marked.")
  }
}

errHandler := func(e *error) {
  log.Fatalln((*e).Error())
}

config := consumer.Config{
  ConsumerGroup: "test",
  ErrHandler:    errHandler,
  KafkaBrokers:  []string{"localhost:9092"},
  MsgHandler:    msgHandler,
  Topics:        []string{"test"},
}

proxyconsumer, _ := consumer.New(&config)
proxyconsumer.EnableLogging()

// Temporary hack for simplicity. Use channels/app-logic in actual application.
time.Sleep(100000 * time.Millisecond)

Producer:

errHandler := func(err *sarama.ProducerError) {
  errs := *err
  fmt.Println(errs.Error())
}
config := producer.Config{
  ErrHandler:   errHandler,
  KafkaBrokers: []string{"localhost:9092"},
}
asyncProducer, err := producer.New(&config)
asyncProducer.EnableLogging()

strTime := strconv.Itoa(int(time.Now().Unix()))
msg := asyncProducer.CreateKeyMessage("test", strTime, "testValue")

if err != nil {
  panic(err)
}
input, _ := asyncProducer.Input()
input <- msg

// Temporary hack for simplicity. Use channels/app-logic in actual application.
time.Sleep(2000 * time.Millisecond)

About

Go library providing abstraction over Shopify's Sarama library.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages