Skip to content

Commit

Permalink
Move client features to consumerconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerHelmuth committed Apr 18, 2024
1 parent fcdfdaa commit bc7f2f5
Show file tree
Hide file tree
Showing 15 changed files with 477 additions and 78 deletions.
25 changes: 25 additions & 0 deletions .chloggen/client-move-to-consumer.yaml
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: collector/client

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecates collector/client. Use `consumerconnection` instead.

# One or more tracking issues or pull requests related to the change
issues: [9996]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
13 changes: 13 additions & 0 deletions client/client.go
Expand Up @@ -86,6 +86,8 @@ import (
type ctxKey struct{}

// Info contains data related to the clients connecting to receivers.
//
// Deprecated: [v0.99.0] Use consumerconnection.Info instead
type Info struct {
// Addr for the client connecting to this collector. Available in a
// best-effort basis, and generally reliable for receivers making use of
Expand All @@ -103,6 +105,8 @@ type Info struct {

// AuthData represents the authentication data as seen by authenticators tied to
// the receivers.
//
// Deprecated: [v0.99.0] Use consumerconnection.AuthData instead
type AuthData interface {
// GetAttribute returns the value for the given attribute. Authenticator
// implementations might define different data types for different
Expand All @@ -114,16 +118,21 @@ type AuthData interface {
GetAttributeNames() []string
}

// Deprecated: [v0.99.0] Use consumerconnection.MetadataHostName instead
const MetadataHostName = "Host"

// NewContext takes an existing context and derives a new context with the
// client.Info value stored on it.
//
// Deprecated: [v0.99.0] Use consumerconnection.NewContextWithConnection instead
func NewContext(ctx context.Context, c Info) context.Context {
return context.WithValue(ctx, ctxKey{}, c)
}

// FromContext takes a context and returns a ClientInfo from it.
// When a ClientInfo isn't present, a new empty one is returned.
//
// Deprecated: [v0.99.0] Use consumerconnection.InfoFromContext instead
func FromContext(ctx context.Context) Info {
c, ok := ctx.Value(ctxKey{}).(Info)
if !ok {
Expand All @@ -133,11 +142,15 @@ func FromContext(ctx context.Context) Info {
}

// Metadata is an immutable map, meant to contain request metadata.
//
// Deprecated: [v0.99.0] Use consumerconnection.Metadata instead
type Metadata struct {
data map[string][]string
}

// NewMetadata creates a new Metadata object to use in Info.
//
// Deprecated: [v0.99.0] Use consumerconnection.NewMetadata instead
func NewMetadata(md map[string][]string) Metadata {
c := make(map[string][]string, len(md))
for k, v := range md {
Expand Down
18 changes: 9 additions & 9 deletions config/configgrpc/configgrpc.go
Expand Up @@ -24,7 +24,6 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
Expand All @@ -33,6 +32,7 @@ import (
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/config/internal"
"go.opentelemetry.io/collector/consumer/consumerconnection"
"go.opentelemetry.io/collector/extension/auth"
)

Expand Down Expand Up @@ -390,7 +390,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err
}

// enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes
// a client.Info, potentially with the peer's address.
// a consumerconnection.Info, potentially with the peer's address.
func enhanceWithClientInformation(includeMetadata bool) func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
return func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
return handler(contextWithClient(ctx, includeMetadata), req)
Expand All @@ -403,23 +403,23 @@ func enhanceStreamWithClientInformation(includeMetadata bool) func(srv any, ss g
}
}

// contextWithClient attempts to add the peer address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
// contextWithClient attempts to add the peer address to the consumerconnection.Info from the context. When no
// consumerconnection.Info exists in the context, one is created.
func contextWithClient(ctx context.Context, includeMetadata bool) context.Context {
cl := client.FromContext(ctx)
cl := consumerconnection.InfoFromContext(ctx)
if p, ok := peer.FromContext(ctx); ok {
cl.Addr = p.Addr
}
if includeMetadata {
if md, ok := metadata.FromIncomingContext(ctx); ok {
copiedMD := md.Copy()
if len(md[client.MetadataHostName]) == 0 && len(md[":authority"]) > 0 {
copiedMD[client.MetadataHostName] = md[":authority"]
if len(md[consumerconnection.MetadataHostName]) == 0 && len(md[":authority"]) > 0 {
copiedMD[consumerconnection.MetadataHostName] = md[":authority"]
}
cl.Metadata = client.NewMetadata(copiedMD)
cl.Metadata = consumerconnection.NewMetadata(copiedMD)
}
}
return client.NewContext(ctx, cl)
return consumerconnection.NewContextWithInfo(ctx, cl)
}

func authUnaryServerInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler, server auth.Server) (any, error) {
Expand Down
54 changes: 27 additions & 27 deletions config/configgrpc/configgrpc_test.go
Expand Up @@ -22,14 +22,14 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/consumerconnection"
"go.opentelemetry.io/collector/extension/auth"
"go.opentelemetry.io/collector/extension/auth/authtest"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
Expand Down Expand Up @@ -696,21 +696,21 @@ func TestContextWithClient(t *testing.T) {
desc string
input context.Context
doMetadata bool
expected client.Info
expected consumerconnection.Info
}{
{
desc: "no peer information, empty client",
input: context.Background(),
expected: client.Info{},
expected: consumerconnection.Info{},
},
{
desc: "existing client with IP, no peer information",
input: client.NewContext(context.Background(), client.Info{
input: consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
}),
expected: client.Info{
expected: consumerconnection.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
Expand All @@ -723,15 +723,15 @@ func TestContextWithClient(t *testing.T) {
IP: net.IPv4(1, 2, 3, 4),
},
}),
expected: client.Info{
expected: consumerconnection.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
},
},
{
desc: "existing client, existing IP gets overridden with peer information",
input: peer.NewContext(client.NewContext(context.Background(), client.Info{
input: peer.NewContext(consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
Expand All @@ -740,56 +740,56 @@ func TestContextWithClient(t *testing.T) {
IP: net.IPv4(1, 2, 3, 5),
},
}),
expected: client.Info{
expected: consumerconnection.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 5),
},
},
},
{
desc: "existing client with metadata",
input: client.NewContext(context.Background(), client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
input: consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{
Metadata: consumerconnection.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
}),
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
expected: consumerconnection.Info{
Metadata: consumerconnection.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
},
},
{
desc: "existing client with metadata in context",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{}),
metadata.Pairs("test-metadata-key", "test-value"),
),
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
expected: consumerconnection.Info{
Metadata: consumerconnection.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}}),
},
},
{
desc: "existing client with metadata in context, no metadata processing",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{}),
metadata.Pairs("test-metadata-key", "test-value"),
),
expected: client.Info{},
expected: consumerconnection.Info{},
},
{
desc: "existing client with Host and metadata",
input: metadata.NewIncomingContext(
client.NewContext(context.Background(), client.Info{}),
consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{}),
metadata.Pairs("test-metadata-key", "test-value", ":authority", "localhost:55443"),
),
doMetadata: true,
expected: client.Info{
Metadata: client.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}, ":authority": {"localhost:55443"}, "Host": {"localhost:55443"}}),
expected: consumerconnection.Info{
Metadata: consumerconnection.NewMetadata(map[string][]string{"test-metadata-key": {"test-value"}, ":authority": {"localhost:55443"}, "Host": {"localhost:55443"}}),
},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
cl := client.FromContext(contextWithClient(tC.input, tC.doMetadata))
cl := consumerconnection.InfoFromContext(contextWithClient(tC.input, tC.doMetadata))
assert.Equal(t, tC.expected, cl)
})
}
Expand Down Expand Up @@ -817,7 +817,7 @@ func TestStreamInterceptorEnhancesClient(t *testing.T) {
// verify
assert.NoError(t, err)

cl := client.FromContext(outContext)
cl := consumerconnection.InfoFromContext(outContext)
assert.Equal(t, "1.1.1.1", cl.Addr.String())
}

