Skip to content

Commit

Permalink
[chore] [receiver/datadog] Refactor translation files into internal p…
Browse files Browse the repository at this point in the history
…ackage (open-telemetry#34160)

**Description:**
This PR is a follow-up to open-telemetry#33957. It refactors the Datadog receiver
files to remove internal methods and structures from the public API and
into an internal directory.

**Link to tracking Issue:** 
open-telemetry#18278 

**Testing:** 
This is a refactor, so no new unit tests have been added.
  • Loading branch information
carrieedwards authored Jul 19, 2024
1 parent aab1424 commit 74af245
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 88 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -10,16 +10,16 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

type Batcher struct {
type batcher struct {
pmetric.Metrics

resourceMetrics map[identity.Resource]pmetric.ResourceMetrics
scopeMetrics map[identity.Scope]pmetric.ScopeMetrics
metrics map[identity.Metric]pmetric.Metric
}

func newBatcher() Batcher {
return Batcher{
func newBatcher() batcher {
return batcher{
Metrics: pmetric.NewMetrics(),
resourceMetrics: make(map[identity.Resource]pmetric.ResourceMetrics),
scopeMetrics: make(map[identity.Scope]pmetric.ScopeMetrics),
Expand All @@ -30,7 +30,7 @@ func newBatcher() Batcher {
// Dimensions stores the properties of the series that are needed in order
// to unique identify the series. This is needed in order to batch metrics by
// resource, scope, and datapoint attributes
type Dimensions struct {
type dimensions struct {
name string
metricType pmetric.MetricType
resourceAttrs pcommon.Map
Expand All @@ -47,9 +47,9 @@ var metricTypeMap = map[string]pmetric.MetricType{
"sketch": pmetric.MetricTypeExponentialHistogram,
}

func parseSeriesProperties(name string, metricType string, tags []string, host string, version string, stringPool *StringPool) Dimensions {
func parseSeriesProperties(name string, metricType string, tags []string, host string, version string, stringPool *StringPool) dimensions {
resourceAttrs, scopeAttrs, dpAttrs := tagsToAttributes(tags, host, stringPool)
return Dimensions{
return dimensions{
name: name,
metricType: metricTypeMap[metricType],
buildInfo: version,
Expand All @@ -59,7 +59,7 @@ func parseSeriesProperties(name string, metricType string, tags []string, host s
}
}

func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) {
func (b batcher) Lookup(dim dimensions) (pmetric.Metric, identity.Metric) {
resource := dim.Resource()
resourceID := identity.OfResource(resource)
resourceMetrics, ok := b.resourceMetrics[resourceID]
Expand Down Expand Up @@ -90,21 +90,21 @@ func (b Batcher) Lookup(dim Dimensions) (pmetric.Metric, identity.Metric) {
return metric, metricID
}

func (d Dimensions) Resource() pcommon.Resource {
func (d dimensions) Resource() pcommon.Resource {
resource := pcommon.NewResource()
d.resourceAttrs.CopyTo(resource.Attributes()) // TODO(jesus.vazquez) review this copy
return resource
}

func (d Dimensions) Scope() pcommon.InstrumentationScope {
func (d dimensions) Scope() pcommon.InstrumentationScope {
scope := pcommon.NewInstrumentationScope()
scope.SetName("otelcol/datadogreceiver")
scope.SetVersion(d.buildInfo)
d.scopeAttrs.CopyTo(scope.Attributes())
return scope
}

func (d Dimensions) Metric() pmetric.Metric {
func (d dimensions) Metric() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(d.name)
switch d.metricType {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver
package translator

import (
"testing"

"github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
)

Expand Down Expand Up @@ -275,13 +274,8 @@ func TestMetricBatcher(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mt := newMetricsTranslator()
mt.buildInfo = component.BuildInfo{
Command: "otelcol",
Description: "OpenTelemetry Collector",
Version: "latest",
}
result := mt.translateMetricsV1(tt.series)
mt := createMetricsTranslator()
result := mt.TranslateSeriesV1(tt.series)

tt.expect(t, result)
})
Expand Down
41 changes: 41 additions & 0 deletions receiver/datadogreceiver/internal/translator/metrics_translator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

type MetricsTranslator struct {
sync.RWMutex
buildInfo component.BuildInfo
lastTs map[identity.Stream]pcommon.Timestamp
stringPool *StringPool
}

func NewMetricsTranslator(buildInfo component.BuildInfo) *MetricsTranslator {
return &MetricsTranslator{
buildInfo: buildInfo,
lastTs: make(map[identity.Stream]pcommon.Timestamp),
stringPool: newStringPool(),
}
}

func (mt *MetricsTranslator) streamHasTimestamp(stream identity.Stream) (pcommon.Timestamp, bool) {
mt.RLock()
defer mt.RUnlock()
ts, ok := mt.lastTs[stream]
return ts, ok
}

func (mt *MetricsTranslator) updateLastTsForStream(stream identity.Stream, ts pcommon.Timestamp) {
mt.Lock()
defer mt.Unlock()
mt.lastTs[stream] = ts
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,18 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"sync"
"time"

datadogV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

type MetricsTranslator struct {
sync.RWMutex
buildInfo component.BuildInfo
lastTs map[identity.Stream]pcommon.Timestamp
stringPool *StringPool
}

func newMetricsTranslator() *MetricsTranslator {
return &MetricsTranslator{
lastTs: make(map[identity.Stream]pcommon.Timestamp),
stringPool: newStringPool(),
}
}

func (mt *MetricsTranslator) streamHasTimestamp(stream identity.Stream) (pcommon.Timestamp, bool) {
mt.RLock()
defer mt.RUnlock()
ts, ok := mt.lastTs[stream]
return ts, ok
}

func (mt *MetricsTranslator) updateLastTsForStream(stream identity.Stream, ts pcommon.Timestamp) {
mt.Lock()
defer mt.Unlock()
mt.lastTs[stream] = ts
}

const (
TypeGauge string = "gauge"
TypeRate string = "rate"
Expand All @@ -52,7 +23,7 @@ type SeriesList struct {
Series []datadogV1.Series `json:"series"`
}

func (mt *MetricsTranslator) translateMetricsV1(series SeriesList) pmetric.Metrics {
func (mt *MetricsTranslator) TranslateSeriesV1(series SeriesList) pmetric.Metrics {
bt := newBatcher()

for _, serie := range series.Series {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver
package translator

import (
"testing"
Expand All @@ -28,7 +28,7 @@ func testPointsToDatadogPoints(points []testPoint) [][]*float64 {

}

func TestTranslateMetricsV1(t *testing.T) {
func TestTranslateSeriesV1(t *testing.T) {
tests := []struct {
name string

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestTranslateMetricsV1(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mt := createMetricsTranslator()
result := mt.translateMetricsV1(tt.series)
result := mt.TranslateSeriesV1(tt.series)

tt.expect(t, result)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver
package translator

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"testing"
Expand All @@ -13,12 +13,11 @@ import (
)

func createMetricsTranslator() *MetricsTranslator {
mt := newMetricsTranslator()
mt.buildInfo = component.BuildInfo{
mt := NewMetricsTranslator(component.BuildInfo{
Command: "otelcol",
Description: "OpenTelemetry Collector",
Version: "latest",
}
})
return mt
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datadogreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver"
package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator"

import (
"bytes"
Expand Down Expand Up @@ -51,7 +51,7 @@ func upsertHeadersAttributes(req *http.Request, attrs pcommon.Map) {
}
}

func toTraces(payload *pb.TracerPayload, req *http.Request) ptrace.Traces {
func ToTraces(payload *pb.TracerPayload, req *http.Request) ptrace.Traces {
var traces pb.Traces
for _, p := range payload.GetChunks() {
traces = append(traces, p.GetSpans())
Expand Down Expand Up @@ -161,17 +161,17 @@ var bufferPool = sync.Pool{
},
}

func getBuffer() *bytes.Buffer {
func GetBuffer() *bytes.Buffer {
buffer := bufferPool.Get().(*bytes.Buffer)
buffer.Reset()
return buffer
}

func putBuffer(buffer *bytes.Buffer) {
func PutBuffer(buffer *bytes.Buffer) {
bufferPool.Put(buffer)
}

func handleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error) {
func HandleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error) {
var tracerPayloads []*pb.TracerPayload

defer func() {
Expand All @@ -181,8 +181,8 @@ func handleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error)

switch {
case strings.HasPrefix(req.URL.Path, "/v0.7"):
buf := getBuffer()
defer putBuffer(buf)
buf := GetBuffer()
defer PutBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
return nil, err
}
Expand All @@ -193,8 +193,8 @@ func handleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error)

tracerPayloads = append(tracerPayloads, &tracerPayload)
case strings.HasPrefix(req.URL.Path, "/v0.5"):
buf := getBuffer()
defer putBuffer(buf)
buf := GetBuffer()
defer PutBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
return nil, err
}
Expand Down Expand Up @@ -229,8 +229,8 @@ func handleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error)
}
tracerPayloads = append(tracerPayloads, tracerPayload)
case strings.HasPrefix(req.URL.Path, "/api/v0.2"):
buf := getBuffer()
defer putBuffer(buf)
buf := GetBuffer()
defer PutBuffer(buf)
if _, err = io.Copy(buf, req.Body); err != nil {
return nil, err
}
Expand Down Expand Up @@ -265,8 +265,8 @@ func handleTracesPayload(req *http.Request) (tp []*pb.TracerPayload, err error)
func decodeRequest(req *http.Request, dest *pb.Traces) (err error) {
switch mediaType := getMediaType(req); mediaType {
case "application/msgpack":
buf := getBuffer()
defer putBuffer(buf)
buf := GetBuffer()
defer PutBuffer(buf)
_, err = io.Copy(buf, req.Body)
if err != nil {
return err
Expand All @@ -283,8 +283,8 @@ func decodeRequest(req *http.Request, dest *pb.Traces) (err error) {
default:
// do our best
if err1 := json.NewDecoder(req.Body).Decode(&dest); err1 != nil {
buf := getBuffer()
defer putBuffer(buf)
buf := GetBuffer()
defer PutBuffer(buf)
_, err2 := io.Copy(buf, req.Body)
if err2 != nil {
return err2
Expand Down
Loading

0 comments on commit 74af245

Please sign in to comment.