Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/kafka] Impelement partitioning for OTLP metrics #31315

Merged
merged 49 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
1380a2a
Impelement partitioning for OTLP metrics
SHaaD94 Feb 19, 2024
5123395
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
SHaaD94 Feb 25, 2024
25a8d77
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Feb 27, 2024
a813b9f
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 4, 2024
c773a31
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 11, 2024
d6745b2
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 12, 2024
e8a2ce6
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
SHaaD94 Mar 14, 2024
a78e71a
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
SHaaD94 Mar 18, 2024
82c9e64
1
SHaaD94 Mar 18, 2024
740ec99
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
SHaaD94 Mar 20, 2024
deca155
Use existing hash function
SHaaD94 Mar 20, 2024
ccd1fef
Add github issue
SHaaD94 Mar 20, 2024
65bcb4a
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 22, 2024
e536f98
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 24, 2024
25c61a5
add partitioning by specific tags
SHaaD94 Mar 24, 2024
9b4bea7
Add more tests
SHaaD94 Mar 24, 2024
ca9f993
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 25, 2024
a11531b
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 25, 2024
244ca03
Update exporter/kafkaexporter/README.md
SHaaD94 Mar 26, 2024
b4e20e9
Update pkg/pdatautil/hash_test.go
SHaaD94 Mar 26, 2024
925ed8b
add missed %v
SHaaD94 Mar 26, 2024
1f43023
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 26, 2024
593702b
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 26, 2024
b602a8b
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 26, 2024
088c4ae
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 27, 2024
4390dfe
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 28, 2024
7802c17
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 30, 2024
fb81f66
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Mar 31, 2024
dcc32bb
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 3, 2024
d4ae3df
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 4, 2024
db57263
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 6, 2024
9df70f1
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 9, 2024
6e34560
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 10, 2024
3ba71b2
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 11, 2024
9474705
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 12, 2024
13d6284
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 17, 2024
e09d644
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 17, 2024
965631f
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
SHaaD94 Apr 26, 2024
b66537a
rollback partitioning by specific resource attributes
SHaaD94 Apr 26, 2024
5c5502d
Bump go version to 1.21.9 to fix GO-2024-2687
SHaaD94 Apr 26, 2024
6203927
Update exporter/kafkaexporter/go.mod
SHaaD94 Apr 26, 2024
2628a01
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 26, 2024
e56a50e
lint
SHaaD94 Apr 29, 2024
c74f6c7
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
SHaaD94 Apr 29, 2024
4e5ee2f
Merge branch 'balance-metrics-by-resources' of github.com:SHaaD94/ope…
SHaaD94 Apr 29, 2024
8784607
tidy
SHaaD94 Apr 29, 2024
d161c6b
do not mutate messages
SHaaD94 Apr 29, 2024
f14f588
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 29, 2024
22dd715
Merge branch 'main' into balance-metrics-by-resources
SHaaD94 Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/kafka-exporter-key-by-metric-resources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add an ability to publish kafka messages with message key based on metric resource attributes - it will allow partitioning metrics in Kafka.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29433, 30666]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
1 change: 1 addition & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The following settings can be optionally configured:
- The following encodings are valid *only* for **logs**.
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Config struct {
// trace ID as the message key by default.
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`

PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`
Expand Down
23 changes: 13 additions & 10 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -109,11 +110,12 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Expand Down Expand Up @@ -165,6 +167,7 @@ func TestLoadConfig(t *testing.T) {
Topic: "spans",
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
ResolveCanonicalBootstrapServersOnly: true,
Expand Down
6 changes: 6 additions & 0 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
if config.PartitionMetricsByResourceAttributes {
if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok {
keyableMarshaler.Key()
}
}

return &kafkaMetricsProducer{
cfg: config,
topic: config.Topic,
Expand Down
44 changes: 44 additions & 0 deletions exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kafkaexporter
import (
"encoding/json"
"fmt"
"go.opentelemetry.io/collector/pdata/pmetric"
"testing"
"time"

Expand Down Expand Up @@ -71,6 +72,49 @@ func TestDefaultLogsMarshalers(t *testing.T) {
}
}

func TestOTLPMetricsJsonMarshaling(t *testing.T) {
now := time.Unix(1, 0)

metric := pmetric.NewMetrics()
r := pcommon.NewResource()
r.Attributes().PutStr("service.name", "my_service_name")
r.Attributes().PutStr("service.instance.id", "kek_x_1")
r.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource())

rm := metric.ResourceMetrics().At(0)
rm.SetSchemaUrl(conventions.SchemaURL)

sm := rm.ScopeMetrics().AppendEmpty()
pmetric.NewScopeMetrics()
m := sm.Metrics().AppendEmpty()
m.SetEmptyGauge()
m.Gauge().DataPoints().AppendEmpty().SetStartTimestamp(pcommon.NewTimestampFromTime(now))
m.Gauge().DataPoints().At(0).Attributes().PutStr("gauage_attribute", "attr")
m.Gauge().DataPoints().At(0).SetDoubleValue(1.0)

r1 := pcommon.NewResource()
r1.Attributes().PutStr("service.name", "my_service_name")
r1.Attributes().PutStr("service.instance.id", "kek_x_2")
r1.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource())

standardMarshaler := metricsMarshalers()["otlp_json"]
msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX")
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msgs, 1, "Expected number of messages in the message")
require.Equal(t, nil, msgs[0].Key)

keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler)
require.True(t, ok, "Must be a KeyableMetricsMarshaler")
keyableMarshaler.Key()

