Skip to content

Commit

Permalink
Merge branch 'main' into fix-send_failed_requests-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdandrutu committed Jan 5, 2022
2 parents f2a3a4a + f13a011 commit 5e27ca4
Show file tree
Hide file tree
Showing 25 changed files with 241 additions and 131 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
- `service.telemetry.metrics.level` and `service.telemetry.metrics.address`
should be used to configure collector self-metrics.
- `configauth`: add helpers to create new server authenticators. (#4558)
- Refactor `configgrpc` for compression methods (#4624)
- Add an option to allow `config.Map` conversion in the `service.ConfigProvider` (#4634)

## 🧰 Bug fixes 🧰

- Fix merge config map provider to close the watchers (#4570)
- Fix expand map provider to call close on the base provider (#4571)
- Fix correct the value of `otelcol_exporter_send_failed_requests` (#4629)
- `otlp` receiver: Fix legacy port cfg value override and HTTP server starting bug (#4631)

## v0.41.0 Beta

Expand Down
8 changes: 4 additions & 4 deletions cmd/builder/internal/builder/templates/main_windows.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func checkUseInteractiveMode() (bool, error) {
return true, nil
}

if isInteractiveSession, err := svc.IsAnInteractiveSession(); err != nil {
return false, fmt.Errorf("failed to determine if we are running in an interactive session %w", err)
} else {
return isInteractiveSession, nil
isInteractiveSession, err := svc.IsAnInteractiveSession()
if err != nil {
return false, fmt.Errorf("failed to determine if we are running in an interactive session: %w", err)
}
return isInteractiveSession, nil
}

func runService(params service.CollectorSettings) error {
Expand Down
8 changes: 4 additions & 4 deletions cmd/otelcorecol/main_windows.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 18 additions & 31 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,7 @@ import (
"go.opentelemetry.io/collector/internal/middleware"
)

// Compression gRPC keys for supported compression types within collector.
const (
CompressionUnsupported = ""
CompressionGzip = "gzip"
CompressionSnappy = "snappy"
CompressionZstd = "zstd"
)

var (
// Map of opentelemetry compression types to grpc registered compression types.
gRPCCompressionKeyMap = map[string]string{
CompressionGzip: gzip.Name,
CompressionSnappy: snappy.Name,
CompressionZstd: zstd.Name,
}

errMetadataNotFound = errors.New("no request metadata found")
)
var errMetadataNotFound = errors.New("no request metadata found")

// Allowed balancer names to be set in grpclb_policy to discover the servers.
var allowedBalancerNames = []string{roundrobin.Name, grpc.PickFirstBalancerName}
Expand All @@ -83,7 +66,7 @@ type GRPCClientSettings struct {
Endpoint string `mapstructure:"endpoint"`

// The compression key for supported compression types within collector.
Compression string `mapstructure:"compression"`
Compression middleware.CompressionType `mapstructure:"compression"`

// TLSSetting struct exposes TLS client configuration.
TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"`
Expand Down Expand Up @@ -194,12 +177,12 @@ func (gcs *GRPCClientSettings) isSchemeHTTPS() bool {
// ToDialOptions maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC.
func (gcs *GRPCClientSettings) ToDialOptions(host component.Host, settings component.TelemetrySettings) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
if gcs.Compression != "" {
if compressionKey := GetGRPCCompressionKey(gcs.Compression); compressionKey != CompressionUnsupported {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(compressionKey)))
} else {
return nil, fmt.Errorf("unsupported compression type %q", gcs.Compression)
if gcs.Compression != middleware.CompressionEmpty && gcs.Compression != middleware.CompressionNone {
cp, err := getGRPCCompressionName(gcs.Compression)
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(cp)))
}

tlsCfg, err := gcs.TLSSetting.LoadTLSConfig()
Expand Down Expand Up @@ -373,14 +356,18 @@ func (gss *GRPCServerSettings) ToServerOption(host component.Host, settings comp
return opts, nil
}

// GetGRPCCompressionKey returns the grpc registered compression key if the
// passed in compression key is supported, and CompressionUnsupported otherwise.
func GetGRPCCompressionKey(compressionType string) string {
compressionKey := strings.ToLower(compressionType)
if encodingKey, ok := gRPCCompressionKeyMap[compressionKey]; ok {
return encodingKey
// getGRPCCompressionName returns compression name registered in grpc.
func getGRPCCompressionName(compressionType middleware.CompressionType) (string, error) {
switch compressionType {
case middleware.CompressionGzip:
return gzip.Name, nil
case middleware.CompressionSnappy:
return snappy.Name, nil
case middleware.CompressionZstd:
return zstd.Name, nil
default:
return "", fmt.Errorf("unsupported compression type %q", compressionType)
}
return CompressionUnsupported
}

// enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes
Expand Down
181 changes: 126 additions & 55 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/internal/middleware"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)
Expand All @@ -62,36 +63,103 @@ func TestAllGrpcClientSettings(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

gcs := &GRPCClientSettings{
Headers: map[string]string{
"test": "test",
tests := []struct {
settings GRPCClientSettings
name string
host component.Host
}{
{
name: "test all with gzip compression",
settings: GRPCClientSettings{
Headers: map[string]string{
"test": "test",
},
Endpoint: "localhost:1234",
Compression: middleware.CompressionGzip,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
Keepalive: &KeepaliveClientConfig{
Time: time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
WaitForReady: true,
BalancerName: "round_robin",
Auth: &configauth.Authentication{AuthenticatorID: config.NewComponentID("testauth")},
},
host: &mockHost{
ext: map[config.ComponentID]component.Extension{
config.NewComponentID("testauth"): &configauth.MockClientAuthenticator{},
},
},
},
Endpoint: "localhost:1234",
Compression: "gzip",
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
{
name: "test all with snappy compression",
settings: GRPCClientSettings{
Headers: map[string]string{
"test": "test",
},
Endpoint: "localhost:1234",
Compression: middleware.CompressionSnappy,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
Keepalive: &KeepaliveClientConfig{
Time: time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
WaitForReady: true,
BalancerName: "round_robin",
Auth: &configauth.Authentication{AuthenticatorID: config.NewComponentID("testauth")},
},
host: &mockHost{
ext: map[config.ComponentID]component.Extension{
config.NewComponentID("testauth"): &configauth.MockClientAuthenticator{},
},
},
},
Keepalive: &KeepaliveClientConfig{
Time: time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
{
name: "test all with zstd compression",
settings: GRPCClientSettings{
Headers: map[string]string{
"test": "test",
},
Endpoint: "localhost:1234",
Compression: middleware.CompressionZstd,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
Keepalive: &KeepaliveClientConfig{
Time: time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
WaitForReady: true,
BalancerName: "round_robin",
Auth: &configauth.Authentication{AuthenticatorID: config.NewComponentID("testauth")},
},
host: &mockHost{
ext: map[config.ComponentID]component.Extension{
config.NewComponentID("testauth"): &configauth.MockClientAuthenticator{},
},
},
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
WaitForReady: true,
BalancerName: "round_robin",
Auth: &configauth.Authentication{AuthenticatorID: config.NewComponentID("testauth")},
}

host := &mockHost{
ext: map[config.ComponentID]component.Extension{
config.NewComponentID("testauth"): &configauth.MockClientAuthenticator{},
},
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
opts, err := test.settings.ToDialOptions(test.host, tt.TelemetrySettings)
assert.NoError(t, err)
assert.Len(t, opts, 9)
})
}

opts, err := gcs.ToDialOptions(host, tt.TelemetrySettings)
assert.NoError(t, err)
assert.Len(t, opts, 9)
}

func TestDefaultGrpcServerSettings(t *testing.T) {
Expand Down Expand Up @@ -242,6 +310,39 @@ func TestGRPCClientSettingsError(t *testing.T) {
},
host: &mockHost{},
},
{
err: "unsupported compression type \"zlib\"",
settings: GRPCClientSettings{
Endpoint: "localhost:1234",
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
Compression: "zlib",
},
host: &mockHost{},
},
{
err: "unsupported compression type \"deflate\"",
settings: GRPCClientSettings{
Endpoint: "localhost:1234",
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
Compression: "deflate",
},
host: &mockHost{},
},
{
err: "unsupported compression type \"bad\"",
settings: GRPCClientSettings{
Endpoint: "localhost:1234",
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
Compression: "bad",
},
host: &mockHost{},
},
}
for _, test := range tests {
t.Run(test.err, func(t *testing.T) {
Expand Down Expand Up @@ -343,36 +444,6 @@ func TestGRPCServerSettings_ToListener_Error(t *testing.T) {
assert.Error(t, err)
}

func TestGetGRPCCompressionKey(t *testing.T) {
if GetGRPCCompressionKey("gzip") != CompressionGzip {
t.Error("gzip is marked as supported but returned unsupported")
}

if GetGRPCCompressionKey("Gzip") != CompressionGzip {
t.Error("Capitalization of CompressionGzip should not matter")
}

if GetGRPCCompressionKey("snappy") != CompressionSnappy {
t.Error("snappy is marked as supported but returned unsupported")
}

if GetGRPCCompressionKey("Snappy") != CompressionSnappy {
t.Error("Capitalization of CompressionSnappy should not matter")
}

if GetGRPCCompressionKey("zstd") != CompressionZstd {
t.Error("zstd is marked as supported but returned unsupported")
}

if GetGRPCCompressionKey("Zstd") != CompressionZstd {
t.Error("Capitalization of CompressionZstd should not matter")
}

if GetGRPCCompressionKey("badType") != CompressionUnsupported {
t.Error("badType is not supported but was returned as supported")
}
}

func TestHttpReception(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestLoadConfig(t *testing.T) {
"another": "somevalue",
},
Endpoint: "1.2.3.4:1234",
Compression: "on",
Compression: "gzip",
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "/var/lib/mycert.pem",
Expand Down

0 comments on commit 5e27ca4

Please sign in to comment.