Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COR-177] Latency Fixes #72

Merged
merged 4 commits into from
Nov 19, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 93 additions & 73 deletions monitor/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ import (
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/unit"
)

type Histogram struct {
name string
description string
type latencyHistogram struct {
meter metric.Meter
histogramName string
histogramDescription string
connectorName string
env string
value int64
}

const (
Expand All @@ -22,102 +27,117 @@ const (
LatencyConnectorKey = "latencyconn"
LatencyKafkaProduceKey = "latencykafkaprod"
LatencyStreamserverConsumeKey = "latencyssconsume"

// Metric keys
rpcLatency = 0
connLatency = 1
coreLatency = 2
systemLatency = 3
e2eLatency = 4
)

var (
latencyObservations = map[string]int64{
LatencyRpcKey: 0,
LatencyConnectorKey: 0,
LatencyKafkaProduceKey: 0,
LatencyStreamserverConsumeKey: 0,
}
latencyMetrics = map[int]int64{
rpcLatency: 0,
connLatency: 0,
coreLatency: 0,
systemLatency: 0,
e2eLatency: 0,
}
histograms = map[int]Histogram{
rpcLatency: {name: "latency.rpc.histogram", description: "Latency from block time to connector reception"},
connLatency: {name: "latency.connector.histogram", description: "Latency from connector reception to kafka produce"},
coreLatency: {name: "latency.core.histogram", description: "Latency from kafka to streamserver"},
systemLatency: {name: "latency.system.histogram", description: "Latency from connector to streamserver"},
e2eLatency: {name: "latency.e2e.histogram", description: "Latency from block time to streamserver"},
// ExportLatencyMetrics exports all latency metrics from baggage in ctx
func ExportLatencyMetrics(ctx context.Context, meter metric.Meter, connectorNmae string, env string) {
latency := latencyHistogram{
meter: meter,
connectorName: connectorName,
env: env,
value: 0,
}
)

// ExportLatencyMetrics exports latency metrics from baggage in ctx
func ExportLatencyMetrics(ctx context.Context, meter metric.Meter, connName string, env string) {
bag := baggage.FromContext(ctx)
recordRpcLatency(ctx, latency)
gunnarsundberg marked this conversation as resolved.
Show resolved Hide resolved
recordConnectorLatency(ctx, latency)
recordCoreLatency(ctx, latency)
recordSystemLatency(ctx, latency)
recordEndLatency(ctx, latency)
}

// Extract latency observations from baggage
for key := range latencyObservations {
latencyObservations[key] = getBaggageLatency(bag, key)
}
func recordRpcLatency(ctx context.Context, latency latencyHistogram) {
latency.histogramName = "latency.rpc.histogram"
latency.histogramDescription = "Latency from block time to connector reception"

// Get observations
rpcObservation := latencyObservations[LatencyRpcKey]
connObservation := latencyObservations[LatencyConnectorKey]
kafkaProduceObservation := latencyObservations[LatencyKafkaProduceKey]
ssConsumeObservation := latencyObservations[LatencyStreamserverConsumeKey]
rpcObservation := getBaggageLatency(ctx, LatencyRpcKey)
connObservation := getBaggageLatency(ctx, LatencyConnectorKey)

// Derive latency metrics from observations
if rpcObservation > 0 && connObservation > 0 {
latencyMetrics[rpcLatency] = connObservation - rpcObservation
} else {
latencyMetrics[rpcLatency] = 0
latency.value = connObservation - rpcObservation
}

recordLatencyHistogram(ctx, latency)
}

func recordConnectorLatency(ctx context.Context, latency latencyHistogram) {
latency.histogramName = "latency.connector.histogram"
latency.histogramDescription = "Latency from connector reception to kafka produce"

connObservation := getBaggageLatency(ctx, LatencyConnectorKey)
kafkaProduceObservation := getBaggageLatency(ctx, LatencyKafkaProduceKey)

if kafkaProduceObservation > 0 && connObservation > 0 {
latencyMetrics[connLatency] = kafkaProduceObservation - connObservation
} else {
latencyMetrics[connLatency] = 0
latency.value = kafkaProduceObservation - connObservation
}

recordLatencyHistogram(ctx, latency)
}

func recordCoreLatency(ctx context.Context, latency latencyHistogram) {
latency.histogramName = "latency.core.histogram"
latency.histogramDescription = "Latency from kafka to streamserver"

kafkaProduceObservation := getBaggageLatency(ctx, LatencyKafkaProduceKey)
ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey)

if ssConsumeObservation > 0 && kafkaProduceObservation > 0 {
latencyMetrics[coreLatency] = ssConsumeObservation - kafkaProduceObservation
} else {
latencyMetrics[coreLatency] = 0
latency.value = ssConsumeObservation - kafkaProduceObservation
}

recordLatencyHistogram(ctx, latency)
}

func recordSystemLatency(ctx context.Context, latency latencyHistogram) {
latency.histogramName = "latency.system.histogram"
latency.histogramDescription = "Latency from connector to streamserver"

connObservation := getBaggageLatency(ctx, LatencyConnectorKey)
ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey)

if ssConsumeObservation > 0 && connObservation > 0 {
latencyMetrics[systemLatency] = ssConsumeObservation - connObservation
} else {
latencyMetrics[systemLatency] = 0
latency.value = ssConsumeObservation - connObservation
}

recordLatencyHistogram(ctx, latency)
}

func recordEndLatency(ctx context.Context, latency latencyHistogram) {
latency.histogramName = "latency.e2e.histogram"
latency.histogramDescription = "Latency from block time to streamserver"

rpcObservation := getBaggageLatency(ctx, LatencyRpcKey)
ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey)

if ssConsumeObservation > 0 && rpcObservation > 0 {
latencyMetrics[e2eLatency] = ssConsumeObservation - rpcObservation
} else {
latencyMetrics[e2eLatency] = 0
latency.value = ssConsumeObservation - rpcObservation
}

// Record histograms
for key, hist := range histograms {
latency := latencyMetrics[key]
if latency > 0 {
histogramMetric, err := meter.SyncInt64().Histogram(
hist.name,
instrument.WithUnit("microseconds"),
instrument.WithDescription(hist.description),
)
if err != nil {
log.Error().Err(err).Str("histogram", hist.name).Msg("Unable to create histogram")
}
histogramMetric.Record(ctx, latency, attribute.String("Connector", connName), attribute.String("Env", env))
recordLatencyHistogram(ctx, latency)
}

func recordLatencyHistogram(ctx context.Context, latency latencyHistogram) {
if latency.value > 0 {
// Convert latency from microseconds to float milliseconds
latencyValue := float64(latency.value) / 1000
histogram, err := latency.meter.SyncFloat64().Histogram(
latency.histogramName,
instrument.WithUnit(unit.Milliseconds),
instrument.WithDescription(latency.histogramDescription),
)
if err != nil {
log.Error().Err(err).Str("connector", latency.connectorName).Str("histogram", latency.histogramName).Msg("Unable to create histogram")
}
attributes := []attribute.KeyValue{
attribute.String("Connector", latency.connectorName),
attribute.String("Env", latency.env),
}
histogram.Record(ctx, latencyValue, attributes...)
}
}

func getBaggageLatency(bag baggage.Baggage, key string) int64 {
func getBaggageLatency(ctx context.Context, key string) int64 {
bag := baggage.FromContext(ctx)

mem := bag.Member(key)
ts, err := strconv.Atoi(mem.Value())
// Baggage key does not exist
Expand Down
10 changes: 6 additions & 4 deletions monitor/metric_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package monitor

import (
"context"
"testing"

"go.opentelemetry.io/otel/baggage"
)

func TestGetBaggageLatency(t *testing.T) {
type args struct {
bag baggage.Baggage
ctx context.Context
key string
}
tests := []struct {
Expand Down Expand Up @@ -51,15 +52,16 @@ func TestGetBaggageLatency(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getBaggageLatency(tt.args.bag, tt.args.key); got != tt.want {
if got := getBaggageLatency(tt.args.ctx, tt.args.key); got != tt.want {
t.Errorf("Get() = %v, want %v", got, tt.want)
}
})
}
}

func createBaggage(key string, value string) baggage.Baggage {
func createBaggage(key string, value string) context.Context {
member, _ := baggage.NewMember(key, value)
bag, _ := baggage.New(member)
return bag
ctx := baggage.ContextWithBaggage(context.TODO(), bag)
return ctx
}