msgs, err = keyableMarshaler.Marshal(metric, "KafkaTopicX")
require.NoError(t, err, "Must have marshaled the data without error")
require.Len(t, msgs, 2, "Expected number of messages in the message")

require.Equal(t, sarama.ByteEncoder("90e74a8334a89993bd3f6ad05f9ca02438032a78d4399fb6fecf6c94fcdb13ef"), msgs[0].Key)
require.Equal(t, sarama.ByteEncoder("55e1113a2eace57b91ef58911d811c28e936365f03ac068e8ce23090d9ea748f"), msgs[1].Key)
}

func TestOTLPTracesJsonMarshaling(t *testing.T) {
t.Parallel()

Expand Down
62 changes: 49 additions & 13 deletions exporter/kafkaexporter/pdata_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"github.com/IBM/sarama"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/resourceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal"
)

type pdataLogsMarshaler struct {
Expand Down Expand Up @@ -42,36 +42,72 @@ func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarsha
}
}

// KeyableMetricsMarshaler is an extension of the MetricsMarshaler interface intended to provide partition key capabilities
// for metrics messages
type KeyableMetricsMarshaler interface {
MetricsMarshaler
Key()
}

type pdataMetricsMarshaler struct {
marshaler pmetric.Marshaler
encoding string
keyed bool
}

// Key configures the pdataMetricsMarshaler to set the message key on the kafka messages
func (p *pdataMetricsMarshaler) Key() {
p.keyed = true
}
Comment on lines +59 to 62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This control is super confusing. It's unclear what Key is doing and why it's needed. I believe the flag can be simply sent to the newPdataMetricsMarshaler constructor. And there is no need to expose the KeyableMetricsMarshaler API here.

I understand that you are doing it consistently with what's already done for tracing. We can keep it as is, but should be refactored later for both metrics and traces. Feel free to submit an issue if you agree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, I would prefer to leave it as is for now and refactor it in next PR. I am planing to raise it shortly for logs partitioning by resources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I suggesting


func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) {
bts, err := p.marshaler.MarshalMetrics(ld)
if err != nil {
return nil, err
}
return []*sarama.ProducerMessage{
{
var msgs []*sarama.ProducerMessage
if p.keyed {
metrics := ld.ResourceMetrics()

for i := 0; i < metrics.Len(); i++ {
resourceMetrics := metrics.At(i)
hash := resourceutil.CalculateResourceAttributesHash(resourceMetrics.Resource())

newMetrics := pmetric.NewMetrics()
resourceMetrics.MoveTo(newMetrics.ResourceMetrics().AppendEmpty())
dmitryax marked this conversation as resolved.
Show resolved Hide resolved

bts, err := p.marshaler.MarshalMetrics(newMetrics)
if err != nil {
return nil, err
}
msgs = append(msgs, &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
Key: sarama.ByteEncoder(hash),
})
}
} else {
bts, err := p.marshaler.MarshalMetrics(ld)
if err != nil {
return nil, err
}
msgs = append(msgs, &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
},
}, nil
})
}

