Skip to content

Commit

Permalink
Breaking Change: Remove usage of ocagent package for oc exporter
Browse files Browse the repository at this point in the history
    The config for OpenCensus exporter removed .

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Aug 17, 2020
1 parent cb7a116 commit 761d8d5
Show file tree
Hide file tree
Showing 14 changed files with 537 additions and 382 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

## 🛑 Breaking changes 🛑

- Remove `reconnection_delay` from OpenCensus exporter #1516.

## v0.8.0 Beta

## 🚀 New components 🚀
Expand Down
3 changes: 1 addition & 2 deletions exporter/opencensusexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ The following settings can be optionally configured:
[grpc.WithKeepaliveParams()](https://godoc.org/google.golang.org/grpc#WithKeepaliveParams).
- `num_workers` (default = 2): number of workers that send the gRPC requests.
Optional.
- `reconnection_delay` (default = unset): time period between each reconnection
performed by the exporter.
- `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers.
See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md).
- `wait_for_ready`: Option not supported.

Example:

Expand Down
5 changes: 0 additions & 5 deletions exporter/opencensusexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package opencensusexporter

import (
"time"

"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
)
Expand All @@ -29,7 +27,4 @@ type Config struct {

// The number of workers that send the gRPC requests.
NumWorkers int `mapstructure:"num_workers"`

// The time period between each reconnection performed by the exporter.
ReconnectionDelay time.Duration `mapstructure:"reconnection_delay,omitempty"`
}
3 changes: 1 addition & 2 deletions exporter/opencensusexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func TestLoadConfig(t *testing.T) {
WriteBufferSize: 512 * 1024,
BalancerName: "round_robin",
},
NumWorkers: 123,
ReconnectionDelay: 15,
NumWorkers: 123,
})
}
82 changes: 7 additions & 75 deletions exporter/opencensusexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
package opencensusexporter

import (
"fmt"
"context"

"contrib.go.opencensus.io/exporter/ocagent"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand Down Expand Up @@ -54,82 +50,18 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
// We almost read 0 bytes, so no need to tune ReadBufferSize.
WriteBufferSize: 512 * 1024,
},
NumWorkers: 2,
}
}

// CreateTraceExporter creates a trace exporter based on this config.
func (f *Factory) CreateTraceExporter(logger *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) {
ocac := config.(*Config)
opts, err := f.OCAgentOptions(logger, ocac)
if err != nil {
return nil, err
}
return NewTraceExporter(logger, config, opts...)
}

// OCAgentOptions takes the oc exporter Config and generates ocagent Options
func (f *Factory) OCAgentOptions(logger *zap.Logger, ocac *Config) ([]ocagent.ExporterOption, error) {
if ocac.Endpoint == "" {
return nil, &ocExporterError{
code: errEndpointRequired,
msg: "OpenCensus exporter config requires an Endpoint",
}
}
// TODO(ccaraman): Clean up this usage of gRPC settings apart of PR to address issue #933.
opts := []ocagent.ExporterOption{ocagent.WithAddress(ocac.Endpoint)}
if ocac.Compression != "" {
if compressionKey := configgrpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != configgrpc.CompressionUnsupported {
opts = append(opts, ocagent.UseCompressor(compressionKey))
} else {
return nil, &ocExporterError{
code: errUnsupportedCompressionType,
msg: fmt.Sprintf("OpenCensus exporter unsupported compression type %q", ocac.Compression),
}
}
}
switch {
case ocac.TLSSetting.CAFile != "":
creds, err := credentials.NewClientTLSFromFile(ocac.TLSSetting.CAFile, "")
if err != nil {
return nil, &ocExporterError{
code: errUnableToGetTLSCreds,
msg: fmt.Sprintf("OpenCensus exporter unable to read TLS credentials from pem file %q: %v", ocac.TLSSetting.CAFile, err),
}
}
opts = append(opts, ocagent.WithTLSCredentials(creds))
case !ocac.TLSSetting.Insecure:
tlsConf, err := ocac.TLSSetting.LoadTLSConfig()
if err != nil {
return nil, fmt.Errorf("OpenCensus exporter failed to load TLS config: %w", err)
}
creds := credentials.NewTLS(tlsConf)
opts = append(opts, ocagent.WithTLSCredentials(creds))
default:
opts = append(opts, ocagent.WithInsecure())
}

if len(ocac.Headers) > 0 {
opts = append(opts, ocagent.WithHeaders(ocac.Headers))
}
if ocac.ReconnectionDelay > 0 {
opts = append(opts, ocagent.WithReconnectionPeriod(ocac.ReconnectionDelay))
}
if ocac.Keepalive != nil {
opts = append(opts, ocagent.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: ocac.Keepalive.Time,
Timeout: ocac.Keepalive.Timeout,
PermitWithoutStream: ocac.Keepalive.PermitWithoutStream,
})))
}
return opts, nil
func (f *Factory) CreateTraceExporter(_ *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) {
oCfg := config.(*Config)
return newTraceExporter(context.Background(), oCfg)
}

// CreateMetricsExporter creates a metrics exporter based on this config.
func (f *Factory) CreateMetricsExporter(logger *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) {
func (f *Factory) CreateMetricsExporter(_ *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) {
oCfg := config.(*Config)
opts, err := f.OCAgentOptions(logger, oCfg)
if err != nil {
return nil, err
}
return NewMetricsExporter(logger, config, opts...)
return newMetricsExporter(context.Background(), oCfg)
}
93 changes: 38 additions & 55 deletions exporter/opencensusexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/receiver/opencensusreceiver"
"go.opentelemetry.io/collector/testutil"
)

Expand All @@ -51,25 +48,7 @@ func TestCreateMetricsExporter(t *testing.T) {
}

func TestCreateTraceExporter(t *testing.T) {
// This test is about creating the exporter and stopping it. However, the
// exporter keeps trying to update its connection state in the background
// so unless there is a receiver enabled the stop call can return different
// results. Standing up a receiver to ensure that stop don't report errors.
rcvFactory := opencensusreceiver.NewFactory()
require.NotNil(t, rcvFactory)
rcvCfg := rcvFactory.CreateDefaultConfig().(*opencensusreceiver.Config)
rcvCfg.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)

rcv, err := rcvFactory.CreateTraceReceiver(
context.Background(),
component.ReceiverCreateParams{Logger: zap.NewNop()},
rcvCfg,
new(exportertest.SinkTraceExporter))
require.NotNil(t, rcv)
require.Nil(t, err)
require.Nil(t, rcv.Start(context.Background(), componenttest.NewNopHost()))
defer rcv.Shutdown(context.Background())

endpoint := testutil.GetAvailableLocalAddress(t)
tests := []struct {
name string
config Config
Expand All @@ -81,106 +60,109 @@ func TestCreateTraceExporter(t *testing.T) {
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
},
NumWorkers: 3,
},
mustFail: true,
},
{
name: "UseSecure",
name: "ZeroNumWorkers",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
},
NumWorkers: 0,
},
mustFail: true,
},
{
name: "ReconnectionDelay",
name: "UseSecure",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
},
ReconnectionDelay: 5 * time.Second,
NumWorkers: 3,
},
},
{
name: "Keepalive",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
Keepalive: &configgrpc.KeepaliveClientConfig{
Time: 30 * time.Second,
Timeout: 25 * time.Second,
PermitWithoutStream: true,
},
},
NumWorkers: 3,
},
},
{
name: "Compression",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
Compression: configgrpc.CompressionGzip,
},
NumWorkers: 3,
},
},
{
name: "Headers",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
Headers: map[string]string{
"hdr1": "val1",
"hdr2": "val2",
},
},
},
},
{
name: "NumConsumers",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
},
NumWorkers: 3,
},
},
{
name: "CompressionError",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
Compression: "unknown compression",
},
NumWorkers: 3,
},
mustFail: true,
},
{
name: "CaCert",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "testdata/test_cert.pem",
},
},
},
NumWorkers: 3,
},
},
{
name: "CertPemFileError",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
Endpoint: endpoint,
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "nosuchfile",
},
},
},
NumWorkers: 3,
},
mustFail: true,
},
Expand All @@ -189,21 +171,22 @@ func TestCreateTraceExporter(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := &Factory{}
consumer, err := factory.CreateTraceExporter(zap.NewNop(), &tt.config)

if tt.mustFail {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, consumer)

err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of opencensus exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
}
tReceiver, tErr := factory.CreateTraceExporter(zap.NewNop(), &tt.config)
checkErrorsAndShutdown(t, tReceiver, tErr, tt.mustFail)
mReceiver, mErr := factory.CreateMetricsExporter(zap.NewNop(), &tt.config)
checkErrorsAndShutdown(t, mReceiver, mErr, tt.mustFail)
})
}
}

func checkErrorsAndShutdown(t *testing.T, receiver component.Receiver, err error, mustFail bool) {
if mustFail {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, receiver)

require.NoError(t, receiver.Shutdown(context.Background()))
}
}
Loading

0 comments on commit 761d8d5

Please sign in to comment.