Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking Change: Remove usage of ocagent package for oc exporter #1516

Merged
merged 1 commit into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

## 🛑 Breaking changes 🛑

- Remove `reconnection_delay` from OpenCensus exporter #1516.

## v0.8.0 Beta

## 🚀 New components 🚀
Expand Down
3 changes: 1 addition & 2 deletions exporter/opencensusexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ The following settings can be optionally configured:
[grpc.WithKeepaliveParams()](https://godoc.org/google.golang.org/grpc#WithKeepaliveParams).
- `num_workers` (default = 2): number of workers that send the gRPC requests.
Optional.
- `reconnection_delay` (default = unset): time period between each reconnection
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
performed by the exporter.
- `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers.
See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md).
- `wait_for_ready`: Option not supported.

Example:

Expand Down
5 changes: 0 additions & 5 deletions exporter/opencensusexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package opencensusexporter

import (
"time"

"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
)
Expand All @@ -29,7 +27,4 @@ type Config struct {

// The number of workers that send the gRPC requests.
NumWorkers int `mapstructure:"num_workers"`

// The time period between each reconnection performed by the exporter.
ReconnectionDelay time.Duration `mapstructure:"reconnection_delay,omitempty"`
}
3 changes: 1 addition & 2 deletions exporter/opencensusexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func TestLoadConfig(t *testing.T) {
WriteBufferSize: 512 * 1024,
BalancerName: "round_robin",
},
NumWorkers: 123,
ReconnectionDelay: 15,
NumWorkers: 123,
})
}
82 changes: 7 additions & 75 deletions exporter/opencensusexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
package opencensusexporter

import (
"fmt"
"context"

"contrib.go.opencensus.io/exporter/ocagent"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand Down Expand Up @@ -54,82 +50,18 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
// We almost read 0 bytes, so no need to tune ReadBufferSize.
WriteBufferSize: 512 * 1024,
},
NumWorkers: 2,
}
}

// CreateTraceExporter creates a trace exporter based on this config.
func (f *Factory) CreateTraceExporter(logger *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) {
ocac := config.(*Config)
opts, err := f.OCAgentOptions(logger, ocac)
if err != nil {
return nil, err
}
return NewTraceExporter(logger, config, opts...)
}

// OCAgentOptions takes the oc exporter Config and generates ocagent Options
func (f *Factory) OCAgentOptions(logger *zap.Logger, ocac *Config) ([]ocagent.ExporterOption, error) {
if ocac.Endpoint == "" {
return nil, &ocExporterError{
code: errEndpointRequired,
msg: "OpenCensus exporter config requires an Endpoint",
}
}
// TODO(ccaraman): Clean up this usage of gRPC settings apart of PR to address issue #933.
opts := []ocagent.ExporterOption{ocagent.WithAddress(ocac.Endpoint)}
if ocac.Compression != "" {
if compressionKey := configgrpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != configgrpc.CompressionUnsupported {
opts = append(opts, ocagent.UseCompressor(compressionKey))
} else {
return nil, &ocExporterError{
code: errUnsupportedCompressionType,
msg: fmt.Sprintf("OpenCensus exporter unsupported compression type %q", ocac.Compression),
}
}
}
switch {
case ocac.TLSSetting.CAFile != "":
creds, err := credentials.NewClientTLSFromFile(ocac.TLSSetting.CAFile, "")
if err != nil {
return nil, &ocExporterError{
code: errUnableToGetTLSCreds,
msg: fmt.Sprintf("OpenCensus exporter unable to read TLS credentials from pem file %q: %v", ocac.TLSSetting.CAFile, err),
}
}
opts = append(opts, ocagent.WithTLSCredentials(creds))
case !ocac.TLSSetting.Insecure:
tlsConf, err := ocac.TLSSetting.LoadTLSConfig()
if err != nil {
return nil, fmt.Errorf("OpenCensus exporter failed to load TLS config: %w", err)
}
creds := credentials.NewTLS(tlsConf)
opts = append(opts, ocagent.WithTLSCredentials(creds))
default:
opts = append(opts, ocagent.WithInsecure())
}

if len(ocac.Headers) > 0 {
opts = append(opts, ocagent.WithHeaders(ocac.Headers))
}
if ocac.ReconnectionDelay > 0 {
opts = append(opts, ocagent.WithReconnectionPeriod(ocac.ReconnectionDelay))
}
if ocac.Keepalive != nil {
opts = append(opts, ocagent.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: ocac.Keepalive.Time,
Timeout: ocac.Keepalive.Timeout,
PermitWithoutStream: ocac.Keepalive.PermitWithoutStream,
})))
}
return opts, nil
func (f *Factory) CreateTraceExporter(_ *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) {
oCfg := config.(*Config)
return newTraceExporter(context.Background(), oCfg)
}

// CreateMetricsExporter creates a metrics exporter based on this config.
func (f *Factory) CreateMetricsExporter(logger *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) {
func (f *Factory) CreateMetricsExporter(_ *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) {
oCfg := config.(*Config)
opts, err := f.OCAgentOptions(logger, oCfg)
if err != nil {
return nil, err
}
return NewMetricsExporter(logger, config, opts...)
return newMetricsExporter(context.Background(), oCfg)
}
93 changes: 38 additions & 55 deletions exporter/opencensusexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/receiver/opencensusreceiver"
"go.opentelemetry.io/collector/testutil"
)

Expand All @@ -51,25 +48,7 @@ func TestCreateMetricsExporter(t *testing.T) {
}

func TestCreateTraceExporter(t *testing.T) {
// This test is about creating the exporter and stopping it. However, the
// exporter keeps trying to update its connection state in the background
// so unless there is a receiver enabled the stop call can return different
// results. Standing up a receiver to ensure that stop don't report errors.
rcvFactory := opencensusreceiver.NewFactory()
require.NotNil(t, rcvFactory)
rcvCfg := rcvFactory.CreateDefaultConfig().(*opencensusreceiver.Config)
rcvCfg.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)

rcv, err := rcvFactory.CreateTraceReceiver(
context.Background(),
component.ReceiverCreateParams{Logger: zap.NewNop()},
rcvCfg,
new(exportertest.SinkTraceExporter))
require.NotNil(t, rcv)
require.Nil(t, err)
require.Nil(t, rcv.Start(context.Background(), componenttest.NewNopHost()))
defer rcv.Shutdown(context.Background())

endpoint := testutil.GetAvailableLocalAddress(t)
tests := []struct {
name string
config Config
Expand All @@ -81,106 +60,109 @@ func TestCreateTraceExporter(t *testing.T) {
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
},
NumWorkers: 3,
},
mustFail: true,
},
{
name: "UseSecure",
name: "ZeroNumWorkers",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
},
NumWorkers: 0,
},
mustFail: true,
},
{
name: "ReconnectionDelay",
name: "UseSecure",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
},
ReconnectionDelay: 5 * time.Second,
NumWorkers: 3,
},
},
{
name: "Keepalive",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
Keepalive: &configgrpc.KeepaliveClientConfig{
Time: 30 * time.Second,
Timeout: 25 * time.Second,
PermitWithoutStream: true,
},
},
NumWorkers: 3,
},
},
{
name: "Compression",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
Compression: configgrpc.CompressionGzip,
},
NumWorkers: 3,
},
},
{
name: "Headers",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
Headers: map[string]string{
"hdr1": "val1",
"hdr2": "val2",
},
},
},
},
{
name: "NumConsumers",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
},
NumWorkers: 3,
},
},
{
name: "CompressionError",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
Compression: "unknown compression",
},
NumWorkers: 3,
},
mustFail: true,
},
{
name: "CaCert",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "testdata/test_cert.pem",
},
},
},
NumWorkers: 3,
},
},
{
name: "CertPemFileError",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "nosuchfile",
},
},
},
NumWorkers: 3,
},
mustFail: true,
},
Expand All @@ -189,21 +171,22 @@ func TestCreateTraceExporter(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := &Factory{}
consumer, err := factory.CreateTraceExporter(zap.NewNop(), &tt.config)

if tt.mustFail {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, consumer)

err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of opencensus exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
}
tReceiver, tErr := factory.CreateTraceExporter(zap.NewNop(), &tt.config)
checkErrorsAndShutdown(t, tReceiver, tErr, tt.mustFail)
mReceiver, mErr := factory.CreateMetricsExporter(zap.NewNop(), &tt.config)
checkErrorsAndShutdown(t, mReceiver, mErr, tt.mustFail)
})
}
}

func checkErrorsAndShutdown(t *testing.T, receiver component.Receiver, err error, mustFail bool) {
if mustFail {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, receiver)

require.NoError(t, receiver.Shutdown(context.Background()))
}
}
Loading