Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/move kafkamock #118

Merged
merged 3 commits into from
Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/testing/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func createProcessor(brokers []string, extraopts ...goka.ProcessorOption) (*goka
goka.DefineGroup(
goka.Group("consume-scalar"),
goka.Persist(new(codec.Int64)),
goka.Input(goka.Stream("scalar"), new(codec.Int64), ConsumeScalarState),
goka.Input(goka.Stream("scalar-state"), new(codec.Int64), ConsumeScalarState),
goka.Input(goka.Stream("scalar"), new(codec.Int64), ConsumeScalar),
goka.Output(goka.Stream("sink"), new(codec.Int64)),
),
extraopts...,
)
Expand Down
31 changes: 25 additions & 6 deletions examples/testing/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/lovoo/goka"
"github.com/lovoo/goka/tester"
)

func Test_ConsumeScalar(t *testing.T) {
Expand All @@ -27,8 +28,8 @@ func Test_ConsumeScalar(t *testing.T) {
func Test_ConsumeScalar_Integration(t *testing.T) {
// ctrl := goka.NewMockController(t)
// defer ctrl.Finish()
kafkaMock := goka.NewKafkaMock(t, "consume-scalar")
proc, err := createProcessor(nil, kafkaMock.ProcessorOptions()...)
tester := tester.New(t)
proc, err := createProcessor(nil, goka.WithTester(tester))

if err != nil {
t.Fatalf("Error creating processor: %v", err)
Expand All @@ -46,21 +47,39 @@ func Test_ConsumeScalar_Integration(t *testing.T) {
msg := []byte(strconv.FormatInt(1, 10))

// there is no initial value for key "foo"
if val := kafkaMock.ValueForKey("foo"); val != nil {
if val := tester.ValueForKey("foo"); val != nil {
t.Errorf("state was not initially empty: %v", val)
}

// send the message twice
kafkaMock.Consume("scalar", "foo", msg)
kafkaMock.Consume("scalar", "foo", msg)
tester.Consume("scalar-state", "foo", msg)
tester.Consume("scalar-state", "foo", msg)

value := string(kafkaMock.ValueForKey("foo").([]byte))
fooByte, isByte := tester.ValueForKey("foo").([]byte)
if !isByte {
t.Errorf("state does not exist or is not []byte")
}
value := string(fooByte)
fmt.Printf("%v\n", value)

if value != "2" {
t.Errorf("Expected value %s, got %s", "2", value)
}

tester.Consume("scalar", "somekey", msg)
var (
parsed int64
parseErr error
)
// expect that a value was emitted
tester.ExpectEmit("sink", "outgoing", func(value []byte) {
parsed, parseErr = strconv.ParseInt(string(value), 10, 64)
})
if parseErr != nil || parsed != 2 {
panic(fmt.Errorf("parsing emitted message failed or had a wrong value (%d): %v", parsed, parseErr))
}
tester.Finish(true)

proc.Stop()
<-done
}
26 changes: 26 additions & 0 deletions mock/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package mock

import (
"fmt"

"github.com/golang/mock/gomock"
)

type gomockPanicker struct {
reporter gomock.TestReporter
}

func (gm *gomockPanicker) Errorf(format string, args ...interface{}) {
gm.reporter.Errorf(format, args...)
}
func (gm *gomockPanicker) Fatalf(format string, args ...interface{}) {
defer panic(fmt.Sprintf(format, args...))
gm.reporter.Fatalf(format, args...)
}

// NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever)
// which panics on a Fatalf. This is necessary when using a mock in kafkamock.
// Otherwise it will freeze on an unexpected call.
func NewMockController(t gomock.TestReporter) *gomock.Controller {
return gomock.NewController(&gomockPanicker{reporter: t})
}
19 changes: 19 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,25 @@ func WithNilHandling(nh NilHandling) ProcessorOption {
}
}

type Tester interface {
StorageBuilder() storage.Builder
ConsumerBuilder() kafka.ConsumerBuilder
ProducerBuilder() kafka.ProducerBuilder
TopicManagerBuilder() kafka.TopicManagerBuilder
}

// WithTester configures all external connections of a processor, ie, storage,
// consumer and producer
func WithTester(t Tester) ProcessorOption {
return func(o *poptions) {
o.builders.storage = t.StorageBuilder()
o.builders.consumer = t.ConsumerBuilder()
o.builders.producer = t.ProducerBuilder()
o.builders.topicmgr = t.TopicManagerBuilder()
o.partitionChannelSize = 0
}
}

func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
Expand Down
68 changes: 34 additions & 34 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/lovoo/goka/mock"
"github.com/lovoo/goka/multierr"
"github.com/lovoo/goka/storage"
"github.com/lovoo/goka/tester"

"github.com/facebookgo/ensure"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -1393,10 +1394,10 @@ func TestProcessor_HasGetStateless(t *testing.T) {
}

func TestProcessor_StatelessContext(t *testing.T) {
ctrl := NewMockController(t)
ctrl := mock.NewMockController(t)
defer ctrl.Finish()
var (
km = NewKafkaMock(t, "user-reputation").SetCodec(new(codec.Bytes))
tester = tester.New(t).SetCodec(new(codec.Bytes))
//count int64
//wait = make(chan bool)
)
Expand All @@ -1413,8 +1414,7 @@ func TestProcessor_StatelessContext(t *testing.T) {
"stateless-ctx",
Input("input-topic", new(codec.Bytes), callPersist),
),
// add kafkamock options
km.ProcessorOptions()...,
WithTester(tester),
)
ensure.Nil(t, err)
done := make(chan bool)
Expand All @@ -1425,7 +1425,7 @@ func TestProcessor_StatelessContext(t *testing.T) {
}()
err = doTimed(t, func() {
// consume a random key/message, the content doesn't matter as this should fail
km.ConsumeString("input-topic", "key", "msg")
tester.ConsumeString("input-topic", "key", "msg")
<-done
})
ensure.Nil(t, err)
Expand All @@ -1434,8 +1434,8 @@ func TestProcessor_StatelessContext(t *testing.T) {
func TestProcessor_ProducerError(t *testing.T) {

t.Run("SetValue", func(t *testing.T) {
km := NewKafkaMock(t, "test")
km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
tester := tester.New(t)
tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
return kafka.NewPromise().Finish(errors.New("producer error"))
})

Expand All @@ -1448,7 +1448,7 @@ func TestProcessor_ProducerError(t *testing.T) {
Input("topic", new(codec.String), consume),
Persist(new(codec.String)),
),
km.ProcessorOptions()...,
WithTester(tester),
)

ensure.Nil(t, err)
Expand All @@ -1461,15 +1461,15 @@ func TestProcessor_ProducerError(t *testing.T) {
close(done)
}()

km.ConsumeString("topic", "key", "world")
tester.ConsumeString("topic", "key", "world")
proc.Stop()
<-done
ensure.True(t, processorErrors != nil)
})

t.Run("Emit", func(t *testing.T) {
km := NewKafkaMock(t, "test")
km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
tester := tester.New(t)
tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
return kafka.NewPromise().Finish(errors.New("producer error"))
})

Expand All @@ -1482,7 +1482,7 @@ func TestProcessor_ProducerError(t *testing.T) {
Input("topic", new(codec.String), consume),
Persist(new(codec.String)),
),
km.ProcessorOptions()...,
WithTester(tester),
)

ensure.Nil(t, err)
Expand All @@ -1495,16 +1495,16 @@ func TestProcessor_ProducerError(t *testing.T) {
close(done)
}()

km.ConsumeString("topic", "key", "world")
tester.ConsumeString("topic", "key", "world")

proc.Stop()
<-done
ensure.True(t, processorErrors != nil)
})

