Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confluent kafka go #74

Merged
merged 2 commits into from
Jan 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kafka/confluent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
This a wrapper around the confluent-kafka-go library.

To use library
- Create processor or view with confluent consumer, eg, `goka.WithConsumerBuilder(confluent.NewConsumerBuilder(1000))`
- Install `librdkafka` in the compilation environment
- Compile the go binary with `-tags static`
- Install `libssl1.0.0` `libsasl2-2` (or equivalent) in the execution environment

Note that this is experimental, not well tested and features are missing (in particular `auto.commit` is set to true).

315 changes: 315 additions & 0 deletions kafka/confluent/confluent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
package confluent

import (
"fmt"
"log"
"strings"

rdkafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/lovoo/goka"
"github.com/lovoo/goka/kafka"
)

type topicPartitionInfo struct {
tp rdkafka.TopicPartition
hwm int64
bofSent bool
}

type confluent struct {
tablePartitions map[string]map[int32]topicPartitionInfo
streamPartitions map[int32][]rdkafka.TopicPartition
partitionMap map[int32]bool

consumer confluentConsumer
events chan kafka.Event
groupTopics map[string]int64
cmds chan interface{}
stop chan bool
done chan bool
}

type addPartition struct {
topic string
partition int32
initialOffset int64
}

type removePartition struct {
topic string
partition int32
}

type addGroupPartition struct {
partition int32
}

func NewConsumer(brokers []string, group string, bufsize int) (kafka.Consumer, error) {
consumer, err := rdkafka.NewConsumer(
&rdkafka.ConfigMap{
"bootstrap.servers": strings.Join(brokers, ","),
"group.id": group,
"session.timeout.ms": 6000,
// TODO(diogo): implement Commit()
//"enable.auto.commit": false,
"go.events.channel.size": bufsize,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"default.topic.config": rdkafka.ConfigMap{"auto.offset.reset": "earliest"},
},
)
if err != nil {
return nil, err
}

c := &confluent{
consumer: consumer,
tablePartitions: make(map[string]map[int32]topicPartitionInfo),
streamPartitions: make(map[int32][]rdkafka.TopicPartition),
partitionMap: make(map[int32]bool),
events: make(chan kafka.Event, 1024),
cmds: make(chan interface{}, 1024),
stop: make(chan bool),
done: make(chan bool),
}

// start go routine
go c.run()
return c, nil
}

// NewConsumerBuilder builds confluent-based consumers with channel size.
func NewConsumerBuilder(size int) goka.ConsumerBuilder {
return func(brokers []string, group, clientID string) (kafka.Consumer, error) {
consumer, err := NewConsumer(brokers, group, size)
if err != nil {
log.Fatalf("cannot create confluent consumer: %v", err)
}
return consumer, nil
}
}

func (c *confluent) Events() <-chan kafka.Event {
return c.events
}

func (c *confluent) Subscribe(topics map[string]int64) error {
log.Println("%% confluent %%", "subscribe", topics)
c.groupTopics = topics

var tops []string
for topic := range topics {
tops = append(tops, string(topic))
}

err := c.consumer.SubscribeTopics(tops, nil)
if err != nil {
return err
}
return nil
}

func (c *confluent) Commit(topic string, partition int32, offset int64) error { return nil }

func (c *confluent) AddGroupPartition(partition int32) {
select {
case c.cmds <- &addGroupPartition{partition}:
case <-c.stop:
}
}

func (c *confluent) AddPartition(topic string, partition int32, initialOffset int64) {
select {
case c.cmds <- &addPartition{topic, partition, initialOffset}:
case <-c.stop:
}
}

func (c *confluent) RemovePartition(topic string, partition int32) {
select {
case c.cmds <- &removePartition{topic, partition}:
case <-c.stop:
}
}

func (c *confluent) Close() error {
// stop go routines
close(c.stop)
<-c.done

return nil
}

func (c *confluent) run() {
defer close(c.done)
for {
select {
case ev := <-c.consumer.Events():
// log.Println("%% confluent %%", "received event", ev)
switch e := ev.(type) {
case rdkafka.AssignedPartitions:
c.events <- c.rebalance(e)

case rdkafka.RevokedPartitions:
c.consumer.Unassign()

case *rdkafka.Message:
var (
topic = *e.TopicPartition.Topic
partition = e.TopicPartition.Partition
)

c.events <- &kafka.Message{
Topic: topic,
Partition: partition,
Offset: int64(e.TopicPartition.Offset),
Key: string(e.Key),
Value: e.Value,
Timestamp: e.Timestamp,
}

case rdkafka.PartitionEOF:
//log.Printf("%%%% confluent %%%% Reached %v\n", e)
c.events <- &kafka.EOF{
Topic: *e.Topic,
Partition: e.Partition,
Hwm: int64(e.Offset),
}

case rdkafka.Error:
c.events <- &kafka.Error{fmt.Errorf("error from rdkafka: %v", e)}

default:
//log.Printf("HANDLE ME: %v", ev)
}

case cmd := <-c.cmds:
switch cmd := cmd.(type) {
case *addPartition:
c.addPartition(cmd.topic, cmd.partition, cmd.initialOffset)
case *removePartition:
c.removePartition(cmd.topic, cmd.partition)
case *addGroupPartition:
c.addGroupPartition(cmd.partition)
default:
c.events <- &kafka.Error{fmt.Errorf("invalid command: %T", cmd)}
}

case <-c.stop:
log.Println("%% confluent %% terminated")
return
}
}
}