Expand Down Expand Up @@ -901,7 +901,7 @@ func TestClientInfoInterceptors(t *testing.T) {
}

// verify
cl := client.FromContext(mock.recordedContext)
cl := consumerconnection.InfoFromContext(mock.recordedContext)

// the client address is something like 127.0.0.1:41086
assert.Contains(t, cl.Addr.String(), "127.0.0.1")
Expand All @@ -915,15 +915,15 @@ func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) {
authCalled := false
authFunc := func(context.Context, map[string][]string) (context.Context, error) {
authCalled = true
ctx := client.NewContext(context.Background(), client.Info{
ctx := consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{
Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)},
})

return ctx, nil
}
handler := func(ctx context.Context, _ any) (any, error) {
handlerCalled = true
cl := client.FromContext(ctx)
cl := consumerconnection.InfoFromContext(ctx)
assert.Equal(t, "1.2.3.4", cl.Addr.String())
return nil, nil
}
Expand Down Expand Up @@ -987,14 +987,14 @@ func TestDefaultStreamInterceptorAuthSucceeded(t *testing.T) {
authCalled := false
authFunc := func(context.Context, map[string][]string) (context.Context, error) {
authCalled = true
ctx := client.NewContext(context.Background(), client.Info{
ctx := consumerconnection.NewContextWithInfo(context.Background(), consumerconnection.Info{
Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)},
})
return ctx, nil
}
handler := func(_ any, stream grpc.ServerStream) error {
// ensure that the client information is propagated down to the underlying stream
cl := client.FromContext(stream.Context())
cl := consumerconnection.InfoFromContext(stream.Context())
assert.Equal(t, "1.2.3.4", cl.Addr.String())
handlerCalled = true
return nil
Expand Down
3 changes: 2 additions & 1 deletion config/configgrpc/go.mod
Expand Up @@ -5,7 +5,6 @@ go 1.21
require (
github.com/mostynb/go-grpc-compression v1.2.2
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.98.0
go.opentelemetry.io/collector/component v0.98.0
go.opentelemetry.io/collector/config/configauth v0.98.0
go.opentelemetry.io/collector/config/configcompression v1.5.0
Expand All @@ -14,6 +13,7 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.98.0
go.opentelemetry.io/collector/config/configtls v0.98.0
go.opentelemetry.io/collector/config/internal v0.98.0
go.opentelemetry.io/collector/consumer v0.98.0
go.opentelemetry.io/collector/extension/auth v0.98.0
go.opentelemetry.io/collector/pdata v1.5.0
go.opentelemetry.io/collector/pdata/testdata v0.98.0
Expand Down Expand Up @@ -49,6 +49,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.52.3 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.opentelemetry.io/collector v0.98.0 // indirect
go.opentelemetry.io/collector/confmap v0.98.0 // indirect
go.opentelemetry.io/collector/extension v0.98.0 // indirect
go.opentelemetry.io/collector/featuregate v1.5.0 // indirect
Expand Down
20 changes: 10 additions & 10 deletions config/confighttp/clientinfohandler.go
Expand Up @@ -8,10 +8,10 @@ import (
"net"
"net/http"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer/consumerconnection"
)

// clientInfoHandler is an http.Handler that enhances the incoming request context with client.Info.
// clientInfoHandler is an http.Handler that enhances the incoming request context with consumerconnection.Info.
type clientInfoHandler struct {
next http.Handler

Expand All @@ -20,16 +20,16 @@ type clientInfoHandler struct {
}

// ServeHTTP intercepts incoming HTTP requests, replacing the request's context with one that contains
// a client.Info containing the client's IP address.
// a consumerconnection.Info containing the client's IP address.
func (h *clientInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
req = req.WithContext(contextWithClient(req, h.includeMetadata))
h.next.ServeHTTP(w, req)
}

// contextWithClient attempts to add the client IP address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
// contextWithClient attempts to add the client IP address to the consumerconnection.Info from the context. When no
// consumerconnection.Info exists in the context, one is created.
func contextWithClient(req *http.Request, includeMetadata bool) context.Context {
cl := client.FromContext(req.Context())
cl := consumerconnection.InfoFromContext(req.Context())

ip := parseIP(req.RemoteAddr)
if ip != nil {
Expand All @@ -38,14 +38,14 @@ func contextWithClient(req *http.Request, includeMetadata bool) context.Context

if includeMetadata {
md := req.Header.Clone()
if len(md.Get(client.MetadataHostName)) == 0 && req.Host != "" {
md.Add(client.MetadataHostName, req.Host)
if len(md.Get(consumerconnection.MetadataHostName)) == 0 && req.Host != "" {
md.Add(consumerconnection.MetadataHostName, req.Host)
}

cl.Metadata = client.NewMetadata(md)
cl.Metadata = consumerconnection.NewMetadata(md)
}

ctx := client.NewContext(req.Context(), cl)
ctx := consumerconnection.NewContextWithInfo(req.Context(), cl)
return ctx
}

Expand Down

0 comments on commit bc7f2f5

Please sign in to comment.