Skip to content

Commit

Permalink
[pdata] Enable the pdata mutation safeguards in the fanout consumers (#…
Browse files Browse the repository at this point in the history
…8634)

This change enables the runtime assertions to catch unintentional pdata
mutations in components claiming as non-mutating pdata. Without these
assertions, runtime errors may still occur, but thrown by unrelated
components, making it very difficult to troubleshoot.

This required introducing extra API to get the pdata mutability state:
- p[metric|trace|log].[Metrics|Traces|Logs].IsReadOnly()

Resolves:
#6794
  • Loading branch information
dmitryax committed Oct 16, 2023
1 parent 35512c4 commit 8a385c2
Show file tree
Hide file tree
Showing 15 changed files with 351 additions and 114 deletions.
18 changes: 18 additions & 0 deletions .chloggen/add-is-read-only.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: pdata

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add IsReadOnly() method to p[metrics|logs|traces].[Metrics|Logs|Spans] pdata structs allowing to check if the struct is read-only.

# One or more tracking issues or pull requests related to the change
issues: [6794]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
26 changes: 26 additions & 0 deletions .chloggen/enable-mutation-assertions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: fanoutconsumer

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable runtime assertions to catch incorrect pdata mutations in the components claiming as non-mutating pdata.

# One or more tracking issues or pull requests related to the change
issues: [6794]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This change enables the runtime assertions to catch unintentional pdata mutations in components that are claimed
as non-mutating pdata. Without these assertions, runtime errors may still occur, but thrown by unrelated components,
making it very difficult to troubleshoot.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
68 changes: 37 additions & 31 deletions internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,22 @@ import (
// NewLogs wraps multiple log consumers in a single one.
// It fanouts the incoming data to all the consumers, and does smart routing:
// - Clones only to the consumer that needs to mutate the data.
// - If all consumers needs to mutate the data one will get the original data.
// - If all consumers needs to mutate the data one will get the original mutable data.
func NewLogs(lcs []consumer.Logs) consumer.Logs {
if len(lcs) == 1 {
// Don't wrap if no need to do it.
return lcs[0]
}
var pass []consumer.Logs
var clone []consumer.Logs
for i := 0; i < len(lcs)-1; i++ {
if !lcs[i].Capabilities().MutatesData {
pass = append(pass, lcs[i])
lc := &logsConsumer{}
for i := 0; i < len(lcs); i++ {
if lcs[i].Capabilities().MutatesData {
lc.mutable = append(lc.mutable, lcs[i])
} else {
clone = append(clone, lcs[i])
lc.readonly = append(lc.readonly, lcs[i])
}
}
// Give the original data to the last consumer if no other read-only consumer,
// otherwise put it in the right bucket. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
if len(pass) == 0 || !lcs[len(lcs)-1].Capabilities().MutatesData {
pass = append(pass, lcs[len(lcs)-1])
} else {
clone = append(clone, lcs[len(lcs)-1])
}
return &logsConsumer{pass: pass, clone: clone}
return lc
}

type logsConsumer struct {
pass []consumer.Logs
clone []consumer.Logs
mutable []consumer.Logs
readonly []consumer.Logs
}

func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
Expand All @@ -59,20 +45,40 @@ func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
// ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one.
func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var errs error
// Initially pass to clone exporter to avoid the case where the optimization of sending
// the incoming data to a mutating consumer is used that may change the incoming data before
// cloning.
for _, lc := range lsc.clone {
clonedLogs := plog.NewLogs()
ld.CopyTo(clonedLogs)
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs))

if len(lsc.mutable) > 0 {
// Clone the data before sending to all mutating consumers except the last one.
for i := 0; i < len(lsc.mutable)-1; i++ {
errs = multierr.Append(errs, lsc.mutable[i].ConsumeLogs(ctx, cloneLogs(ld)))
}
// Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the
// data is mutable. Never share the same data between a mutating and a non-mutating consumer since the
// non-mutating consumer may process data async and the mutating consumer may change the data before that.
lastConsumer := lsc.mutable[len(lsc.mutable)-1]
if len(lsc.readonly) == 0 && !ld.IsReadOnly() {
errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, ld))
} else {
errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, cloneLogs(ld)))
}
}
for _, lc := range lsc.pass {

// Mark the data as read-only if it will be sent to more than one read-only consumer.
if len(lsc.readonly) > 1 && !ld.IsReadOnly() {
ld.MarkReadOnly()
}
for _, lc := range lsc.readonly {
errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld))
}