t.Run("Value-stateless", func(t *testing.T) {
km := NewKafkaMock(t, "test")
km.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
tester := tester.New(t)
tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise {
return kafka.NewPromise().Finish(errors.New("producer error"))
})

Expand All @@ -1519,7 +1519,7 @@ func TestProcessor_ProducerError(t *testing.T) {
DefineGroup("test",
Input("topic", new(codec.String), consume),
),
append(km.ProcessorOptions())...,
WithTester(tester),
)

ensure.Nil(t, err)
Expand All @@ -1532,7 +1532,7 @@ func TestProcessor_ProducerError(t *testing.T) {
close(done)
}()

km.ConsumeString("topic", "key", "world")
tester.ConsumeString("topic", "key", "world")

// stopping the processor. It should actually not produce results
proc.Stop()
Expand All @@ -1543,7 +1543,7 @@ func TestProcessor_ProducerError(t *testing.T) {
}

func TestProcessor_consumeFail(t *testing.T) {
km := NewKafkaMock(t, "test")
tester := tester.New(t)

consume := func(ctx Context, msg interface{}) {
ctx.Fail(errors.New("consume-failed"))
Expand All @@ -1553,7 +1553,7 @@ func TestProcessor_consumeFail(t *testing.T) {
DefineGroup("test",
Input("topic", new(codec.String), consume),
),
append(km.ProcessorOptions())...,
WithTester(tester),
)

ensure.Nil(t, err)
Expand All @@ -1566,15 +1566,15 @@ func TestProcessor_consumeFail(t *testing.T) {
close(done)
}()

km.ConsumeString("topic", "key", "world")
tester.ConsumeString("topic", "key", "world")

proc.Stop()
<-done
ensure.True(t, strings.Contains(processorErrors.Error(), "consume-failed"))
}

func TestProcessor_consumePanic(t *testing.T) {
km := NewKafkaMock(t, "test")
tester := tester.New(t)

consume := func(ctx Context, msg interface{}) {
panic("panicking")
Expand All @@ -1584,7 +1584,7 @@ func TestProcessor_consumePanic(t *testing.T) {
DefineGroup("test",
Input("topic", new(codec.String), consume),
),
append(km.ProcessorOptions())...,
WithTester(tester),
)

ensure.Nil(t, err)
Expand All @@ -1597,7 +1597,7 @@ func TestProcessor_consumePanic(t *testing.T) {
close(done)
}()

km.ConsumeString("topic", "key", "world")
tester.ConsumeString("topic", "key", "world")

proc.Stop()
<-done
Expand Down Expand Up @@ -1661,12 +1661,13 @@ func TestProcessor_consumeNil(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
km := NewKafkaMock(t, "test")
tester := tester.New(t)
proc, err := NewProcessor([]string{"broker"},
DefineGroup("test",
Input("topic", tc.codec, tc.cb),
),
append(km.ProcessorOptions(), WithNilHandling(tc.handling))...,
WithTester(tester),
WithNilHandling(tc.handling),
)

ensure.Nil(t, err)
Expand All @@ -1679,7 +1680,7 @@ func TestProcessor_consumeNil(t *testing.T) {
close(done)
}()

km.Consume("topic", "key", nil)
tester.Consume("topic", "key", nil)

proc.Stop()
<-done
Expand All @@ -1697,13 +1698,13 @@ func TestProcessor_failOnRecover(t *testing.T) {
msgToRecover = 100
)

km := NewKafkaMock(t, "test")
tester := tester.New(t)

consume := func(ctx Context, msg interface{}) {
log.Println("consuming message..", ctx.Key())
}

km.SetGroupTableCreator(func() (string, []byte) {
tester.SetGroupTableCreator(func() (string, []byte) {
time.Sleep(10 * time.Millisecond)
recovered++
if recovered > msgToRecover {
Expand All @@ -1717,12 +1718,11 @@ func TestProcessor_failOnRecover(t *testing.T) {
Input("topic", new(codec.String), consume),
Persist(rawCodec),
),
append(km.ProcessorOptions(),
WithUpdateCallback(func(s storage.Storage, partition int32, key string, value []byte) error {
log.Printf("recovered state: %s: %s", key, string(value))
return nil
}),
)...,
WithTester(tester),
WithUpdateCallback(func(s storage.Storage, partition int32, key string, value []byte) error {
log.Printf("recovered state: %s: %s", key, string(value))
return nil
}),
)

ensure.Nil(t, err)
Expand Down
Loading