Skip to content

Commit

Permalink
#239 integrate sarama consumer group (#260)
Browse files Browse the repository at this point in the history
* Co-authored-by: frairon <franz.eichhorn@lovoo.com>
Co-authored-by: R053NR07 <jan.bickel@lovoo.com>
* bugfix in shutdown/rebalance: correctly closing joins
* run update/request/response stats in own goroutine
* fix rebalancing by adding a copartitioning rebalance strategy
* updated readme for configuration, added changelog
* Open Storage in PartitionTable when performing Setup
* return trackOutput if stats are nil
* view.get fixed for uninitialized view
added lots of godoc
fixed many linter errors
added Open call when creating storage
* context stats tracking: use queueing mechanism to avoid race conditions
* Add simpleBackoff and add backoff options for processor and view
* added strings to streams helper
* #249 view example
* issue #249: migration guide, #241 panic documentation of context
* #248 exposing partition table's connection state to view
* Migration: describe offsetbug
* partition_table: implement autoreconnect in recovery mode
* bugfix goroutine-leak in statsloop in partition table
* #248: refactored state merger, bugfix race condition when shutting down the signal/observers, created example
* bugfix partition state notification
* remove change in example, updated changelog
* fix readme example and add readme as example
* restore 1-simplest fix, remove readme-example
Co-authored-by: Jan Bickel <jan.bickel@lovoo.com>
  • Loading branch information
frairon committed Jul 9, 2020
1 parent 00d8032 commit 075bc83
Show file tree
Hide file tree
Showing 109 changed files with 10,957 additions and 10,115 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
tmp*
*.*~
.tags*
vendor
290 changes: 290 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

57 changes: 57 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
This document sums up issues for migrating between 0.1.4 to 0.9.x

## restartable/auto reconnect view.Run()

In 0.1.4, if a view was created to be restartable, it returned from `Run` in case of errors, but allowed
to be restarted calling `Run` again. The view was still usable, even if it was not running and receiving updates.

The behavior of that option has changed in 0.9.x in a way that the `view.Run` does the reconnect internally using configurable backoff.
The Option was also renamed (the old version has been kept for compatibility reasons).

```go

// create a view
view014, _ := NewView(..., WithViewRestartable())

// reconnect logic in 0.1.4
go func(){
for running{
err:= view014.Run(context.Background())
// handle error

// sleep or simple backoff logic
time.Sleep(time.Second)
}
}()

// After migration:
// create a view
view09x, _ := NewView(..., WithViewAutoReconnect())
ctx, cancel := context.WithCancel(context.Background())

// no need for reconnect logic, it's handled by the view internally
go func(){
err:= view09x.Run(ctx)
// handle shutdown error
}()

// stop view
cancel()

```


## Offset bug in local storage
In 0.1.4 there was a bug that caused the table offset being stored in the local cache always be +1 compared the actual offset stored in kafka.
A second bug kind of evened it out so it never was an issue.

From 0.9.x, both bugs are fixed. However, if you upgrade goka and restart a processor using the same cache files that were maintained by the old version you'll see a warning like this
```
Error: local offset is higher than partition offset. topic some-topic, partition 0, hwm 1312, local offset 1314. This can have several reasons:
(1) The kafka topic storing the table is gone --> delete the local cache and restart!
(2) the processor crashed last time while writing to disk.
(3) You found a bug!
```
This is because goka sees an offset that it is not expecting.
You should see this error only once per partition and processor. The offset will be fixed automatically. If it appears on every start or regularily, it might actually a bug or some error and should be further investigated
(or reported to goka :)).
53 changes: 45 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,37 @@ You can install Goka by running the following command:

``$ go get -u github.com/lovoo/goka``

## Configuration

