From b66537aab1f8dbecd25295bf630526edefd54bc4 Mon Sep 17 00:00:00 2001 From: SHaaD94 Date: Fri, 26 Apr 2024 18:43:02 +0700 Subject: [PATCH] rollback partitioning by specific resource attributes --- exporter/kafkaexporter/README.md | 4 +- exporter/kafkaexporter/config.go | 10 +- exporter/kafkaexporter/config_test.go | 6 +- exporter/kafkaexporter/factory.go | 9 +- exporter/kafkaexporter/kafka_exporter.go | 4 +- exporter/kafkaexporter/marshaler_test.go | 29 +---- exporter/kafkaexporter/pdata_marshaler.go | 19 +-- exporter/kafkaexporter/testdata/config.yaml | 6 +- pkg/pdatautil/hash.go | 20 ---- pkg/pdatautil/hash_test.go | 124 -------------------- 10 files changed, 18 insertions(+), 213 deletions(-) diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index a1b9f718d297d..2ac969d72edf6 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -36,9 +36,7 @@ The following settings can be optionally configured: - The following encodings are valid *only* for **logs**. - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. - `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. -- `partition_metrics_by_resource_attributes` configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. - - `enabled`: (default = false) - - `attributes`: (default = []) the list of resource attributes to include in the partitioning key. If empty, all resource attributes are included. +- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. - `auth` - `plain_text` - `username`: The username to use. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index 5cf7fa403ceec..baefca3ce23ae 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -48,7 +48,7 @@ type Config struct { // trace ID as the message key by default. PartitionTracesByID bool `mapstructure:"partition_traces_by_id"` - PartitionMetricsByResourceAttributes PartitionByResourceAttributes `mapstructure:"partition_metrics_by_resource_attributes"` + PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"` // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. @@ -61,14 +61,6 @@ type Config struct { Authentication kafka.Authentication `mapstructure:"auth"` } -// PartitionByResourceAttributes defines configuration for partitioning by resource attributes. -type PartitionByResourceAttributes struct { - Enabled bool `mapstructure:"enabled"` - - // The list of resource attributes to use for partitioning, empty by default - Attributes []string `mapstructure:"attributes"` -} - // Metadata defines configuration for retrieving metadata from the broker. type Metadata struct { // Whether to maintain a full set of metadata for all topics, or just diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index 9e4dba77def46..4b43f948ebe36 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -58,7 +58,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, - PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true}, + PartitionMetricsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", Authentication: kafka.Authentication{ @@ -113,7 +113,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, - PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true}, + PartitionMetricsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", Authentication: kafka.Authentication{ @@ -167,7 +167,7 @@ func TestLoadConfig(t *testing.T) { Topic: "spans", Encoding: "otlp_proto", PartitionTracesByID: true, - PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true}, + PartitionMetricsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", ResolveCanonicalBootstrapServersOnly: true, diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index 0909f6884abb3..d990a17dab8d0 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -99,12 +99,9 @@ func createDefaultConfig() component.Config { Brokers: []string{defaultBroker}, ClientID: defaultClientID, // using an empty topic to track when it has not been set by user, default is based on traces or metrics. - Topic: "", - Encoding: defaultEncoding, - PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{ - Enabled: defaultPartitionMetricsByResourceAttributesEnabled, - Attributes: []string{}, - }, + Topic: "", + Encoding: defaultEncoding, + PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled, Metadata: Metadata{ Full: defaultMetadataFull, Retry: MetadataRetry{ diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 0f26df3bc9e90..c6b82dcfc66a7 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -211,9 +211,9 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m if marshaler == nil { return nil, errUnrecognizedEncoding } - if config.PartitionMetricsByResourceAttributes.Enabled { + if config.PartitionMetricsByResourceAttributes { if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok { - keyableMarshaler.Key(config.PartitionMetricsByResourceAttributes.Attributes) + keyableMarshaler.Key() } } diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index a11506631706c..97b110fd945c3 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -76,48 +76,21 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { tests := []struct { name string keyEnabled bool - attributes []string messagePartitionKeys []sarama.Encoder }{ { name: "partitioning_disabled", keyEnabled: false, - attributes: []string{}, - messagePartitionKeys: []sarama.Encoder{nil}, - }, - { - name: "partitioning_disabled_keys_are_not_empty", - keyEnabled: false, - attributes: []string{"service.name"}, messagePartitionKeys: []sarama.Encoder{nil}, }, { name: "partitioning_enabled", keyEnabled: true, - attributes: []string{}, messagePartitionKeys: []sarama.Encoder{ sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16}, sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44}, }, }, - { - name: "partitioning_enabled_with_keys", - keyEnabled: true, - attributes: []string{"service.instance.id"}, - messagePartitionKeys: []sarama.Encoder{ - sarama.ByteEncoder{0xf9, 0x1e, 0x59, 0x41, 0xb5, 0x16, 0xfa, 0xdf, 0xc1, 0x79, 0xa3, 0x54, 0x68, 0x1d, 0xb6, 0xc8}, - sarama.ByteEncoder{0x47, 0xac, 0xe2, 0x30, 0xd, 0x72, 0xd1, 0x82, 0xa5, 0xd, 0xe3, 0xa4, 0x64, 0xd3, 0x6b, 0xb5}, - }, - }, - { - name: "partitioning_enabled_keys_do_not_exist", - keyEnabled: true, - attributes: []string{"non_existing_key"}, - messagePartitionKeys: []sarama.Encoder{ - sarama.ByteEncoder{0x99, 0xe9, 0xd8, 0x51, 0x37, 0xdb, 0x46, 0xef, 0xfe, 0x7c, 0x8e, 0x2d, 0x85, 0x35, 0xce, 0xeb}, - sarama.ByteEncoder{0x99, 0xe9, 0xd8, 0x51, 0x37, 0xdb, 0x46, 0xef, 0xfe, 0x7c, 0x8e, 0x2d, 0x85, 0x35, 0xce, 0xeb}, - }, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -147,7 +120,7 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler) require.True(t, ok, "Must be a KeyableMetricsMarshaler") if tt.keyEnabled { - keyableMarshaler.Key(tt.attributes) + keyableMarshaler.Key() } msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX") diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index e619fb1d6b39d..415546e6b2050 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -46,20 +46,18 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha // for metrics messages type KeyableMetricsMarshaler interface { MetricsMarshaler - Key(attributes []string) + Key() } type pdataMetricsMarshaler struct { - marshaler pmetric.Marshaler - encoding string - keyed bool - keyAttributes []string + marshaler pmetric.Marshaler + encoding string + keyed bool } // Key configures the pdataMetricsMarshaler to set the message key on the kafka messages -func (p *pdataMetricsMarshaler) Key(attributes []string) { +func (p *pdataMetricsMarshaler) Key() { p.keyed = true - p.keyAttributes = attributes } func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) { @@ -69,12 +67,7 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar for i := 0; i < metrics.Len(); i++ { resourceMetrics := metrics.At(i) - var hash [16]byte - if len(p.keyAttributes) > 0 { - hash = pdatautil.MapHashSelectedKeys(resourceMetrics.Resource().Attributes(), p.keyAttributes) - } else { - hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) - } + var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) newMetrics := pmetric.NewMetrics() resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty()) diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index 3ceaa60a741b4..7c89bea74ade1 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -13,11 +13,7 @@ kafka: required_acks: -1 # WaitForAll timeout: 10s partition_traces_by_id: true - partition_metrics_by_resource_attributes: - enabled: true - attributes: - - k1 - - k2 + partition_metrics_by_resource_attributes: true auth: plain_text: username: jdoe diff --git a/pkg/pdatautil/hash.go b/pkg/pdatautil/hash.go index 8484510c6a6e2..6826de769b898 100644 --- a/pkg/pdatautil/hash.go +++ b/pkg/pdatautil/hash.go @@ -63,26 +63,6 @@ func MapHash(m pcommon.Map) [16]byte { return hw.hashSum128() } -// MapHashSelectedKeys return a hash for the provided map, using values of only provided keys. -// Order of hash calculation is determined by the order of keys. -func MapHashSelectedKeys(m pcommon.Map, keys []string) [16]byte { - if m.Len() == 0 || len(keys) == 0 { - return emptyHash - } - - hw := hashWriterPool.Get().(*hashWriter) - defer hashWriterPool.Put(hw) - hw.byteBuf = hw.byteBuf[:0] - - for _, k := range keys { - if v, ok := m.Get(k); ok { - hw.writeValueHash(v) - } - } - - return hw.hashSum128() -} - // ValueHash return a hash for the provided pcommon.Value. func ValueHash(v pcommon.Value) [16]byte { hw := hashWriterPool.Get().(*hashWriter) diff --git a/pkg/pdatautil/hash_test.go b/pkg/pdatautil/hash_test.go index 1cb655762a743..a725676f37ac1 100644 --- a/pkg/pdatautil/hash_test.go +++ b/pkg/pdatautil/hash_test.go @@ -150,130 +150,6 @@ func TestMapHash(t *testing.T) { } } -func TestMapHashSelectedKeys(t *testing.T) { - tests := []struct { - name string - maps []pcommon.Map - keys [][]string - equal bool - }{ - { - name: "empty_maps", - maps: []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}, - keys: [][]string{{}, {}}, - equal: true, - }, - { - name: "same_maps_different_order", - keys: [][]string{{"k1", "k2"}, {"k1", "k2"}}, - maps: func() []pcommon.Map { - m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} - m[0].PutStr("k1", "v1") - m[0].PutInt("k2", 1) - m[0].PutDouble("k3", 1) - m[0].PutBool("k4", true) - m[0].PutEmptyBytes("k5").FromRaw([]byte("abc")) - sl := m[0].PutEmptySlice("k6") - sl.AppendEmpty().SetStr("str") - sl.AppendEmpty().SetBool(true) - m0 := m[0].PutEmptyMap("k") - m0.PutInt("k1", 1) - m0.PutDouble("k2", 10) - - m1 := m[1].PutEmptyMap("k") - m1.PutDouble("k2", 10) - m1.PutInt("k1", 1) - m[1].PutEmptyBytes("k5").FromRaw([]byte("abc")) - m[1].PutBool("k4", true) - sl = m[1].PutEmptySlice("k6") - sl.AppendEmpty().SetStr("str") - sl.AppendEmpty().SetBool(true) - m[1].PutInt("k2", 1) - m[1].PutStr("k1", "v1") - m[1].PutDouble("k3", 1) - - return m - }(), - equal: true, - }, - { - name: "different_maps_having_same_keys", - keys: [][]string{{"k2", "k3"}, {"k2", "k3"}}, - maps: func() []pcommon.Map { - m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} - m[0].PutStr("k1", "v1") - m[0].PutInt("k2", 1) - m[0].PutDouble("k3", 1) - - m[1].PutInt("k2", 1) - m[1].PutDouble("k3", 1) - m[1].PutDouble("k4", 1) - - return m - }(), - equal: true, - }, - { - name: "same_maps_different_key_order", - keys: [][]string{{"k2", "k3"}, {"k3", "k2"}}, - maps: func() []pcommon.Map { - m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} - m[0].PutInt("k2", 1) - m[0].PutDouble("k3", 1) - - m[1].PutInt("k2", 1) - m[1].PutDouble("k3", 1) - - return m - }(), - equal: false, - }, - { - name: "same_maps_with_not_existing_keys", - keys: [][]string{{"k10", "k11"}, {"k10", "k11"}}, - maps: func() []pcommon.Map { - m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} - m[0].PutStr("k1", "v1") - - m[1].PutInt("k2", 1) - - return m - }(), - equal: true, - }, - { - name: "different_maps", - keys: [][]string{{"k2", "k3"}, {"k2", "k3"}}, - maps: func() []pcommon.Map { - m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()} - m[0].PutInt("k2", 2) - m[0].PutDouble("k3", 2) - - m[1].PutInt("k2", 1) - m[1].PutDouble("k3", 1) - - return m - }(), - equal: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for i := 0; i < len(tt.maps); i++ { - for j := i + 1; j < len(tt.maps); j++ { - if tt.equal { - assert.Equal(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), - "maps %d %v and %d %v must have the same hash, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) - } else { - assert.NotEqual(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]), - "maps %d %v and %d %v must have different hashes, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j]) - } - } - } - }) - } -} - func TestValueHash(t *testing.T) { tests := []struct { name string