func (c *confluent) addGroupPartition(partition int32) {
log.Println("%% confluent %%", "adding group partition", partition)
c.partitionMap[partition] = true
c.reassign()
}

func (c *confluent) addPartition(topic string, partition int32, initialOffset int64) {
log.Println("%% confluent %%", "adding topic partition", topic, partition, initialOffset)
if _, has := c.tablePartitions[topic]; !has {
c.tablePartitions[topic] = make(map[int32]topicPartitionInfo)
}
c.tablePartitions[topic][partition] = topicPartitionInfo{
tp: rdkafka.TopicPartition{
Topic: &topic,
Partition: partition,
Offset: rdkafka.Offset(initialOffset),
Error: nil,
},
}

// send BOF
l, h, err := c.consumer.QueryWatermarkOffsets(topic, partition, 500)
if err != nil {
select {
case c.events <- &kafka.Error{fmt.Errorf("error querying watermarks: %v", err)}:
case <-c.stop:
return
}
}
select {
case c.events <- &kafka.BOF{
Topic: topic,
Partition: partition,
Offset: l,
Hwm: h,
}:
case <-c.stop:
return
}

c.reassign()
}

func (c *confluent) removePartition(topic string, partition int32) {
log.Println("%% confluent %%", "remove topic partition", topic, partition)
if _, has := c.tablePartitions[topic]; !has {
return
}
if _, has := c.tablePartitions[topic][partition]; !has {
return
}
delete(c.tablePartitions[topic], partition)
if len(c.tablePartitions[topic]) == 0 {
delete(c.tablePartitions, topic)
}
c.reassign()
}

func (c *confluent) reassign() {
var tps []rdkafka.TopicPartition
for p, tp := range c.streamPartitions {
if c.partitionMap[p] {
tps = append(tps, tp...)
}
}
for _, m := range c.tablePartitions {
for _, tp := range m {
tps = append(tps, tp.tp)
}
}
c.consumer.Assign(tps)
}

func (c *confluent) rebalance(e rdkafka.AssignedPartitions) *kafka.Assignment {
var (
as = make(kafka.Assignment)
pm = c.partitionMap
)

c.partitionMap = make(map[int32]bool)
for _, p := range e.Partitions {
if p.Offset == -1001 {
off := c.groupTopics[*p.Topic]
as[p.Partition] = off
} else {
as[p.Partition] = int64(p.Offset)
}

c.streamPartitions[p.Partition] = append(c.streamPartitions[p.Partition], p)
c.partitionMap[p.Partition] = pm[p.Partition] // keep already assigned partitions
}
return &as
}

//go:generate mockgen -package mock -destination ../mock/confluent.go -source=confluent.go confluentConsumer
type confluentConsumer interface {
Assign(partitions []rdkafka.TopicPartition) (err error)
Close() (err error)
Commit() ([]rdkafka.TopicPartition, error)
CommitMessage(m *rdkafka.Message) ([]rdkafka.TopicPartition, error)
CommitOffsets(offsets []rdkafka.TopicPartition) ([]rdkafka.TopicPartition, error)
Events() chan rdkafka.Event
GetMetadata(topic *string, allTopics bool, timeoutMs int) (*rdkafka.Metadata, error)
Poll(timeoutMs int) (event rdkafka.Event)
QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
String() string
Subscribe(topic string, rebalanceCb rdkafka.RebalanceCb) error
SubscribeTopics(topics []string, rebalanceCb rdkafka.RebalanceCb) (err error)
Unassign() (err error)
Unsubscribe() (err error)
}
28 changes: 28 additions & 0 deletions kafka/confluent/confluent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package confluent

import (
"testing"

"github.com/golang/mock/gomock"
"stash.lvint.de/lab/goka/mock"
)

func newMockConfluent(consumer confluentConsumer) *confluent {
return &confluent{
consumer: consumer,
events: make(chan Event),
stop: make(chan bool),
done: make(chan bool),
}
}

func TestConfluent1(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

consumer := mock.NewMockconfluentConsumer(ctrl)
c := newMockConfluent(consumer)

consumer.EXPECT().SubscribeTopics([]string{"t1"}, nil).Return(nil)
c.ConnectGroup(map[Topic]Offset{"t1": -1})
}
6 changes: 5 additions & 1 deletion kafkamock.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,17 @@ func (km *KafkaMock) ProcessorOptions() []ProcessorOption {
WithStorageBuilder(func(topic string, partition int32) (storage.Storage, error) {
return km.storage, nil
}),
WithConsumer(km.consumerMock),
WithConsumerBuilder(km.consumerBuilder),
WithProducer(km.producerMock),
WithTopicManager(km.topicMgrMock),
WithPartitionChannelSize(0),
}
}

func (km *KafkaMock) consumerBuilder(b []string, group, clientID string) (kafka.Consumer, error) {
return km.consumerMock, nil
}

// initProtocol initiates the protocol with the client basically making the KafkaMock
// usable.
func (km *KafkaMock) initProtocol() {
Expand Down
Loading