Skip to content

Commit

Permalink
use default channel size from sarama in kafka package
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens authored and db7 committed Jan 4, 2018
1 parent 00d2946 commit 0baf3ad
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
1 change: 0 additions & 1 deletion kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ func CreateDefaultSaramaConfig(clientID string, partitioner sarama.PartitionerCo

config.Version = sarama.V0_10_1_0
config.ClientID = clientID
config.ChannelBufferSize = defaultChannelBufferSize

// consumer configuration
config.Consumer.Return.Errors = true
Expand Down
8 changes: 6 additions & 2 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

const (
// size of sarama buffer for consumer and producer
defaultChannelBufferSize = 10
defaultChannelBufferSize = 256

// time sarama-cluster assumes the processing of an event may take
defaultMaxProcessingTime = 1 * time.Second
Expand Down Expand Up @@ -52,7 +52,11 @@ type saramaConsumer struct {

// NewSaramaConsumer creates a new Consumer using sarama
func NewSaramaConsumer(brokers []string, group string, config *cluster.Config) (Consumer, error) {
events := make(chan Event, defaultChannelBufferSize)
chsize := config.Config.ChannelBufferSize
if chsize == 0 {
chsize = defaultChannelBufferSize
}
events := make(chan Event, chsize)

g, err := newGroupConsumer(brokers, group, events, config)
if err != nil {
Expand Down

0 comments on commit 0baf3ad

Please sign in to comment.