From 8ec79e2cee21ce1d56bc120647babce98d9acabc Mon Sep 17 00:00:00 2001 From: Gunnar Sundberg Date: Thu, 17 Nov 2022 16:23:24 -0500 Subject: [PATCH 1/4] refactor and convert histograms to milliseconds --- monitor/metric.go | 178 +++++++++++++++++++++++++++------------------- 1 file changed, 105 insertions(+), 73 deletions(-) diff --git a/monitor/metric.go b/monitor/metric.go index daab64f..19f4446 100644 --- a/monitor/metric.go +++ b/monitor/metric.go @@ -9,11 +9,16 @@ import ( "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/unit" ) -type Histogram struct { - name string - description string +type latencyHistogram struct { + meter metric.Meter + histogramName string + histogramDescription string + connectorName string + env string + value int64 } const ( @@ -22,102 +27,129 @@ const ( LatencyConnectorKey = "latencyconn" LatencyKafkaProduceKey = "latencykafkaprod" LatencyStreamserverConsumeKey = "latencyssconsume" - - // Metric keys - rpcLatency = 0 - connLatency = 1 - coreLatency = 2 - systemLatency = 3 - e2eLatency = 4 -) - -var ( - latencyObservations = map[string]int64{ - LatencyRpcKey: 0, - LatencyConnectorKey: 0, - LatencyKafkaProduceKey: 0, - LatencyStreamserverConsumeKey: 0, - } - latencyMetrics = map[int]int64{ - rpcLatency: 0, - connLatency: 0, - coreLatency: 0, - systemLatency: 0, - e2eLatency: 0, - } - histograms = map[int]Histogram{ - rpcLatency: {name: "latency.rpc.histogram", description: "Latency from block time to connector reception"}, - connLatency: {name: "latency.connector.histogram", description: "Latency from connector reception to kafka produce"}, - coreLatency: {name: "latency.core.histogram", description: "Latency from kafka to streamserver"}, - systemLatency: {name: "latency.system.histogram", description: "Latency from connector to streamserver"}, - e2eLatency: {name: "latency.e2e.histogram", description: "Latency from block time to streamserver"}, - } ) // ExportLatencyMetrics exports latency metrics from baggage in ctx -func ExportLatencyMetrics(ctx context.Context, meter metric.Meter, connName string, env string) { - bag := baggage.FromContext(ctx) - // Extract latency observations from baggage - for key := range latencyObservations { - latencyObservations[key] = getBaggageLatency(bag, key) +func RecordRpcLatency(ctx context.Context, meter metric.Meter, connName string, env string) { + rpcLatency := latencyHistogram{ + meter: meter, + histogramName: "latency.rpc.histogram", + histogramDescription: "Latency from block time to connector reception", + connectorName: connName, + env: env, + value: 0, } - // Get observations - rpcObservation := latencyObservations[LatencyRpcKey] - connObservation := latencyObservations[LatencyConnectorKey] - kafkaProduceObservation := latencyObservations[LatencyKafkaProduceKey] - ssConsumeObservation := latencyObservations[LatencyStreamserverConsumeKey] + rpcObservation := getBaggageLatency(ctx, LatencyRpcKey) + connObservation := getBaggageLatency(ctx, LatencyConnectorKey) - // Derive latency metrics from observations if rpcObservation > 0 && connObservation > 0 { - latencyMetrics[rpcLatency] = connObservation - rpcObservation - } else { - latencyMetrics[rpcLatency] = 0 + rpcLatency.value = connObservation - rpcObservation + } + + recordLatencyHistogram(ctx, rpcLatency) +} + +func RecordConnectorLatency(ctx context.Context, meter metric.Meter, connName string, env string) { + connectorLatency := latencyHistogram{ + meter: meter, + histogramName: "latency.connector.histogram", + histogramDescription: "Latency from connector reception to kafka produce", + connectorName: connName, + env: env, + value: 0, } + connObservation := getBaggageLatency(ctx, LatencyConnectorKey) + kafkaProduceObservation := getBaggageLatency(ctx, LatencyKafkaProduceKey) + if kafkaProduceObservation > 0 && connObservation > 0 { - latencyMetrics[connLatency] = kafkaProduceObservation - connObservation - } else { - latencyMetrics[connLatency] = 0 + connectorLatency.value = kafkaProduceObservation - connObservation + } + + recordLatencyHistogram(ctx, connectorLatency) +} + +func RecordCoreLatency(ctx context.Context, meter metric.Meter, connName string, env string) { + coreLatency := latencyHistogram{ + meter: meter, + histogramName: "latency.core.histogram", + histogramDescription: "Latency from kafka to streamserver", + connectorName: connName, + env: env, + value: 0, } + kafkaProduceObservation := getBaggageLatency(ctx, LatencyKafkaProduceKey) + ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey) + if ssConsumeObservation > 0 && kafkaProduceObservation > 0 { - latencyMetrics[coreLatency] = ssConsumeObservation - kafkaProduceObservation - } else { - latencyMetrics[coreLatency] = 0 + coreLatency.value = ssConsumeObservation - kafkaProduceObservation } + recordLatencyHistogram(ctx, coreLatency) +} + +func RecordSystemLatency(ctx context.Context, meter metric.Meter, connName string, env string) { + systemLatency := latencyHistogram{ + meter: meter, + histogramName: "latency.system.histogram", + histogramDescription: "Latency from connector to streamserver", + connectorName: connName, + env: env, + value: 0, + } + + connObservation := getBaggageLatency(ctx, LatencyConnectorKey) + ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey) + if ssConsumeObservation > 0 && connObservation > 0 { - latencyMetrics[systemLatency] = ssConsumeObservation - connObservation - } else { - latencyMetrics[systemLatency] = 0 + systemLatency.value = ssConsumeObservation - connObservation + } + + recordLatencyHistogram(ctx, systemLatency) +} + +func RecordEndLatency(ctx context.Context, meter metric.Meter, connName string, env string) { + e2eLatency := latencyHistogram{ + meter: meter, + histogramName: "latency.e2e.histogram", + histogramDescription: "Latency from block time to streamserver", + connectorName: connName, + env: env, + value: 0, } + rpcObservation := getBaggageLatency(ctx, LatencyRpcKey) + ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey) + if ssConsumeObservation > 0 && rpcObservation > 0 { - latencyMetrics[e2eLatency] = ssConsumeObservation - rpcObservation - } else { - latencyMetrics[e2eLatency] = 0 + e2eLatency.value = ssConsumeObservation - rpcObservation } - // Record histograms - for key, hist := range histograms { - latency := latencyMetrics[key] - if latency > 0 { - histogramMetric, err := meter.SyncInt64().Histogram( - hist.name, - instrument.WithUnit("microseconds"), - instrument.WithDescription(hist.description), - ) - if err != nil { - log.Error().Err(err).Str("histogram", hist.name).Msg("Unable to create histogram") - } - histogramMetric.Record(ctx, latency, attribute.String("Connector", connName), attribute.String("Env", env)) + recordLatencyHistogram(ctx, e2eLatency) +} + +func recordLatencyHistogram(ctx context.Context, obs latencyHistogram) { + if obs.value > 0 { + // Convert latency from microseconds to float milliseconds + latency := float64(obs.value) / 1000 + histogram, err := obs.meter.SyncFloat64().Histogram( + obs.histogramName, + instrument.WithUnit(unit.Milliseconds), + instrument.WithDescription(obs.histogramDescription), + ) + if err != nil { + log.Error().Err(err).Str("connector", obs.connectorName).Str("histogram", obs.histogramName).Msg("Unable to create histogram") } + histogram.Record(ctx, latency, attribute.String("Connector", obs.connectorName), attribute.String("Env", obs.env)) } } -func getBaggageLatency(bag baggage.Baggage, key string) int64 { +func getBaggageLatency(ctx context.Context, key string) int64 { + bag := baggage.FromContext(ctx) + mem := bag.Member(key) ts, err := strconv.Atoi(mem.Value()) // Baggage key does not exist From 1db99cf83abc52202414f6b1717c2fddd41291dc Mon Sep 17 00:00:00 2001 From: Gunnar Sundberg Date: Thu, 17 Nov 2022 16:23:43 -0500 Subject: [PATCH 2/4] update test to use ctx --- monitor/metric_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/monitor/metric_test.go b/monitor/metric_test.go index d629f96..15f7ec6 100644 --- a/monitor/metric_test.go +++ b/monitor/metric_test.go @@ -1,6 +1,7 @@ package monitor import ( + "context" "testing" "go.opentelemetry.io/otel/baggage" @@ -8,7 +9,7 @@ import ( func TestGetBaggageLatency(t *testing.T) { type args struct { - bag baggage.Baggage + ctx context.Context key string } tests := []struct { @@ -51,15 +52,16 @@ func TestGetBaggageLatency(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := getBaggageLatency(tt.args.bag, tt.args.key); got != tt.want { + if got := getBaggageLatency(tt.args.ctx, tt.args.key); got != tt.want { t.Errorf("Get() = %v, want %v", got, tt.want) } }) } } -func createBaggage(key string, value string) baggage.Baggage { +func createBaggage(key string, value string) context.Context { member, _ := baggage.NewMember(key, value) bag, _ := baggage.New(member) - return bag + ctx := baggage.ContextWithBaggage(context.TODO(), bag) + return ctx } From 435baf00c016324f72cb6dd3ea46f159ff537947 Mon Sep 17 00:00:00 2001 From: Gunnar Sundberg Date: Thu, 17 Nov 2022 17:03:32 -0500 Subject: [PATCH 3/4] keep latency observations backwards compatible --- monitor/metric.go | 116 +++++++++++++++++++++------------------------- 1 file changed, 52 insertions(+), 64 deletions(-) diff --git a/monitor/metric.go b/monitor/metric.go index 19f4446..757b516 100644 --- a/monitor/metric.go +++ b/monitor/metric.go @@ -29,121 +29,109 @@ const ( LatencyStreamserverConsumeKey = "latencyssconsume" ) -// ExportLatencyMetrics exports latency metrics from baggage in ctx - -func RecordRpcLatency(ctx context.Context, meter metric.Meter, connName string, env string) { - rpcLatency := latencyHistogram{ - meter: meter, - histogramName: "latency.rpc.histogram", - histogramDescription: "Latency from block time to connector reception", - connectorName: connName, - env: env, - value: 0, +// ExportLatencyMetrics exports all latency metrics from baggage in ctx +func ExportLatencyMetrics(ctx context.Context, meter metric.Meter, connectorNmae string, env string) { + latency := latencyHistogram{ + meter: meter, + connectorName: connectorName, + env: env, + value: 0, } + recordRpcLatency(ctx, latency) + recordConnectorLatency(ctx, latency) + recordCoreLatency(ctx, latency) + recordSystemLatency(ctx, latency) + recordEndLatency(ctx, latency) +} + +func recordRpcLatency(ctx context.Context, latency latencyHistogram) { + latency.histogramName = "latency.rpc.histogram" + latency.histogramDescription = "Latency from block time to connector reception" + rpcObservation := getBaggageLatency(ctx, LatencyRpcKey) connObservation := getBaggageLatency(ctx, LatencyConnectorKey) if rpcObservation > 0 && connObservation > 0 { - rpcLatency.value = connObservation - rpcObservation + latency.value = connObservation - rpcObservation } - recordLatencyHistogram(ctx, rpcLatency) + recordLatencyHistogram(ctx, latency) } -func RecordConnectorLatency(ctx context.Context, meter metric.Meter, connName string, env string) { - connectorLatency := latencyHistogram{ - meter: meter, - histogramName: "latency.connector.histogram", - histogramDescription: "Latency from connector reception to kafka produce", - connectorName: connName, - env: env, - value: 0, - } +func recordConnectorLatency(ctx context.Context, latency latencyHistogram) { + latency.histogramName = "latency.connector.histogram" + latency.histogramDescription = "Latency from connector reception to kafka produce" connObservation := getBaggageLatency(ctx, LatencyConnectorKey) kafkaProduceObservation := getBaggageLatency(ctx, LatencyKafkaProduceKey) if kafkaProduceObservation > 0 && connObservation > 0 { - connectorLatency.value = kafkaProduceObservation - connObservation + latency.value = kafkaProduceObservation - connObservation } - recordLatencyHistogram(ctx, connectorLatency) + recordLatencyHistogram(ctx, latency) } -func RecordCoreLatency(ctx context.Context, meter metric.Meter, connName string, env string) { - coreLatency := latencyHistogram{ - meter: meter, - histogramName: "latency.core.histogram", - histogramDescription: "Latency from kafka to streamserver", - connectorName: connName, - env: env, - value: 0, - } +func recordCoreLatency(ctx context.Context, latency latencyHistogram) { + latency.histogramName = "latency.core.histogram" + latency.histogramDescription = "Latency from kafka to streamserver" kafkaProduceObservation := getBaggageLatency(ctx, LatencyKafkaProduceKey) ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey) if ssConsumeObservation > 0 && kafkaProduceObservation > 0 { - coreLatency.value = ssConsumeObservation - kafkaProduceObservation + latency.value = ssConsumeObservation - kafkaProduceObservation } - recordLatencyHistogram(ctx, coreLatency) + recordLatencyHistogram(ctx, latency) } -func RecordSystemLatency(ctx context.Context, meter metric.Meter, connName string, env string) { - systemLatency := latencyHistogram{ - meter: meter, - histogramName: "latency.system.histogram", - histogramDescription: "Latency from connector to streamserver", - connectorName: connName, - env: env, - value: 0, - } +func recordSystemLatency(ctx context.Context, latency latencyHistogram) { + latency.histogramName = "latency.system.histogram" + latency.histogramDescription = "Latency from connector to streamserver" connObservation := getBaggageLatency(ctx, LatencyConnectorKey) ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey) if ssConsumeObservation > 0 && connObservation > 0 { - systemLatency.value = ssConsumeObservation - connObservation + latency.value = ssConsumeObservation - connObservation } - recordLatencyHistogram(ctx, systemLatency) + recordLatencyHistogram(ctx, latency) } -func RecordEndLatency(ctx context.Context, meter metric.Meter, connName string, env string) { - e2eLatency := latencyHistogram{ - meter: meter, - histogramName: "latency.e2e.histogram", - histogramDescription: "Latency from block time to streamserver", - connectorName: connName, - env: env, - value: 0, - } +func recordEndLatency(ctx context.Context, latency latencyHistogram) { + latency.histogramName = "latency.e2e.histogram" + latency.histogramDescription = "Latency from block time to streamserver" rpcObservation := getBaggageLatency(ctx, LatencyRpcKey) ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey) if ssConsumeObservation > 0 && rpcObservation > 0 { - e2eLatency.value = ssConsumeObservation - rpcObservation + latency.value = ssConsumeObservation - rpcObservation } - recordLatencyHistogram(ctx, e2eLatency) + recordLatencyHistogram(ctx, latency) } -func recordLatencyHistogram(ctx context.Context, obs latencyHistogram) { - if obs.value > 0 { +func recordLatencyHistogram(ctx context.Context, latency latencyHistogram) { + if latency.value > 0 { // Convert latency from microseconds to float milliseconds - latency := float64(obs.value) / 1000 - histogram, err := obs.meter.SyncFloat64().Histogram( - obs.histogramName, + latencyValue := float64(latency.value) / 1000 + histogram, err := latency.meter.SyncFloat64().Histogram( + latency.histogramName, instrument.WithUnit(unit.Milliseconds), - instrument.WithDescription(obs.histogramDescription), + instrument.WithDescription(latency.histogramDescription), ) if err != nil { - log.Error().Err(err).Str("connector", obs.connectorName).Str("histogram", obs.histogramName).Msg("Unable to create histogram") + log.Error().Err(err).Str("connector", latency.connectorName).Str("histogram", latency.histogramName).Msg("Unable to create histogram") + } + attributes := []attribute.KeyValue{ + attribute.String("Connector", latency.connectorName), + attribute.String("Env", latency.env), } - histogram.Record(ctx, latency, attribute.String("Connector", obs.connectorName), attribute.String("Env", obs.env)) + histogram.Record(ctx, latencyValue, attributes...) } } From 0f8a14cf8e935fc7619592f0feeaab41076b927d Mon Sep 17 00:00:00 2001 From: Gunnar Sundberg Date: Fri, 18 Nov 2022 18:14:13 -0500 Subject: [PATCH 4/4] update function design --- monitor/metric.go | 91 +++++++++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 35 deletions(-) diff --git a/monitor/metric.go b/monitor/metric.go index 757b516..7c62dfe 100644 --- a/monitor/metric.go +++ b/monitor/metric.go @@ -13,7 +13,6 @@ import ( ) type latencyHistogram struct { - meter metric.Meter histogramName string histogramDescription string connectorName string @@ -30,24 +29,26 @@ const ( ) // ExportLatencyMetrics exports all latency metrics from baggage in ctx -func ExportLatencyMetrics(ctx context.Context, meter metric.Meter, connectorNmae string, env string) { - latency := latencyHistogram{ - meter: meter, - connectorName: connectorName, - env: env, - value: 0, +func ExportLatencyMetrics(ctx context.Context, meter metric.Meter, connectorName string, env string) { + for _, latency := range []latencyHistogram{ + getRPCLatency(ctx, connectorName, env), + getConnectorLatency(ctx, connectorName, env), + getCoreLatency(ctx, connectorName, env), + getSystemLatency(ctx, connectorName, env), + getEndLatency(ctx, connectorName, env), + } { + recordLatencyHistogram(ctx, meter, latency) } - - recordRpcLatency(ctx, latency) - recordConnectorLatency(ctx, latency) - recordCoreLatency(ctx, latency) - recordSystemLatency(ctx, latency) - recordEndLatency(ctx, latency) } -func recordRpcLatency(ctx context.Context, latency latencyHistogram) { - latency.histogramName = "latency.rpc.histogram" - latency.histogramDescription = "Latency from block time to connector reception" +func getRPCLatency(ctx context.Context, connectorName string, env string) latencyHistogram { + latency := latencyHistogram{ + histogramName: "latency.rpc.histogram", + histogramDescription: "Latency from block time to connector reception", + connectorName: connectorName, + env: env, + value: 0, + } rpcObservation := getBaggageLatency(ctx, LatencyRpcKey) connObservation := getBaggageLatency(ctx, LatencyConnectorKey) @@ -56,12 +57,17 @@ func recordRpcLatency(ctx context.Context, latency latencyHistogram) { latency.value = connObservation - rpcObservation } - recordLatencyHistogram(ctx, latency) + return latency } -func recordConnectorLatency(ctx context.Context, latency latencyHistogram) { - latency.histogramName = "latency.connector.histogram" - latency.histogramDescription = "Latency from connector reception to kafka produce" +func getConnectorLatency(ctx context.Context, connectorName string, env string) latencyHistogram { + latency := latencyHistogram{ + histogramName: "latency.connector.histogram", + histogramDescription: "Latency from connector reception to kafka produce", + connectorName: connectorName, + env: env, + value: 0, + } connObservation := getBaggageLatency(ctx, LatencyConnectorKey) kafkaProduceObservation := getBaggageLatency(ctx, LatencyKafkaProduceKey) @@ -70,12 +76,17 @@ func recordConnectorLatency(ctx context.Context, latency latencyHistogram) { latency.value = kafkaProduceObservation - connObservation } - recordLatencyHistogram(ctx, latency) + return latency } -func recordCoreLatency(ctx context.Context, latency latencyHistogram) { - latency.histogramName = "latency.core.histogram" - latency.histogramDescription = "Latency from kafka to streamserver" +func getCoreLatency(ctx context.Context, connectorName string, env string) latencyHistogram { + latency := latencyHistogram{ + histogramName: "latency.core.histogram", + histogramDescription: "Latency from kafka to streamserver", + connectorName: connectorName, + env: env, + value: 0, + } kafkaProduceObservation := getBaggageLatency(ctx, LatencyKafkaProduceKey) ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey) @@ -84,12 +95,17 @@ func recordCoreLatency(ctx context.Context, latency latencyHistogram) { latency.value = ssConsumeObservation - kafkaProduceObservation } - recordLatencyHistogram(ctx, latency) + return latency } -func recordSystemLatency(ctx context.Context, latency latencyHistogram) { - latency.histogramName = "latency.system.histogram" - latency.histogramDescription = "Latency from connector to streamserver" +func getSystemLatency(ctx context.Context, connectorName string, env string) latencyHistogram { + latency := latencyHistogram{ + histogramName: "latency.system.histogram", + histogramDescription: "Latency from connector to streamserver", + connectorName: connectorName, + env: env, + value: 0, + } connObservation := getBaggageLatency(ctx, LatencyConnectorKey) ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey) @@ -98,12 +114,17 @@ func recordSystemLatency(ctx context.Context, latency latencyHistogram) { latency.value = ssConsumeObservation - connObservation } - recordLatencyHistogram(ctx, latency) + return latency } -func recordEndLatency(ctx context.Context, latency latencyHistogram) { - latency.histogramName = "latency.e2e.histogram" - latency.histogramDescription = "Latency from block time to streamserver" +func getEndLatency(ctx context.Context, connectorName string, env string) latencyHistogram { + latency := latencyHistogram{ + histogramName: "latency.e2e.histogram", + histogramDescription: "Latency from block time to streamserver", + connectorName: connectorName, + env: env, + value: 0, + } rpcObservation := getBaggageLatency(ctx, LatencyRpcKey) ssConsumeObservation := getBaggageLatency(ctx, LatencyStreamserverConsumeKey) @@ -112,14 +133,14 @@ func recordEndLatency(ctx context.Context, latency latencyHistogram) { latency.value = ssConsumeObservation - rpcObservation } - recordLatencyHistogram(ctx, latency) + return latency } -func recordLatencyHistogram(ctx context.Context, latency latencyHistogram) { +func recordLatencyHistogram(ctx context.Context, meter metric.Meter, latency latencyHistogram) { if latency.value > 0 { // Convert latency from microseconds to float milliseconds latencyValue := float64(latency.value) / 1000 - histogram, err := latency.meter.SyncFloat64().Histogram( + histogram, err := meter.SyncFloat64().Histogram( latency.histogramName, instrument.WithUnit(unit.Milliseconds), instrument.WithDescription(latency.histogramDescription),