diff --git a/CHANGELOG.md b/CHANGELOG.md index 29a7a7fb560..f08abbc7a66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Adds test to check BatchSpanProcessor ignores `OnEnd` and `ForceFlush` post `Shutdown`. (#1772) - Option `ExportTimeout` was added to batch span processor. (#1755) - Adds semantic conventions for exceptions. (#1492) -- Added support for configuring OTLP/HTTP Endpoints, Headers, Compression and Timeout via the Environment Variables. (#1758) +- Added support for configuring OTLP/HTTP and OTLP/gRPC Endpoints, TLS Certificates, Headers, Compression and Timeout via Environment Variables. (#1758, #1769 and #1811) - `OTEL_EXPORTER_OTLP_ENDPOINT` - `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` - `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT` @@ -29,7 +29,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `OTEL_EXPORTER_OTLP_TIMEOUT` - `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT` - `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT` -- Added support for configuring OTLP/HTTP TLS Certificates via the Environment Variables. (#1769) - `OTEL_EXPORTER_OTLP_CERTIFICATE` - `OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE` - `OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE` diff --git a/exporters/otlp/otlphttp/envconfig.go b/exporters/otlp/internal/otlpconfig/envconfig.go similarity index 77% rename from exporters/otlp/otlphttp/envconfig.go rename to exporters/otlp/internal/otlpconfig/envconfig.go index 3308c6ef474..b52f3dc537a 100644 --- a/exporters/otlp/otlphttp/envconfig.go +++ b/exporters/otlp/internal/otlpconfig/envconfig.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlphttp +package otlpconfig import ( "crypto/tls" @@ -24,37 +24,50 @@ import ( "strings" "time" - "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp" - "go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig" + "go.opentelemetry.io/otel" ) -func applyEnvConfigs(cfg *config) { - e := envOptionsReader{ - getEnv: os.Getenv, - readFile: ioutil.ReadFile, +func ApplyGRPCEnvConfigs(cfg *Config) { + e := EnvOptionsReader{ + GetEnv: os.Getenv, + ReadFile: ioutil.ReadFile, } - opts := e.getOptionsFromEnv() - for _, opt := range opts { - opt.Apply(cfg) + e.ApplyGRPCEnvConfigs(cfg) +} + +func ApplyHTTPEnvConfigs(cfg *Config) { + e := EnvOptionsReader{ + GetEnv: os.Getenv, + ReadFile: ioutil.ReadFile, } + + e.ApplyHTTPEnvConfigs(cfg) +} + +type EnvOptionsReader struct { + GetEnv func(string) string + ReadFile func(filename string) ([]byte, error) } -type envOptionsReader struct { - getEnv func(string) string - readFile func(filename string) ([]byte, error) +func (e *EnvOptionsReader) ApplyHTTPEnvConfigs(cfg *Config) { + opts := e.GetOptionsFromEnv() + for _, opt := range opts { + opt.ApplyHTTPOption(cfg) + } } -func (e *envOptionsReader) applyEnvConfigs(cfg *config) { - opts := e.getOptionsFromEnv() +func (e *EnvOptionsReader) ApplyGRPCEnvConfigs(cfg *Config) { + opts := e.GetOptionsFromEnv() for _, opt := range opts { - opt.Apply(cfg) + opt.ApplyGRPCOption(cfg) } } -func (e *envOptionsReader) getOptionsFromEnv() []Option { - var opts []Option +func (e *EnvOptionsReader) GetOptionsFromEnv() []GenericOption { + var opts []GenericOption // Endpoint if v, ok := e.getEnvValue("ENDPOINT"); ok { @@ -132,28 +145,28 @@ func (e *envOptionsReader) getOptionsFromEnv() []Option { return opts } -// getEnvValue gets an OTLP environment variable value of the specified key using the getEnv function. +// getEnvValue gets an OTLP environment variable value of the specified key using the GetEnv function. // This function already prepends the OTLP prefix to all key lookup. -func (e *envOptionsReader) getEnvValue(key string) (string, bool) { - v := strings.TrimSpace(e.getEnv(fmt.Sprintf("OTEL_EXPORTER_OTLP_%s", key))) +func (e *EnvOptionsReader) getEnvValue(key string) (string, bool) { + v := strings.TrimSpace(e.GetEnv(fmt.Sprintf("OTEL_EXPORTER_OTLP_%s", key))) return v, v != "" } -func (e *envOptionsReader) readTLSConfig(path string) (*tls.Config, error) { - b, err := e.readFile(path) +func (e *EnvOptionsReader) readTLSConfig(path string) (*tls.Config, error) { + b, err := e.ReadFile(path) if err != nil { return nil, err } - return otlpconfig.CreateTLSConfig(b) + return CreateTLSConfig(b) } -func stringToCompression(value string) Compression { +func stringToCompression(value string) otlp.Compression { switch value { case "gzip": - return GzipCompression + return otlp.GzipCompression } - return NoCompression + return otlp.NoCompression } func stringToHeader(value string) map[string]string { diff --git a/exporters/otlp/otlphttp/envconfig_test.go b/exporters/otlp/internal/otlpconfig/envconfig_test.go similarity index 98% rename from exporters/otlp/otlphttp/envconfig_test.go rename to exporters/otlp/internal/otlpconfig/envconfig_test.go index 77f00bde515..7a6316a2d10 100644 --- a/exporters/otlp/otlphttp/envconfig_test.go +++ b/exporters/otlp/internal/otlpconfig/envconfig_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlphttp +package otlpconfig import ( "reflect" diff --git a/exporters/otlp/internal/otlpconfig/options.go b/exporters/otlp/internal/otlpconfig/options.go new file mode 100644 index 00000000000..3fd4a30dd38 --- /dev/null +++ b/exporters/otlp/internal/otlpconfig/options.go @@ -0,0 +1,376 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig" + +import ( + "crypto/tls" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + "go.opentelemetry.io/otel/exporters/otlp" +) + +const ( + // DefaultMaxAttempts describes how many times the driver + // should retry the sending of the payload in case of a + // retryable error. + DefaultMaxAttempts int = 5 + // DefaultTracesPath is a default URL path for endpoint that + // receives spans. + DefaultTracesPath string = "/v1/traces" + // DefaultMetricsPath is a default URL path for endpoint that + // receives metrics. + DefaultMetricsPath string = "/v1/metrics" + // DefaultBackoff is a default base backoff time used in the + // exponential backoff strategy. + DefaultBackoff time.Duration = 300 * time.Millisecond + // DefaultTimeout is a default max waiting time for the backend to process + // each span or metrics batch. + DefaultTimeout time.Duration = 10 * time.Second + // DefaultServiceConfig is the gRPC service config used if none is + // provided by the user. + DefaultServiceConfig = `{ + "methodConfig":[{ + "name":[ + { "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" }, + { "service":"opentelemetry.proto.collector.trace.v1.TraceService" } + ], + "retryPolicy":{ + "MaxAttempts":5, + "InitialBackoff":"0.3s", + "MaxBackoff":"5s", + "BackoffMultiplier":2, + "RetryableStatusCodes":[ + "CANCELLED", + "DEADLINE_EXCEEDED", + "RESOURCE_EXHAUSTED", + "ABORTED", + "OUT_OF_RANGE", + "UNAVAILABLE", + "DATA_LOSS" + ] + } + }] +}` +) + +type ( + SignalConfig struct { + Endpoint string + Insecure bool + TLSCfg *tls.Config + Headers map[string]string + Compression otlp.Compression + Timeout time.Duration + URLPath string + + // gRPC configurations + GRPCCredentials credentials.TransportCredentials + } + + Config struct { + // Signal specific configurations + Metrics SignalConfig + Traces SignalConfig + + // General configurations + MaxAttempts int + Backoff time.Duration + + // HTTP configuration + Marshaler otlp.Marshaler + + // gRPC configurations + ReconnectionPeriod time.Duration + ServiceConfig string + DialOptions []grpc.DialOption + } +) + +func NewDefaultConfig() Config { + c := Config{ + Traces: SignalConfig{ + Endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort), + URLPath: DefaultTracesPath, + Compression: otlp.NoCompression, + Timeout: DefaultTimeout, + }, + Metrics: SignalConfig{ + Endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort), + URLPath: DefaultMetricsPath, + Compression: otlp.NoCompression, + Timeout: DefaultTimeout, + }, + MaxAttempts: DefaultMaxAttempts, + Backoff: DefaultBackoff, + ServiceConfig: DefaultServiceConfig, + } + + return c +} + +type ( + // GenericOption applies an option to the HTTP or gRPC driver. + GenericOption interface { + ApplyHTTPOption(*Config) + ApplyGRPCOption(*Config) + + // A private method to prevent users implementing the + // interface and so future additions to it will not + // violate compatibility. + private() + } + + // HTTPOption applies an option to the HTTP driver. + HTTPOption interface { + ApplyHTTPOption(*Config) + + // A private method to prevent users implementing the + // interface and so future additions to it will not + // violate compatibility. + private() + } + + // GRPCOption applies an option to the gRPC driver. + GRPCOption interface { + ApplyGRPCOption(*Config) + + // A private method to prevent users implementing the + // interface and so future additions to it will not + // violate compatibility. + private() + } +) + +// genericOption is an option that applies the same logic +// for both gRPC and HTTP. +type genericOption struct { + fn func(*Config) +} + +func (g *genericOption) ApplyGRPCOption(cfg *Config) { + g.fn(cfg) +} + +func (g *genericOption) ApplyHTTPOption(cfg *Config) { + g.fn(cfg) +} + +func (genericOption) private() {} + +func newGenericOption(fn func(cfg *Config)) GenericOption { + return &genericOption{fn: fn} +} + +// splitOption is an option that applies different logics +// for gRPC and HTTP. +type splitOption struct { + httpFn func(*Config) + grpcFn func(*Config) +} + +func (g *splitOption) ApplyGRPCOption(cfg *Config) { + g.grpcFn(cfg) +} + +func (g *splitOption) ApplyHTTPOption(cfg *Config) { + g.httpFn(cfg) +} + +func (splitOption) private() {} + +func newSplitOption(httpFn func(cfg *Config), grpcFn func(cfg *Config)) GenericOption { + return &splitOption{httpFn: httpFn, grpcFn: grpcFn} +} + +// httpOption is an option that is only applied to the HTTP driver. +type httpOption struct { + fn func(*Config) +} + +func (h *httpOption) ApplyHTTPOption(cfg *Config) { + h.fn(cfg) +} + +func (httpOption) private() {} + +func NewHTTPOption(fn func(cfg *Config)) HTTPOption { + return &httpOption{fn: fn} +} + +// grpcOption is an option that is only applied to the gRPC driver. +type grpcOption struct { + fn func(*Config) +} + +func (h *grpcOption) ApplyGRPCOption(cfg *Config) { + h.fn(cfg) +} + +func (grpcOption) private() {} + +func NewGRPCOption(fn func(cfg *Config)) GRPCOption { + return &grpcOption{fn: fn} +} + +// Generic Options + +func WithEndpoint(endpoint string) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.Endpoint = endpoint + cfg.Metrics.Endpoint = endpoint + }) +} + +func WithTracesEndpoint(endpoint string) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.Endpoint = endpoint + }) +} + +func WithMetricsEndpoint(endpoint string) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Metrics.Endpoint = endpoint + }) +} + +func WithCompression(compression otlp.Compression) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.Compression = compression + cfg.Metrics.Compression = compression + }) +} + +func WithTracesCompression(compression otlp.Compression) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.Compression = compression + }) +} + +func WithMetricsCompression(compression otlp.Compression) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Metrics.Compression = compression + }) +} + +func WithTracesURLPath(urlPath string) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.URLPath = urlPath + }) +} + +func WithMetricsURLPath(urlPath string) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Metrics.URLPath = urlPath + }) +} + +func WithMaxAttempts(maxAttempts int) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.MaxAttempts = maxAttempts + }) +} + +func WithBackoff(duration time.Duration) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Backoff = duration + }) +} + +func WithTLSClientConfig(tlsCfg *tls.Config) GenericOption { + return newSplitOption(func(cfg *Config) { + cfg.Traces.TLSCfg = tlsCfg.Clone() + cfg.Metrics.TLSCfg = tlsCfg.Clone() + }, func(cfg *Config) { + cfg.Traces.GRPCCredentials = credentials.NewTLS(tlsCfg) + cfg.Metrics.GRPCCredentials = credentials.NewTLS(tlsCfg) + }) +} + +func WithTracesTLSClientConfig(tlsCfg *tls.Config) GenericOption { + return newSplitOption(func(cfg *Config) { + cfg.Traces.TLSCfg = tlsCfg.Clone() + }, func(cfg *Config) { + cfg.Traces.GRPCCredentials = credentials.NewTLS(tlsCfg) + }) +} + +func WithMetricsTLSClientConfig(tlsCfg *tls.Config) GenericOption { + return newSplitOption(func(cfg *Config) { + cfg.Metrics.TLSCfg = tlsCfg.Clone() + }, func(cfg *Config) { + cfg.Metrics.GRPCCredentials = credentials.NewTLS(tlsCfg) + }) +} + +func WithInsecure() GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.Insecure = true + cfg.Metrics.Insecure = true + }) +} + +func WithInsecureTraces() GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.Insecure = true + }) +} + +func WithInsecureMetrics() GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Metrics.Insecure = true + }) +} + +func WithHeaders(headers map[string]string) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.Headers = headers + cfg.Metrics.Headers = headers + }) +} + +func WithTracesHeaders(headers map[string]string) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.Headers = headers + }) +} + +func WithMetricsHeaders(headers map[string]string) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Metrics.Headers = headers + }) +} + +func WithTimeout(duration time.Duration) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.Timeout = duration + cfg.Metrics.Timeout = duration + }) +} + +func WithTracesTimeout(duration time.Duration) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Traces.Timeout = duration + }) +} + +func WithMetricsTimeout(duration time.Duration) GenericOption { + return newGenericOption(func(cfg *Config) { + cfg.Metrics.Timeout = duration + }) +} diff --git a/exporters/otlp/internal/otlpconfig/options_test.go b/exporters/otlp/internal/otlpconfig/options_test.go new file mode 100644 index 00000000000..7a70ba53619 --- /dev/null +++ b/exporters/otlp/internal/otlpconfig/options_test.go @@ -0,0 +1,425 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpconfig + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "testing" + "time" + + "go.opentelemetry.io/otel/exporters/otlp" + + "github.com/stretchr/testify/assert" +) + +type env map[string]string + +func (e *env) getEnv(env string) string { + return (*e)[env] +} + +type fileReader map[string][]byte + +func (f *fileReader) readFile(filename string) ([]byte, error) { + if b, ok := (*f)[filename]; ok { + return b, nil + } + return nil, errors.New("File not found") +} + +func TestConfigs(t *testing.T) { + tlsCert, err := CreateTLSConfig([]byte(WeakCertificate)) + assert.NoError(t, err) + + tests := []struct { + name string + opts []GenericOption + env env + fileReader fileReader + asserts func(t *testing.T, c *Config, grpcOption bool) + }{ + { + name: "Test default configs", + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, "localhost:4317", c.Traces.Endpoint) + assert.Equal(t, "localhost:4317", c.Metrics.Endpoint) + assert.Equal(t, otlp.NoCompression, c.Traces.Compression) + assert.Equal(t, otlp.NoCompression, c.Metrics.Compression) + assert.Equal(t, map[string]string(nil), c.Traces.Headers) + assert.Equal(t, map[string]string(nil), c.Metrics.Headers) + assert.Equal(t, 10*time.Second, c.Traces.Timeout) + assert.Equal(t, 10*time.Second, c.Metrics.Timeout) + }, + }, + + // Endpoint Tests + { + name: "Test With Endpoint", + opts: []GenericOption{ + WithEndpoint("someendpoint"), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, "someendpoint", c.Traces.Endpoint) + assert.Equal(t, "someendpoint", c.Metrics.Endpoint) + }, + }, + { + name: "Test With Signal Specific Endpoint", + opts: []GenericOption{ + WithEndpoint("overrode_by_signal_specific"), + WithTracesEndpoint("traces_endpoint"), + WithMetricsEndpoint("metrics_endpoint"), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, "traces_endpoint", c.Traces.Endpoint) + assert.Equal(t, "metrics_endpoint", c.Metrics.Endpoint) + }, + }, + { + name: "Test Environment Endpoint", + env: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "env_endpoint", + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, "env_endpoint", c.Traces.Endpoint) + assert.Equal(t, "env_endpoint", c.Metrics.Endpoint) + }, + }, + { + name: "Test Environment Signal Specific Endpoint", + env: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "overrode_by_signal_specific", + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "env_traces_endpoint", + "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT": "env_metrics_endpoint", + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, "env_traces_endpoint", c.Traces.Endpoint) + assert.Equal(t, "env_metrics_endpoint", c.Metrics.Endpoint) + }, + }, + { + name: "Test Mixed Environment and With Endpoint", + opts: []GenericOption{ + WithTracesEndpoint("traces_endpoint"), + }, + env: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "env_endpoint", + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, "traces_endpoint", c.Traces.Endpoint) + assert.Equal(t, "env_endpoint", c.Metrics.Endpoint) + }, + }, + + // Certificate tests + { + name: "Test With Certificate", + opts: []GenericOption{ + WithTLSClientConfig(tlsCert), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + if grpcOption { + //TODO: make sure gRPC's credentials actually works + assert.NotNil(t, c.Traces.GRPCCredentials) + assert.NotNil(t, c.Metrics.GRPCCredentials) + } else { + assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects()) + assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Metrics.TLSCfg.RootCAs.Subjects()) + } + }, + }, + { + name: "Test With Signal Specific Certificate", + opts: []GenericOption{ + WithTLSClientConfig(&tls.Config{}), + WithTracesTLSClientConfig(tlsCert), + WithMetricsTLSClientConfig(&tls.Config{RootCAs: x509.NewCertPool()}), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + + if grpcOption { + assert.NotNil(t, c.Traces.GRPCCredentials) + assert.NotNil(t, c.Metrics.GRPCCredentials) + } else { + assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects()) + assert.Equal(t, 0, len(c.Metrics.TLSCfg.RootCAs.Subjects())) + } + }, + }, + { + name: "Test Environment Certificate", + env: map[string]string{ + "OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path", + }, + fileReader: fileReader{ + "cert_path": []byte(WeakCertificate), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + if grpcOption { + assert.NotNil(t, c.Traces.GRPCCredentials) + assert.NotNil(t, c.Metrics.GRPCCredentials) + } else { + assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects()) + assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Metrics.TLSCfg.RootCAs.Subjects()) + } + }, + }, + { + name: "Test Environment Signal Specific Certificate", + env: map[string]string{ + "OTEL_EXPORTER_OTLP_CERTIFICATE": "overrode_by_signal_specific", + "OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE": "invalid_cert", + }, + fileReader: fileReader{ + "cert_path": []byte(WeakCertificate), + "invalid_cert": []byte("invalid certificate file."), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + if grpcOption { + assert.NotNil(t, c.Traces.GRPCCredentials) + assert.Nil(t, c.Metrics.GRPCCredentials) + } else { + assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects()) + assert.Equal(t, (*tls.Config)(nil), c.Metrics.TLSCfg) + } + }, + }, + { + name: "Test Mixed Environment and With Certificate", + opts: []GenericOption{ + WithMetricsTLSClientConfig(&tls.Config{RootCAs: x509.NewCertPool()}), + }, + env: map[string]string{ + "OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path", + }, + fileReader: fileReader{ + "cert_path": []byte(WeakCertificate), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + if grpcOption { + assert.NotNil(t, c.Traces.GRPCCredentials) + assert.NotNil(t, c.Metrics.GRPCCredentials) + } else { + assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects()) + assert.Equal(t, 0, len(c.Metrics.TLSCfg.RootCAs.Subjects())) + } + }, + }, + + // Headers tests + { + name: "Test With Headers", + opts: []GenericOption{ + WithHeaders(map[string]string{"h1": "v1"}), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, map[string]string{"h1": "v1"}, c.Metrics.Headers) + assert.Equal(t, map[string]string{"h1": "v1"}, c.Traces.Headers) + }, + }, + { + name: "Test With Signal Specific Headers", + opts: []GenericOption{ + WithHeaders(map[string]string{"overrode": "by_signal_specific"}), + WithMetricsHeaders(map[string]string{"m1": "mv1"}), + WithTracesHeaders(map[string]string{"t1": "tv1"}), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, map[string]string{"m1": "mv1"}, c.Metrics.Headers) + assert.Equal(t, map[string]string{"t1": "tv1"}, c.Traces.Headers) + }, + }, + { + name: "Test Environment Headers", + env: map[string]string{"OTEL_EXPORTER_OTLP_HEADERS": "h1=v1,h2=v2"}, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Metrics.Headers) + assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Traces.Headers) + }, + }, + { + name: "Test Environment Signal Specific Headers", + env: map[string]string{ + "OTEL_EXPORTER_OTLP_HEADERS": "overrode_by_signal_specific", + "OTEL_EXPORTER_OTLP_TRACES_HEADERS": "h1=v1,h2=v2", + "OTEL_EXPORTER_OTLP_METRICS_HEADERS": "h1=v1,h2=v2", + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Metrics.Headers) + assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Traces.Headers) + }, + }, + { + name: "Test Mixed Environment and With Headers", + env: map[string]string{"OTEL_EXPORTER_OTLP_HEADERS": "h1=v1,h2=v2"}, + opts: []GenericOption{ + WithMetricsHeaders(map[string]string{"m1": "mv1"}), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, map[string]string{"m1": "mv1"}, c.Metrics.Headers) + assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Traces.Headers) + }, + }, + + // Compression Tests + { + name: "Test With Compression", + opts: []GenericOption{ + WithCompression(otlp.GzipCompression), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, otlp.GzipCompression, c.Traces.Compression) + assert.Equal(t, otlp.GzipCompression, c.Metrics.Compression) + }, + }, + { + name: "Test With Signal Specific Compression", + opts: []GenericOption{ + WithCompression(otlp.NoCompression), // overrode by signal specific configs + WithTracesCompression(otlp.GzipCompression), + WithMetricsCompression(otlp.GzipCompression), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, otlp.GzipCompression, c.Traces.Compression) + assert.Equal(t, otlp.GzipCompression, c.Metrics.Compression) + }, + }, + { + name: "Test Environment Compression", + env: map[string]string{ + "OTEL_EXPORTER_OTLP_COMPRESSION": "gzip", + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, otlp.GzipCompression, c.Traces.Compression) + assert.Equal(t, otlp.GzipCompression, c.Metrics.Compression) + }, + }, + { + name: "Test Environment Signal Specific Compression", + env: map[string]string{ + "OTEL_EXPORTER_OTLP_TRACES_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION": "gzip", + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, otlp.GzipCompression, c.Traces.Compression) + assert.Equal(t, otlp.GzipCompression, c.Metrics.Compression) + }, + }, + { + name: "Test Mixed Environment and With Compression", + opts: []GenericOption{ + WithTracesCompression(otlp.NoCompression), + }, + env: map[string]string{ + "OTEL_EXPORTER_OTLP_TRACES_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION": "gzip", + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, otlp.NoCompression, c.Traces.Compression) + assert.Equal(t, otlp.GzipCompression, c.Metrics.Compression) + }, + }, + + // Timeout Tests + { + name: "Test With Timeout", + opts: []GenericOption{ + WithTimeout(time.Duration(5 * time.Second)), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, 5*time.Second, c.Traces.Timeout) + assert.Equal(t, 5*time.Second, c.Metrics.Timeout) + }, + }, + { + name: "Test With Signal Specific Timeout", + opts: []GenericOption{ + WithTimeout(time.Duration(5 * time.Second)), + WithTracesTimeout(time.Duration(13 * time.Second)), + WithMetricsTimeout(time.Duration(14 * time.Second)), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, 13*time.Second, c.Traces.Timeout) + assert.Equal(t, 14*time.Second, c.Metrics.Timeout) + }, + }, + { + name: "Test Environment Timeout", + env: map[string]string{ + "OTEL_EXPORTER_OTLP_TIMEOUT": "15000", + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, c.Metrics.Timeout, 15*time.Second) + assert.Equal(t, c.Traces.Timeout, 15*time.Second) + }, + }, + { + name: "Test Environment Signal Specific Timeout", + env: map[string]string{ + "OTEL_EXPORTER_OTLP_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT": "27000", + "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT": "28000", + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, c.Traces.Timeout, 27*time.Second) + assert.Equal(t, c.Metrics.Timeout, 28*time.Second) + }, + }, + { + name: "Test Mixed Environment and With Timeout", + env: map[string]string{ + "OTEL_EXPORTER_OTLP_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT": "27000", + "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT": "28000", + }, + opts: []GenericOption{ + WithTracesTimeout(5 * time.Second), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, c.Traces.Timeout, 5*time.Second) + assert.Equal(t, c.Metrics.Timeout, 28*time.Second) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + e := EnvOptionsReader{ + GetEnv: tt.env.getEnv, + ReadFile: tt.fileReader.readFile, + } + + // Tests Generic options as HTTP Options + cfg := NewDefaultConfig() + e.ApplyHTTPEnvConfigs(&cfg) + for _, opt := range tt.opts { + opt.ApplyHTTPOption(&cfg) + } + tt.asserts(t, &cfg, false) + + // Tests Generic options as gRPC Options + cfg = NewDefaultConfig() + e.ApplyGRPCEnvConfigs(&cfg) + for _, opt := range tt.opts { + opt.ApplyGRPCOption(&cfg) + } + tt.asserts(t, &cfg, true) + }) + } +} diff --git a/exporters/otlp/optiontypes.go b/exporters/otlp/optiontypes.go new file mode 100644 index 00000000000..682625f4548 --- /dev/null +++ b/exporters/otlp/optiontypes.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp // import "go.opentelemetry.io/otel/exporters/otlp" + +// Compression describes the compression used for payloads sent to the +// collector. +type Compression int + +const ( + // NoCompression tells the driver to send payloads without + // compression. + NoCompression Compression = iota + // GzipCompression tells the driver to send payloads after + // compressing them with gzip. + GzipCompression +) + +// Marshaler describes the kind of message format sent to the collector +type Marshaler int + +const ( + // MarshalProto tells the driver to send using the protobuf binary format. + MarshalProto Marshaler = iota + // MarshalJSON tells the driver to send using json format. + MarshalJSON +) diff --git a/exporters/otlp/otlpgrpc/connection.go b/exporters/otlp/otlpgrpc/connection.go index 5fbd68ffe42..d904b56e058 100644 --- a/exporters/otlp/otlpgrpc/connection.go +++ b/exporters/otlp/otlpgrpc/connection.go @@ -22,6 +22,11 @@ import ( "time" "unsafe" + "google.golang.org/grpc/encoding/gzip" + + "go.opentelemetry.io/otel/exporters/otlp" + "go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig" + "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -36,7 +41,8 @@ type connection struct { cc *grpc.ClientConn // these fields are read-only after constructor is finished - cfg config + cfg otlpconfig.Config + sCfg otlpconfig.SignalConfig metadata metadata.MD newConnectionHandler func(cc *grpc.ClientConn) @@ -51,12 +57,13 @@ type connection struct { closeBackgroundConnectionDoneCh func(ch chan struct{}) } -func newConnection(cfg config, handler func(cc *grpc.ClientConn)) *connection { +func newConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler func(cc *grpc.ClientConn)) *connection { c := new(connection) c.newConnectionHandler = handler c.cfg = cfg - if len(c.cfg.headers) > 0 { - c.metadata = metadata.New(c.cfg.headers) + c.sCfg = sCfg + if len(c.sCfg.Headers) > 0 { + c.metadata = metadata.New(c.sCfg.Headers) } c.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { close(ch) @@ -117,7 +124,7 @@ func (c *connection) indefiniteBackgroundConnection() { c.closeBackgroundConnectionDoneCh(c.backgroundConnectionDoneCh) }() - connReattemptPeriod := c.cfg.reconnectionPeriod + connReattemptPeriod := c.cfg.ReconnectionPeriod if connReattemptPeriod <= 0 { connReattemptPeriod = defaultConnReattemptPeriod } @@ -204,28 +211,26 @@ func (c *connection) setConnection(cc *grpc.ClientConn) bool { } func (c *connection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) { - endpoint := c.cfg.collectorEndpoint - dialOpts := []grpc.DialOption{} - if c.cfg.serviceConfig != "" { - dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.serviceConfig)) + if c.cfg.ServiceConfig != "" { + dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.ServiceConfig)) } - if c.cfg.clientCredentials != nil { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.cfg.clientCredentials)) - } else if c.cfg.canDialInsecure { + if c.sCfg.GRPCCredentials != nil { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.sCfg.GRPCCredentials)) + } else if c.sCfg.Insecure { dialOpts = append(dialOpts, grpc.WithInsecure()) } - if c.cfg.compressor != "" { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(c.cfg.compressor))) + if c.sCfg.Compression == otlp.GzipCompression { + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) } - if len(c.cfg.dialOptions) != 0 { - dialOpts = append(dialOpts, c.cfg.dialOptions...) + if len(c.cfg.DialOptions) != 0 { + dialOpts = append(dialOpts, c.cfg.DialOptions...) } ctx, cancel := c.contextWithStop(ctx) defer cancel() ctx = c.contextWithMetadata(ctx) - return grpc.DialContext(ctx, endpoint, dialOpts...) + return grpc.DialContext(ctx, c.sCfg.Endpoint, dialOpts...) } func (c *connection) contextWithMetadata(ctx context.Context) context.Context { diff --git a/exporters/otlp/otlpgrpc/driver.go b/exporters/otlp/otlpgrpc/driver.go index 13f7ddb31c6..66979a360f7 100644 --- a/exporters/otlp/otlpgrpc/driver.go +++ b/exporters/otlp/otlpgrpc/driver.go @@ -20,6 +20,8 @@ import ( "fmt" "sync" + "go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig" + "google.golang.org/grpc" "go.opentelemetry.io/otel/exporters/otlp" @@ -33,11 +35,22 @@ import ( ) type driver struct { + metricsDriver metricsDriver + tracesDriver tracesDriver +} + +type metricsDriver struct { connection *connection lock sync.Mutex metricsClient colmetricpb.MetricsServiceClient - tracesClient coltracepb.TraceServiceClient +} + +type tracesDriver struct { + connection *connection + + lock sync.Mutex + tracesClient coltracepb.TraceServiceClient } var ( @@ -46,50 +59,69 @@ var ( // NewDriver creates a new gRPC protocol driver. func NewDriver(opts ...Option) otlp.ProtocolDriver { - cfg := config{ - collectorEndpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort), - serviceConfig: DefaultServiceConfig, - } + cfg := otlpconfig.NewDefaultConfig() + otlpconfig.ApplyGRPCEnvConfigs(&cfg) for _, opt := range opts { - opt(&cfg) + opt.ApplyGRPCOption(&cfg) } + d := &driver{} - d.connection = newConnection(cfg, d.handleNewConnection) + + d.tracesDriver = tracesDriver{ + connection: newConnection(cfg, cfg.Traces, d.tracesDriver.handleNewConnection), + } + + d.metricsDriver = metricsDriver{ + connection: newConnection(cfg, cfg.Metrics, d.metricsDriver.handleNewConnection), + } return d } -func (d *driver) handleNewConnection(cc *grpc.ClientConn) { - d.lock.Lock() - defer d.lock.Unlock() +func (md *metricsDriver) handleNewConnection(cc *grpc.ClientConn) { + md.lock.Lock() + defer md.lock.Unlock() + if cc != nil { + md.metricsClient = colmetricpb.NewMetricsServiceClient(cc) + } else { + md.metricsClient = nil + } +} + +func (td *tracesDriver) handleNewConnection(cc *grpc.ClientConn) { + td.lock.Lock() + defer td.lock.Unlock() if cc != nil { - d.metricsClient = colmetricpb.NewMetricsServiceClient(cc) - d.tracesClient = coltracepb.NewTraceServiceClient(cc) + td.tracesClient = coltracepb.NewTraceServiceClient(cc) } else { - d.metricsClient = nil - d.tracesClient = nil + td.tracesClient = nil } } // Start implements otlp.ProtocolDriver. It establishes a connection // to the collector. func (d *driver) Start(ctx context.Context) error { - d.connection.startConnection(ctx) + d.tracesDriver.connection.startConnection(ctx) + d.metricsDriver.connection.startConnection(ctx) return nil } // Stop implements otlp.ProtocolDriver. It shuts down the connection // to the collector. func (d *driver) Stop(ctx context.Context) error { - return d.connection.shutdown(ctx) + if err := d.tracesDriver.connection.shutdown(ctx); err != nil { + return err + } + + return d.metricsDriver.connection.shutdown(ctx) } // ExportMetrics implements otlp.ProtocolDriver. It transforms metrics // to protobuf binary format and sends the result to the collector. func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { - if !d.connection.connected() { - return fmt.Errorf("exporter disconnected: %w", d.connection.lastConnectError()) + if !d.metricsDriver.connection.connected() { + return fmt.Errorf("metrics exporter is disconnected from the server %s: %w", d.metricsDriver.connection.sCfg.Endpoint, d.metricsDriver.connection.lastConnectError()) } - ctx, cancel := d.connection.contextWithStop(ctx) + ctx, cancel := d.metricsDriver.connection.contextWithStop(ctx) defer cancel() rms, err := transform.CheckpointSet(ctx, selector, cps, 1) @@ -100,24 +132,24 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, return nil } - return d.uploadMetrics(ctx, rms) + return d.metricsDriver.uploadMetrics(ctx, rms) } -func (d *driver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error { - ctx = d.connection.contextWithMetadata(ctx) +func (md *metricsDriver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error { + ctx = md.connection.contextWithMetadata(ctx) err := func() error { - d.lock.Lock() - defer d.lock.Unlock() - if d.metricsClient == nil { + md.lock.Lock() + defer md.lock.Unlock() + if md.metricsClient == nil { return errNoClient } - _, err := d.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{ + _, err := md.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{ ResourceMetrics: protoMetrics, }) return err }() if err != nil { - d.connection.setStateDisconnected(err) + md.connection.setStateDisconnected(err) } return err } @@ -125,10 +157,10 @@ func (d *driver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.Res // ExportTraces implements otlp.ProtocolDriver. It transforms spans to // protobuf binary format and sends the result to the collector. func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { - if !d.connection.connected() { - return fmt.Errorf("exporter disconnected: %w", d.connection.lastConnectError()) + if !d.tracesDriver.connection.connected() { + return fmt.Errorf("traces exporter is disconnected from the server %s: %w", d.tracesDriver.connection.sCfg.Endpoint, d.tracesDriver.connection.lastConnectError()) } - ctx, cancel := d.connection.contextWithStop(ctx) + ctx, cancel := d.tracesDriver.connection.contextWithStop(ctx) defer cancel() protoSpans := transform.SpanData(ss) @@ -136,24 +168,24 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) return nil } - return d.uploadTraces(ctx, protoSpans) + return d.tracesDriver.uploadTraces(ctx, protoSpans) } -func (d *driver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error { - ctx = d.connection.contextWithMetadata(ctx) +func (td *tracesDriver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error { + ctx = td.connection.contextWithMetadata(ctx) err := func() error { - d.lock.Lock() - defer d.lock.Unlock() - if d.tracesClient == nil { + td.lock.Lock() + defer td.lock.Unlock() + if td.tracesClient == nil { return errNoClient } - _, err := d.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{ + _, err := td.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{ ResourceSpans: protoSpans, }) return err }() if err != nil { - d.connection.setStateDisconnected(err) + td.connection.setStateDisconnected(err) } return err } diff --git a/exporters/otlp/otlpgrpc/options.go b/exporters/otlp/otlpgrpc/options.go index 1700b5b8ee5..37d877bf2ba 100644 --- a/exporters/otlp/otlpgrpc/options.go +++ b/exporters/otlp/otlpgrpc/options.go @@ -15,131 +15,170 @@ package otlpgrpc import ( + "fmt" "time" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp" + "go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig" + "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) -const ( - // DefaultServiceConfig is the gRPC service config used if none is - // provided by the user. - // - // For more info on gRPC service configs: - // https://github.com/grpc/proposal/blob/master/A6-client-retries.md - // - // For more info on the RetryableStatusCodes we allow here: - // https://github.com/open-telemetry/oteps/blob/be2a3fcbaa417ebbf5845cd485d34fdf0ab4a2a4/text/0035-opentelemetry-protocol.md#export-response - // - // Note: MaxAttempts > 5 are treated as 5. See - // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy - // for more details. - DefaultServiceConfig = `{ - "methodConfig":[{ - "name":[ - { "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" }, - { "service":"opentelemetry.proto.collector.trace.v1.TraceService" } - ], - "retryPolicy":{ - "MaxAttempts":5, - "InitialBackoff":"0.3s", - "MaxBackoff":"5s", - "BackoffMultiplier":2, - "RetryableStatusCodes":[ - "CANCELLED", - "DEADLINE_EXCEEDED", - "RESOURCE_EXHAUSTED", - "ABORTED", - "OUT_OF_RANGE", - "UNAVAILABLE", - "DATA_LOSS" - ] - } - }] -}` -) - -type config struct { - canDialInsecure bool - collectorEndpoint string - compressor string - reconnectionPeriod time.Duration - serviceConfig string - dialOptions []grpc.DialOption - headers map[string]string - clientCredentials credentials.TransportCredentials -} - // Option applies an option to the gRPC driver. -type Option func(cfg *config) +type Option interface { + otlpconfig.GRPCOption +} // WithInsecure disables client transport security for the exporter's gRPC connection // just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure // does. Note, by default, client security is required unless WithInsecure is used. func WithInsecure() Option { - return func(cfg *config) { - cfg.canDialInsecure = true - } + return otlpconfig.WithInsecure() +} + +// WithTracesInsecure disables client transport security for the traces exporter's gRPC connection +// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure +// does. Note, by default, client security is required unless WithInsecure is used. +func WithTracesInsecure() Option { + return otlpconfig.WithInsecureTraces() +} + +// WithInsecureMetrics disables client transport security for the metrics exporter's gRPC connection +// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure +// does. Note, by default, client security is required unless WithInsecure is used. +func WithInsecureMetrics() Option { + return otlpconfig.WithInsecureMetrics() } // WithEndpoint allows one to set the endpoint that the exporter will // connect to the collector on. If unset, it will instead try to use // connect to DefaultCollectorHost:DefaultCollectorPort. func WithEndpoint(endpoint string) Option { - return func(cfg *config) { - cfg.collectorEndpoint = endpoint - } + return otlpconfig.WithEndpoint(endpoint) +} + +// WithTracesEndpoint allows one to set the traces endpoint that the exporter will +// connect to the collector on. If unset, it will instead try to use +// connect to DefaultCollectorHost:DefaultCollectorPort. +func WithTracesEndpoint(endpoint string) Option { + return otlpconfig.WithTracesEndpoint(endpoint) +} + +// WithMetricsEndpoint allows one to set the metrics endpoint that the exporter will +// connect to the collector on. If unset, it will instead try to use +// connect to DefaultCollectorHost:DefaultCollectorPort. +func WithMetricsEndpoint(endpoint string) Option { + return otlpconfig.WithMetricsEndpoint(endpoint) } // WithReconnectionPeriod allows one to set the delay between next connection attempt // after failing to connect with the collector. -func WithReconnectionPeriod(rp time.Duration) Option { - return func(cfg *config) { - cfg.reconnectionPeriod = rp +func WithReconnectionPeriod(rp time.Duration) otlpconfig.GRPCOption { + return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { + cfg.ReconnectionPeriod = rp + }) +} + +func compressorToCompression(compressor string) otlp.Compression { + switch compressor { + case "gzip": + return otlp.GzipCompression } + + otel.Handle(fmt.Errorf("invalid compression type: '%s', using no compression as default", compressor)) + return otlp.NoCompression } // WithCompressor will set the compressor for the gRPC client to use when sending requests. // It is the responsibility of the caller to ensure that the compressor set has been registered // with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some // compressors auto-register on import, such as gzip, which can be registered by calling -// `import _ "google.golang.org/grpc/encoding/gzip"` +// `import _ "google.golang.org/grpc/encoding/gzip"`. func WithCompressor(compressor string) Option { - return func(cfg *config) { - cfg.compressor = compressor - } + return otlpconfig.WithCompression(compressorToCompression(compressor)) +} + +// WithTracesCompression will set the compressor for the gRPC client to use when sending traces requests. +// It is the responsibility of the caller to ensure that the compressor set has been registered +// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some +// compressors auto-register on import, such as gzip, which can be registered by calling +// `import _ "google.golang.org/grpc/encoding/gzip"`. +func WithTracesCompression(compressor string) Option { + return otlpconfig.WithTracesCompression(compressorToCompression(compressor)) +} + +// WithMetricsCompression will set the compressor for the gRPC client to use when sending metrics requests. +// It is the responsibility of the caller to ensure that the compressor set has been registered +// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some +// compressors auto-register on import, such as gzip, which can be registered by calling +// `import _ "google.golang.org/grpc/encoding/gzip"`. +func WithMetricsCompression(compressor string) Option { + return otlpconfig.WithMetricsCompression(compressorToCompression(compressor)) } -// WithHeaders will send the provided headers with gRPC requests +// WithHeaders will send the provided headers with gRPC requests. func WithHeaders(headers map[string]string) Option { - return func(cfg *config) { - cfg.headers = headers - } + return otlpconfig.WithHeaders(headers) +} + +// WithTracesHeaders will send the provided headers with gRPC traces requests. +func WithTracesHeaders(headers map[string]string) Option { + return otlpconfig.WithTracesHeaders(headers) +} + +// WithMetricsHeaders will send the provided headers with gRPC metrics requests. +func WithMetricsHeaders(headers map[string]string) Option { + return otlpconfig.WithMetricsHeaders(headers) } // WithTLSCredentials allows the connection to use TLS credentials // when talking to the server. It takes in grpc.TransportCredentials instead -// of say a Certificate file or a tls.Certificate, because the retrieving +// of say a Certificate file or a tls.Certificate, because the retrieving of // these credentials can be done in many ways e.g. plain file, in code tls.Config // or by certificate rotation, so it is up to the caller to decide what to use. func WithTLSCredentials(creds credentials.TransportCredentials) Option { - return func(cfg *config) { - cfg.clientCredentials = creds - } + return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { + cfg.Traces.GRPCCredentials = creds + cfg.Metrics.GRPCCredentials = creds + }) +} + +// WithTracesTLSCredentials allows the connection to use TLS credentials +// when talking to the traces server. It takes in grpc.TransportCredentials instead +// of say a Certificate file or a tls.Certificate, because the retrieving of +// these credentials can be done in many ways e.g. plain file, in code tls.Config +// or by certificate rotation, so it is up to the caller to decide what to use. +func WithTracesTLSCredentials(creds credentials.TransportCredentials) Option { + return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { + cfg.Traces.GRPCCredentials = creds + }) +} + +// WithMetricsTLSCredentials allows the connection to use TLS credentials +// when talking to the metrics server. It takes in grpc.TransportCredentials instead +// of say a Certificate file or a tls.Certificate, because the retrieving of +// these credentials can be done in many ways e.g. plain file, in code tls.Config +// or by certificate rotation, so it is up to the caller to decide what to use. +func WithMetricsTLSCredentials(creds credentials.TransportCredentials) Option { + return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { + cfg.Metrics.GRPCCredentials = creds + }) } // WithServiceConfig defines the default gRPC service config used. func WithServiceConfig(serviceConfig string) Option { - return func(cfg *config) { - cfg.serviceConfig = serviceConfig - } + return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { + cfg.ServiceConfig = serviceConfig + }) } // WithDialOption opens support to any grpc.DialOption to be used. If it conflicts // with some other configuration the GRPC specified via the collector the ones here will // take preference since they are set last. func WithDialOption(opts ...grpc.DialOption) Option { - return func(cfg *config) { - cfg.dialOptions = opts - } + return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { + cfg.DialOptions = opts + }) } diff --git a/exporters/otlp/otlpgrpc/otlp_integration_test.go b/exporters/otlp/otlpgrpc/otlp_integration_test.go index b89bbc0be61..0b3a722a615 100644 --- a/exporters/otlp/otlpgrpc/otlp_integration_test.go +++ b/exporters/otlp/otlpgrpc/otlp_integration_test.go @@ -360,7 +360,10 @@ func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) { } err = exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "misconfiguration"}}) - require.Equal(t, err.Error(), "exporter disconnected: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") + + expectedErr := fmt.Sprintf("traces exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)", mc.endpoint) + + require.Equal(t, expectedErr, err.Error()) defer func() { _ = exp.Shutdown(ctx) diff --git a/exporters/otlp/otlphttp/driver.go b/exporters/otlp/otlphttp/driver.go index 887dee962af..eb47c25650f 100644 --- a/exporters/otlp/otlphttp/driver.go +++ b/exporters/otlp/otlphttp/driver.go @@ -28,6 +28,8 @@ import ( "strings" "time" + "go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig" + jsonpb "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" @@ -64,14 +66,15 @@ var ourTransport *http.Transport = &http.Transport{ type driver struct { metricsDriver signalDriver tracesDriver signalDriver - cfg config + cfg otlpconfig.Config stopCh chan struct{} } type signalDriver struct { - cfg signalConfig - generalCfg config + name string + cfg otlpconfig.SignalConfig + generalCfg otlpconfig.Config client *http.Client stopCh chan struct{} } @@ -80,15 +83,15 @@ var _ otlp.ProtocolDriver = (*driver)(nil) // NewDriver creates a new HTTP driver. func NewDriver(opts ...Option) otlp.ProtocolDriver { - cfg := newDefaultConfig() - applyEnvConfigs(&cfg) - + cfg := otlpconfig.NewDefaultConfig() + otlpconfig.ApplyHTTPEnvConfigs(&cfg) for _, opt := range opts { - opt.Apply(&cfg) + opt.ApplyHTTPOption(&cfg) } + for pathPtr, defaultPath := range map[*string]string{ - &cfg.traces.urlPath: DefaultTracesPath, - &cfg.metrics.urlPath: DefaultMetricsPath, + &cfg.Traces.URLPath: DefaultTracesPath, + &cfg.Metrics.URLPath: DefaultMetricsPath, } { tmp := strings.TrimSpace(*pathPtr) if tmp == "" { @@ -101,46 +104,48 @@ func NewDriver(opts ...Option) otlp.ProtocolDriver { } *pathPtr = tmp } - if cfg.maxAttempts <= 0 { - cfg.maxAttempts = DefaultMaxAttempts + if cfg.MaxAttempts <= 0 { + cfg.MaxAttempts = DefaultMaxAttempts } - if cfg.maxAttempts > DefaultMaxAttempts { - cfg.maxAttempts = DefaultMaxAttempts + if cfg.MaxAttempts > DefaultMaxAttempts { + cfg.MaxAttempts = DefaultMaxAttempts } - if cfg.backoff <= 0 { - cfg.backoff = DefaultBackoff + if cfg.Backoff <= 0 { + cfg.Backoff = DefaultBackoff } metricsClient := &http.Client{ Transport: ourTransport, - Timeout: cfg.metrics.timeout, + Timeout: cfg.Metrics.Timeout, } - if cfg.metrics.tlsCfg != nil { + if cfg.Metrics.TLSCfg != nil { transport := ourTransport.Clone() - transport.TLSClientConfig = cfg.metrics.tlsCfg + transport.TLSClientConfig = cfg.Metrics.TLSCfg metricsClient.Transport = transport } tracesClient := &http.Client{ Transport: ourTransport, - Timeout: cfg.traces.timeout, + Timeout: cfg.Traces.Timeout, } - if cfg.traces.tlsCfg != nil { + if cfg.Traces.TLSCfg != nil { transport := ourTransport.Clone() - transport.TLSClientConfig = cfg.traces.tlsCfg + transport.TLSClientConfig = cfg.Traces.TLSCfg tracesClient.Transport = transport } stopCh := make(chan struct{}) return &driver{ tracesDriver: signalDriver{ - cfg: cfg.traces, + name: "traces", + cfg: cfg.Traces, generalCfg: cfg, stopCh: stopCh, client: tracesClient, }, metricsDriver: signalDriver{ - cfg: cfg.metrics, + name: "metrics", + cfg: cfg.Metrics, generalCfg: cfg, stopCh: stopCh, client: metricsClient, @@ -198,18 +203,18 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) } func (d *driver) marshal(msg proto.Message) ([]byte, error) { - if d.cfg.marshaler == MarshalJSON { + if d.cfg.Marshaler == otlp.MarshalJSON { return jsonpb.Marshal(msg) } return proto.Marshal(msg) } func (d *signalDriver) send(ctx context.Context, rawRequest []byte) error { - address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, d.cfg.urlPath) + address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.Endpoint, d.cfg.URLPath) var cancel context.CancelFunc ctx, cancel = d.contextWithStop(ctx) defer cancel() - for i := 0; i < d.generalCfg.maxAttempts; i++ { + for i := 0; i < d.generalCfg.MaxAttempts; i++ { response, err := d.singleSend(ctx, rawRequest, address) if err != nil { return err @@ -226,20 +231,20 @@ func (d *signalDriver) send(ctx context.Context, rawRequest []byte) error { fallthrough case http.StatusServiceUnavailable: select { - case <-time.After(getWaitDuration(d.generalCfg.backoff, i)): + case <-time.After(getWaitDuration(d.generalCfg.Backoff, i)): continue case <-ctx.Done(): return ctx.Err() } default: - return fmt.Errorf("failed with HTTP status %s", response.Status) + return fmt.Errorf("failed to send %s to %s with HTTP status %s", d.name, address, response.Status) } } - return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.maxAttempts) + return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.MaxAttempts) } func (d *signalDriver) getScheme() string { - if d.cfg.insecure { + if d.cfg.Insecure { return "http" } return "https" @@ -302,20 +307,20 @@ func (d *signalDriver) singleSend(ctx context.Context, rawRequest []byte, addres func (d *signalDriver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) { var bodyReader io.ReadCloser headers := http.Header{} - for k, v := range d.cfg.headers { + for k, v := range d.cfg.Headers { headers.Set(k, v) } contentLength := (int64)(len(rawRequest)) - if d.generalCfg.marshaler == MarshalJSON { + if d.generalCfg.Marshaler == otlp.MarshalJSON { headers.Set("Content-Type", contentTypeJSON) } else { headers.Set("Content-Type", contentTypeProto) } requestReader := bytes.NewBuffer(rawRequest) - switch d.cfg.compression { - case NoCompression: + switch d.cfg.Compression { + case otlp.NoCompression: bodyReader = ioutil.NopCloser(requestReader) - case GzipCompression: + case otlp.GzipCompression: preader, pwriter := io.Pipe() go func() { defer pwriter.Close() diff --git a/exporters/otlp/otlphttp/driver_test.go b/exporters/otlp/otlphttp/driver_test.go index 157188df01e..10190154c37 100644 --- a/exporters/otlp/otlphttp/driver_test.go +++ b/exporters/otlp/otlphttp/driver_test.go @@ -16,6 +16,7 @@ package otlphttp_test import ( "context" + "fmt" "net/http" "os" "testing" @@ -57,7 +58,7 @@ func TestEndToEnd(t *testing.T) { { name: "with gzip compression", opts: []otlphttp.Option{ - otlphttp.WithCompression(otlphttp.GzipCompression), + otlphttp.WithCompression(otlp.GzipCompression), }, }, { @@ -109,7 +110,7 @@ func TestEndToEnd(t *testing.T) { { name: "with json encoding", opts: []otlphttp.Option{ - otlphttp.WithMarshal(otlphttp.MarshalJSON), + otlphttp.WithMarshal(otlp.MarshalJSON), }, }, } @@ -154,6 +155,7 @@ func TestRetry(t *testing.T) { defer mc.MustStop(t) driver := otlphttp.NewDriver( otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithTracesEndpoint(mc.Endpoint()), otlphttp.WithInsecure(), otlphttp.WithMaxAttempts(len(statuses)+1), ) @@ -237,6 +239,7 @@ func TestNoRetry(t *testing.T) { }() err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot()) assert.Error(t, err) + assert.Equal(t, fmt.Sprintf("failed to send traces to http://%s/v1/traces with HTTP status 400 Bad Request", mc.endpoint), err.Error()) assert.Empty(t, mc.GetSpans()) } diff --git a/exporters/otlp/otlphttp/options.go b/exporters/otlp/otlphttp/options.go index 78e57e1f375..e221ceb05b4 100644 --- a/exporters/otlp/otlphttp/options.go +++ b/exporters/otlp/otlphttp/options.go @@ -16,23 +16,10 @@ package otlphttp import ( "crypto/tls" - "fmt" "time" "go.opentelemetry.io/otel/exporters/otlp" -) - -// Compression describes the compression used for payloads sent to the -// collector. -type Compression int - -const ( - // NoCompression tells the driver to send payloads without - // compression. - NoCompression Compression = iota - // GzipCompression tells the driver to send payloads after - // compressing them with gzip. - GzipCompression + "go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig" ) const ( @@ -54,78 +41,9 @@ const ( DefaultTimeout time.Duration = 10 * time.Second ) -// Marshaler describes the kind of message format sent to the collector -type Marshaler int - -const ( - // MarshalProto tells the driver to send using the protobuf binary format. - MarshalProto Marshaler = iota - // MarshalJSON tells the driver to send using json format. - MarshalJSON -) - -type signalConfig struct { - endpoint string - insecure bool - tlsCfg *tls.Config - headers map[string]string - compression Compression - timeout time.Duration - urlPath string -} - -type config struct { - metrics signalConfig - traces signalConfig - - maxAttempts int - backoff time.Duration - marshaler Marshaler -} - -func newDefaultConfig() config { - c := config{ - traces: signalConfig{ - endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort), - urlPath: DefaultTracesPath, - compression: NoCompression, - timeout: DefaultTimeout, - }, - metrics: signalConfig{ - endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort), - urlPath: DefaultMetricsPath, - compression: NoCompression, - timeout: DefaultTimeout, - }, - maxAttempts: DefaultMaxAttempts, - backoff: DefaultBackoff, - } - - return c -} - // Option applies an option to the HTTP driver. type Option interface { - Apply(*config) - - // A private method to prevent users implementing the - // interface and so future additions to it will not - // violate compatibility. - private() -} - -type genericOption struct { - fn func(*config) -} - -func (g *genericOption) Apply(cfg *config) { - g.fn(cfg) -} - -func (genericOption) private() {} - -func newGenericOption(fn func(cfg *config)) Option { - return &genericOption{fn: fn} + otlpconfig.HTTPOption } // WithEndpoint allows one to set the address of the collector @@ -134,10 +52,7 @@ func newGenericOption(fn func(cfg *config)) Option { // DefaultCollectorHost:DefaultCollectorPort. Note that the endpoint // must not contain any URL path. func WithEndpoint(endpoint string) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.endpoint = endpoint - cfg.metrics.endpoint = endpoint - }) + return otlpconfig.WithEndpoint(endpoint) } // WithTracesEndpoint allows one to set the address of the collector @@ -145,9 +60,7 @@ func WithEndpoint(endpoint string) Option { // unset, it will instead try to use the Endpoint configuration. // Note that the endpoint must not contain any URL path. func WithTracesEndpoint(endpoint string) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.endpoint = endpoint - }) + return otlpconfig.WithTracesEndpoint(endpoint) } // WithMetricsEndpoint allows one to set the address of the collector @@ -155,177 +68,132 @@ func WithTracesEndpoint(endpoint string) Option { // unset, it will instead try to use the Endpoint configuration. // Note that the endpoint must not contain any URL path. func WithMetricsEndpoint(endpoint string) Option { - return newGenericOption(func(cfg *config) { - cfg.metrics.endpoint = endpoint - }) + return otlpconfig.WithMetricsEndpoint(endpoint) } // WithCompression tells the driver to compress the sent data. -func WithCompression(compression Compression) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.compression = compression - cfg.metrics.compression = compression - }) +func WithCompression(compression otlp.Compression) Option { + return otlpconfig.WithCompression(compression) } // WithTracesCompression tells the driver to compress the sent traces data. -func WithTracesCompression(compression Compression) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.compression = compression - }) +func WithTracesCompression(compression otlp.Compression) Option { + return otlpconfig.WithTracesCompression(compression) } // WithMetricsCompression tells the driver to compress the sent metrics data. -func WithMetricsCompression(compression Compression) Option { - return newGenericOption(func(cfg *config) { - cfg.metrics.compression = compression - }) +func WithMetricsCompression(compression otlp.Compression) Option { + return otlpconfig.WithMetricsCompression(compression) } // WithTracesURLPath allows one to override the default URL path used // for sending traces. If unset, DefaultTracesPath will be used. func WithTracesURLPath(urlPath string) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.urlPath = urlPath - }) + return otlpconfig.WithTracesURLPath(urlPath) } // WithMetricsURLPath allows one to override the default URL path used // for sending metrics. If unset, DefaultMetricsPath will be used. func WithMetricsURLPath(urlPath string) Option { - return newGenericOption(func(cfg *config) { - cfg.metrics.urlPath = urlPath - }) + return otlpconfig.WithMetricsURLPath(urlPath) } // WithMaxAttempts allows one to override how many times the driver // will try to send the payload in case of retryable errors. If unset, // DefaultMaxAttempts will be used. func WithMaxAttempts(maxAttempts int) Option { - return newGenericOption(func(cfg *config) { - cfg.maxAttempts = maxAttempts - }) + return otlpconfig.WithMaxAttempts(maxAttempts) } // WithBackoff tells the driver to use the duration as a base of the // exponential backoff strategy. If unset, DefaultBackoff will be // used. func WithBackoff(duration time.Duration) Option { - return newGenericOption(func(cfg *config) { - cfg.backoff = duration - }) + return otlpconfig.WithBackoff(duration) } // WithTLSClientConfig can be used to set up a custom TLS // configuration for the client used to send payloads to the // collector. Use it if you want to use a custom certificate. func WithTLSClientConfig(tlsCfg *tls.Config) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.tlsCfg = tlsCfg - cfg.metrics.tlsCfg = tlsCfg - }) + return otlpconfig.WithTLSClientConfig(tlsCfg) } // WithTracesTLSClientConfig can be used to set up a custom TLS // configuration for the client used to send traces. // Use it if you want to use a custom certificate. func WithTracesTLSClientConfig(tlsCfg *tls.Config) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.tlsCfg = tlsCfg - }) + return otlpconfig.WithTracesTLSClientConfig(tlsCfg) } // WithMetricsTLSClientConfig can be used to set up a custom TLS // configuration for the client used to send metrics. // Use it if you want to use a custom certificate. func WithMetricsTLSClientConfig(tlsCfg *tls.Config) Option { - return newGenericOption(func(cfg *config) { - cfg.metrics.tlsCfg = tlsCfg - }) + return otlpconfig.WithMetricsTLSClientConfig(tlsCfg) } // WithInsecure tells the driver to connect to the collector using the // HTTP scheme, instead of HTTPS. func WithInsecure() Option { - return newGenericOption(func(cfg *config) { - cfg.traces.insecure = true - cfg.metrics.insecure = true - }) + return otlpconfig.WithInsecure() } // WithInsecureTraces tells the driver to connect to the traces collector using the // HTTP scheme, instead of HTTPS. func WithInsecureTraces() Option { - return newGenericOption(func(cfg *config) { - cfg.traces.insecure = true - }) + return otlpconfig.WithInsecureTraces() } // WithInsecure tells the driver to connect to the metrics collector using the // HTTP scheme, instead of HTTPS. func WithInsecureMetrics() Option { - return newGenericOption(func(cfg *config) { - cfg.metrics.insecure = true - }) + return otlpconfig.WithInsecureMetrics() } // WithHeaders allows one to tell the driver to send additional HTTP // headers with the payloads. Specifying headers like Content-Length, // Content-Encoding and Content-Type may result in a broken driver. func WithHeaders(headers map[string]string) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.headers = headers - cfg.metrics.headers = headers - }) + return otlpconfig.WithHeaders(headers) } // WithTracesHeaders allows one to tell the driver to send additional HTTP // headers with the trace payloads. Specifying headers like Content-Length, // Content-Encoding and Content-Type may result in a broken driver. func WithTracesHeaders(headers map[string]string) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.headers = headers - }) + return otlpconfig.WithTracesHeaders(headers) } // WithMetricsHeaders allows one to tell the driver to send additional HTTP // headers with the metrics payloads. Specifying headers like Content-Length, // Content-Encoding and Content-Type may result in a broken driver. func WithMetricsHeaders(headers map[string]string) Option { - return newGenericOption(func(cfg *config) { - cfg.metrics.headers = headers - }) + return otlpconfig.WithMetricsHeaders(headers) } // WithMarshal tells the driver which wire format to use when sending to the // collector. If unset, MarshalProto will be used -func WithMarshal(m Marshaler) Option { - return newGenericOption(func(cfg *config) { - cfg.marshaler = m +func WithMarshal(m otlp.Marshaler) Option { + return otlpconfig.NewHTTPOption(func(cfg *otlpconfig.Config) { + cfg.Marshaler = m }) } // WithTimeout tells the driver the max waiting time for the backend to process // each spans or metrics batch. If unset, the default will be 10 seconds. func WithTimeout(duration time.Duration) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.timeout = duration - cfg.metrics.timeout = duration - }) + return otlpconfig.WithTimeout(duration) } // WithTracesTimeout tells the driver the max waiting time for the backend to process // each spans batch. If unset, the default will be 10 seconds. func WithTracesTimeout(duration time.Duration) Option { - return newGenericOption(func(cfg *config) { - cfg.traces.timeout = duration - }) + return otlpconfig.WithTracesTimeout(duration) } // WithMetricsTimeout tells the driver the max waiting time for the backend to process // each metrics batch. If unset, the default will be 10 seconds. func WithMetricsTimeout(duration time.Duration) Option { - return newGenericOption(func(cfg *config) { - cfg.metrics.timeout = duration - }) + return otlpconfig.WithMetricsTimeout(duration) } diff --git a/exporters/otlp/otlphttp/options_test.go b/exporters/otlp/otlphttp/options_test.go deleted file mode 100644 index 80355f5ee3d..00000000000 --- a/exporters/otlp/otlphttp/options_test.go +++ /dev/null @@ -1,389 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package otlphttp - -import ( - "crypto/tls" - "crypto/x509" - "errors" - "testing" - "time" - - "go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig" - - "github.com/stretchr/testify/assert" -) - -type env map[string]string - -func (e *env) getEnv(env string) string { - return (*e)[env] -} - -type fileReader map[string][]byte - -func (f *fileReader) readFile(filename string) ([]byte, error) { - if b, ok := (*f)[filename]; ok { - return b, nil - } - return nil, errors.New("File not found") -} - -func TestConfigs(t *testing.T) { - tlsCert, err := otlpconfig.CreateTLSConfig([]byte(otlpconfig.WeakCertificate)) - assert.NoError(t, err) - - tests := []struct { - name string - opts []Option - env env - fileReader fileReader - asserts func(t *testing.T, c *config) - }{ - { - name: "Test default configs", - asserts: func(t *testing.T, c *config) { - assert.Equal(t, "localhost:4317", c.traces.endpoint) - assert.Equal(t, "localhost:4317", c.metrics.endpoint) - assert.Equal(t, NoCompression, c.traces.compression) - assert.Equal(t, NoCompression, c.metrics.compression) - assert.Equal(t, map[string]string(nil), c.traces.headers) - assert.Equal(t, map[string]string(nil), c.metrics.headers) - assert.Equal(t, 10*time.Second, c.traces.timeout) - assert.Equal(t, 10*time.Second, c.metrics.timeout) - }, - }, - - // Endpoint Tests - { - name: "Test With Endpoint", - opts: []Option{ - WithEndpoint("someendpoint"), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, "someendpoint", c.traces.endpoint) - assert.Equal(t, "someendpoint", c.metrics.endpoint) - }, - }, - { - name: "Test With Signal Specific Endpoint", - opts: []Option{ - WithEndpoint("overrode_by_signal_specific"), - WithTracesEndpoint("traces_endpoint"), - WithMetricsEndpoint("metrics_endpoint"), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, "traces_endpoint", c.traces.endpoint) - assert.Equal(t, "metrics_endpoint", c.metrics.endpoint) - }, - }, - { - name: "Test Environment Endpoint", - env: map[string]string{ - "OTEL_EXPORTER_OTLP_ENDPOINT": "env_endpoint", - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, "env_endpoint", c.traces.endpoint) - assert.Equal(t, "env_endpoint", c.metrics.endpoint) - }, - }, - { - name: "Test Environment Signal Specific Endpoint", - env: map[string]string{ - "OTEL_EXPORTER_OTLP_ENDPOINT": "overrode_by_signal_specific", - "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "env_traces_endpoint", - "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT": "env_metrics_endpoint", - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, "env_traces_endpoint", c.traces.endpoint) - assert.Equal(t, "env_metrics_endpoint", c.metrics.endpoint) - }, - }, - { - name: "Test Mixed Environment and With Endpoint", - opts: []Option{ - WithTracesEndpoint("traces_endpoint"), - }, - env: map[string]string{ - "OTEL_EXPORTER_OTLP_ENDPOINT": "env_endpoint", - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, "traces_endpoint", c.traces.endpoint) - assert.Equal(t, "env_endpoint", c.metrics.endpoint) - }, - }, - - // Certificate tests - { - name: "Test With Certificate", - opts: []Option{ - WithTLSClientConfig(tlsCert), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, tlsCert.RootCAs.Subjects(), c.traces.tlsCfg.RootCAs.Subjects()) - assert.Equal(t, tlsCert.RootCAs.Subjects(), c.metrics.tlsCfg.RootCAs.Subjects()) - }, - }, - { - name: "Test With Signal Specific Endpoint", - opts: []Option{ - WithTLSClientConfig(&tls.Config{}), - WithTracesTLSClientConfig(tlsCert), - WithMetricsTLSClientConfig(&tls.Config{RootCAs: x509.NewCertPool()}), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, tlsCert.RootCAs.Subjects(), c.traces.tlsCfg.RootCAs.Subjects()) - assert.Equal(t, 0, len(c.metrics.tlsCfg.RootCAs.Subjects())) - }, - }, - { - name: "Test Environment Endpoint", - env: map[string]string{ - "OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path", - }, - fileReader: fileReader{ - "cert_path": []byte(otlpconfig.WeakCertificate), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, tlsCert.RootCAs.Subjects(), c.traces.tlsCfg.RootCAs.Subjects()) - assert.Equal(t, tlsCert.RootCAs.Subjects(), c.metrics.tlsCfg.RootCAs.Subjects()) - }, - }, - { - name: "Test Environment Signal Specific Endpoint", - env: map[string]string{ - "OTEL_EXPORTER_OTLP_CERTIFICATE": "overrode_by_signal_specific", - "OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE": "cert_path", - "OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE": "invalid_cert", - }, - fileReader: fileReader{ - "cert_path": []byte(otlpconfig.WeakCertificate), - "invalid_cert": []byte("invalid certificate file."), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, tlsCert.RootCAs.Subjects(), c.traces.tlsCfg.RootCAs.Subjects()) - assert.Equal(t, (*tls.Config)(nil), c.metrics.tlsCfg) - }, - }, - { - name: "Test Mixed Environment and With Endpoint", - opts: []Option{ - WithMetricsTLSClientConfig(&tls.Config{RootCAs: x509.NewCertPool()}), - }, - env: map[string]string{ - "OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path", - }, - fileReader: fileReader{ - "cert_path": []byte(otlpconfig.WeakCertificate), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, tlsCert.RootCAs.Subjects(), c.traces.tlsCfg.RootCAs.Subjects()) - assert.Equal(t, 0, len(c.metrics.tlsCfg.RootCAs.Subjects())) - }, - }, - - // Headers tests - { - name: "Test With Headers", - opts: []Option{ - WithHeaders(map[string]string{"h1": "v1"}), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, map[string]string{"h1": "v1"}, c.metrics.headers) - assert.Equal(t, map[string]string{"h1": "v1"}, c.traces.headers) - }, - }, - { - name: "Test With Signal Specific Headers", - opts: []Option{ - WithHeaders(map[string]string{"overrode": "by_signal_specific"}), - WithMetricsHeaders(map[string]string{"m1": "mv1"}), - WithTracesHeaders(map[string]string{"t1": "tv1"}), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, map[string]string{"m1": "mv1"}, c.metrics.headers) - assert.Equal(t, map[string]string{"t1": "tv1"}, c.traces.headers) - }, - }, - { - name: "Test Environment Headers", - env: map[string]string{"OTEL_EXPORTER_OTLP_HEADERS": "h1=v1,h2=v2"}, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.metrics.headers) - assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.traces.headers) - }, - }, - { - name: "Test Environment Signal Specific Headers", - env: map[string]string{ - "OTEL_EXPORTER_OTLP_HEADERS": "overrode_by_signal_specific", - "OTEL_EXPORTER_OTLP_TRACES_HEADERS": "h1=v1,h2=v2", - "OTEL_EXPORTER_OTLP_METRICS_HEADERS": "h1=v1,h2=v2", - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.metrics.headers) - assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.traces.headers) - }, - }, - { - name: "Test Mixed Environment and With Headers", - env: map[string]string{"OTEL_EXPORTER_OTLP_HEADERS": "h1=v1,h2=v2"}, - opts: []Option{ - WithMetricsHeaders(map[string]string{"m1": "mv1"}), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, map[string]string{"m1": "mv1"}, c.metrics.headers) - assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.traces.headers) - }, - }, - - // Compression Tests - { - name: "Test With Compression", - opts: []Option{ - WithCompression(GzipCompression), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, GzipCompression, c.traces.compression) - assert.Equal(t, GzipCompression, c.metrics.compression) - }, - }, - { - name: "Test With Signal Specific Compression", - opts: []Option{ - WithCompression(NoCompression), // overrode by signal specific configs - WithTracesCompression(GzipCompression), - WithMetricsCompression(GzipCompression), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, GzipCompression, c.traces.compression) - assert.Equal(t, GzipCompression, c.metrics.compression) - }, - }, - { - name: "Test Environment Compression", - env: map[string]string{ - "OTEL_EXPORTER_OTLP_COMPRESSION": "gzip", - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, GzipCompression, c.traces.compression) - assert.Equal(t, GzipCompression, c.metrics.compression) - }, - }, - { - name: "Test Environment Signal Specific Compression", - env: map[string]string{ - "OTEL_EXPORTER_OTLP_TRACES_COMPRESSION": "gzip", - "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION": "gzip", - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, GzipCompression, c.traces.compression) - assert.Equal(t, GzipCompression, c.metrics.compression) - }, - }, - { - name: "Test Mixed Environment and With Compression", - opts: []Option{ - WithTracesCompression(NoCompression), - }, - env: map[string]string{ - "OTEL_EXPORTER_OTLP_TRACES_COMPRESSION": "gzip", - "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION": "gzip", - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, NoCompression, c.traces.compression) - assert.Equal(t, GzipCompression, c.metrics.compression) - }, - }, - - // Timeout Tests - { - name: "Test With Timeout", - opts: []Option{ - WithTimeout(time.Duration(5 * time.Second)), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, 5*time.Second, c.traces.timeout) - assert.Equal(t, 5*time.Second, c.metrics.timeout) - }, - }, - { - name: "Test With Signal Specific Timeout", - opts: []Option{ - WithTimeout(time.Duration(5 * time.Second)), - WithTracesTimeout(time.Duration(13 * time.Second)), - WithMetricsTimeout(time.Duration(14 * time.Second)), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, 13*time.Second, c.traces.timeout) - assert.Equal(t, 14*time.Second, c.metrics.timeout) - }, - }, - { - name: "Test Environment Timeout", - env: map[string]string{ - "OTEL_EXPORTER_OTLP_TIMEOUT": "15000", - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, c.metrics.timeout, 15*time.Second) - assert.Equal(t, c.traces.timeout, 15*time.Second) - }, - }, - { - name: "Test Environment Signal Specific Timeout", - env: map[string]string{ - "OTEL_EXPORTER_OTLP_TIMEOUT": "15000", - "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT": "27000", - "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT": "28000", - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, c.traces.timeout, 27*time.Second) - assert.Equal(t, c.metrics.timeout, 28*time.Second) - }, - }, - { - name: "Test Mixed Environment and With Timeout", - env: map[string]string{ - "OTEL_EXPORTER_OTLP_TIMEOUT": "15000", - "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT": "27000", - "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT": "28000", - }, - opts: []Option{ - WithTracesTimeout(5 * time.Second), - }, - asserts: func(t *testing.T, c *config) { - assert.Equal(t, c.traces.timeout, 5*time.Second) - assert.Equal(t, c.metrics.timeout, 28*time.Second) - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cfg := newDefaultConfig() - - e := envOptionsReader{ - getEnv: tt.env.getEnv, - readFile: tt.fileReader.readFile, - } - e.applyEnvConfigs(&cfg) - - for _, opt := range tt.opts { - opt.Apply(&cfg) - } - tt.asserts(t, &cfg) - }) - } -}