Skip to content

Commit

Permalink
Provide better helpers for configgrpc, consistent with confighttp
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Oct 31, 2022
1 parent 3e91f3d commit a2df621
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 60 deletions.
18 changes: 18 additions & 0 deletions .chloggen/providehelpers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: config/configgrpc

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Provide better helpers for configgrpc, consistent with confighttp

# One or more tracking issues or pull requests related to the change
issues: [6441]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- Deprecate `GRPCClientSettings.ToDialOptions` in favor of `GRPCClientSettings.ToClientConn`.
- Deprecate `GRPCServerSettings.ToServerOption` in favor of `GRPCServerSettings.ToServer`.
34 changes: 32 additions & 2 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,25 @@ func (gcs *GRPCClientSettings) isSchemeHTTPS() bool {
return strings.HasPrefix(gcs.Endpoint, "https://")
}

// ToDialOptions maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC.
// ToClientConn creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use grpc.WithBlock() dial option.
func (gcs *GRPCClientSettings) ToClientConn(ctx context.Context, host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts, err := gcs.toDialOptions(host, settings)
if err != nil {
return nil, err
}
opts = append(opts, extraOpts...)
return grpc.DialContext(ctx, gcs.SanitizedEndpoint(), opts...)
}

// Deprecated: [v0.64.0] use ToClientConn.
func (gcs *GRPCClientSettings) ToDialOptions(host component.Host, settings component.TelemetrySettings) ([]grpc.DialOption, error) {
return gcs.toDialOptions(host, settings)
}

func (gcs *GRPCClientSettings) toDialOptions(host component.Host, settings component.TelemetrySettings) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
if configcompression.IsCompressed(gcs.Compression) {
cp, err := getGRPCCompressionName(gcs.Compression)
Expand Down Expand Up @@ -271,8 +288,21 @@ func (gss *GRPCServerSettings) ToListener() (net.Listener, error) {
return gss.NetAddr.Listen()
}

// ToServerOption maps configgrpc.GRPCServerSettings to a slice of server options for gRPC.
func (gss *GRPCServerSettings) ToServer(host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.ServerOption) (*grpc.Server, error) {
opts, err := gss.toServerOption(host, settings)
if err != nil {
return nil, err
}
opts = append(opts, extraOpts...)
return grpc.NewServer(opts...), nil
}

// Deprecated: [v0.64.0] use ToServer.
func (gss *GRPCServerSettings) ToServerOption(host component.Host, settings component.TelemetrySettings) ([]grpc.ServerOption, error) {
return gss.toServerOption(host, settings)
}

func (gss *GRPCServerSettings) toServerOption(host component.Host, settings component.TelemetrySettings) ([]grpc.ServerOption, error) {
var opts []grpc.ServerOption

if gss.TLSSetting != nil {
Expand Down
70 changes: 22 additions & 48 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestDefaultGrpcClientSettings(t *testing.T) {
Insecure: true,
},
}
opts, err := gcs.ToDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
opts, err := gcs.toDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
assert.NoError(t, err)
assert.Len(t, opts, 3)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestAllGrpcClientSettings(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
opts, err := test.settings.ToDialOptions(test.host, tt.TelemetrySettings)
opts, err := test.settings.toDialOptions(test.host, tt.TelemetrySettings)
assert.NoError(t, err)
assert.Len(t, opts, 9)
})
Expand All @@ -163,9 +163,7 @@ func TestAllGrpcClientSettings(t *testing.T) {

func TestDefaultGrpcServerSettings(t *testing.T) {
gss := &GRPCServerSettings{}
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

opts, err := gss.toServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
assert.Len(t, opts, 2)
}
Expand Down Expand Up @@ -198,21 +196,13 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) {
},
},
}
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

opts, err := gss.toServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
assert.Len(t, opts, 9)
}

func TestGrpcServerAuthSettings(t *testing.T) {
gss := &GRPCServerSettings{}

// sanity check
_, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)

