Skip to content

Commit

Permalink
Merge pull request #28 from lovoo/metrics
Browse files Browse the repository at this point in the history
Expose message timestamp and metrics improvements
  • Loading branch information
SamiHiltunen committed Jul 5, 2017
2 parents 52701c2 + 1001c9f commit 4264155
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 23 deletions.
10 changes: 10 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/lovoo/goka/kafka"
"github.com/lovoo/goka/storage"
Expand All @@ -27,6 +28,10 @@ type Context interface {
// SetValue updates the value of the key in the group table.
SetValue(value interface{})

// Timestamp returns the timestamp of the input message. If the timestamp is
// invalid, a zero time will be returned.
Timestamp() time.Time

// Join returns the value of key in the copartitioned table.
Join(topic Table) interface{}

Expand Down Expand Up @@ -133,6 +138,11 @@ func (ctx *context) SetValue(value interface{}) {
}
}

// Timestamp returns the timestamp of the input message.
func (ctx *context) Timestamp() time.Time {
return ctx.msg.Timestamp
}

func (ctx *context) Key() string {
return string(ctx.msg.Key)
}
Expand Down
12 changes: 12 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ func TestContext_Emit(t *testing.T) {
ensure.DeepEqual(t, ack, 1)
}

func TestContext_Timestamp(t *testing.T) {
ts := time.Now()

ctx := &context{
msg: &message{
Timestamp: ts,
},
}

ensure.DeepEqual(t, ctx.Timestamp(), ts)
}

func TestContext_EmitError(t *testing.T) {
ack := 0
emitted := 0
Expand Down
11 changes: 11 additions & 0 deletions examples/testing/context_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
gomock "github.com/golang/mock/gomock"
goka "github.com/lovoo/goka"
time "time"
)

// Mock of Context interface
Expand Down Expand Up @@ -91,6 +92,16 @@ func (_mr *_MockContextRecorder) SetValue(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "SetValue", arg0)
}

func (_m *MockContext) Timestamp() time.Time {
ret := _m.ctrl.Call(_m, "Timestamp")
ret0, _ := ret[0].(time.Time)
return ret0
}

func (_mr *_MockContextRecorder) Timestamp() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Timestamp")
}