Goka relies on [Sarama](https://github.com/Shopify/sarama) to perform the actual communication with Kafka, which offers many configuration settings. The config is documented [here](https://godoc.org/github.com/Shopify/sarama#Config).

In most cases, you need to modify the config, e.g. to set the Kafka Version.

```
cfg := goka.DefaultConfig()
cfg.Version = sarama.V2_4_0_0
goka.ReplaceGlobalConfig(cfg)
```

This makes all goka components use the updated config.

If you do need specific configuration for different components, you need to pass customized builders to the
component's constructor, e.g.

```
cfg := goka.DefaultConfig()
// modify the config with component-specific settings
// use the config by creating a builder which allows to override global config
goka.NewProcessor(// ...,
goka.WithConsumerGroupBuilder(
goka.ConsumerGroupBuilderWithConfig(cfg),
),
// ...
)
```

## Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.
Expand Down Expand Up @@ -67,11 +98,11 @@ package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
Expand All @@ -83,18 +114,20 @@ var (
group goka.Group = "example-group"
)

// emits a single message and leave
// Emit messages forever every second
func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}
defer emitter.Finish()
err = emitter.EmitSync("some-key", "some-value")
if err != nil {
log.Fatalf("error emitting message: %v", err)
for {
time.Sleep(1 * time.Second)
err = emitter.EmitSync("some-key", "some-value")
if err != nil {
log.Fatalf("error emitting message: %v", err)
}
}
fmt.Println("message emitted")
}

// process messages until ctrl-c is pressed
Expand Down Expand Up @@ -132,6 +165,8 @@ func runProcessor() {
defer close(done)
if err = p.Run(ctx); err != nil {
log.Fatalf("error running processor: %v", err)
} else {
log.Printf("Processor shutdown cleanly")
}
}()

Expand All @@ -143,10 +178,12 @@ func runProcessor() {
}

func main() {
runEmitter() // emits one message and stops
runProcessor() // press ctrl-c to stop
go runEmitter() // emits one message and stops
runProcessor() // press ctrl-c to stop
}

