Skip to content

Commit

Permalink
Merge pull request #11 from JoshKCarroll/configure-buffer-size
Browse files Browse the repository at this point in the history
Add `KAFKA_MAX_BUFFER_KB` to configure librdkafka buffer size
  • Loading branch information
JoshuaC215 committed May 20, 2019
2 parents 36ae99e + 435394d commit d3f7cf9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 35 deletions.
44 changes: 29 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
Frafka is a Kafka implementation for [Frizzle](https://github.com/qntfy/frizzle) based on [confluent-go-kafka](https://github.com/confluentinc/confluent-kafka-go).

Frizzle is a magic message (`Msg`) bus designed for parallel processing w many goroutines.
* `Receive()` messages from a configured `Source`
* Do your processing, possibly `Send()` each `Msg` on to one or more `Sink` destinations
* `Ack()` (or `Fail()`) the `Msg` to notify the `Source` that processing completed

* `Receive()` messages from a configured `Source`
* Do your processing, possibly `Send()` each `Msg` on to one or more `Sink` destinations
* `Ack()` (or `Fail()`) the `Msg` to notify the `Source` that processing completed

## Prereqs / Build instructions

Expand All @@ -22,20 +23,22 @@ As of Go 1.11, frafka uses [go mod](https://github.com/golang/go/wiki/Modules) f

### Install librdkafka

Frafka depends on C library `librdkafka` (>=`v0.11.4`). For Debian 9+ (which includes golang docker images),
Frafka depends on C library `librdkafka` (>=`v0.11.6`). For Debian 9+ (which includes golang docker images),
it has to be built from source. Fortunately, there's a script for that.
```

```sh
# Install librdkafka
- curl --silent -OL https://raw.githubusercontent.com/confluentinc/confluent-kafka-go/v0.11.4/mk/bootstrap-librdkafka.sh
- bash bootstrap-librdkafka.sh v0.11.4 /usr/local
- ldconfig
```

Once that is installed, should be good to go with
```
$ go get github.com/qntfy/frafka
$ cd frafka
$ go build

```sh
go get github.com/qntfy/frafka
cd frafka
go build
```

## Running the tests
Expand All @@ -44,17 +47,19 @@ Frafka has integration tests which require a kafka broker to test against. `KAFK
used by tests. [simplesteph/kafka-stack-docker-compose](https://github.com/simplesteph/kafka-stack-docker-compose)
has a great simple docker-compose setup that is used in frafka CI currently.

```
$ curl --silent -L -o kafka.yml https://raw.githubusercontent.com/simplesteph/kafka-stack-docker-compose/v5.1.0/zk-single-kafka-single.yml
$ DOCKER_HOST_IP=127.0.0.1 docker-compose -f kafka.yml up -d
```sh
curl --silent -L -o kafka.yml https://raw.githubusercontent.com/simplesteph/kafka-stack-docker-compose/v5.1.0/zk-single-kafka-single.yml
DOCKER_HOST_IP=127.0.0.1 docker-compose -f kafka.yml up -d
# takes a while to initialize; can use a tool like wait-for-it.sh in scripting
$ export KAFKA_BROKERS=127.0.0.1:9092
$ go test -v --cover ./...
export KAFKA_BROKERS=127.0.0.1:9092
go test -v --cover ./...
```

## Configuration

Frafka Sources and Sinks are configured using [Viper](https://godoc.org/github.com/spf13/viper).
```

```golang
func InitSink(config *viper.Viper) (*Sink, error)

func InitSource(config *viper.Viper) (*Source, error)
Expand All @@ -70,13 +75,22 @@ use a prefix before the below values.
| KAFKA_TOPICS | source | topic(s) to read from | |
| KAFKA_CONSUMER_GROUP | source | consumer group value for coordinating multiple clients | |
| KAFKA_CONSUME_LATEST_FIRST | source (optional) | start at the beginning or end of topic | earliest |
| KAFKA_MAX_BUFFER_KB | optional | How large a buffer to allow for prefetching and batch produing kafka message* | 16384 |

*`KAFKA_MAX_BUFFER_KB` is passed through to librdkafka. Default is 16MB.
Corresponding librdkafka config values are `queue.buffering.max.kbytes` (Producer) and `queued.max.messages.kbytes`
(Consumer). Note that librdkafka creates one buffer each for the Producer (Sink) and for each topic+partition
being consumed by the source. E.g. with default 16MB default, if you are consuming from 4 partitions and also
producing then the theoretical max memory usage from the buffer would be `16*(4+1) = 80` MB.

## Async Error Handling

Since records are sent in batch fashion, Kafka may report errors or other information asynchronously.
Event can be recovered via channels returned by the `Sink.Events()` and `Source.Events()` methods.
Partition changes and EOF will be reported as non-error Events, other errors will conform to `error` interface.
Where possible, Events will retain underlying type from [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go)
if more information is desired.

## Contributing

Contributions welcome! Take a look at open issues.
26 changes: 14 additions & 12 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ func InitSink(config *viper.Viper) (*Sink, error) {
}
brokers := strings.Join(config.GetStringSlice("kafka_brokers"), ",")

// TODO: Performance optimization in librdkafka
// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
// Key values:
// - queue.buffering.max.messages
// - queue.buffering.max.ms
// - compression.codec ?
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": brokers})
config.SetDefault("kafka_max_buffer_kb", 16384) // 16MB
maxBufferKB := config.GetInt("kafka_max_buffer_kb")

kCfg := kafka.ConfigMap{
"bootstrap.servers": brokers,
"queued.max.messages.kbytes": maxBufferKB, // limit memory usage for the consumer prefetch buffer; note there is one buffer per topic+partition
}

p, err := kafka.NewProducer(&kCfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -103,16 +105,16 @@ func (s *Sink) Send(m frizzle.Msg, topic string) error {

// Close the Sink after flushing any Msgs not fully sent
func (s *Sink) Close() error {
// Flush any messages still pending send
if remaining := s.prod.Flush(flushTimeoutMS); remaining > 0 {
return fmt.Errorf("there are still %d messages which have not been delivered after %d milliseconds", remaining, flushTimeoutMS)
}

// check if already closed, return if so
if s.quitChan == nil {
return nil
}

// Flush any messages still pending send
if remaining := s.prod.Flush(flushTimeoutMS); remaining > 0 {
return fmt.Errorf("there are still %d messages which have not been delivered after %d milliseconds", remaining, flushTimeoutMS)
}

// tell deliveryReports() goroutine to finish if running
close(s.quitChan)
// wait for it to finish
Expand Down
17 changes: 9 additions & 8 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,23 @@ func InitSource(config *viper.Viper) (*Source, error) {
}
brokers := strings.Join(config.GetStringSlice("kafka_brokers"), ",")

// TODO: Performance optimization in librdkafka
// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
// Key values:
// - queued.min.messages
// - fetch.message.max.bytes and related
c, err := kafka.NewConsumer(&kafka.ConfigMap{
config.SetDefault("kafka_max_buffer_kb", 16384) // 16MB
maxBufferKB := config.GetInt("kafka_max_buffer_kb")

kCfg := kafka.ConfigMap{
"bootstrap.servers": brokers, // expects CSV
"group.id": config.GetString("kafka_consumer_group"),
"session.timeout.ms": kafkaSessionTimeoutMS,
"go.events.channel.enable": true, // support c.Events()
"go.events.channel.size": kafkaEventChannelSize,
"go.application.rebalance.enable": true, // we handle partition updates (needed for offset management)
"go.application.rebalance.enable": true, // we handle partition updates (needed for offset management)
"queued.max.messages.kbytes": maxBufferKB, // limit memory usage for the consumer prefetch buffer; note there is one buffer per topic+partition
"default.topic.config": kafka.ConfigMap{
"auto.offset.reset": startOffset,
},
})
}

c, err := kafka.NewConsumer(&kCfg)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit d3f7cf9

Please sign in to comment.