// test
gss.Auth = &configauth.Authentication{
AuthenticatorID: config.NewComponentID("mock"),
}
Expand All @@ -221,12 +211,9 @@ func TestGrpcServerAuthSettings(t *testing.T) {
config.NewComponentID("mock"): configauth.NewServerAuthenticator(),
},
}
opts, err := gss.ToServerOption(host, componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

// verify
srv, err := gss.ToServer(host, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
assert.NotNil(t, opts)
assert.NotNil(t, srv)
}

func TestGRPCClientSettingsError(t *testing.T) {
Expand Down Expand Up @@ -345,8 +332,7 @@ func TestGRPCClientSettingsError(t *testing.T) {
}
for _, test := range tests {
t.Run(test.err, func(t *testing.T) {
opts, err := test.settings.ToDialOptions(test.host, tt.TelemetrySettings)
assert.Nil(t, opts)
_, err := test.settings.ToClientConn(context.Background(), test.host, tt.TelemetrySettings)
assert.Error(t, err)
assert.Regexp(t, test.err, err)
})
Expand All @@ -365,7 +351,7 @@ func TestUseSecure(t *testing.T) {
TLSSetting: configtls.TLSClientSetting{},
Keepalive: nil,
}
dialOpts, err := gcs.ToDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
dialOpts, err := gcs.toDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
assert.NoError(t, err)
assert.Len(t, dialOpts, 3)
}
Expand Down Expand Up @@ -418,9 +404,7 @@ func TestGRPCServerSettingsError(t *testing.T) {
}
for _, test := range tests {
t.Run(test.err, func(t *testing.T) {
opts, err := test.settings.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
_ = grpc.NewServer(opts...)

_, err := test.settings.ToServer(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
assert.Regexp(t, test.err, err)
})
}
Expand Down Expand Up @@ -563,9 +547,8 @@ func TestHttpReception(t *testing.T) {
}
ln, err := gss.ToListener()
assert.NoError(t, err)
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
s, err := gss.ToServer(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
s := grpc.NewServer(opts...)
ptraceotlp.RegisterGRPCServer(s, &grpcTraceServer{})

go func() {
Expand All @@ -576,13 +559,11 @@ func TestHttpReception(t *testing.T) {
Endpoint: ln.Addr().String(),
TLSSetting: *test.tlsClientCreds,
}
clientOpts, errClient := gcs.ToDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
grpcClientConn, errClient := gcs.ToClientConn(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings)
assert.NoError(t, errClient)
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
assert.NoError(t, errDial)
client := ptraceotlp.NewGRPCClient(grpcClientConn)
c := ptraceotlp.NewGRPCClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
resp, errResp := client.Export(ctx, ptraceotlp.NewExportRequest(), grpc.WaitForReady(true))
resp, errResp := c.Export(ctx, ptraceotlp.NewExportRequest(), grpc.WaitForReady(true))
if test.hasError {
assert.Error(t, errResp)
} else {
Expand Down Expand Up @@ -612,13 +593,12 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
}
ln, err := gss.ToListener()
assert.NoError(t, err)
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
srv, err := gss.ToServer(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)
s := grpc.NewServer(opts...)
ptraceotlp.RegisterGRPCServer(s, &grpcTraceServer{})
ptraceotlp.RegisterGRPCServer(srv, &grpcTraceServer{})

go func() {
_ = s.Serve(ln)
_ = srv.Serve(ln)
}()

gcs := &GRPCClientSettings{
Expand All @@ -627,17 +607,15 @@ func TestReceiveOnUnixDomainSocket(t *testing.T) {
Insecure: true,
},
}
clientOpts, errClient := gcs.ToDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
grpcClientConn, errClient := gcs.ToClientConn(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings)
assert.NoError(t, errClient)
grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
assert.NoError(t, errDial)
client := ptraceotlp.NewGRPCClient(grpcClientConn)
c := ptraceotlp.NewGRPCClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
resp, errResp := client.Export(ctx, ptraceotlp.NewExportRequest(), grpc.WaitForReady(true))
resp, errResp := c.Export(ctx, ptraceotlp.NewExportRequest(), grpc.WaitForReady(true))
assert.NoError(t, errResp)
assert.NotNil(t, resp)
cancelFunc()
s.Stop()
srv.Stop()
}

func TestContextWithClient(t *testing.T) {
Expand Down Expand Up @@ -808,9 +786,8 @@ func TestClientInfoInterceptors(t *testing.T) {
Transport: "tcp",
},
}
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
srv, err := gss.ToServer(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
srv := grpc.NewServer(opts...)
ptraceotlp.RegisterGRPCServer(srv, mock)

defer srv.Stop()
Expand Down Expand Up @@ -838,12 +815,9 @@ func TestClientInfoInterceptors(t *testing.T) {
require.NoError(t, tt.Shutdown(context.Background()))
}()

clientOpts, errClient := gcs.ToDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
grpcClientConn, errClient := gcs.ToClientConn(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings)
require.NoError(t, errClient)

grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
require.NoError(t, errDial)

cl := ptraceotlp.NewGRPCClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFunc()
Expand Down
8 changes: 1 addition & 7 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,7 @@ func newExporter(cfg config.Exporter, set component.ExporterCreateSettings) (*ex
// start actually creates the gRPC connection. The client construction is deferred till this point as this
// is the only place we get hold of Extensions which are required to construct auth round tripper.
func (e *exporter) start(ctx context.Context, host component.Host) (err error) {
dialOpts, err := e.config.GRPCClientSettings.ToDialOptions(host, e.settings)
if err != nil {
return err
}
dialOpts = append(dialOpts, grpc.WithUserAgent(e.userAgent))

if e.clientConn, err = grpc.DialContext(ctx, e.config.GRPCClientSettings.SanitizedEndpoint(), dialOpts...); err != nil {
if e.clientConn, err = e.config.GRPCClientSettings.ToClientConn(ctx, host, e.settings, grpc.WithUserAgent(e.userAgent)); err != nil {
return err
}

Expand Down
4 changes: 1 addition & 3 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,10 @@ func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host
func (r *otlpReceiver) startProtocolServers(host component.Host) error {
var err error
if r.cfg.GRPC != nil {
var opts []grpc.ServerOption
opts, err = r.cfg.GRPC.ToServerOption(host, r.settings.TelemetrySettings)
r.serverGRPC, err = r.cfg.GRPC.ToServer(host, r.settings.TelemetrySettings)
if err != nil {
return err
}
r.serverGRPC = grpc.NewServer(opts...)

if r.traceReceiver != nil {
ptraceotlp.RegisterGRPCServer(r.serverGRPC, r.traceReceiver)
Expand Down

0 comments on commit a2df621

Please sign in to comment.