Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions go/pdata/metrics/internal/baseotlptostef.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,42 @@ type BaseOtlpToStef struct {
Otlp2tef otlptools.Otlp2Stef
}

func AggregationTemporalityToStef(flags pmetric.AggregationTemporality) otelstef.AggregationTemporality {
func AggregationTemporalityToStef(flags pmetric.AggregationTemporality) (otelstef.AggregationTemporality, error) {
switch flags {
case pmetric.AggregationTemporalityDelta:
return otelstef.AggregationTemporalityDelta
return otelstef.AggregationTemporalityDelta, nil
case pmetric.AggregationTemporalityCumulative:
return otelstef.AggregationTemporalityCumulative
return otelstef.AggregationTemporalityCumulative, nil
case pmetric.AggregationTemporalityUnspecified:
return otelstef.AggregationTemporalityUnspecified
return otelstef.AggregationTemporalityUnspecified, nil
default:
panic("unexpected aggregation temporality")
return 0, fmt.Errorf("unexpected aggregation temporality: %v", flags)
}
}

func (c *BaseOtlpToStef) ConvertNumDatapoint(dst *otelstef.Point, src pmetric.NumberDataPoint) {
func (c *BaseOtlpToStef) ConvertNumDatapoint(dst *otelstef.Point, src pmetric.NumberDataPoint) error {
dst.SetTimestamp(uint64(src.Timestamp()))
dst.SetStartTimestamp(uint64(src.StartTimestamp()))

if src.Flags().NoRecordedValue() {
dst.Value().SetType(otelstef.PointValueTypeNone)
return
return nil
}

switch src.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
dst.Value().SetInt64(src.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
dst.Value().SetFloat64(src.DoubleValue())
case pmetric.NumberDataPointValueTypeEmpty:
dst.Value().SetType(otelstef.PointValueTypeNone)
default:
panic("Unsupported number datapoint value type")
return fmt.Errorf("unsupported number datapoint value type: %v", src.ValueType())
}
return nil
}

func (c *BaseOtlpToStef) ConvertExemplars(dst *otelstef.ExemplarArray, src pmetric.ExemplarSlice) {
func (c *BaseOtlpToStef) ConvertExemplars(dst *otelstef.ExemplarArray, src pmetric.ExemplarSlice) error {
dst.EnsureLen(src.Len())

for i := 0; i < src.Len(); i++ {
Expand All @@ -73,9 +76,10 @@ func (c *BaseOtlpToStef) ConvertExemplars(dst *otelstef.ExemplarArray, src pmetr
case pmetric.ExemplarValueTypeEmpty:
dstExemplar.Value().SetType(otelstef.ExemplarValueTypeNone)
default:
panic("unknown Exemplar value type")
return fmt.Errorf("unknown exemplar value type: %v", srcExemplar.ValueType())
}
}
return nil
}

func (c *BaseOtlpToStef) ConvertHistogram(dst *otelstef.Point, src pmetric.HistogramDataPoint) error {
Expand Down
87 changes: 87 additions & 0 deletions go/pdata/metrics/internal/baseotlptostef_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package internal

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/splunk/stef/go/otel/otelstef"
)

func TestConvertNumDatapoint_EmptyValueType(t *testing.T) {
c := &BaseOtlpToStef{}
dst := otelstef.NewPoint()
src := pmetric.NewNumberDataPoint()
src.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
// Don't set any value — ValueType will be NumberDataPointValueTypeEmpty

err := c.ConvertNumDatapoint(dst, src)
require.NoError(t, err)
assert.Equal(t, otelstef.PointValueTypeNone, dst.Value().Type())
}

func TestConvertNumDatapoint_IntValue(t *testing.T) {
c := &BaseOtlpToStef{}
dst := otelstef.NewPoint()
src := pmetric.NewNumberDataPoint()
src.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
src.SetIntValue(42)

err := c.ConvertNumDatapoint(dst, src)
require.NoError(t, err)
assert.Equal(t, int64(42), dst.Value().Int64())
}

func TestConvertNumDatapoint_DoubleValue(t *testing.T) {
c := &BaseOtlpToStef{}
dst := otelstef.NewPoint()
src := pmetric.NewNumberDataPoint()
src.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
src.SetDoubleValue(3.14)

err := c.ConvertNumDatapoint(dst, src)
require.NoError(t, err)
assert.InDelta(t, 3.14, dst.Value().Float64(), 0.001)
}

func TestConvertNumDatapoint_NoRecordedValue(t *testing.T) {
c := &BaseOtlpToStef{}
dst := otelstef.NewPoint()
src := pmetric.NewNumberDataPoint()
src.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
src.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))

err := c.ConvertNumDatapoint(dst, src)
require.NoError(t, err)
assert.Equal(t, otelstef.PointValueTypeNone, dst.Value().Type())
}

func TestAggregationTemporalityToStef_AllValues(t *testing.T) {
tests := []struct {
name string
input pmetric.AggregationTemporality
expected otelstef.AggregationTemporality
wantErr bool
}{
{name: "delta", input: pmetric.AggregationTemporalityDelta, expected: otelstef.AggregationTemporalityDelta},
{name: "cumulative", input: pmetric.AggregationTemporalityCumulative, expected: otelstef.AggregationTemporalityCumulative},
{name: "unspecified", input: pmetric.AggregationTemporalityUnspecified, expected: otelstef.AggregationTemporalityUnspecified},
{name: "unknown", input: pmetric.AggregationTemporality(99), wantErr: true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := AggregationTemporalityToStef(tt.input)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tt.expected, result)
}
})
}
}
66 changes: 47 additions & 19 deletions go/pdata/metrics/otlp2stef_unsorted.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/splunk/stef/go/otel/otelstef"
Expand Down Expand Up @@ -28,25 +30,39 @@ func (d *OtlpToStefUnsorted) Convert(src pmetric.Metrics, writer *otelstef.Metri
otlp2stef.ScopeUnsorted(writer.Record.Scope(), smm.Scope(), smm.SchemaUrl())
for k := 0; k < smm.Metrics().Len(); k++ {
m := smm.Metrics().At(k)
metric2metric(m, writer.Record.Metric(), otlp2stef)
if err := metric2metric(m, writer.Record.Metric(), otlp2stef); err != nil {
return err
}
var err error
switch m.Type() {
case pmetric.MetricTypeGauge:
err = d.writeNumeric(writer, m.Gauge().DataPoints())
case pmetric.MetricTypeSum:
writer.Record.Metric().SetAggregationTemporality(internal.AggregationTemporalityToStef(m.Sum().AggregationTemporality()))
at, atErr := internal.AggregationTemporalityToStef(m.Sum().AggregationTemporality())
if atErr != nil {
return atErr
}
writer.Record.Metric().SetAggregationTemporality(at)
writer.Record.Metric().SetMonotonic(m.Sum().IsMonotonic())
err = d.writeNumeric(writer, m.Sum().DataPoints())
case pmetric.MetricTypeHistogram:
writer.Record.Metric().SetAggregationTemporality(internal.AggregationTemporalityToStef(m.Histogram().AggregationTemporality()))
at, atErr := internal.AggregationTemporalityToStef(m.Histogram().AggregationTemporality())
if atErr != nil {
return atErr
}
writer.Record.Metric().SetAggregationTemporality(at)
err = d.writeHistogram(writer, m.Histogram().DataPoints())
case pmetric.MetricTypeExponentialHistogram:
writer.Record.Metric().SetAggregationTemporality(internal.AggregationTemporalityToStef(m.ExponentialHistogram().AggregationTemporality()))
at, atErr := internal.AggregationTemporalityToStef(m.ExponentialHistogram().AggregationTemporality())
if atErr != nil {
return atErr
}
writer.Record.Metric().SetAggregationTemporality(at)
err = d.writeExpHistogram(writer, m.ExponentialHistogram().DataPoints())
case pmetric.MetricTypeSummary:
err = d.writeSummary(writer, m.Summary().DataPoints())
default:
panic("Unsupported metric type")
return fmt.Errorf("unsupported metric type: %v", m.Type())
}
if err != nil {
return err
Expand All @@ -57,44 +73,52 @@ func (d *OtlpToStefUnsorted) Convert(src pmetric.Metrics, writer *otelstef.Metri
return nil
}

func metricType(typ pmetric.MetricType) otelstef.MetricType {
func metricType(typ pmetric.MetricType) (otelstef.MetricType, error) {
switch typ {
case pmetric.MetricTypeGauge:
return otelstef.MetricTypeGauge
return otelstef.MetricTypeGauge, nil
case pmetric.MetricTypeSum:
return otelstef.MetricTypeSum
return otelstef.MetricTypeSum, nil
case pmetric.MetricTypeHistogram:
return otelstef.MetricTypeHistogram
return otelstef.MetricTypeHistogram, nil
case pmetric.MetricTypeExponentialHistogram:
return otelstef.MetricTypeExpHistogram
return otelstef.MetricTypeExpHistogram, nil
case pmetric.MetricTypeSummary:
return otelstef.MetricTypeSummary
return otelstef.MetricTypeSummary, nil
default:
panic("Unsupported metric value")
return 0, fmt.Errorf("unsupported metric type: %v", typ)
}
return 0
}

func metric2metric(
src pmetric.Metric, // histogramBounds []float64,
dst *otelstef.Metric,
otlp2stef *otlptools.Otlp2Stef,
) {
) error {
otlp2stef.MapUnsorted(src.Metadata(), dst.Metadata())
dst.SetName(src.Name())
dst.SetDescription(src.Description())
dst.SetUnit(src.Unit())
dst.SetType(metricType(src.Type()))
mt, err := metricType(src.Type())
if err != nil {
return err
}
dst.SetType(mt)
return nil
}

func (d *OtlpToStefUnsorted) writeNumeric(writer *otelstef.MetricsWriter, src pmetric.NumberDataPointSlice) error {
for i := 0; i < src.Len(); i++ {
srcPoint := src.At(i)
dstPoint := writer.Record.Point()

d.base.ConvertNumDatapoint(dstPoint, srcPoint)
if err := d.base.ConvertNumDatapoint(dstPoint, srcPoint); err != nil {
return err
}
d.base.Otlp2tef.MapUnsorted(srcPoint.Attributes(), writer.Record.Attributes())
d.base.ConvertExemplars(dstPoint.Exemplars(), srcPoint.Exemplars())
if err := d.base.ConvertExemplars(dstPoint.Exemplars(), srcPoint.Exemplars()); err != nil {
return err
}

err := writer.Write()
if err != nil {
Expand All @@ -116,7 +140,9 @@ func (d *OtlpToStefUnsorted) writeHistogram(writer *otelstef.MetricsWriter, src

d.base.Otlp2tef.MapUnsorted(src.Attributes(), writer.Record.Attributes())
writer.Record.Metric().HistogramBounds().CopyFromSlice(src.ExplicitBounds().AsRaw())
d.base.ConvertExemplars(dst.Exemplars(), src.Exemplars())
if err := d.base.ConvertExemplars(dst.Exemplars(), src.Exemplars()); err != nil {
return err
}

err = writer.Write()
if err != nil {
Expand All @@ -139,7 +165,9 @@ func (d *OtlpToStefUnsorted) writeExpHistogram(
}

d.base.Otlp2tef.MapUnsorted(src.Attributes(), writer.Record.Attributes())
d.base.ConvertExemplars(dst.Exemplars(), src.Exemplars())
if err := d.base.ConvertExemplars(dst.Exemplars(), src.Exemplars()); err != nil {
return err
}

err = writer.Write()
if err != nil {
Expand Down
100 changes: 100 additions & 0 deletions go/pdata/metrics/otlp2stef_unsorted_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package metrics

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/splunk/stef/go/otel/otelstef"
"github.com/splunk/stef/go/pkg"
)

func TestConvertUnsorted_EmptyNumberDataPointValueType(t *testing.T) {
// Build metrics with a Gauge that has an empty value type data point.
// This previously caused a panic in ConvertNumDatapoint.
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
sm := rm.ScopeMetrics().AppendEmpty()
m := sm.Metrics().AppendEmpty()
m.SetName("test_gauge")
m.SetEmptyGauge()
dp := m.Gauge().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
// Don't set any value — ValueType will be NumberDataPointValueTypeEmpty

buf := &pkg.MemChunkWriter{}
writer, err := otelstef.NewMetricsWriter(buf, pkg.WriterOptions{})
require.NoError(t, err)

converter := OtlpToStefUnsorted{}
err = converter.Convert(metrics, writer)
require.NoError(t, err, "Convert should not panic or error on empty value type")

err = writer.Flush()
require.NoError(t, err)
}

func TestConvertUnsorted_MixedValueTypes(t *testing.T) {
// Build metrics with a mix of int, double, and empty value types.
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
sm := rm.ScopeMetrics().AppendEmpty()
m := sm.Metrics().AppendEmpty()
m.SetName("test_gauge")
m.SetEmptyGauge()

// Data point with int value
dp1 := m.Gauge().DataPoints().AppendEmpty()
dp1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp1.SetIntValue(42)

// Data point with empty value (previously caused panic)
dp2 := m.Gauge().DataPoints().AppendEmpty()
dp2.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))

// Data point with double value
dp3 := m.Gauge().DataPoints().AppendEmpty()
dp3.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp3.SetDoubleValue(3.14)

buf := &pkg.MemChunkWriter{}
writer, err := otelstef.NewMetricsWriter(buf, pkg.WriterOptions{})
require.NoError(t, err)

converter := OtlpToStefUnsorted{}
err = converter.Convert(metrics, writer)
require.NoError(t, err)

err = writer.Flush()
require.NoError(t, err)

// All 3 data points should have been written
require.EqualValues(t, 3, writer.RecordCount())
}

func TestConvertSorted_EmptyNumberDataPointValueType(t *testing.T) {
// Same test for the sorted converter path.
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
sm := rm.ScopeMetrics().AppendEmpty()
m := sm.Metrics().AppendEmpty()
m.SetName("test_gauge")
m.SetEmptyGauge()
dp := m.Gauge().DataPoints().AppendEmpty()
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
// Don't set any value — ValueType will be NumberDataPointValueTypeEmpty

buf := &pkg.MemChunkWriter{}
writer, err := otelstef.NewMetricsWriter(buf, pkg.WriterOptions{})
require.NoError(t, err)

converter := OtlpToStefSorted{}
err = converter.Convert(metrics, writer)
require.NoError(t, err, "Convert should not panic or error on empty value type")

err = writer.Flush()
require.NoError(t, err)
}
Loading
Loading