Skip to content

Commit

Permalink
Merge pull request #118 from lovoo/feature/move-kafkamock
Browse files Browse the repository at this point in the history
Feature/move kafkamock
  • Loading branch information
db7 committed Apr 4, 2018
2 parents 17e8eef + c44716f commit 8b40064
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 145 deletions.
4 changes: 3 additions & 1 deletion examples/testing/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func createProcessor(brokers []string, extraopts ...goka.ProcessorOption) (*goka
goka.DefineGroup(
goka.Group("consume-scalar"),
goka.Persist(new(codec.Int64)),
goka.Input(goka.Stream("scalar"), new(codec.Int64), ConsumeScalarState),
goka.Input(goka.Stream("scalar-state"), new(codec.Int64), ConsumeScalarState),
goka.Input(goka.Stream("scalar"), new(codec.Int64), ConsumeScalar),
goka.Output(goka.Stream("sink"), new(codec.Int64)),
),
extraopts...,
)
Expand Down
31 changes: 25 additions & 6 deletions examples/testing/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/lovoo/goka"
"github.com/lovoo/goka/tester"
)

func Test_ConsumeScalar(t *testing.T) {
Expand All @@ -27,8 +28,8 @@ func Test_ConsumeScalar(t *testing.T) {
func Test_ConsumeScalar_Integration(t *testing.T) {
// ctrl := goka.NewMockController(t)
// defer ctrl.Finish()
kafkaMock := goka.NewKafkaMock(t, "consume-scalar")
proc, err := createProcessor(nil, kafkaMock.ProcessorOptions()...)
tester := tester.New(t)
proc, err := createProcessor(nil, goka.WithTester(tester))

if err != nil {
t.Fatalf("Error creating processor: %v", err)
Expand All @@ -46,21 +47,39 @@ func Test_ConsumeScalar_Integration(t *testing.T) {
msg := []byte(strconv.FormatInt(1, 10))

// there is no initial value for key "foo"
if val := kafkaMock.ValueForKey("foo"); val != nil {
if val := tester.ValueForKey("foo"); val != nil {
t.Errorf("state was not initially empty: %v", val)
}

// send the message twice
kafkaMock.Consume("scalar", "foo", msg)
kafkaMock.Consume("scalar", "foo", msg)
tester.Consume("scalar-state", "foo", msg)
tester.Consume("scalar-state", "foo", msg)

value := string(kafkaMock.ValueForKey("foo").([]byte))
fooByte, isByte := tester.ValueForKey("foo").([]byte)
if !isByte {
t.Errorf("state does not exist or is not []byte")
}
value := string(fooByte)
fmt.Printf("%v\n", value)

if value != "2" {
t.Errorf("Expected value %s, got %s", "2", value)
}

tester.Consume("scalar", "somekey", msg)
var (
parsed int64
parseErr error
)
// expect that a value was emitted
tester.ExpectEmit("sink", "outgoing", func(value []byte) {
parsed, parseErr = strconv.ParseInt(string(value), 10, 64)
})
if parseErr != nil || parsed != 2 {
panic(fmt.Errorf("parsing emitted message failed or had a wrong value (%d): %v", parsed, parseErr))
}
tester.Finish(true)

proc.Stop()
<-done
}
26 changes: 26 additions & 0 deletions mock/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package mock

import (
"fmt"

"github.com/golang/mock/gomock"
)

type gomockPanicker struct {
reporter gomock.TestReporter
}

func (gm *gomockPanicker) Errorf(format string, args ...interface{}) {
gm.reporter.Errorf(format, args...)
}
func (gm *gomockPanicker) Fatalf(format string, args ...interface{}) {
defer panic(fmt.Sprintf(format, args...))
gm.reporter.Fatalf(format, args...)
}

// NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever)
// which panics on a Fatalf. This is necessary when using a mock in kafkamock.
// Otherwise it will freeze on an unexpected call.
func NewMockController(t gomock.TestReporter) *gomock.Controller {
return gomock.NewController(&gomockPanicker{reporter: t})
}
19 changes: 19 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,25 @@ func WithNilHandling(nh NilHandling) ProcessorOption {
}
}

type Tester interface {
StorageBuilder() storage.Builder
ConsumerBuilder() kafka.ConsumerBuilder
ProducerBuilder() kafka.ProducerBuilder
TopicManagerBuilder() kafka.TopicManagerBuilder
}

// WithTester configures all external connections of a processor, ie, storage,
// consumer and producer
func WithTester(t Tester) ProcessorOption {
return func(o *poptions) {
o.builders.storage = t.StorageBuilder()
o.builders.consumer = t.ConsumerBuilder()
o.builders.producer = t.ProducerBuilder()
o.builders.topicmgr = t.TopicManagerBuilder()
o.partitionChannelSize = 0
}
}

func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
Expand Down
68 changes: 34 additions & 34 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/lovoo/goka/mock"
"github.com/lovoo/goka/multierr"
"github.com/lovoo/goka/storage"
"github.com/lovoo/goka/tester"

"github.com/facebookgo/ensure"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -1393,10 +1394,10 @@ func TestProcessor_HasGetStateless(t *testing.T) {
}

func TestProcessor_StatelessContext(t *testing.T) {
ctrl := NewMockController(t)
ctrl := mock.NewMockController(t)
defer ctrl.Finish()
var (
km = NewKafkaMock(t, "user-reputation").SetCodec(new(codec.Bytes))
tester = tester.New(t).SetCodec(new(codec.Bytes))
//count int64
//wait = make(chan bool)
)
Expand All @@ -1413,8 +1414,7 @@ func TestProcessor_StatelessContext(t *testing.T) {
"stateless-ctx",
Input("input-topic", new(codec.Bytes), callPersist),
),
// add kafkamock options
km.ProcessorOptions()...,
WithTester(tester),
)
ensure.Nil(t, err)
done := make(chan bool)
Expand All @@ -1425,7 +1425,7 @@ func TestProcessor_StatelessContext(t *testing.T) {
}()
err = doTimed(t, func() {
// consume a random key/message, the content doesn't matter as this should fail
km.ConsumeString("input-topic", "key", "msg")
tester.ConsumeString("input-topic", "key", "msg")
<-done
})
ensure.Nil(t, err)
Expand All @@ -1434,8 +1434,8 @@ func TestProcessor_StatelessContext(t *testing.T) {
func TestProcessor_ProducerError(t *testing.T) {

t.Run("SetValue", func(t *testing.T) {
km := NewKafkaMock(t, "test")
km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
tester := tester.New(t)
tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
return kafka.NewPromise().Finish(errors.New("producer error"))
})

Expand All @@ -1448,7 +1448,7 @@ func TestProcessor_ProducerError(t *testing.T) {
Input("topic", new(codec.String), consume),
Persist(new(codec.String)),
),
km.ProcessorOptions()...,
WithTester(tester),
)

ensure.Nil(t, err)
Expand All @@ -1461,15 +1461,15 @@ func TestProcessor_ProducerError(t *testing.T) {
close(done)
}()

km.ConsumeString("topic", "key", "world")
tester.ConsumeString("topic", "key", "world")
proc.Stop()
<-done
ensure.True(t, processorErrors != nil)
})

t.Run("Emit", func(t *testing.T) {
km := NewKafkaMock(t, "test")
km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
tester := tester.New(t)
tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
return kafka.NewPromise().Finish(errors.New("producer error"))
})

Expand All @@ -1482,7 +1482,7 @@ func TestProcessor_ProducerError(t *testing.T) {
Input("topic", new(codec.String), consume),
Persist(new(codec.String)),
),
km.ProcessorOptions()...,
WithTester(tester),
)

ensure.Nil(t, err)
Expand All @@ -1495,16 +1495,16 @@ func TestProcessor_ProducerError(t *testing.T) {
close(done)
}()

km.ConsumeString("topic", "key", "world")
tester.ConsumeString("topic", "key", "world")

proc.Stop()
<-done
ensure.True(t, processorErrors != nil)
})

