Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose message timestamp and metrics improvements #28

Merged
merged 4 commits into from
Jul 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/lovoo/goka/kafka"
"github.com/lovoo/goka/storage"
Expand All @@ -27,6 +28,10 @@ type Context interface {
// SetValue updates the value of the key in the group table.
SetValue(value interface{})

// Timestamp returns the timestamp of the input message. If the timestamp is
// invalid, a zero time will be returned.
Timestamp() time.Time

// Join returns the value of key in the copartitioned table.
Join(topic Table) interface{}

Expand Down Expand Up @@ -133,6 +138,11 @@ func (ctx *context) SetValue(value interface{}) {
}
}

// Timestamp returns the timestamp of the input message.
func (ctx *context) Timestamp() time.Time {
return ctx.msg.Timestamp
}

func (ctx *context) Key() string {
return string(ctx.msg.Key)
}
Expand Down
12 changes: 12 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ func TestContext_Emit(t *testing.T) {
ensure.DeepEqual(t, ack, 1)
}

func TestContext_Timestamp(t *testing.T) {
ts := time.Now()

ctx := &context{
msg: &message{
Timestamp: ts,
},
}

ensure.DeepEqual(t, ctx.Timestamp(), ts)
}

func TestContext_EmitError(t *testing.T) {
ack := 0
emitted := 0
Expand Down
11 changes: 11 additions & 0 deletions examples/testing/context_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
gomock "github.com/golang/mock/gomock"
goka "github.com/lovoo/goka"
time "time"
)

// Mock of Context interface
Expand Down Expand Up @@ -91,6 +92,16 @@ func (_mr *_MockContextRecorder) SetValue(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "SetValue", arg0)
}

func (_m *MockContext) Timestamp() time.Time {
ret := _m.ctrl.Call(_m, "Timestamp")
ret0, _ := ret[0].(time.Time)
return ret0
}

func (_mr *_MockContextRecorder) Timestamp() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Timestamp")
}

func (_m *MockContext) Topic() goka.Stream {
ret := _m.ctrl.Call(_m, "Topic")
ret0, _ := ret[0].(goka.Stream)
Expand Down
6 changes: 5 additions & 1 deletion kafka/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kafka

import "fmt"
import (
"fmt"
"time"
)

// Event abstracts different types of events from the kafka consumer like BOF/EOF/Error or an actual message
type Event interface {
Expand Down Expand Up @@ -44,6 +47,7 @@ type Message struct {
Topic string
Partition int32
Offset int64
Timestamp time.Time

Key string
Value []byte
Expand Down
1 change: 1 addition & 0 deletions kafka/group_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (c *groupConsumer) waitForMessages() bool {
Topic: msg.Topic,
Partition: msg.Partition,
Offset: msg.Offset,
Timestamp: msg.Timestamp,
Key: string(msg.Key),
Value: msg.Value,
}
Expand Down
19 changes: 15 additions & 4 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ const (
mxConsumedRate = "consumed_messages_rate"
mxConsumedOffset = "consumed_offset"
mxConsumedPending = "consumed_messages_pending"
mxConsumptionDelay = "consumption_delay"
mxRecoverHwm = "recover_hwm"
mxPartitionLoaderHwm = "partition_loader_hwm" // current high water mark of the partition loader.
mxRecoverStartOffset = "recover_start_offset"
mxRecoverCurrentOffset = "recover_current_offset"
mxRecoverRate = "recover_rate"

defaultPartitionChannelSize = 10
syncInterval = 30 * time.Second
Expand Down Expand Up @@ -57,6 +57,7 @@ type partition struct {
process processCallback

// metrics
registry metrics.Registry
mxStatus metrics.Gauge // partition status = ?
mxConsumed metrics.Counter // number of processed messages
mxConsumedRate metrics.Meter // rate of processed messages
Expand All @@ -65,7 +66,6 @@ type partition struct {

mxRecoverStartOffset metrics.Gauge
mxRecoverCurrentOffset metrics.Gauge
mxRecoverRate metrics.Meter

mxRecoverHwm metrics.Gauge
mxPartitionLoaderHwm metrics.Gauge
Expand Down Expand Up @@ -94,6 +94,7 @@ func newPartition(log logger.Logger, topic string, cb processCallback, st *stora
process: cb,

// metrics
registry: reg,
mxConsumed: metrics.GetOrRegisterCounter(mxConsumed, reg),
mxConsumedRate: metrics.GetOrRegisterMeter(mxConsumedRate, reg),
mxConsumedPending: metrics.GetOrRegisterGauge(mxConsumedPending, reg),
Expand All @@ -103,7 +104,6 @@ func newPartition(log logger.Logger, topic string, cb processCallback, st *stora
mxPartitionLoaderHwm: metrics.GetOrRegisterGauge(mxPartitionLoaderHwm, reg),
mxRecoverStartOffset: metrics.GetOrRegisterGauge(mxRecoverStartOffset, reg),
mxRecoverCurrentOffset: metrics.GetOrRegisterGauge(mxRecoverCurrentOffset, reg),
mxRecoverRate: metrics.GetOrRegisterMeter(mxRecoverRate, reg),
}
}

Expand Down Expand Up @@ -161,6 +161,7 @@ func newMessage(ev *kafka.Message) *message {
Topic: string(ev.Topic),
Partition: int32(ev.Partition),
Offset: int64(ev.Offset),
Timestamp: ev.Timestamp,
Data: ev.Value,
Key: string(ev.Key),
}
Expand Down Expand Up @@ -203,6 +204,12 @@ func (p *partition) run() error {
p.mxConsumedPending.Update(int64(len(p.ch)))
p.mxConsumedRate.Mark(1)
p.mxConsumedOffset.Update(msg.Offset)
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.%s", ev.Topic, mxConsumed), p.registry).Inc(1)
metrics.GetOrRegisterMeter(fmt.Sprintf("%s.%s", ev.Topic, mxConsumedRate), p.registry).Mark(1)
metrics.GetOrRegisterGauge(fmt.Sprintf("%s.%s", ev.Topic, mxConsumedOffset), p.registry).Update(msg.Offset)
if !ev.Timestamp.IsZero() {
metrics.GetOrRegisterTimer(fmt.Sprintf("%s.%s", ev.Topic, mxConsumptionDelay), p.registry).UpdateSince(ev.Timestamp)
}

case *kafka.NOP:
// don't do anything but also don't log.
Expand Down Expand Up @@ -301,7 +308,11 @@ func (p *partition) load(catchup bool) error {
}
lastMessage = time.Now()
// update metrics
p.mxRecoverRate.Mark(1)
p.mxConsumedRate.Mark(1)
metrics.GetOrRegisterMeter(fmt.Sprintf("%s.%s", ev.Topic, mxConsumedRate), p.registry).Mark(1)
if !ev.Timestamp.IsZero() {
metrics.GetOrRegisterTimer(fmt.Sprintf("%s.%s", ev.Topic, mxConsumptionDelay), p.registry).UpdateSince(ev.Timestamp)
}
p.mxRecoverCurrentOffset.Update(ev.Offset)
if ev.Offset < p.initialHwm-1 {
p.mxStatus.Update(partitionRecovering)
Expand Down
2 changes: 2 additions & 0 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"hash/fnv"
"runtime/debug"
"sync"
"time"

"github.com/lovoo/goka/kafka"
"github.com/lovoo/goka/logger"
Expand Down Expand Up @@ -45,6 +46,7 @@ type message struct {
Topic string
Partition int32
Offset int64
Timestamp time.Time
}

// ProcessCallback function is called for every message received by the
Expand Down
Loading