return msgs, nil
}

func (p pdataMetricsMarshaler) Encoding() string {
return p.encoding
}

func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string) MetricsMarshaler {
return pdataMetricsMarshaler{
return &pdataMetricsMarshaler{
marshaler: marshaler,
encoding: encoding,
}
}

// KeyableTracesMarshaler is an extension of the TracesMarshaler interface inteded to provide partition key capabilities
// KeyableTracesMarshaler is an extension of the TracesMarshaler interface intended to provide partition key capabilities
// for trace messages
type KeyableTracesMarshaler interface {
TracesMarshaler
Expand Down
1 change: 1 addition & 0 deletions exporter/kafkaexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ kafka:
required_acks: -1 # WaitForAll
timeout: 10s
partition_traces_by_id: true
partition_metrics_by_resource_attributes: true
auth:
plain_text:
username: jdoe
Expand Down
32 changes: 32 additions & 0 deletions internal/coreinternal/resourceutil/resourceutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package resourceutil

import (
"crypto/sha256"
"encoding/hex"
"go.opentelemetry.io/collector/pdata/pcommon"
"sort"
)

type keyValueLabelPair struct {
Key string
Value string
}

func CalculateResourceAttributesHash(resourceMetrics pcommon.Resource) string {
SHaaD94 marked this conversation as resolved.
Show resolved Hide resolved
var pairs []keyValueLabelPair
resourceMetrics.Attributes().Range(func(k string, v pcommon.Value) bool {
pairs = append(pairs, keyValueLabelPair{Key: k, Value: v.AsString()})
return true
})

sort.SliceStable(pairs, func(i, j int) bool {
return pairs[i].Key < pairs[j].Key
})

h := sha256.New()
for _, pair := range pairs {
h.Write([]byte(pair.Key))
h.Write([]byte(pair.Value))
}
return hex.EncodeToString(h.Sum(nil))
}
47 changes: 47 additions & 0 deletions internal/coreinternal/resourceutil/resourceutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package resourceutil

import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"testing"
)

func TestHashEmptyResource(t *testing.T) {
r := pcommon.NewResource()

assert.EqualValues(t, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", CalculateResourceAttributesHash(r))
}

func TestHashSimpleResource(t *testing.T) {
r := pcommon.NewResource()
r.Attributes().PutStr("k1", "v1")
r.Attributes().PutStr("k2", "v2")

assert.EqualValues(t, "3590bbad8f8a328dbbd5d01c35d8a5fab92c3588cf7e468e995c31d45a51cbef", CalculateResourceAttributesHash(r))
}

func TestHashReorderedAttributes(t *testing.T) {
r1 := pcommon.NewResource()
r1.Attributes().PutStr("k1", "v1")
r1.Attributes().PutStr("k2", "v2")

r2 := pcommon.NewResource()
r2.Attributes().PutStr("k2", "v2")
r2.Attributes().PutStr("k1", "v1")

assert.EqualValues(t, CalculateResourceAttributesHash(r1), CalculateResourceAttributesHash(r2))
}

func TestHashDifferentAttributeValues(t *testing.T) {
r := pcommon.NewResource()
r.Attributes().PutBool("k1", false)
r.Attributes().PutDouble("k2", 1.0)
r.Attributes().PutEmpty("k3")
r.Attributes().PutEmptyBytes("k4")
r.Attributes().PutEmptyMap("k5")
r.Attributes().PutEmptySlice("k6")
r.Attributes().PutInt("k7", 1)
r.Attributes().PutStr("k8", "v8")

assert.EqualValues(t, "46852adab1751045942d67dace7c88665ec0e68b7f4b81a33bb05e5b954a8e57", CalculateResourceAttributesHash(r))
}
Loading