t.Run("Value-stateless", func(t *testing.T) {
km := NewKafkaMock(t, "test")
km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
tester := tester.New(t)
tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
return kafka.NewPromise().Finish(errors.New("producer error"))
})

Expand All @@ -1519,7 +1519,7 @@ func TestProcessor_ProducerError(t *testing.T) {
DefineGroup("test",
Input("topic", new(codec.String), consume),
),
append(km.ProcessorOptions())...,
WithTester(tester),
)

ensure.Nil(t, err)
Expand All @@ -1532,7 +1532,7 @@ func TestProcessor_ProducerError(t *testing.T) {
close(done)
}()

km.ConsumeString("topic", "key", "world")
tester.ConsumeString("topic", "key", "world")

// stopping the processor. It should actually not produce results
proc.Stop()
Expand All @@ -1543,7 +1543,7 @@ func TestProcessor_ProducerError(t *testing.T) {
}

func TestProcessor_consumeFail(t *testing.T) {
km := NewKafkaMock(t, "test")
tester := tester.New(t)

consume := func(ctx Context, msg interface{}) {
ctx.Fail(errors.New("consume-failed"))
Expand All @@ -1553,7 +1553,7 @@ func TestProcessor_consumeFail(t *testing.T) {
DefineGroup("test",
Input("topic", new(codec.String), consume),
),
append(km.ProcessorOptions())...,
WithTester(tester),
)

ensure.Nil(t, err)
Expand All @@ -1566,15 +1566,15 @@ func TestProcessor_consumeFail(t *testing.T) {
close(done)
}()

km.ConsumeString("topic", "key", "world")
tester.ConsumeString("topic", "key", "world")

proc.Stop()
<-done
ensure.True(t, strings.Contains(processorErrors.Error(), "consume-failed"))
}

func TestProcessor_consumePanic(t *testing.T) {
km := NewKafkaMock(t, "test")
tester := tester.New(t)

consume := func(ctx Context, msg interface{}) {
panic("panicking")
Expand All @@ -1584,7 +1584,7 @@ func TestProcessor_consumePanic(t *testing.T) {
DefineGroup("test",
Input("topic", new(codec.String), consume),
),
append(km.ProcessorOptions())...,
WithTester(tester),
)

ensure.Nil(t, err)
Expand All @@ -1597,7 +1597,7 @@ func TestProcessor_consumePanic(t *testing.T) {
close(done)
}()

km.ConsumeString("topic", "key", "world")
tester.ConsumeString("topic", "key", "world")

proc.Stop()
<-done
Expand Down Expand Up @@ -1661,12 +1661,13 @@ func TestProcessor_consumeNil(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
km := NewKafkaMock(t, "test")
tester := tester.New(t)
proc, err := NewProcessor([]string{"broker"},
DefineGroup("test",
Input("topic", tc.codec, tc.cb),
),
append(km.ProcessorOptions(), WithNilHandling(tc.handling))...,
WithTester(tester),
WithNilHandling(tc.handling),
)

ensure.Nil(t, err)
Expand All @@ -1679,7 +1680,7 @@ func TestProcessor_consumeNil(t *testing.T) {
close(done)
}()

km.Consume("topic", "key", nil)
tester.Consume("topic", "key", nil)

proc.Stop()
<-done
Expand All @@ -1697,13 +1698,13 @@ func TestProcessor_failOnRecover(t *testing.T) {
msgToRecover = 100
)

km := NewKafkaMock(t, "test")
tester := tester.New(t)

consume := func(ctx Context, msg interface{}) {
log.Println("consuming message..", ctx.Key())
}

km.SetGroupTableCreator(func() (string, []byte) {
tester.SetGroupTableCreator(func() (string, []byte) {
time.Sleep(10 * time.Millisecond)
recovered++
if recovered > msgToRecover {
Expand All @@ -1717,12 +1718,11 @@ func TestProcessor_failOnRecover(t *testing.T) {
Input("topic", new(codec.String), consume),
Persist(rawCodec),
),
append(km.ProcessorOptions(),
WithUpdateCallback(func(s storage.Storage, partition int32, key string, value []byte) error {
log.Printf("recovered state: %s: %s", key, string(value))
return nil
}),
)...,
WithTester(tester),
WithUpdateCallback(func(s storage.Storage, partition int32, key string, value []byte) error {
log.Printf("recovered state: %s: %s", key, string(value))
return nil
}),
)

ensure.Nil(t, err)
Expand Down
Loading

0 comments on commit 8b40064

Please sign in to comment.