From 479a219dc2c869487c6f1cc979371413d9930c4c Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Tue, 27 Mar 2018 20:18:47 +0200 Subject: [PATCH 1/3] moving kafkaMock into mock package --- examples/testing/main_test.go | 5 ++-- kafkamock.go => mock/tester.go | 47 +++++++++++++++++++--------------- options.go | 19 ++++++++++++++ processor_test.go | 45 ++++++++++++++++---------------- 4 files changed, 71 insertions(+), 45 deletions(-) rename kafkamock.go => mock/tester.go (92%) diff --git a/examples/testing/main_test.go b/examples/testing/main_test.go index 2949816f..4fc54888 100644 --- a/examples/testing/main_test.go +++ b/examples/testing/main_test.go @@ -7,6 +7,7 @@ import ( "github.com/golang/mock/gomock" "github.com/lovoo/goka" + "github.com/lovoo/goka/mock" ) func Test_ConsumeScalar(t *testing.T) { @@ -27,8 +28,8 @@ func Test_ConsumeScalar(t *testing.T) { func Test_ConsumeScalar_Integration(t *testing.T) { // ctrl := goka.NewMockController(t) // defer ctrl.Finish() - kafkaMock := goka.NewKafkaMock(t, "consume-scalar") - proc, err := createProcessor(nil, kafkaMock.ProcessorOptions()...) + kafkaMock := mock.NewKafkaMock(t, "consume-scalar") + proc, err := createProcessor(nil, goka.WithTester(kafkaMock)) if err != nil { t.Fatalf("Error creating processor: %v", err) diff --git a/kafkamock.go b/mock/tester.go similarity index 92% rename from kafkamock.go rename to mock/tester.go index 3ada98ea..7b1c2efd 100644 --- a/kafkamock.go +++ b/mock/tester.go @@ -1,4 +1,4 @@ -package goka +package mock import ( "fmt" @@ -15,6 +15,12 @@ import ( "github.com/lovoo/goka/storage" ) +// Codec decodes and encodes from and to []byte +type Codec interface { + Encode(value interface{}) (data []byte, err error) + Decode(data []byte) (value interface{}, err error) +} + // EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to // simulate producer errors type EmitHandler func(topic string, key string, value []byte) *kafka.Promise @@ -32,8 +38,8 @@ func (gm *gomockPanicker) Fatalf(format string, args ...interface{}) { } // NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) -// which panics on a Fatalf. This is necessary when using a mock in kafkamock. Otherwise it will -// freeze on an unexpected call. +// which panics on a Fatalf. This is necessary when using a mock in kafkamock. +// Otherwise it will freeze on an unexpected call. func NewMockController(t gomock.TestReporter) *gomock.Controller { return gomock.NewController(&gomockPanicker{reporter: t}) } @@ -76,14 +82,14 @@ type Tester interface { } // NewKafkaMock returns a new testprocessor mocking every external service -func NewKafkaMock(t Tester, groupName Group) *KafkaMock { +func NewKafkaMock(t Tester, groupName string) *KafkaMock { kafkaMock := &KafkaMock{ storage: storage.NewMemory(), t: t, incomingEvents: make(chan kafka.Event), consumerEvents: make(chan kafka.Event), handledTopics: make(map[string]bool), - groupTopic: tableName(groupName), + groupTopic: fmt.Sprintf("%s-table", groupName), codec: new(codec.Bytes), } kafkaMock.consumerMock = newConsumerMock(kafkaMock) @@ -122,28 +128,29 @@ func (km *KafkaMock) SetGroupTableCreator(creator func() (string, []byte)) { // option_c, // )..., // ) -func (km *KafkaMock) ProcessorOptions() []ProcessorOption { - return []ProcessorOption{ - WithStorageBuilder(func(topic string, partition int32) (storage.Storage, error) { - return km.storage, nil - }), - WithConsumerBuilder(km.consumerBuilder), - WithProducerBuilder(km.producerBuilder), - WithTopicManagerBuilder(km.topicManagerBuilder), - WithPartitionChannelSize(0), + +func (km *KafkaMock) TopicManagerBuilder() kafka.TopicManagerBuilder { + return func(brokers []string) (kafka.TopicManager, error) { + return km.topicMgrMock, nil } } -func (km *KafkaMock) topicManagerBuilder(brokers []string) (kafka.TopicManager, error) { - return km.topicMgrMock, nil +func (km *KafkaMock) ConsumerBuilder() kafka.ConsumerBuilder { + return func(b []string, group, clientID string) (kafka.Consumer, error) { + return km.consumerMock, nil + } } -func (km *KafkaMock) producerBuilder(b []string, cid string, hasher func() hash.Hash32) (kafka.Producer, error) { - return km.producerMock, nil +func (km *KafkaMock) ProducerBuilder() kafka.ProducerBuilder { + return func(b []string, cid string, hasher func() hash.Hash32) (kafka.Producer, error) { + return km.producerMock, nil + } } -func (km *KafkaMock) consumerBuilder(b []string, group, clientID string) (kafka.Consumer, error) { - return km.consumerMock, nil +func (km *KafkaMock) StorageBuilder() storage.Builder { + return func(topic string, partition int32) (storage.Storage, error) { + return km.storage, nil + } } // initProtocol initiates the protocol with the client basically making the KafkaMock diff --git a/options.go b/options.go index 405bdd92..dabcb6bb 100644 --- a/options.go +++ b/options.go @@ -168,6 +168,25 @@ func WithNilHandling(nh NilHandling) ProcessorOption { } } +type Tester interface { + StorageBuilder() storage.Builder + ConsumerBuilder() kafka.ConsumerBuilder + ProducerBuilder() kafka.ProducerBuilder + TopicManagerBuilder() kafka.TopicManagerBuilder +} + +// WithTester configures all external connections of a processor, ie, storage, +// consumer and producer +func WithTester(t Tester) ProcessorOption { + return func(o *poptions) { + o.builders.storage = t.StorageBuilder() + o.builders.consumer = t.ConsumerBuilder() + o.builders.producer = t.ProducerBuilder() + o.builders.topicmgr = t.TopicManagerBuilder() + o.partitionChannelSize = 0 + } +} + func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error { opt.clientID = defaultClientID opt.log = logger.Default() diff --git a/processor_test.go b/processor_test.go index 737df560..4426c7db 100644 --- a/processor_test.go +++ b/processor_test.go @@ -1393,10 +1393,10 @@ func TestProcessor_HasGetStateless(t *testing.T) { } func TestProcessor_StatelessContext(t *testing.T) { - ctrl := NewMockController(t) + ctrl := mock.NewMockController(t) defer ctrl.Finish() var ( - km = NewKafkaMock(t, "user-reputation").SetCodec(new(codec.Bytes)) + km = mock.NewKafkaMock(t, "user-reputation").SetCodec(new(codec.Bytes)) //count int64 //wait = make(chan bool) ) @@ -1413,8 +1413,7 @@ func TestProcessor_StatelessContext(t *testing.T) { "stateless-ctx", Input("input-topic", new(codec.Bytes), callPersist), ), - // add kafkamock options - km.ProcessorOptions()..., + WithTester(km), ) ensure.Nil(t, err) done := make(chan bool) @@ -1434,7 +1433,7 @@ func TestProcessor_StatelessContext(t *testing.T) { func TestProcessor_ProducerError(t *testing.T) { t.Run("SetValue", func(t *testing.T) { - km := NewKafkaMock(t, "test") + km := mock.NewKafkaMock(t, "test") km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { return kafka.NewPromise().Finish(errors.New("producer error")) }) @@ -1448,7 +1447,7 @@ func TestProcessor_ProducerError(t *testing.T) { Input("topic", new(codec.String), consume), Persist(new(codec.String)), ), - km.ProcessorOptions()..., + WithTester(km), ) ensure.Nil(t, err) @@ -1468,7 +1467,7 @@ func TestProcessor_ProducerError(t *testing.T) { }) t.Run("Emit", func(t *testing.T) { - km := NewKafkaMock(t, "test") + km := mock.NewKafkaMock(t, "test") km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { return kafka.NewPromise().Finish(errors.New("producer error")) }) @@ -1482,7 +1481,7 @@ func TestProcessor_ProducerError(t *testing.T) { Input("topic", new(codec.String), consume), Persist(new(codec.String)), ), - km.ProcessorOptions()..., + WithTester(km), ) ensure.Nil(t, err) @@ -1503,7 +1502,7 @@ func TestProcessor_ProducerError(t *testing.T) { }) t.Run("Value-stateless", func(t *testing.T) { - km := NewKafkaMock(t, "test") + km := mock.NewKafkaMock(t, "test") km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { return kafka.NewPromise().Finish(errors.New("producer error")) }) @@ -1519,7 +1518,7 @@ func TestProcessor_ProducerError(t *testing.T) { DefineGroup("test", Input("topic", new(codec.String), consume), ), - append(km.ProcessorOptions())..., + WithTester(km), ) ensure.Nil(t, err) @@ -1543,7 +1542,7 @@ func TestProcessor_ProducerError(t *testing.T) { } func TestProcessor_consumeFail(t *testing.T) { - km := NewKafkaMock(t, "test") + km := mock.NewKafkaMock(t, "test") consume := func(ctx Context, msg interface{}) { ctx.Fail(errors.New("consume-failed")) @@ -1553,7 +1552,7 @@ func TestProcessor_consumeFail(t *testing.T) { DefineGroup("test", Input("topic", new(codec.String), consume), ), - append(km.ProcessorOptions())..., + WithTester(km), ) ensure.Nil(t, err) @@ -1574,7 +1573,7 @@ func TestProcessor_consumeFail(t *testing.T) { } func TestProcessor_consumePanic(t *testing.T) { - km := NewKafkaMock(t, "test") + km := mock.NewKafkaMock(t, "test") consume := func(ctx Context, msg interface{}) { panic("panicking") @@ -1584,7 +1583,7 @@ func TestProcessor_consumePanic(t *testing.T) { DefineGroup("test", Input("topic", new(codec.String), consume), ), - append(km.ProcessorOptions())..., + WithTester(km), ) ensure.Nil(t, err) @@ -1661,12 +1660,13 @@ func TestProcessor_consumeNil(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - km := NewKafkaMock(t, "test") + km := mock.NewKafkaMock(t, "test") proc, err := NewProcessor([]string{"broker"}, DefineGroup("test", Input("topic", tc.codec, tc.cb), ), - append(km.ProcessorOptions(), WithNilHandling(tc.handling))..., + WithTester(km), + WithNilHandling(tc.handling), ) ensure.Nil(t, err) @@ -1697,7 +1697,7 @@ func TestProcessor_failOnRecover(t *testing.T) { msgToRecover = 100 ) - km := NewKafkaMock(t, "test") + km := mock.NewKafkaMock(t, "test") consume := func(ctx Context, msg interface{}) { log.Println("consuming message..", ctx.Key()) @@ -1717,12 +1717,11 @@ func TestProcessor_failOnRecover(t *testing.T) { Input("topic", new(codec.String), consume), Persist(rawCodec), ), - append(km.ProcessorOptions(), - WithUpdateCallback(func(s storage.Storage, partition int32, key string, value []byte) error { - log.Printf("recovered state: %s: %s", key, string(value)) - return nil - }), - )..., + WithTester(km), + WithUpdateCallback(func(s storage.Storage, partition int32, key string, value []byte) error { + log.Printf("recovered state: %s: %s", key, string(value)) + return nil + }), ) ensure.Nil(t, err) From b76f1cb5694ae7adfbe539965a595a748394305d Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Tue, 27 Mar 2018 20:45:30 +0200 Subject: [PATCH 2/3] refactor KafkaMock as Tester in tester subpackage --- examples/testing/main_test.go | 14 +-- mock/controller.go | 26 ++++++ processor_test.go | 55 ++++++------ {mock => tester}/tester.go | 165 +++++++++++++++------------------- 4 files changed, 132 insertions(+), 128 deletions(-) create mode 100644 mock/controller.go rename {mock => tester}/tester.go (74%) diff --git a/examples/testing/main_test.go b/examples/testing/main_test.go index 4fc54888..a62aa061 100644 --- a/examples/testing/main_test.go +++ b/examples/testing/main_test.go @@ -7,7 +7,7 @@ import ( "github.com/golang/mock/gomock" "github.com/lovoo/goka" - "github.com/lovoo/goka/mock" + "github.com/lovoo/goka/tester" ) func Test_ConsumeScalar(t *testing.T) { @@ -28,8 +28,8 @@ func Test_ConsumeScalar(t *testing.T) { func Test_ConsumeScalar_Integration(t *testing.T) { // ctrl := goka.NewMockController(t) // defer ctrl.Finish() - kafkaMock := mock.NewKafkaMock(t, "consume-scalar") - proc, err := createProcessor(nil, goka.WithTester(kafkaMock)) + tester := tester.New(t) + proc, err := createProcessor(nil, goka.WithTester(tester)) if err != nil { t.Fatalf("Error creating processor: %v", err) @@ -47,15 +47,15 @@ func Test_ConsumeScalar_Integration(t *testing.T) { msg := []byte(strconv.FormatInt(1, 10)) // there is no initial value for key "foo" - if val := kafkaMock.ValueForKey("foo"); val != nil { + if val := tester.ValueForKey("foo"); val != nil { t.Errorf("state was not initially empty: %v", val) } // send the message twice - kafkaMock.Consume("scalar", "foo", msg) - kafkaMock.Consume("scalar", "foo", msg) + tester.Consume("scalar", "foo", msg) + tester.Consume("scalar", "foo", msg) - value := string(kafkaMock.ValueForKey("foo").([]byte)) + value := string(tester.ValueForKey("foo").([]byte)) fmt.Printf("%v\n", value) if value != "2" { diff --git a/mock/controller.go b/mock/controller.go new file mode 100644 index 00000000..003116bf --- /dev/null +++ b/mock/controller.go @@ -0,0 +1,26 @@ +package mock + +import ( + "fmt" + + "github.com/golang/mock/gomock" +) + +type gomockPanicker struct { + reporter gomock.TestReporter +} + +func (gm *gomockPanicker) Errorf(format string, args ...interface{}) { + gm.reporter.Errorf(format, args...) +} +func (gm *gomockPanicker) Fatalf(format string, args ...interface{}) { + defer panic(fmt.Sprintf(format, args...)) + gm.reporter.Fatalf(format, args...) +} + +// NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) +// which panics on a Fatalf. This is necessary when using a mock in kafkamock. +// Otherwise it will freeze on an unexpected call. +func NewMockController(t gomock.TestReporter) *gomock.Controller { + return gomock.NewController(&gomockPanicker{reporter: t}) +} diff --git a/processor_test.go b/processor_test.go index 4426c7db..e3730074 100644 --- a/processor_test.go +++ b/processor_test.go @@ -19,6 +19,7 @@ import ( "github.com/lovoo/goka/mock" "github.com/lovoo/goka/multierr" "github.com/lovoo/goka/storage" + "github.com/lovoo/goka/tester" "github.com/facebookgo/ensure" "github.com/golang/mock/gomock" @@ -1396,7 +1397,7 @@ func TestProcessor_StatelessContext(t *testing.T) { ctrl := mock.NewMockController(t) defer ctrl.Finish() var ( - km = mock.NewKafkaMock(t, "user-reputation").SetCodec(new(codec.Bytes)) + tester = tester.New(t).SetCodec(new(codec.Bytes)) //count int64 //wait = make(chan bool) ) @@ -1413,7 +1414,7 @@ func TestProcessor_StatelessContext(t *testing.T) { "stateless-ctx", Input("input-topic", new(codec.Bytes), callPersist), ), - WithTester(km), + WithTester(tester), ) ensure.Nil(t, err) done := make(chan bool) @@ -1424,7 +1425,7 @@ func TestProcessor_StatelessContext(t *testing.T) { }() err = doTimed(t, func() { // consume a random key/message, the content doesn't matter as this should fail - km.ConsumeString("input-topic", "key", "msg") + tester.ConsumeString("input-topic", "key", "msg") <-done }) ensure.Nil(t, err) @@ -1433,8 +1434,8 @@ func TestProcessor_StatelessContext(t *testing.T) { func TestProcessor_ProducerError(t *testing.T) { t.Run("SetValue", func(t *testing.T) { - km := mock.NewKafkaMock(t, "test") - km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { + tester := tester.New(t) + tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { return kafka.NewPromise().Finish(errors.New("producer error")) }) @@ -1447,7 +1448,7 @@ func TestProcessor_ProducerError(t *testing.T) { Input("topic", new(codec.String), consume), Persist(new(codec.String)), ), - WithTester(km), + WithTester(tester), ) ensure.Nil(t, err) @@ -1460,15 +1461,15 @@ func TestProcessor_ProducerError(t *testing.T) { close(done) }() - km.ConsumeString("topic", "key", "world") + tester.ConsumeString("topic", "key", "world") proc.Stop() <-done ensure.True(t, processorErrors != nil) }) t.Run("Emit", func(t *testing.T) { - km := mock.NewKafkaMock(t, "test") - km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { + tester := tester.New(t) + tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { return kafka.NewPromise().Finish(errors.New("producer error")) }) @@ -1481,7 +1482,7 @@ func TestProcessor_ProducerError(t *testing.T) { Input("topic", new(codec.String), consume), Persist(new(codec.String)), ), - WithTester(km), + WithTester(tester), ) ensure.Nil(t, err) @@ -1494,7 +1495,7 @@ func TestProcessor_ProducerError(t *testing.T) { close(done) }() - km.ConsumeString("topic", "key", "world") + tester.ConsumeString("topic", "key", "world") proc.Stop() <-done @@ -1502,8 +1503,8 @@ func TestProcessor_ProducerError(t *testing.T) { }) t.Run("Value-stateless", func(t *testing.T) { - km := mock.NewKafkaMock(t, "test") - km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { + tester := tester.New(t) + tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { return kafka.NewPromise().Finish(errors.New("producer error")) }) @@ -1518,7 +1519,7 @@ func TestProcessor_ProducerError(t *testing.T) { DefineGroup("test", Input("topic", new(codec.String), consume), ), - WithTester(km), + WithTester(tester), ) ensure.Nil(t, err) @@ -1531,7 +1532,7 @@ func TestProcessor_ProducerError(t *testing.T) { close(done) }() - km.ConsumeString("topic", "key", "world") + tester.ConsumeString("topic", "key", "world") // stopping the processor. It should actually not produce results proc.Stop() @@ -1542,7 +1543,7 @@ func TestProcessor_ProducerError(t *testing.T) { } func TestProcessor_consumeFail(t *testing.T) { - km := mock.NewKafkaMock(t, "test") + tester := tester.New(t) consume := func(ctx Context, msg interface{}) { ctx.Fail(errors.New("consume-failed")) @@ -1552,7 +1553,7 @@ func TestProcessor_consumeFail(t *testing.T) { DefineGroup("test", Input("topic", new(codec.String), consume), ), - WithTester(km), + WithTester(tester), ) ensure.Nil(t, err) @@ -1565,7 +1566,7 @@ func TestProcessor_consumeFail(t *testing.T) { close(done) }() - km.ConsumeString("topic", "key", "world") + tester.ConsumeString("topic", "key", "world") proc.Stop() <-done @@ -1573,7 +1574,7 @@ func TestProcessor_consumeFail(t *testing.T) { } func TestProcessor_consumePanic(t *testing.T) { - km := mock.NewKafkaMock(t, "test") + tester := tester.New(t) consume := func(ctx Context, msg interface{}) { panic("panicking") @@ -1583,7 +1584,7 @@ func TestProcessor_consumePanic(t *testing.T) { DefineGroup("test", Input("topic", new(codec.String), consume), ), - WithTester(km), + WithTester(tester), ) ensure.Nil(t, err) @@ -1596,7 +1597,7 @@ func TestProcessor_consumePanic(t *testing.T) { close(done) }() - km.ConsumeString("topic", "key", "world") + tester.ConsumeString("topic", "key", "world") proc.Stop() <-done @@ -1660,12 +1661,12 @@ func TestProcessor_consumeNil(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - km := mock.NewKafkaMock(t, "test") + tester := tester.New(t) proc, err := NewProcessor([]string{"broker"}, DefineGroup("test", Input("topic", tc.codec, tc.cb), ), - WithTester(km), + WithTester(tester), WithNilHandling(tc.handling), ) @@ -1679,7 +1680,7 @@ func TestProcessor_consumeNil(t *testing.T) { close(done) }() - km.Consume("topic", "key", nil) + tester.Consume("topic", "key", nil) proc.Stop() <-done @@ -1697,13 +1698,13 @@ func TestProcessor_failOnRecover(t *testing.T) { msgToRecover = 100 ) - km := mock.NewKafkaMock(t, "test") + tester := tester.New(t) consume := func(ctx Context, msg interface{}) { log.Println("consuming message..", ctx.Key()) } - km.SetGroupTableCreator(func() (string, []byte) { + tester.SetGroupTableCreator(func() (string, []byte) { time.Sleep(10 * time.Millisecond) recovered++ if recovered > msgToRecover { @@ -1717,7 +1718,7 @@ func TestProcessor_failOnRecover(t *testing.T) { Input("topic", new(codec.String), consume), Persist(rawCodec), ), - WithTester(km), + WithTester(tester), WithUpdateCallback(func(s storage.Storage, partition int32, key string, value []byte) error { log.Printf("recovered state: %s: %s", key, string(value)) return nil diff --git a/mock/tester.go b/tester/tester.go similarity index 74% rename from mock/tester.go rename to tester/tester.go index 7b1c2efd..c25a4738 100644 --- a/mock/tester.go +++ b/tester/tester.go @@ -1,4 +1,4 @@ -package mock +package tester import ( "fmt" @@ -7,7 +7,6 @@ import ( "sync" "github.com/facebookgo/ensure" - "github.com/golang/mock/gomock" "github.com/golang/protobuf/proto" "github.com/lovoo/goka/codec" @@ -25,30 +24,16 @@ type Codec interface { // simulate producer errors type EmitHandler func(topic string, key string, value []byte) *kafka.Promise -type gomockPanicker struct { - reporter gomock.TestReporter -} - -func (gm *gomockPanicker) Errorf(format string, args ...interface{}) { - gm.reporter.Errorf(format, args...) -} -func (gm *gomockPanicker) Fatalf(format string, args ...interface{}) { - defer panic(fmt.Sprintf(format, args...)) - gm.reporter.Fatalf(format, args...) -} - -// NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) -// which panics on a Fatalf. This is necessary when using a mock in kafkamock. -// Otherwise it will freeze on an unexpected call. -func NewMockController(t gomock.TestReporter) *gomock.Controller { - return gomock.NewController(&gomockPanicker{reporter: t}) -} - -// KafkaMock allows interacting with a test processor -type KafkaMock struct { - t Tester - storage storage.Storage +// Tester allows interacting with a test processor +type Tester struct { + t T + consumerMock *consumerMock + producerMock *producerMock + topicMgrMock *topicMgrMock + emitHandler EmitHandler + storage storage.Storage + codec Codec offset int64 tableOffset int64 incomingEvents chan kafka.Event @@ -64,90 +49,82 @@ type KafkaMock struct { groupTableCreator func() (string, []byte) callQueue []func() wg sync.WaitGroup - - consumerMock *consumerMock - - producerMock *producerMock - emitHandler EmitHandler - topicMgrMock *topicMgrMock - codec Codec } -// Tester abstracts the interface we assume from the test case. +// T abstracts the interface we assume from the test case. // Will most likely be *testing.T -type Tester interface { +type T interface { Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Fatal(a ...interface{}) } -// NewKafkaMock returns a new testprocessor mocking every external service -func NewKafkaMock(t Tester, groupName string) *KafkaMock { - kafkaMock := &KafkaMock{ +// New returns a new testprocessor mocking every external service +// It should be passed as goka.WithTester to goka.NewProcessor. It essentially +// replaces the storage/consumer/producer/topicmanager with a mock. +// For example, a normal call to NewProcessor like this +// goka.NewProcessor(brokers, group, subscriptions, +// option_a, +// option_b, +// option_c, +// ) +// would become in the unit test: +// tester := tester.New(t) +// NewProcessor(brokers, group, subscriptions, +// option_a, +// option_b, +// option_c, +// WithTester(tester), +// ) +func New(t T) *Tester { + tester := &Tester{ storage: storage.NewMemory(), t: t, incomingEvents: make(chan kafka.Event), consumerEvents: make(chan kafka.Event), handledTopics: make(map[string]bool), - groupTopic: fmt.Sprintf("%s-table", groupName), codec: new(codec.Bytes), } - kafkaMock.consumerMock = newConsumerMock(kafkaMock) - kafkaMock.producerMock = newProducerMock(kafkaMock.handleEmit) - kafkaMock.topicMgrMock = newTopicMgrMock(kafkaMock) + tester.consumerMock = newConsumerMock(tester) + tester.producerMock = newProducerMock(tester.handleEmit) + tester.topicMgrMock = newTopicMgrMock(tester) - return kafkaMock + return tester } // SetCodec sets the codec for the group table. -func (km *KafkaMock) SetCodec(codec Codec) *KafkaMock { +func (km *Tester) SetCodec(codec Codec) *Tester { km.codec = codec return km } // SetGroupTableCreator sets a creator for the group table. -func (km *KafkaMock) SetGroupTableCreator(creator func() (string, []byte)) { +func (km *Tester) SetGroupTableCreator(creator func() (string, []byte)) { km.groupTableCreator = creator } -// ProcessorOptions returns the options that must be passed to NewProcessor -// to use the Mock. It essentially replaces the consumer/producer/topicmanager with a mock. -// For convenience, the storage is also mocked. -// For example, a normal call to NewProcessor like this -// NewProcessor(brokers, group, subscriptions, -// option_a, -// option_b, -// option_c, -// ) -// would become in the unit test: -// kafkaMock := NewKafkaMock(t) -// NewProcessor(brokers, group, subscriptions, -// append(kafkaMock.ProcessorOptions(), -// option_a, -// option_b, -// option_c, -// )..., -// ) - -func (km *KafkaMock) TopicManagerBuilder() kafka.TopicManagerBuilder { +func (km *Tester) TopicManagerBuilder() kafka.TopicManagerBuilder { return func(brokers []string) (kafka.TopicManager, error) { return km.topicMgrMock, nil } } -func (km *KafkaMock) ConsumerBuilder() kafka.ConsumerBuilder { +func (km *Tester) ConsumerBuilder() kafka.ConsumerBuilder { return func(b []string, group, clientID string) (kafka.Consumer, error) { + if km.groupTopic == "" { + km.groupTopic = fmt.Sprintf("%s-table", group) + } return km.consumerMock, nil } } -func (km *KafkaMock) ProducerBuilder() kafka.ProducerBuilder { +func (km *Tester) ProducerBuilder() kafka.ProducerBuilder { return func(b []string, cid string, hasher func() hash.Hash32) (kafka.Producer, error) { return km.producerMock, nil } } -func (km *KafkaMock) StorageBuilder() storage.Builder { +func (km *Tester) StorageBuilder() storage.Builder { return func(topic string, partition int32) (storage.Storage, error) { return km.storage, nil } @@ -155,7 +132,7 @@ func (km *KafkaMock) StorageBuilder() storage.Builder { // initProtocol initiates the protocol with the client basically making the KafkaMock // usable. -func (km *KafkaMock) initProtocol() { +func (km *Tester) initProtocol() { defer func() { if r := recover(); r != nil { log.Printf("tester: panic initProtocol: %+v", r) @@ -188,7 +165,7 @@ func (km *KafkaMock) initProtocol() { } // ConsumeProto simulates a message on kafka in a topic with a key. -func (km *KafkaMock) ConsumeProto(topic string, key string, msg proto.Message) { +func (km *Tester) ConsumeProto(topic string, key string, msg proto.Message) { data, err := proto.Marshal(msg) if err != nil && km.t != nil { km.t.Errorf("Error marshaling message for consume: %v", err) @@ -197,12 +174,12 @@ func (km *KafkaMock) ConsumeProto(topic string, key string, msg proto.Message) { } // ConsumeString simulates a message with a string payload. -func (km *KafkaMock) ConsumeString(topic string, key string, msg string) { +func (km *Tester) ConsumeString(topic string, key string, msg string) { km.ConsumeData(topic, key, []byte(msg)) } // Consume simulates a message with a byte slice payload. -func (km *KafkaMock) Consume(topic string, key string, msg []byte) { +func (km *Tester) Consume(topic string, key string, msg []byte) { km.ConsumeData(topic, key, msg) } @@ -211,7 +188,7 @@ func (km *KafkaMock) Consume(topic string, key string, msg []byte) { // ConsumeData is a helper function consuming marshalled data. This function is // used by ConsumeProto by the test case as well as any emit calls of the // processor being tested. -func (km *KafkaMock) ConsumeData(topic string, key string, data []byte) { +func (km *Tester) ConsumeData(topic string, key string, data []byte) { defer func() { if r := recover(); r != nil { log.Printf("tester: panic ConsumeData: %+v\n", r) @@ -239,14 +216,14 @@ func (km *KafkaMock) ConsumeData(topic string, key string, data []byte) { km.incomingEvents <- &kafka.NOP{Partition: -1} } -func (km *KafkaMock) consumeError(err error) { +func (km *Tester) consumeError(err error) { km.incomingEvents <- &kafka.Error{Err: err} // no need to send NOP (actuallly we can't, otherwise we might panic // as the channels are already closed due to the error first). } // ValueForKey attempts to get a value from KafkaMock's storage. -func (km *KafkaMock) ValueForKey(key string) interface{} { +func (km *Tester) ValueForKey(key string) interface{} { item, err := km.storage.Get(key) ensure.Nil(km.t, err) if item == nil { @@ -258,7 +235,7 @@ func (km *KafkaMock) ValueForKey(key string) interface{} { } // SetValue sets a value in the storage. -func (km *KafkaMock) SetValue(key string, value interface{}) { +func (km *Tester) SetValue(key string, value interface{}) { data, err := km.codec.Encode(value) ensure.Nil(km.t, err) err = km.storage.Set(key, data) @@ -266,13 +243,13 @@ func (km *KafkaMock) SetValue(key string, value interface{}) { } // ReplaceEmitHandler replaces the emitter. -func (km *KafkaMock) ReplaceEmitHandler(emitter EmitHandler) { +func (km *Tester) ReplaceEmitHandler(emitter EmitHandler) { km.producerMock.emitter = emitter } // ExpectEmit ensures a message exists in passed topic and key. The message may be // inspected/unmarshalled by a passed expecter function. -func (km *KafkaMock) ExpectEmit(topic string, key string, expecter func(value []byte)) { +func (km *Tester) ExpectEmit(topic string, key string, expecter func(value []byte)) { for i := 0; i < len(km.emitted); i++ { msg := km.emitted[i] if msg.Topic != topic || msg.Key != key { @@ -292,7 +269,7 @@ func (km *KafkaMock) ExpectEmit(topic string, key string, expecter func(value [] // ExpectAllEmitted calls passed expected-emit-handler function for all emitted values and clears the // emitted values -func (km *KafkaMock) ExpectAllEmitted(handler func(topic string, key string, value []byte)) { +func (km *Tester) ExpectAllEmitted(handler func(topic string, key string, value []byte)) { for _, emitted := range km.emitted { handler(emitted.Topic, emitted.Key, emitted.Value) } @@ -305,7 +282,7 @@ func (km *KafkaMock) ExpectAllEmitted(handler func(topic string, key string, val // Clears the list of emits either case. // This should always be called at the end of a test case to make sure // no emits of prior test cases are stuck in the list and mess with the test results. -func (km *KafkaMock) Finish(fail bool) { +func (km *Tester) Finish(fail bool) { if len(km.emitted) > 0 { if fail { km.t.Errorf("The following emits are still in the list, although it's supposed to be empty:") @@ -320,7 +297,7 @@ func (km *KafkaMock) Finish(fail bool) { // handleEmit handles an Emit-call on the producerMock. // This takes care of queueing calls // to handled topics or putting the emitted messages in the emitted-messages-list -func (km *KafkaMock) handleEmit(topic string, key string, value []byte) *kafka.Promise { +func (km *Tester) handleEmit(topic string, key string, value []byte) *kafka.Promise { promise := kafka.NewPromise() if topic == km.groupTopic { return promise.Finish(nil) @@ -342,7 +319,7 @@ func (km *KafkaMock) handleEmit(topic string, key string, value []byte) *kafka.P } // creates a new call being executed after the consume function has run. -func (km *KafkaMock) newCall(call func()) { +func (km *Tester) newCall(call func()) { km.wg.Add(1) km.callQueue = append(km.callQueue, call) } @@ -350,7 +327,7 @@ func (km *KafkaMock) newCall(call func()) { // executes all calls on the call queue. // Executing calls may put new calls on the queue (if they emit something), // so this function executes until no further calls are being made. -func (km *KafkaMock) makeCalls() { +func (km *Tester) makeCalls() { go func() { for len(km.callQueue) > 0 { call := km.callQueue[0] @@ -363,18 +340,18 @@ func (km *KafkaMock) makeCalls() { } type consumerMock struct { - kafkaMock *KafkaMock + tester *Tester } -func newConsumerMock(kafkaMock *KafkaMock) *consumerMock { +func newConsumerMock(tester *Tester) *consumerMock { return &consumerMock{ - kafkaMock: kafkaMock, + tester: tester, } } // Events returns the event channel of the consumer mock func (km *consumerMock) Events() <-chan kafka.Event { - return km.kafkaMock.consumerEvents + return km.tester.consumerEvents } // Subscribe marks the consumer to subscribe to passed topics. @@ -382,9 +359,9 @@ func (km *consumerMock) Events() <-chan kafka.Event { // pass emitted messages back to the processor. func (km *consumerMock) Subscribe(topics map[string]int64) error { for topic := range topics { - km.kafkaMock.handledTopics[topic] = true + km.tester.handledTopics[topic] = true } - go km.kafkaMock.initProtocol() + go km.tester.initProtocol() return nil } @@ -414,14 +391,14 @@ func (km *consumerMock) RemovePartition(topic string, partition int32) error { // Close closes the consumer. // No action required in the mock. func (km *consumerMock) Close() error { - close(km.kafkaMock.incomingEvents) - close(km.kafkaMock.consumerEvents) + close(km.tester.incomingEvents) + close(km.tester.consumerEvents) fmt.Println("closed consumer mock") return nil } type topicMgrMock struct { - kafkaMock *KafkaMock + tester *Tester } // EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible @@ -437,7 +414,7 @@ func (tm *topicMgrMock) EnsureStreamExists(topic string, npar int) error { // Partitions returns the number of partitions of a topic, that are assigned to the running // instance, i.e. it doesn't represent all partitions of a topic. func (tm *topicMgrMock) Partitions(topic string) ([]int32, error) { - tm.kafkaMock.handledTopics[topic] = true + tm.tester.handledTopics[topic] = true return []int32{0}, nil } @@ -447,9 +424,9 @@ func (tm *topicMgrMock) Close() error { return nil } -func newTopicMgrMock(kafkaMock *KafkaMock) *topicMgrMock { +func newTopicMgrMock(tester *Tester) *topicMgrMock { return &topicMgrMock{ - kafkaMock: kafkaMock, + tester: tester, } } From c44716f9cd1079d06cef27c510cdff8446522014 Mon Sep 17 00:00:00 2001 From: franz Date: Wed, 4 Apr 2018 15:48:29 +0200 Subject: [PATCH 3/3] improve testing example to check for Emits --- examples/testing/main.go | 4 +++- examples/testing/main_test.go | 24 +++++++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/examples/testing/main.go b/examples/testing/main.go index ff82375f..1f44cbe1 100644 --- a/examples/testing/main.go +++ b/examples/testing/main.go @@ -37,7 +37,9 @@ func createProcessor(brokers []string, extraopts ...goka.ProcessorOption) (*goka goka.DefineGroup( goka.Group("consume-scalar"), goka.Persist(new(codec.Int64)), - goka.Input(goka.Stream("scalar"), new(codec.Int64), ConsumeScalarState), + goka.Input(goka.Stream("scalar-state"), new(codec.Int64), ConsumeScalarState), + goka.Input(goka.Stream("scalar"), new(codec.Int64), ConsumeScalar), + goka.Output(goka.Stream("sink"), new(codec.Int64)), ), extraopts..., ) diff --git a/examples/testing/main_test.go b/examples/testing/main_test.go index a62aa061..a77e8207 100644 --- a/examples/testing/main_test.go +++ b/examples/testing/main_test.go @@ -52,16 +52,34 @@ func Test_ConsumeScalar_Integration(t *testing.T) { } // send the message twice - tester.Consume("scalar", "foo", msg) - tester.Consume("scalar", "foo", msg) + tester.Consume("scalar-state", "foo", msg) + tester.Consume("scalar-state", "foo", msg) - value := string(tester.ValueForKey("foo").([]byte)) + fooByte, isByte := tester.ValueForKey("foo").([]byte) + if !isByte { + t.Errorf("state does not exist or is not []byte") + } + value := string(fooByte) fmt.Printf("%v\n", value) if value != "2" { t.Errorf("Expected value %s, got %s", "2", value) } + tester.Consume("scalar", "somekey", msg) + var ( + parsed int64 + parseErr error + ) + // expect that a value was emitted + tester.ExpectEmit("sink", "outgoing", func(value []byte) { + parsed, parseErr = strconv.ParseInt(string(value), 10, 64) + }) + if parseErr != nil || parsed != 2 { + panic(fmt.Errorf("parsing emitted message failed or had a wrong value (%d): %v", parsed, parseErr)) + } + tester.Finish(true) + proc.Stop() <-done }