return errs
}

func cloneLogs(ld plog.Logs) plog.Logs {
clonedLogs := plog.NewLogs()
ld.CopyTo(clonedLogs)
return clonedLogs
}

var _ connector.LogsRouter = (*logsRouter)(nil)

type logsRouter struct {
Expand Down
55 changes: 49 additions & 6 deletions internal/fanoutconsumer/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

func TestLogsNotMultiplexing(t *testing.T) {
nop := consumertest.NewNop()
lfc := NewLogs([]consumer.Logs{nop})
assert.Same(t, nop, lfc)
}

func TestLogsMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.LogsSink)
p2 := new(consumertest.LogsSink)
Expand Down Expand Up @@ -57,6 +51,9 @@ func TestLogsMultiplexingNonMutating(t *testing.T) {
assert.True(t, ld == p3.AllLogs()[1])
assert.EqualValues(t, ld, p3.AllLogs()[0])
assert.EqualValues(t, ld, p3.AllLogs()[1])

// The data should be marked as read only.
assert.True(t, ld.IsReadOnly())
}

func TestLogsMultiplexingMutating(t *testing.T) {
Expand Down Expand Up @@ -91,6 +88,46 @@ func TestLogsMultiplexingMutating(t *testing.T) {
assert.True(t, ld == p3.AllLogs()[1])
assert.EqualValues(t, ld, p3.AllLogs()[0])
assert.EqualValues(t, ld, p3.AllLogs()[1])

// The data should not be marked as read only.
assert.False(t, ld.IsReadOnly())
}

func TestReadOnlyLogsMultiplexingMutating(t *testing.T) {
p1 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
p2 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
ldOrig := testdata.GenerateLogs(1)
ld := testdata.GenerateLogs(1)
ld.MarkReadOnly()

for i := 0; i < 2; i++ {
err := lfc.ConsumeLogs(context.Background(), ld)
if err != nil {
t.Errorf("Wanted nil got error")
return
}
}

// All consumers should receive the cloned data.

assert.True(t, ld != p1.AllLogs()[0])
assert.True(t, ld != p1.AllLogs()[1])
assert.EqualValues(t, ldOrig, p1.AllLogs()[0])
assert.EqualValues(t, ldOrig, p1.AllLogs()[1])

assert.True(t, ld != p2.AllLogs()[0])
assert.True(t, ld != p2.AllLogs()[1])
assert.EqualValues(t, ldOrig, p2.AllLogs()[0])
assert.EqualValues(t, ldOrig, p2.AllLogs()[1])

assert.True(t, ld != p3.AllLogs()[0])
assert.True(t, ld != p3.AllLogs()[1])
assert.EqualValues(t, ldOrig, p3.AllLogs()[0])
assert.EqualValues(t, ldOrig, p3.AllLogs()[1])
}

func TestLogsMultiplexingMixLastMutating(t *testing.T) {
Expand Down Expand Up @@ -126,6 +163,9 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) {
assert.True(t, ld != p3.AllLogs()[1])
assert.EqualValues(t, ld, p3.AllLogs()[0])
assert.EqualValues(t, ld, p3.AllLogs()[1])

// The data should not be marked as read only.
assert.False(t, ld.IsReadOnly())
}

func TestLogsMultiplexingMixLastNonMutating(t *testing.T) {
Expand Down Expand Up @@ -160,6 +200,9 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) {
assert.True(t, ld == p3.AllLogs()[1])
assert.EqualValues(t, ld, p3.AllLogs()[0])
assert.EqualValues(t, ld, p3.AllLogs()[1])

// The data should not be marked as read only.
assert.False(t, ld.IsReadOnly())
}

func TestLogsWhenErrors(t *testing.T) {
Expand Down
68 changes: 37 additions & 31 deletions internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,22 @@ import (
// NewMetrics wraps multiple metrics consumers in a single one.
// It fanouts the incoming data to all the consumers, and does smart routing:
// - Clones only to the consumer that needs to mutate the data.
// - If all consumers needs to mutate the data one will get the original data.
// - If all consumers needs to mutate the data one will get the original mutable data.
func NewMetrics(mcs []consumer.Metrics) consumer.Metrics {
if len(mcs) == 1 {
// Don't wrap if no need to do it.
return mcs[0]
}
var pass []consumer.Metrics
var clone []consumer.Metrics
for i := 0; i < len(mcs)-1; i++ {
if !mcs[i].Capabilities().MutatesData {
pass = append(pass, mcs[i])
mc := &metricsConsumer{}
for i := 0; i < len(mcs); i++ {
if mcs[i].Capabilities().MutatesData {
mc.mutable = append(mc.mutable, mcs[i])
} else {
clone = append(clone, mcs[i])
mc.readonly = append(mc.readonly, mcs[i])
}
}
// Give the original data to the last consumer if no other read-only consumer,
// otherwise put it in the right bucket. Never share the same data between
// a mutating and a non-mutating consumer since the non-mutating consumer may process
// data async and the mutating consumer may change the data before that.
if len(pass) == 0 || !mcs[len(mcs)-1].Capabilities().MutatesData {
pass = append(pass, mcs[len(mcs)-1])
} else {
clone = append(clone, mcs[len(mcs)-1])
}
return &metricsConsumer{pass: pass, clone: clone}
return mc
}

type metricsConsumer struct {
pass []consumer.Metrics
clone []consumer.Metrics
mutable []consumer.Metrics
readonly []consumer.Metrics
}

func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
Expand All @@ -57,20 +43,40 @@ func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
var errs error
// Initially pass to clone exporter to avoid the case where the optimization of sending
// the incoming data to a mutating consumer is used that may change the incoming data before
// cloning.
for _, mc := range msc.clone {
clonedMetrics := pmetric.NewMetrics()
md.CopyTo(clonedMetrics)
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics))

if len(msc.mutable) > 0 {
// Clone the data before sending to all mutating consumers except the last one.
for i := 0; i < len(msc.mutable)-1; i++ {
errs = multierr.Append(errs, msc.mutable[i].ConsumeMetrics(ctx, cloneMetrics(md)))
}
// Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the
// data is mutable. Never share the same data between a mutating and a non-mutating consumer since the
// non-mutating consumer may process data async and the mutating consumer may change the data before that.
lastConsumer := msc.mutable[len(msc.mutable)-1]
if len(msc.readonly) == 0 && !md.IsReadOnly() {
errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, md))
} else {
errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, cloneMetrics(md)))
}
}
for _, mc := range msc.pass {

// Mark the data as read-only if it will be sent to more than one read-only consumer.
if len(msc.readonly) > 1 && !md.IsReadOnly() {
md.MarkReadOnly()
}
for _, mc := range msc.readonly {
errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md))
}

return errs
}

func cloneMetrics(md pmetric.Metrics) pmetric.Metrics {
clonedMetrics := pmetric.NewMetrics()
md.CopyTo(clonedMetrics)
return clonedMetrics
}

var _ connector.MetricsRouter = (*metricsRouter)(nil)

type metricsRouter struct {
Expand Down
Loading

0 comments on commit 8a385c2

Please sign in to comment.