Skip to content

Commit

Permalink
feat: implement converstion for Exemplars and Flags
Browse files Browse the repository at this point in the history
Closes #266

For each metric table "foo" in v1 schema, another metric table "foo_exemplar" is created.
Also added conversion for flags (logs and metrics) and observed time
(logs).

With the change, I also reviewed most of the otel2influx code, looking
for other missing otel model properties, and other small refactors.
  • Loading branch information
jacobmarble committed Aug 31, 2023
1 parent c74f672 commit a235dfa
Show file tree
Hide file tree
Showing 17 changed files with 721 additions and 701 deletions.
9 changes: 7 additions & 2 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ const (
MetricSummaryQuantileKeyV2 = "quantile"
MetricSummaryCountSuffix = "_count"
MetricSummarySumSuffix = "_sum"
MetricExemplarSuffix = "_exemplar"

// These attribute key names are influenced by the proto message keys.
// https://github.com/open-telemetry/opentelemetry-proto/blob/abbf7b7b49a5342d0d6c0e86e91d713bbedb6580/opentelemetry/proto/trace/v1/trace.proto
// https://github.com/open-telemetry/opentelemetry-proto/blob/abbf7b7b49a5342d0d6c0e86e91d713bbedb6580/opentelemetry/proto/metrics/v1/metrics.proto
// https://github.com/open-telemetry/opentelemetry-proto/blob/abbf7b7b49a5342d0d6c0e86e91d713bbedb6580/opentelemetry/proto/logs/v1/logs.proto
AttributeTime = "time"
AttributeStartTimeUnixNano = "start_time_unix_nano"
AttributeTime = "time"
AttributeStartTimeUnixNano = "start_time_unix_nano"
AttributeObservedTimeUnixNano = "observed_time_unix_nano"
// string formatted RFC3339, used by the otel statsd input plugin
AttributeStartTimeStatsd = "start_time"
AttributeTraceID = "trace_id"
AttributeSpanID = "span_id"
AttributeTraceState = "trace_state"
Expand All @@ -67,4 +71,5 @@ const (
AttributeSeverityNumber = "severity_number"
AttributeSeverityText = "severity_text"
AttributeBody = "body"
AttributeFlags = "flags"
)
20 changes: 10 additions & 10 deletions influx2otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (b *MetricsBatch) lookupMetric(metricName string, tags map[string]string, v
rAttributes.PutStr(k, v)
case k == "temporality" && v == "delta":
isDelta = true
case k == "start_time":
case k == common.AttributeStartTimeStatsd:
default:
mAttributes.PutStr(k, v)
}
Expand All @@ -112,10 +112,10 @@ func (b *MetricsBatch) lookupMetric(metricName string, tags map[string]string, v

ilmKey := ilName + ":" + ilVersion
if _, found := b.ilmByRMAttributesAndIL[rKey][ilmKey]; !found {
ilMetrics := resourceMetrics.ScopeMetrics().AppendEmpty()
ilMetrics.Scope().SetName(ilName)
ilMetrics.Scope().SetVersion(ilVersion)
b.ilmByRMAttributesAndIL[rKey][ilmKey] = ilMetrics
isMetrics := resourceMetrics.ScopeMetrics().AppendEmpty()
isMetrics.Scope().SetName(ilName)
isMetrics.Scope().SetVersion(ilVersion)
b.ilmByRMAttributesAndIL[rKey][ilmKey] = isMetrics
b.metricByRMIL[rKey][ilmKey] = make(map[string]pmetric.Metric)
}

Expand Down Expand Up @@ -182,9 +182,9 @@ func (b *MetricsBatch) GetMetrics() pmetric.Metrics {
// Ensure that infinity histogram buckets exist.
for _, resourceMetrics := range b.rmByAttributes {
for i := 0; i < resourceMetrics.ScopeMetrics().Len(); i++ {
ilMetrics := resourceMetrics.ScopeMetrics().At(i)
for j := 0; j < ilMetrics.Metrics().Len(); j++ {
metric := ilMetrics.Metrics().At(j)
isMetrics := resourceMetrics.ScopeMetrics().At(i)
for j := 0; j < isMetrics.Metrics().Len(); j++ {
metric := isMetrics.Metrics().At(j)
if metric.Type() == pmetric.MetricTypeHistogram {
for k := 0; k < metric.Histogram().DataPoints().Len(); k++ {
dataPoint := metric.Histogram().DataPoints().At(k)
Expand Down Expand Up @@ -217,7 +217,7 @@ func (b *MetricsBatch) addPointWithUnknownSchema(measurement string, tags map[st
}

for k, v := range fields {
if k == "start_time" {
if k == common.AttributeStartTimeStatsd {
continue
}
var floatValue *float64
Expand Down Expand Up @@ -245,7 +245,7 @@ func (b *MetricsBatch) addPointWithUnknownSchema(measurement string, tags map[st
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
// set start_time, if exists and is RFC3339
// used by statsd input plugin
if startTimeObj, ok := fields["start_time"]; ok {
if startTimeObj, ok := fields[common.AttributeStartTimeStatsd]; ok {
if startTimeStr, ok := startTimeObj.(string); ok {
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(t))
Expand Down
40 changes: 20 additions & 20 deletions influx2otel/metrics_statsd_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func TestStatsdTimingSchema(t *testing.T) {

expect := pmetric.NewMetrics()
rm := expect.ResourceMetrics().AppendEmpty()
ilMetrics := rm.ScopeMetrics().AppendEmpty()
m := ilMetrics.Metrics().AppendEmpty()
isMetrics := rm.ScopeMetrics().AppendEmpty()
m := isMetrics.Metrics().AppendEmpty()
m.SetName("test_service_stage_metrics_biz_success_v4_count")
m.SetEmptyGauge()
dp := m.Gauge().DataPoints().AppendEmpty()
Expand All @@ -48,7 +48,7 @@ func TestStatsdTimingSchema(t *testing.T) {
dp.SetTimestamp(pcommon.Timestamp(1395066363000000123))
dp.SetDoubleValue(10)

m = ilMetrics.Metrics().AppendEmpty()
m = isMetrics.Metrics().AppendEmpty()
m.SetName("test_service_stage_metrics_biz_success_v4_lower")
m.SetEmptyGauge()
dp = m.Gauge().DataPoints().AppendEmpty()
Expand All @@ -57,7 +57,7 @@ func TestStatsdTimingSchema(t *testing.T) {
dp.SetTimestamp(pcommon.Timestamp(1395066363000000123))
dp.SetDoubleValue(10)

m = ilMetrics.Metrics().AppendEmpty()
m = isMetrics.Metrics().AppendEmpty()
m.SetName("test_service_stage_metrics_biz_success_v4_mean")
m.SetEmptyGauge()
dp = m.Gauge().DataPoints().AppendEmpty()
Expand All @@ -66,7 +66,7 @@ func TestStatsdTimingSchema(t *testing.T) {
dp.SetTimestamp(pcommon.Timestamp(1395066363000000123))
dp.SetDoubleValue(10)

m = ilMetrics.Metrics().AppendEmpty()
m = isMetrics.Metrics().AppendEmpty()
m.SetName("test_service_stage_metrics_biz_success_v4_median")
m.SetEmptyGauge()
dp = m.Gauge().DataPoints().AppendEmpty()
Expand All @@ -75,7 +75,7 @@ func TestStatsdTimingSchema(t *testing.T) {
dp.SetTimestamp(pcommon.Timestamp(1395066363000000123))
dp.SetDoubleValue(10)

m = ilMetrics.Metrics().AppendEmpty()
m = isMetrics.Metrics().AppendEmpty()
m.SetName("test_service_stage_metrics_biz_success_v4_stddev")
m.SetEmptyGauge()
dp = m.Gauge().DataPoints().AppendEmpty()
Expand All @@ -84,7 +84,7 @@ func TestStatsdTimingSchema(t *testing.T) {
dp.SetTimestamp(pcommon.Timestamp(1395066363000000123))
dp.SetDoubleValue(10)

m = ilMetrics.Metrics().AppendEmpty()
m = isMetrics.Metrics().AppendEmpty()
m.SetName("test_service_stage_metrics_biz_success_v4_sum")
m.SetEmptyGauge()
dp = m.Gauge().DataPoints().AppendEmpty()
Expand All @@ -93,7 +93,7 @@ func TestStatsdTimingSchema(t *testing.T) {
dp.SetTimestamp(pcommon.Timestamp(1395066363000000123))
dp.SetDoubleValue(100)

m = ilMetrics.Metrics().AppendEmpty()
m = isMetrics.Metrics().AppendEmpty()
m.SetName("test_service_stage_metrics_biz_success_v4_upper")
m.SetEmptyGauge()
dp = m.Gauge().DataPoints().AppendEmpty()
Expand Down Expand Up @@ -126,8 +126,8 @@ func TestStatsCounter(t *testing.T) {

expect := pmetric.NewMetrics()
rm := expect.ResourceMetrics().AppendEmpty()
ilMetrics := rm.ScopeMetrics().AppendEmpty()
m := ilMetrics.Metrics().AppendEmpty()
isMetrics := rm.ScopeMetrics().AppendEmpty()
m := isMetrics.Metrics().AppendEmpty()
m.SetName("gorets_value")
m.SetEmptySum()
m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
Expand Down Expand Up @@ -163,8 +163,8 @@ func TestStatsDeltaCounter(t *testing.T) {

expect := pmetric.NewMetrics()
rm := expect.ResourceMetrics().AppendEmpty()
ilMetrics := rm.ScopeMetrics().AppendEmpty()
m := ilMetrics.Metrics().AppendEmpty()
isMetrics := rm.ScopeMetrics().AppendEmpty()
m := isMetrics.Metrics().AppendEmpty()
m.SetName("gorets_value")
m.SetEmptySum()
m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
Expand Down Expand Up @@ -199,8 +199,8 @@ func TestStatsGauge(t *testing.T) {

expect := pmetric.NewMetrics()
rm := expect.ResourceMetrics().AppendEmpty()
ilMetrics := rm.ScopeMetrics().AppendEmpty()
m := ilMetrics.Metrics().AppendEmpty()
isMetrics := rm.ScopeMetrics().AppendEmpty()
m := isMetrics.Metrics().AppendEmpty()
m.SetName("gaugor_value")
m.SetEmptyGauge()
dp := m.Gauge().DataPoints().AppendEmpty()
Expand Down Expand Up @@ -235,8 +235,8 @@ func TestStatsdSetsSchema(t *testing.T) {

expect := pmetric.NewMetrics()
rm := expect.ResourceMetrics().AppendEmpty()
ilMetrics := rm.ScopeMetrics().AppendEmpty()
m := ilMetrics.Metrics().AppendEmpty()
isMetrics := rm.ScopeMetrics().AppendEmpty()
m := isMetrics.Metrics().AppendEmpty()
m.SetName("uniques_value")
m.SetEmptyGauge()
dp := m.Gauge().DataPoints().AppendEmpty()
Expand All @@ -262,17 +262,17 @@ func TestDeltaTemporalityStatsdCounter(t *testing.T) {
"temporality": "delta",
},
map[string]interface{}{
"value": int64(10),
"start_time": "2023-04-13T22:34:00.000535129+03:00",
"value": int64(10),
common.AttributeStartTimeStatsd: "2023-04-13T22:34:00.000535129+03:00",
},
time.Unix(0, 1395066363000000123),
common.InfluxMetricValueTypeSum)
require.NoError(t, err)

expect := pmetric.NewMetrics()
rm := expect.ResourceMetrics().AppendEmpty()
ilMetrics := rm.ScopeMetrics().AppendEmpty()
m := ilMetrics.Metrics().AppendEmpty()
isMetrics := rm.ScopeMetrics().AppendEmpty()
m := isMetrics.Metrics().AppendEmpty()
m.SetName("gorets_value")
m.SetEmptySum()
m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
Expand Down
67 changes: 40 additions & 27 deletions influx2otel/metrics_telegraf_prometheus_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,18 @@ func (b *MetricsBatch) convertGaugeV1(measurement string, tags map[string]string
dataPoint := metric.Gauge().DataPoints().AppendEmpty()
attributes.CopyTo(dataPoint.Attributes())
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
// set start_time, if exists and is RFC3339
// used by statsd input plugin
if startTimeObj, ok := fields["start_time"]; ok {
if startTimeObj, ok := fields[common.AttributeStartTimeStatsd]; ok {
if startTimeStr, ok := startTimeObj.(string); ok {
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(t))
}
}
}
if flagsObj, ok := fields[common.AttributeFlags]; ok {
if flagsUint64, ok := flagsObj.(uint64); ok {
dataPoint.SetFlags(pmetric.DataPointFlags(flagsUint64))
}
}

if floatValue != nil {
dataPoint.SetDoubleValue(*floatValue)
Expand All @@ -111,9 +114,7 @@ func (b *MetricsBatch) convertGaugeV1(measurement string, tags map[string]string
var floatValue *float64
var intValue *int64

// start_time is a metadata field about the metric,
// provided by statsd plugin.
if k == "start_time" {
if k == common.AttributeStartTimeStatsd || k == common.AttributeFlags {
continue
}
switch typedValue := fieldValue.(type) {
Expand All @@ -137,15 +138,18 @@ func (b *MetricsBatch) convertGaugeV1(measurement string, tags map[string]string
dataPoint := metric.Gauge().DataPoints().AppendEmpty()
attributes.CopyTo(dataPoint.Attributes())
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
// set start_time, if exists and is RFC3339
// used by statsd input plugin
if startTimeObj, ok := fields["start_time"]; ok {
if startTimeObj, ok := fields[common.AttributeStartTimeStatsd]; ok {
if startTimeStr, ok := startTimeObj.(string); ok {
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(t))
}
}
}
if flagsObj, ok := fields[common.AttributeFlags]; ok {
if flagsUint64, ok := flagsObj.(uint64); ok {
dataPoint.SetFlags(pmetric.DataPointFlags(flagsUint64))
}
}

if floatValue != nil {
dataPoint.SetDoubleValue(*floatValue)
Expand Down Expand Up @@ -182,15 +186,18 @@ func (b *MetricsBatch) convertSumV1(measurement string, tags map[string]string,
dataPoint := metric.Sum().DataPoints().AppendEmpty()
attributes.CopyTo(dataPoint.Attributes())
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
// set start_time, if exists and is RFC3339
// used by statsd input plugin
if startTimeObj, ok := fields["start_time"]; ok {
if startTimeObj, ok := fields[common.AttributeStartTimeStatsd]; ok {
if startTimeStr, ok := startTimeObj.(string); ok {
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(t))
}
}
}
if flagsObj, ok := fields[common.AttributeFlags]; ok {
if flagsUint64, ok := flagsObj.(uint64); ok {
dataPoint.SetFlags(pmetric.DataPointFlags(flagsUint64))
}
}

if floatValue != nil {
dataPoint.SetDoubleValue(*floatValue)
Expand All @@ -204,9 +211,7 @@ func (b *MetricsBatch) convertSumV1(measurement string, tags map[string]string,
}

for k, fieldValue := range fields {
// start_time is a metadata field about the metric,
// provided by statsd plugin.
if k == "start_time" {
if k == common.AttributeStartTimeStatsd || k == common.AttributeFlags {
continue
}

Expand All @@ -233,15 +238,18 @@ func (b *MetricsBatch) convertSumV1(measurement string, tags map[string]string,
dataPoint := metric.Sum().DataPoints().AppendEmpty()
attributes.CopyTo(dataPoint.Attributes())
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
// set start_time, if exists and is RFC3339
// used by statsd input plugin
if startTimeObj, ok := fields["start_time"]; ok {
if startTimeObj, ok := fields[common.AttributeStartTimeStatsd]; ok {
if startTimeStr, ok := startTimeObj.(string); ok {
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(t))
}
}
}
if flagsObj, ok := fields[common.AttributeFlags]; ok {
if flagsUint64, ok := flagsObj.(uint64); ok {
dataPoint.SetFlags(pmetric.DataPointFlags(flagsUint64))
}
}

if floatValue != nil {
dataPoint.SetDoubleValue(*floatValue)
Expand Down Expand Up @@ -287,7 +295,7 @@ func (b *MetricsBatch) convertHistogramV1(measurement string, tags map[string]st
bucketCounts = append(bucketCounts, uint64(vBucketCount))
}

} else if k == "start_time" {
} else if k == common.AttributeStartTimeStatsd || k == common.AttributeFlags {
} else {
b.logger.Debug("skipping unrecognized histogram field", "field", k, "value", vi)
}
Expand Down Expand Up @@ -322,15 +330,18 @@ func (b *MetricsBatch) convertHistogramV1(measurement string, tags map[string]st
dataPoint := metric.Histogram().DataPoints().AppendEmpty()
attributes.CopyTo(dataPoint.Attributes())
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
// set start_time, if exists and is RFC3339
// used by statsd input plugin
if startTimeObj, ok := fields["start_time"]; ok {
if startTimeObj, ok := fields[common.AttributeStartTimeStatsd]; ok {
if startTimeStr, ok := startTimeObj.(string); ok {
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(t))
}
}
}
if flagsObj, ok := fields[common.AttributeFlags]; ok {
if flagsUint64, ok := flagsObj.(uint64); ok {
dataPoint.SetFlags(pmetric.DataPointFlags(flagsUint64))
}
}

dataPoint.SetCount(count)
dataPoint.SetSum(sum)
Expand Down Expand Up @@ -371,8 +382,7 @@ func (b *MetricsBatch) convertSummaryV1(measurement string, tags map[string]stri
valueAtQuantile.SetValue(value)
}

} else if k == "start_time" {

} else if k == common.AttributeStartTimeStatsd || k == common.AttributeFlags {
} else {
b.logger.Debug("skipping unrecognized summary field", "field", k, "value", vi)
}
Expand All @@ -391,15 +401,18 @@ func (b *MetricsBatch) convertSummaryV1(measurement string, tags map[string]stri
dataPoint := metric.Summary().DataPoints().AppendEmpty()
attributes.CopyTo(dataPoint.Attributes())
dataPoint.SetTimestamp(pcommon.NewTimestampFromTime(ts))
// set start_time, if exists and is RFC3339
// used by statsd input plugin
if startTimeObj, ok := fields["start_time"]; ok {
if startTimeObj, ok := fields[common.AttributeStartTimeStatsd]; ok {
if startTimeStr, ok := startTimeObj.(string); ok {
if t, err := time.Parse(time.RFC3339, startTimeStr); err == nil {
dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(t))
}
}
}
if flagsObj, ok := fields[common.AttributeFlags]; ok {
if flagsUint64, ok := flagsObj.(uint64); ok {
dataPoint.SetFlags(pmetric.DataPointFlags(flagsUint64))
}
}

dataPoint.SetCount(count)
dataPoint.SetSum(sum)
Expand Down
Loading

0 comments on commit a235dfa

Please sign in to comment.