-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
router.go
155 lines (127 loc) · 4.07 KB
/
router.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package routingconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector"
import (
"errors"
"fmt"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource"
)
var errPipelineNotFound = errors.New("pipeline not found")
// consumerProvider is a function with a type parameter C (expected to be one
// of consumer.Traces, consumer.Metrics, or Consumer.Logs). returns a
// consumer for the given component ID(s).
type consumerProvider[C any] func(...component.ID) (C, error)
// router registers consumers and default consumers for a pipeline. the type
// parameter C is expected to be one of: consumer.Traces, consumer.Metrics, or
// consumer.Logs.
type router[C any] struct {
logger *zap.Logger
parser ottl.Parser[ottlresource.TransformContext]
table []RoutingTableItem
routes map[string]routingItem[C]
routeSlice []routingItem[C]
defaultConsumer C
consumerProvider consumerProvider[C]
}
// newRouter creates a new router instance with based on type parameters C and K.
// see router struct definition for the allowed types.
func newRouter[C any](
table []RoutingTableItem,
defaultPipelineIDs []component.ID,
provider consumerProvider[C],
settings component.TelemetrySettings,
) (*router[C], error) {
parser, err := ottlresource.NewParser(
common.Functions[ottlresource.TransformContext](),
settings,
)
if err != nil {
return nil, err
}
r := &router[C]{
logger: settings.Logger,
parser: parser,
table: table,
routes: make(map[string]routingItem[C]),
consumerProvider: provider,
}
if err := r.registerConsumers(defaultPipelineIDs); err != nil {
return nil, err
}
return r, nil
}
type routingItem[C any] struct {
consumer C
statement *ottl.Statement[ottlresource.TransformContext]
}
func (r *router[C]) registerConsumers(defaultPipelineIDs []component.ID) error {
// register default pipelines
err := r.registerDefaultConsumer(defaultPipelineIDs)
if err != nil {
return err
}
// register pipelines for each route
err = r.registerRouteConsumers()
if err != nil {
return err
}
return nil
}
// registerDefaultConsumer registers a consumer for the default
// pipelines configured
func (r *router[C]) registerDefaultConsumer(pipelineIDs []component.ID) error {
if len(pipelineIDs) == 0 {
return nil
}
consumer, err := r.consumerProvider(pipelineIDs...)
if err != nil {
return fmt.Errorf("%w: %s", errPipelineNotFound, err.Error())
}
r.defaultConsumer = consumer
return nil
}
// registerRouteConsumers registers a consumer for the pipelines configured
// for each route
func (r *router[C]) registerRouteConsumers() error {
for _, item := range r.table {
statement, err := r.getStatementFrom(item)
if err != nil {
return err
}
route, ok := r.routes[key(item)]
if !ok {
route.statement = statement
}
consumer, err := r.consumerProvider(item.Pipelines...)
if err != nil {
return fmt.Errorf("%w: %s", errPipelineNotFound, err.Error())
}
route.consumer = consumer
if !ok {
r.routeSlice = append(r.routeSlice, route)
}
r.routes[key(item)] = route
}
return nil
}
// getStatementFrom builds a routing OTTL statement from the provided
// routing table entry configuration. If the routing table entry configuration
// does not contain a valid OTTL statement then nil is returned.
func (r *router[C]) getStatementFrom(item RoutingTableItem) (*ottl.Statement[ottlresource.TransformContext], error) {
var statement *ottl.Statement[ottlresource.TransformContext]
if item.Statement != "" {
var err error
statement, err = r.parser.ParseStatement(item.Statement)
if err != nil {
return statement, err
}
}
return statement, nil
}
func key(entry RoutingTableItem) string {
return entry.Statement
}