Skip to content

Commit

Permalink
Improve headers implementation (#315)
Browse files Browse the repository at this point in the history
* ignore IntelliJ's .idea directory
* add default headers to the emitter
* add headers to table update callback
* Introduce Headers type.
The explicit type improves readability and adds more flexibility
for future enhancements of the interface.
Co-authored-by: Anas Sulaiman <asulaiman@auvik.com>
  • Loading branch information
mrahs committed May 5, 2021
1 parent d0a4840 commit 48c2ec2
Show file tree
Hide file tree
Showing 23 changed files with 336 additions and 252 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ tmp*
.tags*
vendor
.vscode
.idea
30 changes: 15 additions & 15 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/lovoo/goka/headers"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/multierr"
)

type emitter func(topic string, key string, value []byte, headers map[string][]byte) *Promise
type emitter func(topic string, key string, value []byte, hdr headers.Headers) *Promise

// Context provides access to the processor's table and emit capabilities to
// arbitrary topics in kafka.
Expand Down Expand Up @@ -62,7 +63,7 @@ type Context interface {
Value() interface{}

// Headers returns the headers of the input message
Headers() map[string][]byte
Headers() headers.Headers

// SetValue updates the value of the key in the group table.
// It stores the value in the local cache and sends the
Expand Down Expand Up @@ -140,13 +141,15 @@ type cbContext struct {
// commit commits the message in the consumer session
commit func()

emitter emitter
emitter emitter
emitterDefaultHeaders headers.Headers

asyncFailer func(err error)
syncFailer func(err error)

// Headers as passed from sarama. Note that this field will be filled
// lazily after the first call to Headers
headers map[string][]byte
headers headers.Headers

table *PartitionTable
// joins
Expand Down Expand Up @@ -221,9 +224,9 @@ func (ctx *cbContext) Loopback(key string, value interface{}, options ...Context
ctx.emit(l.Topic(), key, data, opts.emitHeaders)
}

func (ctx *cbContext) emit(topic string, key string, value []byte, headers map[string][]byte) {
func (ctx *cbContext) emit(topic string, key string, value []byte, hdr headers.Headers) {
ctx.counters.emits++
ctx.emitter(topic, key, value, headers).Then(func(err error) {
ctx.emitter(topic, key, value, ctx.emitterDefaultHeaders.Merged(hdr)).Then(func(err error) {
if err != nil {
err = fmt.Errorf("error emitting to %s: %v", topic, err)
}
Expand Down Expand Up @@ -283,13 +286,10 @@ func (ctx *cbContext) Partition() int32 {
return ctx.msg.Partition
}

func (ctx *cbContext) Headers() map[string][]byte {
func (ctx *cbContext) Headers() headers.Headers {

if ctx.headers == nil {
ctx.headers = make(map[string][]byte)
for _, header := range ctx.msg.Headers {
ctx.headers[string(header.Key)] = header.Value
}
ctx.headers = headers.FromSarama(ctx.msg.Headers)
}
return ctx.headers
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func (ctx *cbContext) valueForKey(key string) (interface{}, error) {
return value, nil
}

func (ctx *cbContext) deleteKey(key string, headers map[string][]byte) error {
func (ctx *cbContext) deleteKey(key string, hdr headers.Headers) error {
if ctx.graph.GroupTable() == nil {
return fmt.Errorf("Cannot access state in stateless processor")
}
Expand All @@ -362,15 +362,15 @@ func (ctx *cbContext) deleteKey(key string, headers map[string][]byte) error {
}

ctx.counters.emits++
ctx.emitter(ctx.graph.GroupTable().Topic(), key, nil, headers).Then(func(err error) {
ctx.emitter(ctx.graph.GroupTable().Topic(), key, nil, hdr).Then(func(err error) {
ctx.emitDone(err)
})

return nil
}

// setValueForKey sets a value for a key in the processor state.
func (ctx *cbContext) setValueForKey(key string, value interface{}, headers map[string][]byte) error {
func (ctx *cbContext) setValueForKey(key string, value interface{}, hdr headers.Headers) error {
if ctx.graph.GroupTable() == nil {
return fmt.Errorf("Cannot access state in stateless processor")
}
Expand All @@ -391,7 +391,7 @@ func (ctx *cbContext) setValueForKey(key string, value interface{}, headers map[

table := ctx.graph.GroupTable().Topic()
ctx.counters.emits++
ctx.emitter(table, key, encodedValue, headers).ThenWithMessage(func(msg *sarama.ProducerMessage, err error) {
ctx.emitter(table, key, encodedValue, hdr).ThenWithMessage(func(msg *sarama.ProducerMessage, err error) {
if err == nil && msg != nil {
err = ctx.table.storeNewestOffset(msg.Offset)
}
Expand Down
55 changes: 28 additions & 27 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/lovoo/goka/headers"
"strings"
"sync"
"testing"
Expand All @@ -18,7 +19,7 @@ import (
)

func newEmitter(err error, done func(err error)) emitter {
return func(topic string, key string, value []byte, headers map[string][]byte) *Promise {
return func(topic string, key string, value []byte, hdr headers.Headers) *Promise {
p := NewPromise()
if done != nil {
p.Then(done)
Expand All @@ -28,7 +29,7 @@ func newEmitter(err error, done func(err error)) emitter {
}

func newEmitterW(wg *sync.WaitGroup, err error, done func(err error)) emitter {
return func(topic string, key string, value []byte, headers map[string][]byte) *Promise {
return func(topic string, key string, value []byte, hdr headers.Headers) *Promise {
wg.Add(1)
p := NewPromise()
if done != nil {
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestContext_Emit(t *testing.T) {
})

ctx.start()
ctx.emit("emit-topic", "key", []byte("value"), map[string][]byte{})
ctx.emit("emit-topic", "key", []byte("value"), headers.Headers{})
ctx.finish(nil)

// we can now for all callbacks -- it should also guarantee a memory fence
Expand Down Expand Up @@ -164,7 +165,7 @@ func TestContext_EmitError(t *testing.T) {
})

ctx.start()
ctx.emit("emit-topic", "key", []byte("value"), map[string][]byte{})
ctx.emit("emit-topic", "key", []byte("value"), headers.Headers{})
ctx.finish(nil)

// we can now for all callbacks -- it should also guarantee a memory fence
Expand Down Expand Up @@ -250,7 +251,7 @@ func TestContext_Delete(t *testing.T) {
ctx.emitter = newEmitter(nil, nil)

ctx.start()
err := ctx.deleteKey(key, map[string][]byte{})
err := ctx.deleteKey(key, headers.Headers{})
test.AssertNil(t, err)
ctx.finish(nil)

Expand Down Expand Up @@ -280,7 +281,7 @@ func TestContext_DeleteStateless(t *testing.T) {
}
ctx.emitter = newEmitter(nil, nil)

err := ctx.deleteKey(key, map[string][]byte{})
err := ctx.deleteKey(key, headers.Headers{})
test.AssertTrue(t, strings.Contains(err.Error(), "Cannot access state in stateless processor"))
}

Expand Down Expand Up @@ -313,7 +314,7 @@ func TestContext_DeleteStorageError(t *testing.T) {

ctx.emitter = newEmitter(nil, nil)

err := ctx.deleteKey(key, map[string][]byte{})
err := ctx.deleteKey(key, headers.Headers{})
test.AssertTrue(t, strings.Contains(err.Error(), "error deleting key (key) from storage: storage error"))
}

Expand Down Expand Up @@ -351,7 +352,7 @@ func TestContext_Set(t *testing.T) {
ctx.emitter = newEmitter(nil, nil)

ctx.start()
err := ctx.setValueForKey(key, value, map[string][]byte{})
err := ctx.setValueForKey(key, value, headers.Headers{})
test.AssertNil(t, err)
ctx.finish(nil)

Expand All @@ -373,7 +374,7 @@ func TestContext_GetSetStateful(t *testing.T) {
group Group = "some-group"
key = "key"
value = "value"
headers = map[string][]byte{"key": []byte("headerValue")}
hdr = headers.Headers{"key": []byte("headerValue")}
offset = int64(123)
wg = new(sync.WaitGroup)
st = NewMockStorage(ctrl)
Expand All @@ -398,12 +399,12 @@ func TestContext_GetSetStateful(t *testing.T) {
graph: graph,
trackOutputStats: func(ctx context.Context, topic string, size int) {},
msg: &sarama.ConsumerMessage{Key: []byte(key), Offset: offset},
emitter: func(tp string, k string, v []byte, h map[string][]byte) *Promise {
emitter: func(tp string, k string, v []byte, h headers.Headers) *Promise {
wg.Add(1)
test.AssertEqual(t, tp, graph.GroupTable().Topic())
test.AssertEqual(t, string(k), key)
test.AssertEqual(t, string(v), value)
test.AssertEqual(t, h, headers)
test.AssertEqual(t, h, hdr)
return NewPromise().finish(nil, nil)
},
ctx: context.Background(),
Expand All @@ -412,7 +413,7 @@ func TestContext_GetSetStateful(t *testing.T) {
val := ctx.Value()
test.AssertTrue(t, val == nil)

ctx.SetValue(value, WithCtxEmitHeaders(headers))
ctx.SetValue(value, WithCtxEmitHeaders(hdr))

val = ctx.Value()
test.AssertEqual(t, val, value)
Expand Down Expand Up @@ -451,17 +452,17 @@ func TestContext_SetErrors(t *testing.T) {
asyncFailer: failer,
}

err := ctx.setValueForKey(key, nil, map[string][]byte{})
err := ctx.setValueForKey(key, nil, headers.Headers{})
test.AssertNotNil(t, err)
test.AssertTrue(t, strings.Contains(err.Error(), "cannot set nil"))

err = ctx.setValueForKey(key, 123, map[string][]byte{}) // cannot encode 123 as string
err = ctx.setValueForKey(key, 123, headers.Headers{}) // cannot encode 123 as string
test.AssertNotNil(t, err)
test.AssertTrue(t, strings.Contains(err.Error(), "error encoding"))

st.EXPECT().Set(key, []byte(value)).Return(errors.New("some-error"))

err = ctx.setValueForKey(key, value, map[string][]byte{})
err = ctx.setValueForKey(key, value, headers.Headers{})
test.AssertNotNil(t, err)
test.AssertTrue(t, strings.Contains(err.Error(), "some-error"))

Expand Down Expand Up @@ -494,28 +495,28 @@ func TestContext_Loopback(t *testing.T) {
defer ctrl.Finish()

var (
key = "key"
value = "value"
headers = map[string][]byte{"key": []byte("headerValue")}
cnt = 0
key = "key"
value = "value"
hdr = headers.Headers{"key": []byte("headerValue")}
cnt = 0
)

graph := DefineGroup("group", Persist(c), Loop(c, cb))
ctx := &cbContext{
graph: graph,
msg: &sarama.ConsumerMessage{},
trackOutputStats: func(ctx context.Context, topic string, size int) {},
emitter: func(tp string, k string, v []byte, h map[string][]byte) *Promise {
emitter: func(tp string, k string, v []byte, h headers.Headers) *Promise {
cnt++
test.AssertEqual(t, tp, graph.LoopStream().Topic())
test.AssertEqual(t, string(k), key)
test.AssertEqual(t, string(v), value)
test.AssertEqual(t, h, headers)
test.AssertEqual(t, h, hdr)
return NewPromise()
},
}

ctx.Loopback(key, value, WithCtxEmitHeaders(headers))
ctx.Loopback(key, value, WithCtxEmitHeaders(hdr))
test.AssertTrue(t, cnt == 1)
}

Expand Down Expand Up @@ -631,9 +632,9 @@ func TestContext_Headers(t *testing.T) {
ctx := &cbContext{
msg: &sarama.ConsumerMessage{Key: []byte("key")},
}
headers := ctx.Headers()
test.AssertNotNil(t, headers)
test.AssertEqual(t, len(headers), 0)
hdr := ctx.Headers()
test.AssertNotNil(t, hdr)
test.AssertEqual(t, len(hdr), 0)

ctx = &cbContext{
msg: &sarama.ConsumerMessage{Key: []byte("key"), Headers: []*sarama.RecordHeader{
Expand All @@ -643,8 +644,8 @@ func TestContext_Headers(t *testing.T) {
},
}},
}
headers = ctx.Headers()
test.AssertEqual(t, headers["key"], []byte("value"))
hdr = ctx.Headers()
test.AssertEqual(t, hdr["key"], []byte("value"))
}

func TestContext_Fail(t *testing.T) {
Expand Down
24 changes: 14 additions & 10 deletions emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goka
import (
"errors"
"fmt"
"github.com/lovoo/goka/headers"
"sync"
)

Expand All @@ -16,7 +17,8 @@ type Emitter struct {
codec Codec
producer Producer

topic string
topic string
defaultHeaders headers.Headers

wg sync.WaitGroup
mu sync.RWMutex
Expand Down Expand Up @@ -45,17 +47,18 @@ func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterO
}

return &Emitter{
codec: codec,
producer: prod,
topic: string(topic),
done: make(chan struct{}),
codec: codec,
producer: prod,
topic: string(topic),
defaultHeaders: opts.defaultHeaders,
done: make(chan struct{}),
}, nil
}

func (e *Emitter) emitDone(err error) { e.wg.Done() }

// EmitWithHeaders sends a message with the given headers for the passed key using the emitter's codec.
func (e *Emitter) EmitWithHeaders(key string, msg interface{}, headers map[string][]byte) (*Promise, error) {
func (e *Emitter) EmitWithHeaders(key string, msg interface{}, hdr headers.Headers) (*Promise, error) {
var (
err error
data []byte
Expand All @@ -80,10 +83,11 @@ func (e *Emitter) EmitWithHeaders(key string, msg interface{}, headers map[strin
e.mu.RUnlock()
}

if headers == nil {
if hdr == nil && e.defaultHeaders == nil {
return e.producer.Emit(e.topic, key, data).Then(e.emitDone), nil
}
return e.producer.EmitWithHeaders(e.topic, key, data, headers).Then(e.emitDone), nil

return e.producer.EmitWithHeaders(e.topic, key, data, e.defaultHeaders.Merged(hdr)).Then(e.emitDone), nil
}

// Emit sends a message for passed key using the emitter's codec.
Expand All @@ -92,12 +96,12 @@ func (e *Emitter) Emit(key string, msg interface{}) (*Promise, error) {
}

// EmitSyncWithHeaders sends a message with the given headers to passed topic and key.
func (e *Emitter) EmitSyncWithHeaders(key string, msg interface{}, headers map[string][]byte) error {
func (e *Emitter) EmitSyncWithHeaders(key string, msg interface{}, hdr headers.Headers) error {
var (
err error
promise *Promise
)
promise, err = e.EmitWithHeaders(key, msg, headers)
promise, err = e.EmitWithHeaders(key, msg, hdr)

if err != nil {
return err
Expand Down
Loading

0 comments on commit 48c2ec2

Please sign in to comment.