diff --git a/connector/datadogconnector/connector.go b/connector/datadogconnector/connector.go index 703d55eef9ae2..9473027068871 100644 --- a/connector/datadogconnector/connector.go +++ b/connector/datadogconnector/connector.go @@ -6,6 +6,7 @@ package datadogconnector // import "github.com/open-telemetry/opentelemetry-coll import ( "context" "fmt" + "sync" "time" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" @@ -156,13 +157,17 @@ func (c *traceToMetricConnector) populateContainerTagsCache(traces ptrace.Traces ddContainerTags := attributes.ContainerTagsFromResourceAttributes(attrs) for attr := range c.resourceAttrs { if val, ok := ddContainerTags[attr]; ok { - var cacheVal map[string]struct{} + key := fmt.Sprintf("%s:%s", attr, val) if v, ok := c.containerTagCache.Get(containerID.AsString()); ok { - cacheVal = v.(map[string]struct{}) - cacheVal[fmt.Sprintf("%s:%s", attr, val)] = struct{}{} + cacheVal := v.(*sync.Map) + // check if the key already exists in the cache + if _, ok := cacheVal.Load(key); ok { + continue + } + cacheVal.Store(key, struct{}{}) } else { - cacheVal = make(map[string]struct{}) - cacheVal[fmt.Sprintf("%s:%s", attr, val)] = struct{}{} + cacheVal := &sync.Map{} + cacheVal.Store(key, struct{}{}) c.containerTagCache.Set(containerID.AsString(), cacheVal, cache.DefaultExpiration) } @@ -181,14 +186,16 @@ func (c *traceToMetricConnector) enrichStatsPayload(stats *pb.StatsPayload) { for _, stat := range stats.Stats { if stat.ContainerID != "" { if tags, ok := c.containerTagCache.Get(stat.ContainerID); ok { - tagList := tags.(map[string]struct{}) + + tagList := tags.(*sync.Map) for _, tag := range stat.Tags { - tagList[tag] = struct{}{} - } - stat.Tags = make([]string, 0, len(tagList)) - for tag := range tagList { - stat.Tags = append(stat.Tags, tag) + tagList.Store(tag, struct{}{}) } + stat.Tags = make([]string, 0) + tagList.Range(func(key, value any) bool { + stat.Tags = append(stat.Tags, key.(string)) + return true + }) } } } diff --git a/connector/datadogconnector/connector_test.go b/connector/datadogconnector/connector_test.go index dfac3f7f4ad70..57a1a5625a77e 100644 --- a/connector/datadogconnector/connector_test.go +++ b/connector/datadogconnector/connector_test.go @@ -5,6 +5,7 @@ package datadogconnector import ( "context" + "sync" "testing" "time" @@ -19,7 +20,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" - semconv "go.opentelemetry.io/collector/semconv/v1.17.0" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) @@ -97,19 +98,26 @@ func fillSpanOne(span ptrace.Span) { status.SetMessage("status-cancelled") } -func TestContainerTags(t *testing.T) { +func creteConnector(t *testing.T) (*traceToMetricConnector, *consumertest.MetricsSink) { factory := NewFactory() creationParams := connectortest.NewNopCreateSettings() cfg := factory.CreateDefaultConfig().(*Config) cfg.Traces.ResourceAttributesAsContainerTags = []string{semconv.AttributeCloudAvailabilityZone, semconv.AttributeCloudRegion} + metricsSink := &consumertest.MetricsSink{} traceToMetricsConnector, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, metricsSink) assert.NoError(t, err) connector, ok := traceToMetricsConnector.(*traceToMetricConnector) - err = connector.Start(context.Background(), componenttest.NewNopHost()) + require.True(t, ok) + return connector, metricsSink +} + +func TestContainerTags(t *testing.T) { + connector, metricsSink := creteConnector(t) + err := connector.Start(context.Background(), componenttest.NewNopHost()) if err != nil { t.Errorf("Error starting connector: %v", err) return @@ -118,7 +126,6 @@ func TestContainerTags(t *testing.T) { _ = connector.Shutdown(context.Background()) }() - assert.True(t, ok) // checks if the created connector implements the connectorImp struct trace1 := generateTrace() err = connector.ConsumeTraces(context.Background(), trace1) @@ -130,7 +137,12 @@ func TestContainerTags(t *testing.T) { assert.NoError(t, err) // check if the container tags are added to the cache assert.Equal(t, 1, len(connector.containerTagCache.Items())) - assert.Equal(t, 2, len(connector.containerTagCache.Items()["my-container-id"].Object.(map[string]struct{}))) + count := 0 + connector.containerTagCache.Items()["my-container-id"].Object.(*sync.Map).Range(func(key, value any) bool { + count++ + return true + }) + assert.Equal(t, 2, count) for { if len(metricsSink.AllMetrics()) > 0 { @@ -181,3 +193,44 @@ func newTranslatorWithStatsChannel(t *testing.T, logger *zap.Logger, ch chan []b require.NoError(t, err) return tr } + +func TestDataRace(t *testing.T) { + connector, _ := creteConnector(t) + trace1 := generateTrace() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + connector.populateContainerTagsCache(trace1) + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + sp := &pb.StatsPayload{ + Stats: []*pb.ClientStatsPayload{ + { + ContainerID: "my-container-id", + }, + }, + } + connector.enrichStatsPayload(sp) + } + } + }() + wg.Wait() +}