Skip to content

Commit

Permalink
Merge pull request #13 from maxthomas/simpler-new-sin
Browse files Browse the repository at this point in the history
expose simpler way of creating sink without *viper.Config
  • Loading branch information
maxthomas committed Dec 12, 2019
2 parents d3f7cf9 + 308cece commit dae2dfe
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 16 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ cd frafka
go build
```

## Basic API usage

### Sink

Create a new sink with `NewSink`:

``` golang
// error omitted - handle in proper code
sink, _ := frafka.NewSink("broker1:15151,broker2:15151", 16 * 1024)
```

## Running the tests

Frafka has integration tests which require a kafka broker to test against. `KAFKA_BROKERS` environment variable is
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ require (
github.com/spf13/viper v1.2.1
github.com/stretchr/testify v1.2.2
)

go 1.13
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alexcesaro/statsd v2.0.0+incompatible h1:HG17k1Qk8V1F4UOoq6tx+IUoAbOcI5PHzzEUGeDD72w=
github.com/alexcesaro/statsd v2.0.0+incompatible/go.mod h1:vNepIbQAiyLe1j480173M6NYYaAsGwEcvuDTU3OCUGY=
github.com/confluentinc/confluent-kafka-go v0.11.4 h1:uH5doflVcMn+2G/ECv0wxpgmVkvEpTwYFW57V2iLqHo=
github.com/confluentinc/confluent-kafka-go v0.11.4/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/confluentinc/confluent-kafka-go v0.11.6 h1:rEblubnNXCjRThwAGnFSzLKYIRAoXLDC3A9r4ciziHU=
github.com/confluentinc/confluent-kafka-go v0.11.6/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
33 changes: 19 additions & 14 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,14 @@ type Sink struct {
evtChan chan frizzle.Event
}

// InitSink initializes a basic Sink
func InitSink(config *viper.Viper) (*Sink, error) {
if !config.IsSet("kafka_brokers") {
return nil, errors.New("brokers must be set for kafka Sink")
}
brokers := strings.Join(config.GetStringSlice("kafka_brokers"), ",")

config.SetDefault("kafka_max_buffer_kb", 16384) // 16MB
maxBufferKB := config.GetInt("kafka_max_buffer_kb")

kCfg := kafka.ConfigMap{
"bootstrap.servers": brokers,
"queued.max.messages.kbytes": maxBufferKB, // limit memory usage for the consumer prefetch buffer; note there is one buffer per topic+partition
// NewSink sets up a frafka sink.
func NewSink(brokerString string, bufferSize int) (*Sink, error) {
cfg := kafka.ConfigMap{
"bootstrap.servers": brokerString,
"queued.max.messages.kbytes": bufferSize, // limit memory usage for the consumer prefetch buffer; note there is one buffer per topic+partition
}

p, err := kafka.NewProducer(&kCfg)
p, err := kafka.NewProducer(&cfg)
if err != nil {
return nil, err
}
Expand All @@ -60,6 +52,19 @@ func InitSink(config *viper.Viper) (*Sink, error) {
return s, nil
}

// InitSink initializes a basic Sink via *viper.Config.
func InitSink(config *viper.Viper) (*Sink, error) {
if !config.IsSet("kafka_brokers") {
return nil, errors.New("brokers must be set for kafka Sink")
}
brokers := strings.Join(config.GetStringSlice("kafka_brokers"), ",")

config.SetDefault("kafka_max_buffer_kb", 16384) // 16MB
maxBufferKB := config.GetInt("kafka_max_buffer_kb")

return NewSink(brokers, maxBufferKB)
}

// deliveryReports receives async events from kafka Producer about whether
// message delivery is successful, any errors from broker, etc
func (s *Sink) deliveryReports() {
Expand Down

0 comments on commit dae2dfe

Please sign in to comment.