func (_m *MockContext) Topic() goka.Stream {
ret := _m.ctrl.Call(_m, "Topic")
ret0, _ := ret[0].(goka.Stream)
Expand Down
6 changes: 5 additions & 1 deletion kafka/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kafka

import "fmt"
import (
"fmt"
"time"
)

// Event abstracts different types of events from the kafka consumer like BOF/EOF/Error or an actual message
type Event interface {
Expand Down Expand Up @@ -44,6 +47,7 @@ type Message struct {
Topic string
Partition int32
Offset int64
Timestamp time.Time

Key string
Value []byte
Expand Down
1 change: 1 addition & 0 deletions kafka/group_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (c *groupConsumer) waitForMessages() bool {
Topic: msg.Topic,
Partition: msg.Partition,
Offset: msg.Offset,
Timestamp: msg.Timestamp,
Key: string(msg.Key),
Value: msg.Value,
}
Expand Down
19 changes: 15 additions & 4 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ const (
mxConsumedRate = "consumed_messages_rate"
mxConsumedOffset = "consumed_offset"
mxConsumedPending = "consumed_messages_pending"
mxConsumptionDelay = "consumption_delay"
mxRecoverHwm = "recover_hwm"
mxPartitionLoaderHwm = "partition_loader_hwm" // current high water mark of the partition loader.
mxRecoverStartOffset = "recover_start_offset"
mxRecoverCurrentOffset = "recover_current_offset"
mxRecoverRate = "recover_rate"

defaultPartitionChannelSize = 10
syncInterval = 30 * time.Second
Expand Down Expand Up @@ -57,6 +57,7 @@ type partition struct {
process processCallback

// metrics
registry metrics.Registry
mxStatus metrics.Gauge // partition status = ?
mxConsumed metrics.Counter // number of processed messages
mxConsumedRate metrics.Meter // rate of processed messages
Expand All @@ -65,7 +66,6 @@ type partition struct {

mxRecoverStartOffset metrics.Gauge
mxRecoverCurrentOffset metrics.Gauge
mxRecoverRate metrics.Meter

mxRecoverHwm metrics.Gauge
mxPartitionLoaderHwm metrics.Gauge
Expand Down Expand Up @@ -94,6 +94,7 @@ func newPartition(log logger.Logger, topic string, cb processCallback, st *stora
process: cb,

// metrics
registry: reg,
mxConsumed: metrics.GetOrRegisterCounter(mxConsumed, reg),
mxConsumedRate: metrics.GetOrRegisterMeter(mxConsumedRate, reg),
mxConsumedPending: metrics.GetOrRegisterGauge(mxConsumedPending, reg),
Expand All @@ -103,7 +104,6 @@ func newPartition(log logger.Logger, topic string, cb processCallback, st *stora
mxPartitionLoaderHwm: metrics.GetOrRegisterGauge(mxPartitionLoaderHwm, reg),
mxRecoverStartOffset: metrics.GetOrRegisterGauge(mxRecoverStartOffset, reg),
mxRecoverCurrentOffset: metrics.GetOrRegisterGauge(mxRecoverCurrentOffset, reg),
mxRecoverRate: metrics.GetOrRegisterMeter(mxRecoverRate, reg),
}
}

Expand Down Expand Up @@ -161,6 +161,7 @@ func newMessage(ev *kafka.Message) *message {
Topic: string(ev.Topic),
Partition: int32(ev.Partition),
Offset: int64(ev.Offset),
Timestamp: ev.Timestamp,
Data: ev.Value,
Key: string(ev.Key),
}
Expand Down Expand Up @@ -203,6 +204,12 @@ func (p *partition) run() error {
p.mxConsumedPending.Update(int64(len(p.ch)))
p.mxConsumedRate.Mark(1)
p.mxConsumedOffset.Update(msg.Offset)
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.%s", ev.Topic, mxConsumed), p.registry).Inc(1)
metrics.GetOrRegisterMeter(fmt.Sprintf("%s.%s", ev.Topic, mxConsumedRate), p.registry).Mark(1)
metrics.GetOrRegisterGauge(fmt.Sprintf("%s.%s", ev.Topic, mxConsumedOffset), p.registry).Update(msg.Offset)
if !ev.Timestamp.IsZero() {
metrics.GetOrRegisterTimer(fmt.Sprintf("%s.%s", ev.Topic, mxConsumptionDelay), p.registry).UpdateSince(ev.Timestamp)
}

case *kafka.NOP:
// don't do anything but also don't log.
Expand Down Expand Up @@ -301,7 +308,11 @@ func (p *partition) load(catchup bool) error {
}
lastMessage = time.Now()
// update metrics
p.mxRecoverRate.Mark(1)
p.mxConsumedRate.Mark(1)
metrics.GetOrRegisterMeter(fmt.Sprintf("%s.%s", ev.Topic, mxConsumedRate), p.registry).Mark(1)
if !ev.Timestamp.IsZero() {
metrics.GetOrRegisterTimer(fmt.Sprintf("%s.%s", ev.Topic, mxConsumptionDelay), p.registry).UpdateSince(ev.Timestamp)
}
p.mxRecoverCurrentOffset.Update(ev.Offset)
if ev.Offset < p.initialHwm-1 {
p.mxStatus.Update(partitionRecovering)
Expand Down
2 changes: 2 additions & 0 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"hash/fnv"
"runtime/debug"
"sync"
"time"

"github.com/lovoo/goka/kafka"
"github.com/lovoo/goka/logger"
Expand Down Expand Up @@ -45,6 +46,7 @@ type message struct {
Topic string
Partition int32
Offset int64
Timestamp time.Time
}

// ProcessCallback function is called for every message received by the
Expand Down
Loading

0 comments on commit 4264155

Please sign in to comment.