Skip to content

Commit

Permalink
Allow connectors to request consumers to specific routes (#7179)
Browse files Browse the repository at this point in the history
* Allow connectors to route data to specific consumers

* Allow connectors to request consumers for specific routes
  • Loading branch information
djaglowski committed Mar 4, 2023
1 parent 7318c14 commit 0674cb5
Show file tree
Hide file tree
Showing 13 changed files with 1,102 additions and 27 deletions.
16 changes: 16 additions & 0 deletions .chloggen/connectors-routing-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: connectors

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Provide connectors with a mechanism to route data to specific pipelines

# One or more tracking issues or pull requests related to the change
issues: [7152]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
21 changes: 20 additions & 1 deletion connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ type Traces interface {
consumer.Traces
}

// TracesRouter feeds the first consumer.Traces in each of the specified pipelines.
// The router will create a fanout consumer for the set of pipelines and return a uuid
type TracesRouter interface {
Consumer(...component.ID) (consumer.Traces, error)
PipelineIDs() []component.ID
}

// A Metrics connector acts as an exporter from a metrics pipeline and a receiver
// to one or more traces, metrics, or logs pipelines.
// Metrics feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data.
Expand All @@ -57,9 +64,15 @@ type Metrics interface {
consumer.Metrics
}

// MetricsRouter feeds the first consumer.Metrics in each of the specified pipelines.
type MetricsRouter interface {
Consumer(...component.ID) (consumer.Metrics, error)
PipelineIDs() []component.ID
}

// A Logs connector acts as an exporter from a logs pipeline and a receiver
// to one or more traces, metrics, or logs pipelines.
// Logs feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data.
// Logs feeds a consumer.Logs, consumer.Metrics, or consumer.Logs with data.
//
// Examples:
// - Structured logs containing span information could be consumed and emitted as traces.
Expand All @@ -72,6 +85,12 @@ type Logs interface {
consumer.Logs
}

// LogsRouter feeds the first consumer.Logs in each of the specified pipelines.
type LogsRouter interface {
Consumer(...component.ID) (consumer.Logs, error)
PipelineIDs() []component.ID
}

// CreateSettings configures Connector creators.
type CreateSettings struct {
// ID returns the ID of the component that will be created.
Expand Down
14 changes: 10 additions & 4 deletions service/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,24 @@ func (g *pipelinesGraph) buildComponents(ctx context.Context, set pipelinesSetti
n.Component, err = buildConnector(ctx, n.componentID, set.Telemetry, set.BuildInfo, set.ConnectorBuilder,
n.exprPipelineType, n.rcvrPipelineType, g.nextConsumers(n.ID()))
case *capabilitiesNode:
cap := consumer.Capabilities{}
cap := consumer.Capabilities{MutatesData: false}
for _, proc := range g.pipelines[n.pipelineID].processors {
cap.MutatesData = cap.MutatesData || proc.getConsumer().Capabilities().MutatesData
}
next := g.nextConsumers(n.ID())[0]
switch n.pipelineID.Type() {
case component.DataTypeTraces:
n.baseConsumer = capabilityconsumer.NewTraces(next.(consumer.Traces), cap)
cc := capabilityconsumer.NewTraces(next.(consumer.Traces), cap)
n.baseConsumer = cc
n.ConsumeTracesFunc = cc.ConsumeTraces
case component.DataTypeMetrics:
n.baseConsumer = capabilityconsumer.NewMetrics(next.(consumer.Metrics), cap)
cc := capabilityconsumer.NewMetrics(next.(consumer.Metrics), cap)
n.baseConsumer = cc
n.ConsumeMetricsFunc = cc.ConsumeMetrics
case component.DataTypeLogs:
n.baseConsumer = capabilityconsumer.NewLogs(next.(consumer.Logs), cap)
cc := capabilityconsumer.NewLogs(next.(consumer.Logs), cap)
n.baseConsumer = cc
n.ConsumeLogsFunc = cc.ConsumeLogs
}
case *fanOutNode:
nexts := g.nextConsumers(n.ID())
Expand Down
182 changes: 182 additions & 0 deletions service/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,188 @@ func TestConnectorPipelinesGraph(t *testing.T) {
}
}

func TestConnectorRouter(t *testing.T) {
rcvrID := component.NewID("examplereceiver")
routeTracesID := component.NewIDWithName("examplerouter", "traces")
routeMetricsID := component.NewIDWithName("examplerouter", "metrics")
routeLogsID := component.NewIDWithName("examplerouter", "logs")
expRightID := component.NewIDWithName("exampleexporter", "right")
expLeftID := component.NewIDWithName("exampleexporter", "left")

tracesInID := component.NewIDWithName("traces", "in")
tracesRightID := component.NewIDWithName("traces", "right")
tracesLeftID := component.NewIDWithName("traces", "left")

metricsInID := component.NewIDWithName("metrics", "in")
metricsRightID := component.NewIDWithName("metrics", "right")
metricsLeftID := component.NewIDWithName("metrics", "left")

logsInID := component.NewIDWithName("logs", "in")
logsRightID := component.NewIDWithName("logs", "right")
logsLeftID := component.NewIDWithName("logs", "left")

ctx := context.Background()
set := pipelinesSettings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
ReceiverBuilder: receiver.NewBuilder(
map[component.ID]component.Config{
rcvrID: testcomponents.ExampleReceiverFactory.CreateDefaultConfig(),
},
map[component.Type]receiver.Factory{
testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory,
},
),
ExporterBuilder: exporter.NewBuilder(
map[component.ID]component.Config{
expRightID: testcomponents.ExampleExporterFactory.CreateDefaultConfig(),
expLeftID: testcomponents.ExampleExporterFactory.CreateDefaultConfig(),
},
map[component.Type]exporter.Factory{
testcomponents.ExampleExporterFactory.Type(): testcomponents.ExampleExporterFactory,
},
),
ConnectorBuilder: connector.NewBuilder(
map[component.ID]component.Config{
routeTracesID: testcomponents.ExampleRouterConfig{
Traces: &testcomponents.LeftRightConfig{
Right: tracesRightID,
Left: tracesLeftID,
},
},
routeMetricsID: testcomponents.ExampleRouterConfig{
Metrics: &testcomponents.LeftRightConfig{
Right: metricsRightID,
Left: metricsLeftID,
},
},
routeLogsID: testcomponents.ExampleRouterConfig{
Logs: &testcomponents.LeftRightConfig{
Right: logsRightID,
Left: logsLeftID,
},
},
},
map[component.Type]connector.Factory{
testcomponents.ExampleRouterFactory.Type(): testcomponents.ExampleRouterFactory,
},
),
PipelineConfigs: map[component.ID]*PipelineConfig{
tracesInID: {
Receivers: []component.ID{rcvrID},
Exporters: []component.ID{routeTracesID},
},
tracesRightID: {
Receivers: []component.ID{routeTracesID},
Exporters: []component.ID{expRightID},
},
tracesLeftID: {
Receivers: []component.ID{routeTracesID},
Exporters: []component.ID{expLeftID},
},
metricsInID: {
Receivers: []component.ID{rcvrID},
Exporters: []component.ID{routeMetricsID},
},
metricsRightID: {
Receivers: []component.ID{routeMetricsID},
Exporters: []component.ID{expRightID},
},
metricsLeftID: {
Receivers: []component.ID{routeMetricsID},
Exporters: []component.ID{expLeftID},
},
logsInID: {
Receivers: []component.ID{rcvrID},
Exporters: []component.ID{routeLogsID},
},
logsRightID: {
Receivers: []component.ID{routeLogsID},
Exporters: []component.ID{expRightID},
},
logsLeftID: {
Receivers: []component.ID{routeLogsID},
Exporters: []component.ID{expLeftID},
},
},
}

pg, err := buildPipelinesGraph(ctx, set)
require.NoError(t, err)

allReceivers := pg.getReceivers()
allExporters := pg.GetExporters()

assert.Equal(t, len(set.PipelineConfigs), len(pg.pipelines))

// Get a handle for the traces receiver and both exporters
tracesReceiver := allReceivers[component.DataTypeTraces][rcvrID].(*testcomponents.ExampleReceiver)
tracesRight := allExporters[component.DataTypeTraces][expRightID].(*testcomponents.ExampleExporter)
tracesLeft := allExporters[component.DataTypeTraces][expLeftID].(*testcomponents.ExampleExporter)

// Consume 1, validate it went right
assert.NoError(t, tracesReceiver.ConsumeTraces(ctx, testdata.GenerateTraces(1)))
assert.Equal(t, 1, len(tracesRight.Traces))
assert.Equal(t, 0, len(tracesLeft.Traces))

// Consume 1, validate it went left
assert.NoError(t, tracesReceiver.ConsumeTraces(ctx, testdata.GenerateTraces(1)))
assert.Equal(t, 1, len(tracesRight.Traces))
assert.Equal(t, 1, len(tracesLeft.Traces))

// Consume 3, validate 2 went right, 1 went left
assert.NoError(t, tracesReceiver.ConsumeTraces(ctx, testdata.GenerateTraces(1)))
assert.NoError(t, tracesReceiver.ConsumeTraces(ctx, testdata.GenerateTraces(1)))
assert.NoError(t, tracesReceiver.ConsumeTraces(ctx, testdata.GenerateTraces(1)))
assert.Equal(t, 3, len(tracesRight.Traces))
assert.Equal(t, 2, len(tracesLeft.Traces))

// Get a handle for the metrics receiver and both exporters
metricsReceiver := allReceivers[component.DataTypeMetrics][rcvrID].(*testcomponents.ExampleReceiver)
metricsRight := allExporters[component.DataTypeMetrics][expRightID].(*testcomponents.ExampleExporter)
metricsLeft := allExporters[component.DataTypeMetrics][expLeftID].(*testcomponents.ExampleExporter)

// Consume 1, validate it went right
assert.NoError(t, metricsReceiver.ConsumeMetrics(ctx, testdata.GenerateMetrics(1)))
assert.Equal(t, 1, len(metricsRight.Metrics))
assert.Equal(t, 0, len(metricsLeft.Metrics))

// Consume 1, validate it went left
assert.NoError(t, metricsReceiver.ConsumeMetrics(ctx, testdata.GenerateMetrics(1)))
assert.Equal(t, 1, len(metricsRight.Metrics))
assert.Equal(t, 1, len(metricsLeft.Metrics))

// Consume 3, validate 2 went right, 1 went left
assert.NoError(t, metricsReceiver.ConsumeMetrics(ctx, testdata.GenerateMetrics(1)))
assert.NoError(t, metricsReceiver.ConsumeMetrics(ctx, testdata.GenerateMetrics(1)))
assert.NoError(t, metricsReceiver.ConsumeMetrics(ctx, testdata.GenerateMetrics(1)))
assert.Equal(t, 3, len(metricsRight.Metrics))
assert.Equal(t, 2, len(metricsLeft.Metrics))

// Get a handle for the logs receiver and both exporters
logsReceiver := allReceivers[component.DataTypeLogs][rcvrID].(*testcomponents.ExampleReceiver)
logsRight := allExporters[component.DataTypeLogs][expRightID].(*testcomponents.ExampleExporter)
logsLeft := allExporters[component.DataTypeLogs][expLeftID].(*testcomponents.ExampleExporter)

// Consume 1, validate it went right
assert.NoError(t, logsReceiver.ConsumeLogs(ctx, testdata.GenerateLogs(1)))
assert.Equal(t, 1, len(logsRight.Logs))
assert.Equal(t, 0, len(logsLeft.Logs))

// Consume 1, validate it went left
assert.NoError(t, logsReceiver.ConsumeLogs(ctx, testdata.GenerateLogs(1)))
assert.Equal(t, 1, len(logsRight.Logs))
assert.Equal(t, 1, len(logsLeft.Logs))

// Consume 3, validate 2 went right, 1 went left
assert.NoError(t, logsReceiver.ConsumeLogs(ctx, testdata.GenerateLogs(1)))
assert.NoError(t, logsReceiver.ConsumeLogs(ctx, testdata.GenerateLogs(1)))
assert.NoError(t, logsReceiver.ConsumeLogs(ctx, testdata.GenerateLogs(1)))
assert.Equal(t, 3, len(logsRight.Logs))
assert.Equal(t, 2, len(logsLeft.Logs))

}

func TestGraphBuildErrors(t *testing.T) {
nopReceiverFactory := receivertest.NewNopFactory()
nopProcessorFactory := processortest.NewNopFactory()
Expand Down
50 changes: 50 additions & 0 deletions service/internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package fanoutconsumer // import "go.opentelemetry.io/collector/service/internal

import (
"context"
"fmt"

"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
)
Expand Down Expand Up @@ -80,3 +83,50 @@ func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
}
return errs
}

var _ connector.LogsRouter = (*logsRouter)(nil)

type logsRouter struct {
consumer.Logs
consumers map[component.ID]consumer.Logs
}

func NewLogsRouter(cm map[component.ID]consumer.Logs) consumer.Logs {
consumers := make([]consumer.Logs, 0, len(cm))
for _, consumer := range cm {
consumers = append(consumers, consumer)
}
return &logsRouter{
Logs: NewLogs(consumers),
consumers: cm,
}
}

func (r *logsRouter) PipelineIDs() []component.ID {
ids := make([]component.ID, 0, len(r.consumers))
for id := range r.consumers {
ids = append(ids, id)
}
return ids
}

func (r *logsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Logs, error) {
if len(pipelineIDs) == 0 {
return nil, fmt.Errorf("missing consumers")
}
consumers := make([]consumer.Logs, 0, len(pipelineIDs))
var errors error
for _, pipelineID := range pipelineIDs {
c, ok := r.consumers[pipelineID]
if ok {
consumers = append(consumers, c)
} else {
errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID))
}
}
if errors != nil {
// TODO potentially this could return a NewLogs with the valid consumers
return nil, errors
}
return NewLogs(consumers), nil
}

0 comments on commit 0674cb5

Please sign in to comment.