Skip to content

Commit

Permalink
rollback partitioning by specific resource attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
SHaaD94 committed Apr 26, 2024
1 parent 965631f commit b66537a
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 213 deletions.
4 changes: 1 addition & 3 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ The following settings can be optionally configured:
- The following encodings are valid *only* for **logs**.
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
- `partition_metrics_by_resource_attributes` configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `enabled`: (default = false)
- `attributes`: (default = []) the list of resource attributes to include in the partitioning key. If empty, all resource attributes are included.
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
10 changes: 1 addition & 9 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Config struct {
// trace ID as the message key by default.
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`

PartitionMetricsByResourceAttributes PartitionByResourceAttributes `mapstructure:"partition_metrics_by_resource_attributes"`
PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Expand All @@ -61,14 +61,6 @@ type Config struct {
Authentication kafka.Authentication `mapstructure:"auth"`
}

// PartitionByResourceAttributes defines configuration for partitioning by resource attributes.
type PartitionByResourceAttributes struct {
Enabled bool `mapstructure:"enabled"`

// The list of resource attributes to use for partitioning, empty by default
Attributes []string `mapstructure:"attributes"`
}

// Metadata defines configuration for retrieving metadata from the broker.
type Metadata struct {
// Whether to maintain a full set of metadata for all topics, or just
Expand Down
6 changes: 3 additions & 3 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestLoadConfig(t *testing.T) {
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true},
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestLoadConfig(t *testing.T) {
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true},
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestLoadConfig(t *testing.T) {
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{Attributes: []string{"k1", "k2"}, Enabled: true},
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
ResolveCanonicalBootstrapServersOnly: true,
Expand Down
9 changes: 3 additions & 6 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,9 @@ func createDefaultConfig() component.Config {
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
Topic: "",
Encoding: defaultEncoding,
PartitionMetricsByResourceAttributes: PartitionByResourceAttributes{
Enabled: defaultPartitionMetricsByResourceAttributesEnabled,
Attributes: []string{},
},
Topic: "",
Encoding: defaultEncoding,
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand Down
4 changes: 2 additions & 2 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
if config.PartitionMetricsByResourceAttributes.Enabled {
if config.PartitionMetricsByResourceAttributes {
if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok {
keyableMarshaler.Key(config.PartitionMetricsByResourceAttributes.Attributes)
keyableMarshaler.Key()
}
}

Expand Down
29 changes: 1 addition & 28 deletions exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,48 +76,21 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) {
tests := []struct {
name string
keyEnabled bool
attributes []string
messagePartitionKeys []sarama.Encoder
}{
{
name: "partitioning_disabled",
keyEnabled: false,
attributes: []string{},
messagePartitionKeys: []sarama.Encoder{nil},
},
{
name: "partitioning_disabled_keys_are_not_empty",
keyEnabled: false,
attributes: []string{"service.name"},
messagePartitionKeys: []sarama.Encoder{nil},
},
{
name: "partitioning_enabled",
keyEnabled: true,
attributes: []string{},
messagePartitionKeys: []sarama.Encoder{
sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16},
sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44},
},
},
{
name: "partitioning_enabled_with_keys",
keyEnabled: true,
attributes: []string{"service.instance.id"},
messagePartitionKeys: []sarama.Encoder{
sarama.ByteEncoder{0xf9, 0x1e, 0x59, 0x41, 0xb5, 0x16, 0xfa, 0xdf, 0xc1, 0x79, 0xa3, 0x54, 0x68, 0x1d, 0xb6, 0xc8},
sarama.ByteEncoder{0x47, 0xac, 0xe2, 0x30, 0xd, 0x72, 0xd1, 0x82, 0xa5, 0xd, 0xe3, 0xa4, 0x64, 0xd3, 0x6b, 0xb5},
},
},
{
name: "partitioning_enabled_keys_do_not_exist",
keyEnabled: true,
attributes: []string{"non_existing_key"},
messagePartitionKeys: []sarama.Encoder{
sarama.ByteEncoder{0x99, 0xe9, 0xd8, 0x51, 0x37, 0xdb, 0x46, 0xef, 0xfe, 0x7c, 0x8e, 0x2d, 0x85, 0x35, 0xce, 0xeb},
sarama.ByteEncoder{0x99, 0xe9, 0xd8, 0x51, 0x37, 0xdb, 0x46, 0xef, 0xfe, 0x7c, 0x8e, 0x2d, 0x85, 0x35, 0xce, 0xeb},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -147,7 +120,7 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) {
keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler)
require.True(t, ok, "Must be a KeyableMetricsMarshaler")
if tt.keyEnabled {
keyableMarshaler.Key(tt.attributes)
keyableMarshaler.Key()
}

msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX")
Expand Down
19 changes: 6 additions & 13 deletions exporter/kafkaexporter/pdata_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,18 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha
// for metrics messages
type KeyableMetricsMarshaler interface {
MetricsMarshaler
Key(attributes []string)
Key()
}

type pdataMetricsMarshaler struct {
marshaler pmetric.Marshaler
encoding string
keyed bool
keyAttributes []string
marshaler pmetric.Marshaler
encoding string
keyed bool
}

// Key configures the pdataMetricsMarshaler to set the message key on the kafka messages
func (p *pdataMetricsMarshaler) Key(attributes []string) {
func (p *pdataMetricsMarshaler) Key() {
p.keyed = true
p.keyAttributes = attributes
}

func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) {
Expand All @@ -69,12 +67,7 @@ func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sar

for i := 0; i < metrics.Len(); i++ {
resourceMetrics := metrics.At(i)
var hash [16]byte
if len(p.keyAttributes) > 0 {
hash = pdatautil.MapHashSelectedKeys(resourceMetrics.Resource().Attributes(), p.keyAttributes)
} else {
hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes())
}
var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes())

newMetrics := pmetric.NewMetrics()
resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty())
Expand Down
6 changes: 1 addition & 5 deletions exporter/kafkaexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ kafka:
required_acks: -1 # WaitForAll
timeout: 10s
partition_traces_by_id: true
partition_metrics_by_resource_attributes:
enabled: true
attributes:
- k1
- k2
partition_metrics_by_resource_attributes: true
auth:
plain_text:
username: jdoe
Expand Down
20 changes: 0 additions & 20 deletions pkg/pdatautil/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,6 @@ func MapHash(m pcommon.Map) [16]byte {
return hw.hashSum128()
}

// MapHashSelectedKeys return a hash for the provided map, using values of only provided keys.
// Order of hash calculation is determined by the order of keys.
func MapHashSelectedKeys(m pcommon.Map, keys []string) [16]byte {
if m.Len() == 0 || len(keys) == 0 {
return emptyHash
}

hw := hashWriterPool.Get().(*hashWriter)
defer hashWriterPool.Put(hw)
hw.byteBuf = hw.byteBuf[:0]

for _, k := range keys {
if v, ok := m.Get(k); ok {
hw.writeValueHash(v)
}
}

return hw.hashSum128()
}

// ValueHash return a hash for the provided pcommon.Value.
func ValueHash(v pcommon.Value) [16]byte {
hw := hashWriterPool.Get().(*hashWriter)
Expand Down
124 changes: 0 additions & 124 deletions pkg/pdatautil/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,130 +150,6 @@ func TestMapHash(t *testing.T) {
}
}

func TestMapHashSelectedKeys(t *testing.T) {
tests := []struct {
name string
maps []pcommon.Map
keys [][]string
equal bool
}{
{
name: "empty_maps",
maps: []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()},
keys: [][]string{{}, {}},
equal: true,
},
{
name: "same_maps_different_order",
keys: [][]string{{"k1", "k2"}, {"k1", "k2"}},
maps: func() []pcommon.Map {
m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}
m[0].PutStr("k1", "v1")
m[0].PutInt("k2", 1)
m[0].PutDouble("k3", 1)
m[0].PutBool("k4", true)
m[0].PutEmptyBytes("k5").FromRaw([]byte("abc"))
sl := m[0].PutEmptySlice("k6")
sl.AppendEmpty().SetStr("str")
sl.AppendEmpty().SetBool(true)
m0 := m[0].PutEmptyMap("k")
m0.PutInt("k1", 1)
m0.PutDouble("k2", 10)

m1 := m[1].PutEmptyMap("k")
m1.PutDouble("k2", 10)
m1.PutInt("k1", 1)
m[1].PutEmptyBytes("k5").FromRaw([]byte("abc"))
m[1].PutBool("k4", true)
sl = m[1].PutEmptySlice("k6")
sl.AppendEmpty().SetStr("str")
sl.AppendEmpty().SetBool(true)
m[1].PutInt("k2", 1)
m[1].PutStr("k1", "v1")
m[1].PutDouble("k3", 1)

return m
}(),
equal: true,
},
{
name: "different_maps_having_same_keys",
keys: [][]string{{"k2", "k3"}, {"k2", "k3"}},
maps: func() []pcommon.Map {
m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}
m[0].PutStr("k1", "v1")
m[0].PutInt("k2", 1)
m[0].PutDouble("k3", 1)

m[1].PutInt("k2", 1)
m[1].PutDouble("k3", 1)
m[1].PutDouble("k4", 1)

return m
}(),
equal: true,
},
{
name: "same_maps_different_key_order",
keys: [][]string{{"k2", "k3"}, {"k3", "k2"}},
maps: func() []pcommon.Map {
m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}
m[0].PutInt("k2", 1)
m[0].PutDouble("k3", 1)

m[1].PutInt("k2", 1)
m[1].PutDouble("k3", 1)

return m
}(),
equal: false,
},
{
name: "same_maps_with_not_existing_keys",
keys: [][]string{{"k10", "k11"}, {"k10", "k11"}},
maps: func() []pcommon.Map {
m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}
m[0].PutStr("k1", "v1")

m[1].PutInt("k2", 1)

return m
}(),
equal: true,
},
{
name: "different_maps",
keys: [][]string{{"k2", "k3"}, {"k2", "k3"}},
maps: func() []pcommon.Map {
m := []pcommon.Map{pcommon.NewMap(), pcommon.NewMap()}
m[0].PutInt("k2", 2)
m[0].PutDouble("k3", 2)

m[1].PutInt("k2", 1)
m[1].PutDouble("k3", 1)

return m
}(),
equal: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for i := 0; i < len(tt.maps); i++ {
for j := i + 1; j < len(tt.maps); j++ {
if tt.equal {
assert.Equal(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]),
"maps %d %v and %d %v must have the same hash, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j])
} else {
assert.NotEqual(t, MapHashSelectedKeys(tt.maps[i], tt.keys[i]), MapHashSelectedKeys(tt.maps[j], tt.keys[j]),
"maps %d %v and %d %v must have different hashes, then calculated with keys %v and %v", i, tt.maps[i].AsRaw(), j, tt.maps[j].AsRaw(), tt.keys[i], tt.keys[j])
}
}
}
})
}
}

func TestValueHash(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit b66537a

Please sign in to comment.