Skip to content

Commit

Permalink
Breaking Change: Remove usage of ocagent package for oc exporter
Browse files Browse the repository at this point in the history
    The config for OpenCensus exporter removed .

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Aug 13, 2020
1 parent 442c6df commit c85ea21
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 337 deletions.
2 changes: 0 additions & 2 deletions exporter/opencensusexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ The following settings can be optionally configured:
[grpc.WithKeepaliveParams()](https://godoc.org/google.golang.org/grpc#WithKeepaliveParams).
- `num_workers` (default = 2): number of workers that send the gRPC requests.
Optional.
- `reconnection_delay` (default = unset): time period between each reconnection
performed by the exporter.
- `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers.
See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md).

Expand Down
5 changes: 0 additions & 5 deletions exporter/opencensusexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package opencensusexporter

import (
"time"

"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
)
Expand All @@ -29,7 +27,4 @@ type Config struct {

// The number of workers that send the gRPC requests.
NumWorkers int `mapstructure:"num_workers"`

// The time period between each reconnection performed by the exporter.
ReconnectionDelay time.Duration `mapstructure:"reconnection_delay,omitempty"`
}
3 changes: 1 addition & 2 deletions exporter/opencensusexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func TestLoadConfig(t *testing.T) {
WriteBufferSize: 512 * 1024,
BalancerName: "round_robin",
},
NumWorkers: 123,
ReconnectionDelay: 15,
NumWorkers: 123,
})
}
82 changes: 7 additions & 75 deletions exporter/opencensusexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
package opencensusexporter

import (
"fmt"
"context"

"contrib.go.opencensus.io/exporter/ocagent"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand Down Expand Up @@ -54,82 +50,18 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
// We almost read 0 bytes, so no need to tune ReadBufferSize.
WriteBufferSize: 512 * 1024,
},
NumWorkers: 2,
}
}

// CreateTraceExporter creates a trace exporter based on this config.
func (f *Factory) CreateTraceExporter(logger *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) {
ocac := config.(*Config)
opts, err := f.OCAgentOptions(logger, ocac)
if err != nil {
return nil, err
}
return NewTraceExporter(logger, config, opts...)
}

// OCAgentOptions takes the oc exporter Config and generates ocagent Options
func (f *Factory) OCAgentOptions(logger *zap.Logger, ocac *Config) ([]ocagent.ExporterOption, error) {
if ocac.Endpoint == "" {
return nil, &ocExporterError{
code: errEndpointRequired,
msg: "OpenCensus exporter config requires an Endpoint",
}
}
// TODO(ccaraman): Clean up this usage of gRPC settings apart of PR to address issue #933.
opts := []ocagent.ExporterOption{ocagent.WithAddress(ocac.Endpoint)}
if ocac.Compression != "" {
if compressionKey := configgrpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != configgrpc.CompressionUnsupported {
opts = append(opts, ocagent.UseCompressor(compressionKey))
} else {
return nil, &ocExporterError{
code: errUnsupportedCompressionType,
msg: fmt.Sprintf("OpenCensus exporter unsupported compression type %q", ocac.Compression),
}
}
}
switch {
case ocac.TLSSetting.CAFile != "":
creds, err := credentials.NewClientTLSFromFile(ocac.TLSSetting.CAFile, "")
if err != nil {
return nil, &ocExporterError{
code: errUnableToGetTLSCreds,
msg: fmt.Sprintf("OpenCensus exporter unable to read TLS credentials from pem file %q: %v", ocac.TLSSetting.CAFile, err),
}
}
opts = append(opts, ocagent.WithTLSCredentials(creds))
case !ocac.TLSSetting.Insecure:
tlsConf, err := ocac.TLSSetting.LoadTLSConfig()
if err != nil {
return nil, fmt.Errorf("OpenCensus exporter failed to load TLS config: %w", err)
}
creds := credentials.NewTLS(tlsConf)
opts = append(opts, ocagent.WithTLSCredentials(creds))
default:
opts = append(opts, ocagent.WithInsecure())
}

if len(ocac.Headers) > 0 {
opts = append(opts, ocagent.WithHeaders(ocac.Headers))
}
if ocac.ReconnectionDelay > 0 {
opts = append(opts, ocagent.WithReconnectionPeriod(ocac.ReconnectionDelay))
}
if ocac.Keepalive != nil {
opts = append(opts, ocagent.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: ocac.Keepalive.Time,
Timeout: ocac.Keepalive.Timeout,
PermitWithoutStream: ocac.Keepalive.PermitWithoutStream,
})))
}
return opts, nil
func (f *Factory) CreateTraceExporter(_ *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) {
oCfg := config.(*Config)
return newTraceExporter(context.Background(), oCfg)
}

// CreateMetricsExporter creates a metrics exporter based on this config.
func (f *Factory) CreateMetricsExporter(logger *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) {
func (f *Factory) CreateMetricsExporter(_ *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) {
oCfg := config.(*Config)
opts, err := f.OCAgentOptions(logger, oCfg)
if err != nil {
return nil, err
}
return NewMetricsExporter(logger, config, opts...)
return newMetricsExporter(context.Background(), oCfg)
}
55 changes: 30 additions & 25 deletions exporter/opencensusexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configgrpc"
Expand Down Expand Up @@ -80,27 +81,33 @@ func TestCreateTraceExporter(t *testing.T) {
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
},
NumWorkers: 3,
},
mustFail: true,
},
{
name: "UseSecure",
name: "ZeroNumWorkers",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
},
NumWorkers: 0,
},
mustFail: true,
},
{
name: "ReconnectionDelay",
name: "UseSecure",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
},
ReconnectionDelay: 5 * time.Second,
NumWorkers: 3,
},
},
{
Expand All @@ -114,6 +121,7 @@ func TestCreateTraceExporter(t *testing.T) {
PermitWithoutStream: true,
},
},
NumWorkers: 3,
},
},
{
Expand All @@ -123,6 +131,7 @@ func TestCreateTraceExporter(t *testing.T) {
Endpoint: rcvCfg.NetAddr.Endpoint,
Compression: configgrpc.CompressionGzip,
},
NumWorkers: 3,
},
},
{
Expand All @@ -135,14 +144,6 @@ func TestCreateTraceExporter(t *testing.T) {
"hdr2": "val2",
},
},
},
},
{
name: "NumConsumers",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.NetAddr.Endpoint,
},
NumWorkers: 3,
},
},
Expand All @@ -153,6 +154,7 @@ func TestCreateTraceExporter(t *testing.T) {
Endpoint: rcvCfg.NetAddr.Endpoint,
Compression: "unknown compression",
},
NumWorkers: 3,
},
mustFail: true,
},
Expand All @@ -167,6 +169,7 @@ func TestCreateTraceExporter(t *testing.T) {
},
},
},
NumWorkers: 3,
},
},
{
Expand All @@ -180,6 +183,7 @@ func TestCreateTraceExporter(t *testing.T) {
},
},
},
NumWorkers: 3,
},
mustFail: true,
},
Expand All @@ -188,21 +192,22 @@ func TestCreateTraceExporter(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := &Factory{}
consumer, err := factory.CreateTraceExporter(zap.NewNop(), &tt.config)

if tt.mustFail {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, consumer)

err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of opencensus exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
}
tReceiver, tErr := factory.CreateTraceExporter(zap.NewNop(), &tt.config)
checkErrorsAndShutdown(t, tReceiver, tErr, tt.mustFail)
mReceiver, mErr := factory.CreateMetricsExporter(zap.NewNop(), &tt.config)
checkErrorsAndShutdown(t, mReceiver, mErr, tt.mustFail)
})
}
}

func checkErrorsAndShutdown(t *testing.T, receiver component.Receiver, err error, mustFail bool) {
if mustFail {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, receiver)

require.NoError(t, receiver.Shutdown(context.Background()))
}
}
Loading

0 comments on commit c85ea21

Please sign in to comment.