Skip to content

Commit

Permalink
change consumer option to get a consumer builder
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens authored and db7 committed Jan 8, 2018
1 parent 38a4821 commit a374384
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 34 deletions.
6 changes: 5 additions & 1 deletion kafkamock.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,17 @@ func (km *KafkaMock) ProcessorOptions() []ProcessorOption {
WithStorageBuilder(func(topic string, partition int32) (storage.Storage, error) {
return km.storage, nil
}),
WithConsumer(km.consumerMock),
WithConsumerBuilder(km.consumerBuilder),
WithProducer(km.producerMock),
WithTopicManager(km.topicMgrMock),
WithPartitionChannelSize(0),
}
}

func (km *KafkaMock) consumerBuilder(b []string, group, clientID string) (kafka.Consumer, error) {
return km.consumerMock, nil
}

// initProtocol initiates the protocol with the client basically making the KafkaMock
// usable.
func (km *KafkaMock) initProtocol() {
Expand Down
41 changes: 17 additions & 24 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type UpdateCallback func(s storage.Storage, partition int32, key string, value [
// table. StorageBuilder creates one storage for each partition of the topic.
type StorageBuilder func(topic string, partition int32) (storage.Storage, error)

// ConsumerBuilder creates a Kafka consumer.
type ConsumerBuilder func(brokers []string, group, clientID string) (kafka.Consumer, error)

///////////////////////////////////////////////////////////////////////////////
// default values
///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -78,15 +81,15 @@ func DefaultHasher() func() hash.Hash32 {

}

type consumerBuilder func(brokers []string, group string, clientID string) (kafka.Consumer, error)
type producerBuilder func(brokers []string) (kafka.Producer, error)
type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error)

func defaultConsumerBuilder(brokers []string, group string, clientID string) (kafka.Consumer, error) {
// DefaultConsumerBuilder creates Kafka Consumer using the sarama library.
func DefaultConsumerBuilder(brokers []string, group, clientID string) (kafka.Consumer, error) {
config := kafka.CreateDefaultSaramaConfig(clientID, nil, metrics.DefaultRegistry)
return kafka.NewSaramaConsumer(brokers, group, config)
}

type producerBuilder func(brokers []string) (kafka.Producer, error)
type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error)

func defaultProducerBuilder(clientID string, hasher func() hash.Hash32, log logger.Logger) producerBuilder {
return func(brokers []string) (kafka.Producer, error) {
partitioner := sarama.NewCustomHashPartitioner(hasher)
Expand Down Expand Up @@ -118,7 +121,7 @@ type poptions struct {

builders struct {
storage StorageBuilder
consumer consumerBuilder
consumer ConsumerBuilder
producer producerBuilder
topicmgr topicmgrBuilder
}
Expand Down Expand Up @@ -158,15 +161,10 @@ func WithTopicManager(tm kafka.TopicManager) ProcessorOption {
}
}

// WithConsumer replaces goka's default consumer.
func WithConsumer(c kafka.Consumer) ProcessorOption {
// WithConsumerBuilder creates a Kafka consumer.
func WithConsumerBuilder(cb ConsumerBuilder) ProcessorOption {
return func(o *poptions) {
o.builders.consumer = func(brokers []string, group string, clientID string) (kafka.Consumer, error) {
if c == nil {
return nil, fmt.Errorf("consumer cannot be nil")
}
return c, nil
}
o.builders.consumer = cb
}
}

Expand Down Expand Up @@ -239,7 +237,7 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
return fmt.Errorf("StorageBuilder not set")
}
if opt.builders.consumer == nil {
opt.builders.consumer = defaultConsumerBuilder
opt.builders.consumer = DefaultConsumerBuilder
}
if opt.builders.producer == nil {
opt.builders.producer = defaultProducerBuilder(opt.clientID, opt.hasher, opt.log)
Expand Down Expand Up @@ -268,7 +266,7 @@ type voptions struct {

builders struct {
storage StorageBuilder
consumer consumerBuilder
consumer ConsumerBuilder
topicmgr topicmgrBuilder
}
}
Expand Down Expand Up @@ -297,14 +295,9 @@ func WithViewStorageBuilder(sb StorageBuilder) ViewOption {
}

// WithViewConsumer replaces goka's default view consumer. Mainly for testing.
func WithViewConsumer(c kafka.Consumer) ViewOption {
func WithViewConsumerBuilder(cb ConsumerBuilder) ViewOption {
return func(o *voptions) {
o.builders.consumer = func(brokers []string, group string, clientID string) (kafka.Consumer, error) {
if c == nil {
return nil, fmt.Errorf("consumer cannot be nil")
}
return c, nil
}
o.builders.consumer = cb
}
}

Expand Down Expand Up @@ -357,7 +350,7 @@ func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
return fmt.Errorf("StorageBuilder not set")
}
if opt.builders.consumer == nil {
opt.builders.consumer = defaultConsumerBuilder
opt.builders.consumer = DefaultConsumerBuilder
}
if opt.builders.topicmgr == nil {
opt.builders.topicmgr = defaultTopicManagerBuilder
Expand Down
22 changes: 13 additions & 9 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func syncWith(t *testing.T, ch chan kafka.Event, p ...int32) error {
ch <- &kafka.NOP{Partition: -1}
})
}

func createConsumerBuilder(c kafka.Consumer) ConsumerBuilder {
return func(b []string, g, id string) (kafka.Consumer, error) {
return c, nil
}
}
func createProcessorStateless(ctrl *gomock.Controller, consumer kafka.Consumer, npar int) *Processor {
tm := mock.NewMockTopicManager(ctrl)
producer := mock.NewMockProducer(ctrl)
Expand All @@ -65,7 +69,7 @@ func createProcessorStateless(ctrl *gomock.Controller, consumer kafka.Consumer,
Loop(rawCodec, cb),
),
WithTopicManager(tm),
WithConsumer(consumer),
WithConsumerBuilder(createConsumerBuilder(consumer)),
WithProducer(producer),
WithPartitionChannelSize(0),
)
Expand Down Expand Up @@ -98,7 +102,7 @@ func createProcessor(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consu
Persist(new(codec.String)),
),
WithTopicManager(tm),
WithConsumer(consumer),
WithConsumerBuilder(createConsumerBuilder(consumer)),
WithProducer(producer),
WithStorageBuilder(sb),
WithPartitionChannelSize(0),
Expand Down Expand Up @@ -132,7 +136,7 @@ func createProcessorWithTable(t *testing.T, ctrl *gomock.Controller, consumer ka
Persist(rawCodec),
),
WithTopicManager(tm),
WithConsumer(consumer),
WithConsumerBuilder(createConsumerBuilder(consumer)),
WithProducer(producer),
WithStorageBuilder(sb),
WithPartitionChannelSize(0),
Expand Down Expand Up @@ -381,7 +385,7 @@ func TestNewProcessor(t *testing.T) {
_, err = NewProcessor(nil,
DefineGroup(group, Input(topic, rawCodec, cb)),
WithTopicManager(tm),
WithConsumer(nil),
WithConsumerBuilder(createConsumerBuilder(nil)),
WithProducer(nil),
)
ensure.NotNil(t, err)
Expand All @@ -392,7 +396,7 @@ func TestNewProcessor(t *testing.T) {
_, err = NewProcessor(nil,
DefineGroup(group, Input(topic, rawCodec, cb)),
WithTopicManager(tm),
WithConsumer(consumer),
WithConsumerBuilder(createConsumerBuilder(consumer)),
WithProducer(nil),
)
ensure.NotNil(t, err)
Expand All @@ -411,7 +415,7 @@ func TestNewProcessor(t *testing.T) {
Persist(rawCodec),
),
WithTopicManager(tm),
WithConsumer(consumer),
WithConsumerBuilder(createConsumerBuilder(consumer)),
WithProducer(producer),
)
ensure.Nil(t, err)
Expand All @@ -431,7 +435,7 @@ func TestNewProcessor(t *testing.T) {
Input(topic2, rawCodec, cb),
),
WithTopicManager(tm),
WithConsumer(consumer),
WithConsumerBuilder(createConsumerBuilder(consumer)),
WithProducer(producer),
)
ensure.Nil(t, err)
Expand All @@ -451,7 +455,7 @@ func TestNewProcessor(t *testing.T) {
Join(table, rawCodec),
),
WithTopicManager(tm),
WithConsumer(consumer),
WithConsumerBuilder(createConsumerBuilder(consumer)),
WithProducer(producer),
)
ensure.Nil(t, err)
Expand Down

0 comments on commit a374384

Please sign in to comment.