diff --git a/.chloggen/client-move-to-consumer.yaml b/.chloggen/client-move-to-consumer.yaml new file mode 100644 index 00000000000..dd780c19b1a --- /dev/null +++ b/.chloggen/client-move-to-consumer.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: collector/client + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecates collector/client. Use `consumerconnection` instead. + +# One or more tracking issues or pull requests related to the change +issues: [9996] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/client/client.go b/client/client.go index fa944cad909..b4352f5b187 100644 --- a/client/client.go +++ b/client/client.go @@ -86,6 +86,8 @@ import ( type ctxKey struct{} // Info contains data related to the clients connecting to receivers. +// +// Deprecated: [v0.99.0] Use consumerconnection.Info instead type Info struct { // Addr for the client connecting to this collector. Available in a // best-effort basis, and generally reliable for receivers making use of @@ -103,6 +105,8 @@ type Info struct { // AuthData represents the authentication data as seen by authenticators tied to // the receivers. +// +// Deprecated: [v0.99.0] Use consumerconnection.AuthData instead type AuthData interface { // GetAttribute returns the value for the given attribute. Authenticator // implementations might define different data types for different @@ -114,16 +118,21 @@ type AuthData interface { GetAttributeNames() []string } +// Deprecated: [v0.99.0] Use consumerconnection.MetadataHostName instead const MetadataHostName = "Host" // NewContext takes an existing context and derives a new context with the // client.Info value stored on it. +// +// Deprecated: [v0.99.0] Use consumerconnection.NewContextWithConnection instead func NewContext(ctx context.Context, c Info) context.Context { return context.WithValue(ctx, ctxKey{}, c) } // FromContext takes a context and returns a ClientInfo from it. // When a ClientInfo isn't present, a new empty one is returned. +// +// Deprecated: [v0.99.0] Use consumerconnection.InfoFromContext instead func FromContext(ctx context.Context) Info { c, ok := ctx.Value(ctxKey{}).(Info) if !ok { @@ -133,11 +142,15 @@ func FromContext(ctx context.Context) Info { } // Metadata is an immutable map, meant to contain request metadata. +// +// Deprecated: [v0.99.0] Use consumerconnection.Metadata instead type Metadata struct { data map[string][]string } // NewMetadata creates a new Metadata object to use in Info. +// +// Deprecated: [v0.99.0] Use consumerconnection.NewMetadata instead func NewMetadata(md map[string][]string) Metadata { c := make(map[string][]string, len(md)) for k, v := range md { diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 62375ff2d8f..d8c909ada34 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -24,7 +24,6 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" - "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/configcompression" @@ -33,6 +32,7 @@ import ( "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/config/internal" + "go.opentelemetry.io/collector/consumer/consumerconnection" "go.opentelemetry.io/collector/extension/auth" ) @@ -390,7 +390,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err } // enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes -// a client.Info, potentially with the peer's address. +// a consumerconnection.Info, potentially with the peer's address. func enhanceWithClientInformation(includeMetadata bool) func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { return handler(contextWithClient(ctx, includeMetadata), req) @@ -403,23 +403,23 @@ func enhanceStreamWithClientInformation(includeMetadata bool) func(srv any, ss g } } -// contextWithClient attempts to add the peer address to the client.Info from the context. When no -// client.Info exists in the context, one is created. +// contextWithClient attempts to add the peer address to the consumerconnection.Info from the context. When no +// consumerconnection.Info exists in the context, one is created. func contextWithClient(ctx context.Context, includeMetadata bool) context.Context { - cl := client.FromContext(ctx) + cl := consumerconnection.InfoFromContext(ctx) if p, ok := peer.FromContext(ctx); ok { cl.Addr = p.Addr } if includeMetadata { if md, ok := metadata.FromIncomingContext(ctx); ok { copiedMD := md.Copy() - if len(md[client.MetadataHostName]) == 0 && len(md[":authority"]) > 0 { - copiedMD[client.MetadataHostName] = md[":authority"] + if len(md[consumerconnection.MetadataHostName]) == 0 && len(md[":authority"]) > 0 { + copiedMD[consumerconnection.MetadataHostName] = md[":authority"] } - cl.Metadata = client.NewMetadata(copiedMD) + cl.Metadata = consumerconnection.NewMetadata(copiedMD) } } - return client.NewContext(ctx, cl) + return consumerconnection.NewContextWithInfo(ctx, cl) } func authUnaryServerInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler, server auth.Server) (any, error) { diff --git a/config/configgrpc/configgrpc_test.go b/config/configgrpc/configgrpc_test.go index 2a1b34cb4ee..19d9044e278 100644 --- a/config/configgrpc/configgrpc_test.go +++ b/config/configgrpc/configgrpc_test.go @@ -22,7 +22,6 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" - "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configauth" @@ -30,6 +29,7 @@ import ( "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer/consumerconnection" "go.opentelemetry.io/collector/extension/auth" "go.opentelemetry.io/collector/extension/auth/authtest" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" @@ -696,21 +696,21 @@ func TestContextWithClient(t *testing.T) { desc string input context.Context doMetadata bool - expected client.Info + expected consumerconnection.Info }{ { desc: "no peer information, empty client", input: context.Background(), - expected: client.Info{}, + expected: consumerconnection.Info{}, }, { desc: "existing client with IP, no peer information", - input: client.NewContext(context.Background(), client.Info{ + input: consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{ Addr: &net.IPAddr{ IP: net.IPv4(1, 2, 3, 4), }, }), - expected: client.Info{ + expected: consumerconnection.Info{ Addr: &net.IPAddr{ IP: net.IPv4(1, 2, 3, 4), }, @@ -723,7 +723,7 @@ func TestContextWithClient(t *testing.T) { IP: net.IPv4(1, 2, 3, 4), }, }), - expected: client.Info{ + expected: consumerconnection.Info{ Addr: &net.IPAddr{ IP: net.IPv4(1, 2, 3, 4), }, @@ -731,7 +731,7 @@ func TestContextWithClient(t *testing.T) { }, { desc: "existing client, existing IP gets overridden with peer information", - input: peer.NewContext(client.NewContext(context.Background(), client.Info{ + input: peer.NewContext(consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{ Addr: &net.IPAddr{ IP: net.IPv4(1, 2, 3, 4), }, @@ -740,7 +740,7 @@ func TestContextWithClient(t *testing.T) { IP: net.IPv4(1, 2, 3, 5), }, }), - expected: client.Info{ + expected: consumerconnection.Info{ Addr: &net.IPAddr{ IP: net.IPv4(1, 2, 3, 5), }, @@ -748,48 +748,48 @@ func TestContextWithClient(t *testing.T) { }, { desc: "existing client with metadata", - input: client.NewContext(context.Background(), client.Info{ - Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}), + input: consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}), }), doMetadata: true, - expected: client.Info{ - Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}), + expected: consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}), }, }, { desc: "existing client with metadata in context", input: metadata.NewIncomingContext( - client.NewContext(context.Background(), client.Info{}), + consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{}), metadata.Pairs("test-metadata-key", "test-value"), ), doMetadata: true, - expected: client.Info{ - Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}), + expected: consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}), }, }, { desc: "existing client with metadata in context, no metadata processing", input: metadata.NewIncomingContext( - client.NewContext(context.Background(), client.Info{}), + consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{}), metadata.Pairs("test-metadata-key", "test-value"), ), - expected: client.Info{}, + expected: consumerconnection.Info{}, }, { desc: "existing client with Host and metadata", input: metadata.NewIncomingContext( - client.NewContext(context.Background(), client.Info{}), + consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{}), metadata.Pairs("test-metadata-key", "test-value", ":authority", "localhost:55443"), ), doMetadata: true, - expected: client.Info{ - Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}, ":authority": {"localhost:55443"}, "Host": {"localhost:55443"}}), + expected: consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}, ":authority": {"localhost:55443"}, "Host": {"localhost:55443"}}), }, }, } for _, tC := range testCases { t.Run(tC.desc, func(t *testing.T) { - cl := client.FromContext(contextWithClient(tC.input, tC.doMetadata)) + cl := consumerconnection.InfoFromContext(contextWithClient(tC.input, tC.doMetadata)) assert.Equal(t, tC.expected, cl) }) } @@ -817,7 +817,7 @@ func TestStreamInterceptorEnhancesClient(t *testing.T) { // verify assert.NoError(t, err) - cl := client.FromContext(outContext) + cl := consumerconnection.InfoFromContext(outContext) assert.Equal(t, "1.1.1.1", cl.Addr.String()) } @@ -901,7 +901,7 @@ func TestClientInfoInterceptors(t *testing.T) { } // verify - cl := client.FromContext(mock.recordedContext) + cl := consumerconnection.InfoFromContext(mock.recordedContext) // the client address is something like 127.0.0.1:41086 assert.Contains(t, cl.Addr.String(), "127.0.0.1") @@ -915,7 +915,7 @@ func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) { authCalled := false authFunc := func(context.Context, map[string][]string) (context.Context, error) { authCalled = true - ctx := client.NewContext(context.Background(), client.Info{ + ctx := consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{ Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)}, }) @@ -923,7 +923,7 @@ func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) { } handler := func(ctx context.Context, _ any) (any, error) { handlerCalled = true - cl := client.FromContext(ctx) + cl := consumerconnection.InfoFromContext(ctx) assert.Equal(t, "1.2.3.4", cl.Addr.String()) return nil, nil } @@ -987,14 +987,14 @@ func TestDefaultStreamInterceptorAuthSucceeded(t *testing.T) { authCalled := false authFunc := func(context.Context, map[string][]string) (context.Context, error) { authCalled = true - ctx := client.NewContext(context.Background(), client.Info{ + ctx := consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{ Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)}, }) return ctx, nil } handler := func(_ any, stream grpc.ServerStream) error { // ensure that the client information is propagated down to the underlying stream - cl := client.FromContext(stream.Context()) + cl := consumerconnection.InfoFromContext(stream.Context()) assert.Equal(t, "1.2.3.4", cl.Addr.String()) handlerCalled = true return nil diff --git a/config/configgrpc/go.mod b/config/configgrpc/go.mod index 8af35bf3197..717d8191acd 100644 --- a/config/configgrpc/go.mod +++ b/config/configgrpc/go.mod @@ -5,7 +5,6 @@ go 1.21 require ( github.com/mostynb/go-grpc-compression v1.2.2 github.com/stretchr/testify v1.9.0 - go.opentelemetry.io/collector v0.98.0 go.opentelemetry.io/collector/component v0.98.0 go.opentelemetry.io/collector/config/configauth v0.98.0 go.opentelemetry.io/collector/config/configcompression v1.5.0 @@ -14,6 +13,7 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.98.0 go.opentelemetry.io/collector/config/configtls v0.98.0 go.opentelemetry.io/collector/config/internal v0.98.0 + go.opentelemetry.io/collector/consumer v0.98.0 go.opentelemetry.io/collector/extension/auth v0.98.0 go.opentelemetry.io/collector/pdata v1.5.0 go.opentelemetry.io/collector/pdata/testdata v0.98.0 @@ -49,6 +49,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.52.3 // indirect github.com/prometheus/procfs v0.12.0 // indirect + go.opentelemetry.io/collector v0.98.0 // indirect go.opentelemetry.io/collector/confmap v0.98.0 // indirect go.opentelemetry.io/collector/extension v0.98.0 // indirect go.opentelemetry.io/collector/featuregate v1.5.0 // indirect diff --git a/config/confighttp/clientinfohandler.go b/config/confighttp/clientinfohandler.go index 464153bc2c7..3e97be30002 100644 --- a/config/confighttp/clientinfohandler.go +++ b/config/confighttp/clientinfohandler.go @@ -8,10 +8,10 @@ import ( "net" "net/http" - "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/consumer/consumerconnection" ) -// clientInfoHandler is an http.Handler that enhances the incoming request context with client.Info. +// clientInfoHandler is an http.Handler that enhances the incoming request context with consumerconnection.Info. type clientInfoHandler struct { next http.Handler @@ -20,16 +20,16 @@ type clientInfoHandler struct { } // ServeHTTP intercepts incoming HTTP requests, replacing the request's context with one that contains -// a client.Info containing the client's IP address. +// a consumerconnection.Info containing the client's IP address. func (h *clientInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { req = req.WithContext(contextWithClient(req, h.includeMetadata)) h.next.ServeHTTP(w, req) } -// contextWithClient attempts to add the client IP address to the client.Info from the context. When no -// client.Info exists in the context, one is created. +// contextWithClient attempts to add the client IP address to the consumerconnection.Info from the context. When no +// consumerconnection.Info exists in the context, one is created. func contextWithClient(req *http.Request, includeMetadata bool) context.Context { - cl := client.FromContext(req.Context()) + cl := consumerconnection.InfoFromContext(req.Context()) ip := parseIP(req.RemoteAddr) if ip != nil { @@ -38,14 +38,14 @@ func contextWithClient(req *http.Request, includeMetadata bool) context.Context if includeMetadata { md := req.Header.Clone() - if len(md.Get(client.MetadataHostName)) == 0 && req.Host != "" { - md.Add(client.MetadataHostName, req.Host) + if len(md.Get(consumerconnection.MetadataHostName)) == 0 && req.Host != "" { + md.Add(consumerconnection.MetadataHostName, req.Host) } - cl.Metadata = client.NewMetadata(md) + cl.Metadata = consumerconnection.NewMetadata(md) } - ctx := client.NewContext(req.Context(), cl) + ctx := consumerconnection.NewContextWithInfo(req.Context(), cl) return ctx } diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 303075571db..bf2b1854ee4 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -402,7 +402,7 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin // TODO: Consider to use component ID string as prefix for all the operations. handler = otelhttp.NewHandler(handler, "", otelOpts...) - // wrap the current handler in an interceptor that will add client.Info to the request's context + // wrap the current handler in an interceptor that will add consumerconnection.Info to the request's context handler = &clientInfoHandler{ next: handler, includeMetadata: hss.IncludeMetadata, diff --git a/config/confighttp/confighttp_test.go b/config/confighttp/confighttp_test.go index 635f9415a2f..ac5d789a5ee 100644 --- a/config/confighttp/confighttp_test.go +++ b/config/confighttp/confighttp_test.go @@ -21,7 +21,6 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" - "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configauth" @@ -29,6 +28,7 @@ import ( "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer/consumerconnection" "go.opentelemetry.io/collector/extension/auth" "go.opentelemetry.io/collector/extension/auth/authtest" ) @@ -1114,19 +1114,19 @@ func TestContextWithClient(t *testing.T) { desc string input *http.Request doMetadata bool - expected client.Info + expected consumerconnection.Info }{ { desc: "request without client IP or headers", input: &http.Request{}, - expected: client.Info{}, + expected: consumerconnection.Info{}, }, { desc: "request with client IP", input: &http.Request{ RemoteAddr: "1.2.3.4:55443", }, - expected: client.Info{ + expected: consumerconnection.Info{ Addr: &net.IPAddr{ IP: net.IPv4(1, 2, 3, 4), }, @@ -1138,7 +1138,7 @@ func TestContextWithClient(t *testing.T) { Header: map[string][]string{"x-test-header": {"test-value"}}, }, doMetadata: false, - expected: client.Info{}, + expected: consumerconnection.Info{}, }, { desc: "request with client headers", @@ -1146,8 +1146,8 @@ func TestContextWithClient(t *testing.T) { Header: map[string][]string{"x-test-header": {"test-value"}}, }, doMetadata: true, - expected: client.Info{ - Metadata: client.NewMetadata(map[string][]string{"x-test-header": {"test-value"}}), + expected: consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{"x-test-header": {"test-value"}}), }, }, { @@ -1157,15 +1157,15 @@ func TestContextWithClient(t *testing.T) { Host: "localhost:55443", }, doMetadata: true, - expected: client.Info{ - Metadata: client.NewMetadata(map[string][]string{"x-test-header": {"test-value"}, "Host": {"localhost:55443"}}), + expected: consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{"x-test-header": {"test-value"}, "Host": {"localhost:55443"}}), }, }, } for _, tC := range testCases { t.Run(tC.desc, func(t *testing.T) { ctx := contextWithClient(tC.input, tC.doMetadata) - assert.Equal(t, tC.expected, client.FromContext(ctx)) + assert.Equal(t, tC.expected, consumerconnection.InfoFromContext(ctx)) }) } } diff --git a/config/confighttp/go.mod b/config/confighttp/go.mod index 44e19a024bd..804ab3fdadd 100644 --- a/config/confighttp/go.mod +++ b/config/confighttp/go.mod @@ -7,7 +7,6 @@ require ( github.com/klauspost/compress v1.17.8 github.com/rs/cors v1.10.1 github.com/stretchr/testify v1.9.0 - go.opentelemetry.io/collector v0.98.0 go.opentelemetry.io/collector/component v0.98.0 go.opentelemetry.io/collector/config/configauth v0.98.0 go.opentelemetry.io/collector/config/configcompression v1.5.0 @@ -15,6 +14,7 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.98.0 go.opentelemetry.io/collector/config/configtls v0.98.0 go.opentelemetry.io/collector/config/internal v0.98.0 + go.opentelemetry.io/collector/consumer v0.98.0 go.opentelemetry.io/collector/extension/auth v0.98.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.50.0 go.opentelemetry.io/otel v1.25.0 @@ -44,6 +44,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.52.3 // indirect github.com/prometheus/procfs v0.12.0 // indirect + go.opentelemetry.io/collector v0.98.0 // indirect go.opentelemetry.io/collector/confmap v0.98.0 // indirect go.opentelemetry.io/collector/extension v0.98.0 // indirect go.opentelemetry.io/collector/featuregate v1.5.0 // indirect diff --git a/consumer/consumerconnection/client.go b/consumer/consumerconnection/client.go new file mode 100644 index 00000000000..00bbb65e702 --- /dev/null +++ b/consumer/consumerconnection/client.go @@ -0,0 +1,166 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package consumerconnection contains generic representations of clients connecting to +// different components. Components can make use of this information to make decisions +// related to grouping of batches, tenancy, load balancing, tagging, among others. +// +// The structs defined here are typically used within the context that is +// propagated down the pipeline, with the values being produced by +// authenticators and/or receivers, and consumed by processors and exporters. +// +// # Producers +// +// Receivers are typically responsible for obtaining a consumerconnection.Info from the current +// context and enhancing the consumerconnection.Info with the net.Addr from the peer, +// storing a new consumerconnection.Info into the context that it passes down. For HTTP +// requests, the net.Addr is typically the IP address of the client. +// +// Components should typically delegate this processing to helpers such +// as the confighttp or configgrpc packages: both contain interceptors that will +// enhance the context with the consumerconnection.Info, such that no actions are needed by +// receivers that are built using confighttp.HTTPServerSettings or +// configgrpc.GRPCServerSettings. +// +// Authenticators are responsible for obtaining a consumerconnection.Info from the current +// context, enhancing the consumerconnection.Info with an implementation of consumerconnection.AuthData, +// and storing a new consumerconnection.Info into the context that it passes down. The +// attribute names should be documented with their return types and considered +// part of the public API for the authenticator. +// +// # Consumers +// +// Provided that the pipeline does not contain processors that would discard or +// rewrite the context, such as the batch processor, processors and exporters +// have access to the consumerconnection.Info via consumerconnection.FromContext. Among other usages, +// this data can be used to: +// +// - annotate data points with authentication data (username, tenant, ...) +// +// - route data points based on authentication data +// +// - rate limit client calls based on IP addresses +// +// Processors and exporters relying on the existence of data from the +// consumerconnection.Info, especially consumerconnection.AuthData, should clearly document this as part +// of the component's README file. The expected pattern for consuming data is to +// allow users to specify the attribute name to use in the component. The +// expected data type should also be communicated to users, who should then +// compare this with the authenticators that are part of the pipeline. For +// example, assuming that the OIDC authenticator pushes a "subject" string +// attribute and that we have a hypothetical "authprinter" processor that prints +// the "username" to the console, this is how an OpenTelemetry Collector +// configuration would look like: +// +// extensions: +// oidc: +// issuer_url: http://localhost:8080/auth/realms/opentelemetry +// audience: collector +// receivers: +// otlp: +// protocols: +// grpc: +// auth: +// authenticator: oidc +// processors: +// authprinter: +// attribute: subject +// exporters: +// debug: +// service: +// extensions: [oidc] +// pipelines: +// traces: +// receivers: [otlp] +// processors: [authprinter] +// exporters: [debug] +package consumerconnection // import "go.opentelemetry.io/collector/consumer/consumerconnection" + +import ( + "context" + "net" + "strings" +) + +type ctxKey struct{} + +// Info contains data related to the clients connecting to receivers. +type Info struct { + // Addr for the client connecting to this collector. Available in a + // best-effort basis, and generally reliable for receivers making use of + // confighttp.ToServer and configgrpc.ToServerOption. + Addr net.Addr + + // Auth information from the incoming request as provided by + // configauth.ServerAuthenticator implementations tied to the receiver for + // this connection. + Auth AuthData + + // Metadata is the request metadata from the client connecting to this connector. + Metadata Metadata +} + +// AuthData represents the authentication data as seen by authenticators tied to +// the receivers. +type AuthData interface { + // GetAttribute returns the value for the given attribute. Authenticator + // implementations might define different data types for different + // attributes. While "string" is used most of the time, a key named + // "membership" might return a list of strings. + GetAttribute(string) any + + // GetAttributeNames returns the names of all attributes in this authentication data. + GetAttributeNames() []string +} + +const MetadataHostName = "Host" + +// NewContextWithInfo takes an existing context and derives a new context with the +// consumerconnection.Info value stored on it. +func NewContextWithInfo(ctx context.Context, c Info) context.Context { + return context.WithValue(ctx, ctxKey{}, c) +} + +// InfoFromContext takes a context and returns a ClientInfo from it. +// When a ClientInfo isn't present, a new empty one is returned. +func InfoFromContext(ctx context.Context) Info { + c, ok := ctx.Value(ctxKey{}).(Info) + if !ok { + c = Info{} + } + return c +} + +// Metadata is an immutable map, meant to contain request metadata. +type Metadata struct { + data map[string][]string +} + +// NewMetadata creates a new Metadata object to use in Info. +func NewMetadata(md map[string][]string) Metadata { + c := make(map[string][]string, len(md)) + for k, v := range md { + c[strings.ToLower(k)] = v + } + return Metadata{ + data: c, + } +} + +// Get gets the value of the key from metadata, returning a copy. +// The key lookup is case-insensitive. +func (m Metadata) Get(key string) []string { + if len(m.data) == 0 { + return nil + } + + vals := m.data[strings.ToLower(key)] + if len(vals) == 0 { + return nil + } + + ret := make([]string, len(vals)) + copy(ret, vals) + + return ret +} diff --git a/consumer/consumerconnection/client_test.go b/consumer/consumerconnection/client_test.go new file mode 100644 index 00000000000..907ae76b1f9 --- /dev/null +++ b/consumer/consumerconnection/client_test.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package client contains generic representations of clients connecting to +// different receivers +package consumerconnection + +import ( + "context" + "net" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewContext(t *testing.T) { + testCases := []struct { + desc string + cl Info + }{ + { + desc: "valid client", + cl: Info{ + Addr: &net.IPAddr{ + IP: net.IPv4(1, 2, 3, 4), + }, + }, + }, + { + desc: "nil client", + cl: Info{}, + }, + } + for _, tC := range testCases { + t.Run(tC.desc, func(t *testing.T) { + ctx := NewContextWithInfo(context.Background(), tC.cl) + assert.Equal(t, ctx.Value(ctxKey{}), tC.cl) + }) + } +} + +func TestFromContext(t *testing.T) { + testCases := []struct { + desc string + input context.Context + expected Info + }{ + { + desc: "context with client", + input: context.WithValue(context.Background(), ctxKey{}, Info{ + Addr: &net.IPAddr{ + IP: net.IPv4(1, 2, 3, 4), + }, + }), + expected: Info{ + Addr: &net.IPAddr{ + IP: net.IPv4(1, 2, 3, 4), + }, + }, + }, + { + desc: "context without client", + input: context.Background(), + expected: Info{}, + }, + { + desc: "context with something else in the key", + input: context.WithValue(context.Background(), ctxKey{}, "unexpected!"), + expected: Info{}, + }, + } + for _, tC := range testCases { + t.Run(tC.desc, func(t *testing.T) { + assert.Equal(t, InfoFromContext(tC.input), tC.expected) + }) + } +} + +func TestMetadata(t *testing.T) { + source := map[string][]string{"test-key": {"test-val"}, "TEST-KEY-2": {"test-val"}} + md := NewMetadata(source) + assert.Equal(t, []string{"test-val"}, md.Get("test-key")) + assert.Equal(t, []string{"test-val"}, md.Get("test-KEY")) // case insensitive lookup + assert.Equal(t, []string{"test-val"}, md.Get("test-key-2")) // case insensitive lookup + + // test if copy. In regular use, source cannot change + val := md.Get("test-key") + source["test-key"][0] = "abc" + assert.Equal(t, []string{"test-val"}, val) + + assert.Empty(t, md.Get("non-existent-key")) +} + +func TestUninstantiatedMetadata(t *testing.T) { + i := Info{} + assert.Empty(t, i.Metadata.Get("test")) +} diff --git a/consumer/consumerconnection/doc_test.go b/consumer/consumerconnection/doc_test.go new file mode 100644 index 00000000000..6950e7e56bb --- /dev/null +++ b/consumer/consumerconnection/doc_test.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumerconnection_test + +import ( + "context" + "fmt" + "net" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerconnection" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func Example_receiver() { + // Your receiver get a next consumer when it's constructed + var next consumer.Traces + + // You'll convert the incoming data into pipeline data + td := ptrace.NewTraces() + + // You probably have a context with client metadata from your listener or + // scraper + ctx := context.Background() + + // Get the client from the context: if it doesn't exist, FromContext will + // create one + cl := consumerconnection.InfoFromContext(ctx) + + // Extract the client information based on your original context and set it + // to Addr + cl.Addr = &net.IPAddr{ // nolint + IP: net.IPv4(1, 2, 3, 4), + } + + // When you are done, propagate the context down the pipeline to the next + // consumer + next.ConsumeTraces(ctx, td) // nolint +} + +func Example_processor() { + // Your processor or exporter will receive a context, from which you get the + // client information + ctx := context.Background() + cl := consumerconnection.InfoFromContext(ctx) + + // And use the information from the client as you need + fmt.Println(cl.Addr) +} + +func Example_authenticator() { + // Your configauth.AuthenticateFunc receives a context + ctx := context.Background() + + // Get the client from the context: if it doesn't exist, FromContext will + // create one + cl := consumerconnection.InfoFromContext(ctx) + + // After a successful authentication, place the data you want to propagate + // as part of an AuthData implementation of your own + cl.Auth = &exampleAuthData{ + username: "jdoe", + } + + // Your configauth.AuthenticateFunc should return this new context + _ = consumerconnection.NewContextWithInfo(ctx, cl) +} + +type exampleAuthData struct { + username string +} + +func (e *exampleAuthData) GetAttribute(key string) any { + if key == "username" { + return e.username + } + return nil +} +func (e *exampleAuthData) GetAttributeNames() []string { + return []string{"username"} +} diff --git a/consumer/consumerconnection/package_test.go b/consumer/consumerconnection/package_test.go new file mode 100644 index 00000000000..6d8263a5140 --- /dev/null +++ b/consumer/consumerconnection/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumerconnection + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 60c6fa10ef6..e3b0aee068c 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -16,9 +16,9 @@ import ( "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" - "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerconnection" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -149,8 +149,8 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func // newShard gets or creates a batcher corresponding with attrs. func (bp *batchProcessor) newShard(md map[string][]string) *shard { - exportCtx := client.NewContext(context.Background(), client.Info{ - Metadata: client.NewMetadata(md), + exportCtx := consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(md), }) b := &shard{ processor: bp, @@ -295,7 +295,7 @@ type multiShardBatcher struct { func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { // Get each metadata key value, form the corresponding // attribute set for use as a map lookup key. - info := client.FromContext(ctx) + info := consumerconnection.InfoFromContext(ctx) md := map[string][]string{} var attrs []attribute.KeyValue for _, k := range mb.metadataKeys { diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 6e8033fb387..cadf48d4689 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -14,10 +14,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerconnection" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" @@ -884,7 +884,7 @@ func formatTwo(first, second []string) string { } func (mts *metadataTracesSink) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - info := client.FromContext(ctx) + info := consumerconnection.InfoFromContext(ctx) token1 := info.Metadata.Get("token1") token2 := info.Metadata.Get("token2") mts.lock.Lock() @@ -914,27 +914,27 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { bg := context.Background() callCtxs := []context.Context{ - client.NewContext(bg, client.Info{ - Metadata: client.NewMetadata(map[string][]string{ + consumerconnection.NewContextWithInfo(bg, consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{ "token1": {"single"}, "token3": {"n/a"}, }), }), - client.NewContext(bg, client.Info{ - Metadata: client.NewMetadata(map[string][]string{ + consumerconnection.NewContextWithInfo(bg, consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{ "token1": {"single"}, "token2": {"one", "two"}, "token4": {"n/a"}, }), }), - client.NewContext(bg, client.Info{ - Metadata: client.NewMetadata(map[string][]string{ + consumerconnection.NewContextWithInfo(bg, consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{ "token1": nil, "token2": {"single"}, }), }), - client.NewContext(bg, client.Info{ - Metadata: client.NewMetadata(map[string][]string{ + consumerconnection.NewContextWithInfo(bg, consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{ "token1": {"one", "two", "three"}, "token2": {"single"}, "token3": {"n/a"}, @@ -978,7 +978,7 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { // This test ensures each context had the expected number of spans. require.Equal(t, len(callCtxs), len(sink.spanCountByToken12)) for idx, ctx := range callCtxs { - md := client.FromContext(ctx).Metadata + md := consumerconnection.InfoFromContext(ctx).Metadata exp := formatTwo(md.Get("token1"), md.Get("token2")) require.Equal(t, expectByContext[idx], sink.spanCountByToken12[exp]) } @@ -1008,8 +1008,8 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { bg := context.Background() for requestNum := 0; requestNum < cardLimit; requestNum++ { td := testdata.GenerateTraces(1) - ctx := client.NewContext(bg, client.Info{ - Metadata: client.NewMetadata(map[string][]string{ + ctx := consumerconnection.NewContextWithInfo(bg, consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{ "token": {fmt.Sprint(requestNum)}, }), }) @@ -1018,8 +1018,8 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { } td := testdata.GenerateTraces(1) - ctx := client.NewContext(bg, client.Info{ - Metadata: client.NewMetadata(map[string][]string{ + ctx := consumerconnection.NewContextWithInfo(bg, consumerconnection.Info{ + Metadata: consumerconnection.NewMetadata(map[string][]string{ "token": {"limit_exceeded"}, }), })