Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tester tolerate unconsumed queues #199

Merged
merged 5 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions integrationtest/processor_stuck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"context"
"fmt"
"log"
"sync/atomic"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/kafka"
)

func main() {

proc, err := goka.NewProcessor([]string{"localhost:9092"},
goka.DefineGroup("processor-stuck-test",
goka.Input("input", new(codec.Int64), func(ctx goka.Context, msg interface{}) {
ctx.SetValue(msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
}),
goka.Output("output", new(codec.Int64)),
goka.Persist(new(codec.Int64)),
))

if err != nil {
log.Fatalf("Cannot start processor: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)

log.Printf("Running processor")
procRunErr := proc.Run(ctx)
log.Printf("Processor finished with %v", procRunErr)
}()

log.Printf("wait 5 seconds before starting to emit")
time.Sleep(5 * time.Second)

for i := 0; i < 50; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These loops could use some more overview imo

go func() {

cfg := kafka.NewConfig()
cfg.Producer.Retry.Max = 0
cfg.Producer.Retry.Backoff = 1 * time.Millisecond
emitter, err := goka.NewEmitter([]string{"localhost:9092"}, "input", new(codec.Int64),
goka.WithEmitterProducerBuilder(
kafka.ProducerBuilderWithConfig(cfg),
),
)
if err != nil {
log.Fatalf("Error creating emitter: %v", err)
}

time.Sleep(2 * time.Second)
defer func() {
log.Printf("finishing")
emitter.Finish()
log.Printf("done")
}()

defer recover()
var done int64
var emitted int64
for i := 0; ; i++ {
if atomic.LoadInt64(&done) > 0 {
break
}

// when the context is done, stop emitting
go func() {
<-ctx.Done()
atomic.AddInt64(&done, 1)
}()
emitted++
if emitted%1000 == 0 {
log.Printf("emitted %d", emitted)
}
prom, err := emitter.Emit(fmt.Sprintf("%d", i), int64(i))
if err != nil {
break
}
prom.Then(func(err error) {
if err != nil {
atomic.AddInt64(&done, 1)
}
})
time.Sleep(10 * time.Millisecond)
}
}()
}

log.Printf("waiting for the processor to shutdown")
<-done
log.Printf("processor is dead. Nice!")

cancel()
}
67 changes: 49 additions & 18 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package kafka

import (
"fmt"
"sync"
"time"

"github.com/Shopify/sarama"
)
Expand All @@ -17,6 +19,7 @@ type producer struct {
producer sarama.AsyncProducer
stop chan bool
done chan bool
wg sync.WaitGroup
}

// NewProducer creates new kafka producer for passed brokers.
Expand All @@ -32,17 +35,35 @@ func NewProducer(brokers []string, config *sarama.Config) (Producer, error) {
done: make(chan bool),
}

go p.run()
p.run()

return &p, nil
}

// Close stops the producer and waits for the Success/Error channels to drain.
// Emitting to a closing/closed producer results in write-to-closed-channel panic
func (p *producer) Close() error {
close(p.stop)
<-p.done
return p.producer.Close()
// do an async close to get the rest of the success/error messages to avoid
// leaving unfinished promises.
p.producer.AsyncClose()

// wait for the channels to drain
done := make(chan struct{})
go func() {
p.wg.Wait()
close(done)
}()

select {
case <-done:
case <-time.NewTimer(60 * time.Second).C:
}

return nil
}

// Emit emits a key-value pair to topic and returns a Promise that
// can be checked for errors asynchronously
func (p *producer) Emit(topic string, key string, value []byte) *Promise {
promise := NewPromise()
p.producer.Input() <- &sarama.ProducerMessage{
Expand All @@ -56,19 +77,29 @@ func (p *producer) Emit(topic string, key string, value []byte) *Promise {

// resolve or reject a promise in the message's metadata on Success or Error
func (p *producer) run() {
defer close(p.done)
for {
select {
case <-p.stop:
return

case err := <-p.producer.Errors():
promise := err.Msg.Metadata.(*Promise)
promise.Finish(err.Err)

case msg := <-p.producer.Successes():
promise := msg.Metadata.(*Promise)
promise.Finish(nil)
p.wg.Add(2)
go func() {
defer p.wg.Done()
for {
err, ok := <-p.producer.Errors()

// channel closed, the producer is stopping
if !ok {
return
}
err.Msg.Metadata.(*Promise).Finish(err.Err)
}
}
}()

go func() {
defer p.wg.Done()
for {
msg, ok := <-p.producer.Successes()
// channel closed, the producer is stopping
if !ok {
return
}
msg.Metadata.(*Promise).Finish(nil)
}
}()
}
68 changes: 68 additions & 0 deletions kafka/producer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// +build kafka

package kafka

import (
"log"
"sync/atomic"
"testing"
"time"
)

// This tests how a producer behaves when being shutdown to make sure,
// no promises stay unfinished.
// To run the test, get a local kafka-container running (e.g. go to
// examples-directory and do `make restart`), then run the tests with
// `go test -v github.com/lovoo/goka/kafka/ -tags=kafka`
func TestProducerError(t *testing.T) {
cfg := NewConfig().Config
p, err := NewProducer([]string{"localhost:9092"}, &cfg)

if err != nil {
t.Fatalf("error creating producer: %v", err)
}

var (
promises []*Promise
donePromises int64
emitted int64
done = make(chan bool)
)

go func() {
defer func() {
recover()
}()
defer close(done)
for {
promise := p.Emit("test", "test", []byte{})
promise.Then(func(err error) {
atomic.AddInt64(&donePromises, 1)
if err != nil {
log.Printf("error producing message: %v", err)
}
})
promises = append(promises, promise)
emitted++
time.Sleep(20 * time.Millisecond)
}
}()

// let it run for 1 second
time.Sleep(1000 * time.Millisecond)

// close the producer
err = p.Close()
if err != nil {
log.Printf("Error closing producer: %v", err)
}
// wait for the promises to be
<-done

if len(promises) != int(emitted) {
t.Errorf("Promises/Emits do not match: promises: %d, emitted. %d", len(promises), emitted)
}
if len(promises) != int(donePromises) {
t.Errorf("Promises/Done promises do not match: promises: %d, done. %d", len(promises), donePromises)
}
}
18 changes: 17 additions & 1 deletion partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goka
import (
"context"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -139,7 +140,20 @@ func newMessage(ev *kafka.Message) *message {
func (p *partition) run(ctx context.Context) error {
var wg sync.WaitGroup
p.proxy.AddGroup()
defer wg.Wait()

defer func() {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
case <-time.NewTimer(10 * time.Second).C:
log.Printf("partition shutdown timed out. Will stop waiting.")
}
}()

for {
select {
Expand Down Expand Up @@ -185,10 +199,12 @@ func (p *partition) run(ctx context.Context) error {
select {
case p.responseStats <- p.lastStats:
case <-ctx.Done():
p.log.Printf("Partitioning exiting, context is cancelled")
return nil
}

case <-ctx.Done():
p.log.Printf("Partitioning exiting, context is cancelled (outer)")
return nil
}

Expand Down
Loading