```
A very similar example is also in *1-simplest*. Just run `go run examples/1-simplest/main.go`.

Note that tables have to be configured in Kafka with log compaction.
For details check the [Wiki](https://github.com/lovoo/goka/wiki/Tips#configuring-log-compaction-for-table-topics).
Expand Down
13 changes: 13 additions & 0 deletions assignment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package goka

import (
"fmt"
)

// Assignment represents a partition:offset assignment for the current connection
type Assignment map[int32]int64

func (a *Assignment) string() string {
var am map[int32]int64 = *a
return fmt.Sprintf("Assignment %v", am)
}
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package goka

import "github.com/Shopify/sarama"

// Broker is an interface for the sarama broker
type Broker interface {
Addr() string
Connected() (bool, error)
CreateTopics(request *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error)
Open(conf *sarama.Config) error
}
99 changes: 99 additions & 0 deletions builders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package goka

import (
"hash"
"time"

"github.com/Shopify/sarama"
)

// ProducerBuilder create a Kafka producer.
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

// DefaultProducerBuilder creates a Kafka producer using the Sarama library.
func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config := globalConfig
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config)
}

// ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder {
return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, config)
}
}

// TopicManagerBuilder creates a TopicManager to check partition counts and
// create tables.
type TopicManagerBuilder func(brokers []string) (TopicManager, error)

// DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) {
config := globalConfig
config.ClientID = "goka-topic-manager"
return NewTopicManager(brokers, &config, NewTopicManagerConfig())
}

// TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
return NewTopicManager(brokers, config, tmConfig)
}
}

// TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
config := globalConfig
config.ClientID = "goka-topic-manager"
return NewTopicManager(brokers, &config, tmConfig)
}
}

// ConsumerGroupBuilder creates a `sarama.ConsumerGroup`
type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)

// DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.
func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) {
config := globalConfig
config.ClientID = clientID
return sarama.NewConsumerGroup(brokers, group, &config)
}

// ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config
func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder {
return func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) {
config.ClientID = clientID
return sarama.NewConsumerGroup(brokers, group, config)
}
}

// SaramaConsumerBuilder creates a `sarama.Consumer`
type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error)

// DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.
func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error) {
config := globalConfig
config.ClientID = clientID
return sarama.NewConsumer(brokers, &config)
}

// SaramaConsumerBuilderWithConfig creates a sarama consumer using passed config
func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder {
return func(brokers []string, clientID string) (sarama.Consumer, error) {
config.ClientID = clientID
return sarama.NewConsumer(brokers, config)
}
}

// BackoffBuilder creates a backoff
type BackoffBuilder func() (Backoff, error)

// DefaultBackoffBuilder returnes a simpleBackoff with 10 second steps
func DefaultBackoffBuilder() (Backoff, error) {
return NewSimpleBackoff(time.Second * 10), nil
}
1 change: 0 additions & 1 deletion codec/codec_test.go

This file was deleted.

58 changes: 58 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package goka

import (
"time"

"github.com/Shopify/sarama"
)

var (
globalConfig = *DefaultConfig()
)

const (
// size of sarama buffer for consumer and producer
defaultChannelBufferSize = 256

// time sarama-cluster assumes the processing of an event may take
defaultMaxProcessingTime = 1 * time.Second

// producer flush configuration
defaultFlushFrequency = 100 * time.Millisecond
defaultFlushBytes = 64 * 1024
defaultProducerMaxRetries = 10
)

// DefaultConfig creates a new config used by goka per default
// Use it to modify and pass to `goka.ReplaceGlobalConifg(...)` to modify
// goka's global config
func DefaultConfig() *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0

// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime
// this configures the initial offset for streams. Tables are always
// consumed from OffsetOldest.
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Group.Rebalance.Strategy = CopartitioningStrategy
// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = defaultFlushFrequency
config.Producer.Flush.Bytes = defaultFlushBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Retry.Max = defaultProducerMaxRetries
return config
}

// ReplaceGlobalConfig registeres a standard config used during building if no
// other config is specified
func ReplaceGlobalConfig(config *sarama.Config) {
if config == nil {
panic("nil config registered as global config")
}
globalConfig = *config
}
42 changes: 42 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package goka

import (
"testing"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/internal/test"
)

func TestConfig_DefaultConfig(t *testing.T) {
t.Run("equal", func(t *testing.T) {
cfg := DefaultConfig()
test.AssertTrue(t, cfg.Version == sarama.V2_0_0_0)
test.AssertTrue(t, cfg.Consumer.Return.Errors == true)
test.AssertTrue(t, cfg.Consumer.MaxProcessingTime == defaultMaxProcessingTime)
test.AssertTrue(t, cfg.Consumer.Offsets.Initial == sarama.OffsetNewest)
test.AssertTrue(t, cfg.Producer.RequiredAcks == sarama.WaitForLocal)
test.AssertTrue(t, cfg.Producer.Compression == sarama.CompressionSnappy)
test.AssertTrue(t, cfg.Producer.Flush.Frequency == defaultFlushFrequency)
test.AssertTrue(t, cfg.Producer.Flush.Bytes == defaultFlushBytes)
test.AssertTrue(t, cfg.Producer.Return.Successes == true)
test.AssertTrue(t, cfg.Producer.Return.Errors == true)
test.AssertTrue(t, cfg.Producer.Retry.Max == defaultProducerMaxRetries)
})
}

func TestConfig_ReplaceGlobalConfig(t *testing.T) {
t.Run("succeed", func(t *testing.T) {
custom := DefaultConfig()
custom.Version = sarama.V0_8_2_0
ReplaceGlobalConfig(custom)
test.AssertEqual(t, globalConfig.Version, custom.Version)
})
t.Run("panic", func(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Fatal("there was no panic")
}
}()
ReplaceGlobalConfig(nil)
})
}
Loading

0 comments on commit 075bc83

Please sign in to comment.