Skip to content

Commit

Permalink
[fanoutconsumer] [chore] Do not wrap one read-only consumer (#8689)
Browse files Browse the repository at this point in the history
A follow-up optimization after merging
#8634.

There is no need to create a fanout consumer for only one read-only
consumer. This introduces a behavior that closely resembles its previous
state, before the introduction of the readonly/mutable states.
  • Loading branch information
dmitryax committed Oct 17, 2023
1 parent 8a385c2 commit ec07258
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 0 deletions.
5 changes: 5 additions & 0 deletions internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
// - Clones only to the consumer that needs to mutate the data.
// - If all consumers needs to mutate the data one will get the original mutable data.
func NewLogs(lcs []consumer.Logs) consumer.Logs {
// Don't wrap if there is only one non-mutating consumer.
if len(lcs) == 1 && !lcs[0].Capabilities().MutatesData {
return lcs[0]
}

lc := &logsConsumer{}
for i := 0; i < len(lcs); i++ {
if lcs[i].Capabilities().MutatesData {
Expand Down
6 changes: 6 additions & 0 deletions internal/fanoutconsumer/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

func TestLogsNotMultiplexing(t *testing.T) {
nop := consumertest.NewNop()
lfc := NewLogs([]consumer.Logs{nop})
assert.Same(t, nop, lfc)
}

func TestLogsMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.LogsSink)
p2 := new(consumertest.LogsSink)
Expand Down
5 changes: 5 additions & 0 deletions internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
// - Clones only to the consumer that needs to mutate the data.
// - If all consumers needs to mutate the data one will get the original mutable data.
func NewMetrics(mcs []consumer.Metrics) consumer.Metrics {
// Don't wrap if there is only one non-mutating consumer.
if len(mcs) == 1 && !mcs[0].Capabilities().MutatesData {
return mcs[0]
}

mc := &metricsConsumer{}
for i := 0; i < len(mcs); i++ {
if mcs[i].Capabilities().MutatesData {
Expand Down
6 changes: 6 additions & 0 deletions internal/fanoutconsumer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
)

func TestMetricsNotMultiplexing(t *testing.T) {
nop := consumertest.NewNop()
mfc := NewMetrics([]consumer.Metrics{nop})
assert.Same(t, nop, mfc)
}

func TestMetricsMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.MetricsSink)
p2 := new(consumertest.MetricsSink)
Expand Down
5 changes: 5 additions & 0 deletions internal/fanoutconsumer/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
// - Clones only to the consumer that needs to mutate the data.
// - If all consumers needs to mutate the data one will get the original mutable data.
func NewTraces(tcs []consumer.Traces) consumer.Traces {
// Don't wrap if there is only one non-mutating consumer.
if len(tcs) == 1 && !tcs[0].Capabilities().MutatesData {
return tcs[0]
}

tc := &tracesConsumer{}
for i := 0; i < len(tcs); i++ {
if tcs[i].Capabilities().MutatesData {
Expand Down
6 changes: 6 additions & 0 deletions internal/fanoutconsumer/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestTracesNotMultiplexing(t *testing.T) {
nop := consumertest.NewNop()
tfc := NewTraces([]consumer.Traces{nop})
assert.Same(t, nop, tfc)
}

func TestTracesMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.TracesSink)
p2 := new(consumertest.TracesSink)
Expand Down

0 comments on commit ec07258

Please sign in to comment.