Skip to content

Commit

Permalink
Merge branch 'main' into add_shutdown_podman
Browse files Browse the repository at this point in the history
  • Loading branch information
rogercoll committed May 13, 2024
2 parents 4e6da95 + a133a8e commit e86e8c6
Show file tree
Hide file tree
Showing 36 changed files with 5,685 additions and 992 deletions.
31 changes: 31 additions & 0 deletions .chloggen/drosiek-exporter-logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sumologicexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: change logs behavior

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31479]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
* set OTLP as default format
* add support for OTLP format
* do not support metadata attributes
* do not support source headers
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
27 changes: 27 additions & 0 deletions .chloggen/otelarrowexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: OpenTelemetry Protocol with Apache Arrow Exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implementation copied from opentelemetry/otel-arrow repository @v0.20.0.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26491]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
15 changes: 4 additions & 11 deletions exporter/otelarrowexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ Apache Arrow.
OpenTelemetry Protocol with Apache Arrow supports column-oriented data
transport using the Apache Arrow data format. This component converts
OTLP data into an optimized representation and then sends batches of
data using Apache Arrow to encode the stream. The OpenTelemetry
Protocol with Apache Arrow receiver <!-- TODO add link when the
corresponding skeleton is introduced.
[](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/otelarrowreceiver)
--> component contains logic to reverse the process used in this
data using Apache Arrow to encode the stream. The [OpenTelemetry
Protocol with Apache Arrow receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/otelarrowreceiver) component contains logic to reverse the process used in this
component.

The use of an OpenTelemetry Protocol with Apache Arrow
Expand All @@ -51,7 +48,7 @@ exporter component. This is as simple as replacing "otlp" with

To enable the OpenTelemetry Protocol with Apache Arrow exporter,
include it in the list of exporters for a pipeline. The `endpoint`
setting is required. The `tls` setting is requirede for insecure
setting is required. The `tls` setting is required for insecure
transport.

- `endpoint` (no default): host:port to which the exporter is going to send OTLP trace data,
Expand Down Expand Up @@ -143,13 +140,9 @@ exporters:
When this is configured, the stream will terminate cleanly without
causing retries, with `OK` gRPC status.

The corresponding `otelarrowreceiver` keepalive setting, that is
The [corresponding `otelarrowreceiver` keepalive setting](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/otelarrowreceiver#keepalive-configuration), that is
compatible with the one above, reads:

<!-- TODO add a link to the (../../receiver/otelarrowreceiver/README.md) section
discussing this topic from the receiver perspective after both READMEs are present
in collector-contrib -->

```
receivers:
otelarrow:
Expand Down
36 changes: 18 additions & 18 deletions exporter/otelarrowexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/arrow"
)

// Config defines configuration for OTLP exporter.
Expand All @@ -26,12 +28,12 @@ type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

RetrySettings configretry.BackOffConfig `mapstructure:"retry_on_failure"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`

configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// Arrow includes settings specific to OTel Arrow.
Arrow ArrowSettings `mapstructure:"arrow"`
Arrow ArrowConfig `mapstructure:"arrow"`

// UserDialOptions cannot be configured via `mapstructure`
// schemes. This is useful for custom purposes where the
Expand All @@ -40,9 +42,9 @@ type Config struct {
UserDialOptions []grpc.DialOption `mapstructure:"-"`
}

// ArrowSettings includes whether Arrow is enabled and the number of
// ArrowConfig includes whether Arrow is enabled and the number of
// concurrent Arrow streams.
type ArrowSettings struct {
type ArrowConfig struct {
// NumStreams determines the number of OTel Arrow streams.
NumStreams int `mapstructure:"num_streams"`

Expand All @@ -65,32 +67,26 @@ type ArrowSettings struct {
// Note that `Zstd` applies to gRPC, not Arrow compression.
PayloadCompression configcompression.Type `mapstructure:"payload_compression"`

// Disabled prevents using OTel Arrow streams. The exporter
// Disabled prevents using OTel-Arrow streams. The exporter
// falls back to standard OTLP.
Disabled bool `mapstructure:"disabled"`

// DisableDowngrade prevents this exporter from fallback back
// to standard OTLP. If the Arrow service is unavailable, it
// will retry and/or fail.
DisableDowngrade bool `mapstructure:"disable_downgrade"`

// Prioritizer is a policy name for how load is distributed
// across streams.
Prioritizer arrow.PrioritizerName `mapstructure:"prioritizer"`
}

var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if err := cfg.QueueSettings.Validate(); err != nil {
return fmt.Errorf("queue settings has invalid configuration: %w", err)
}
if err := cfg.Arrow.Validate(); err != nil {
return fmt.Errorf("arrow settings has invalid configuration: %w", err)
}

return nil
}
var _ component.ConfigValidator = (*ArrowConfig)(nil)

// Validate returns an error when the number of streams is less than 1.
func (cfg *ArrowSettings) Validate() error {
func (cfg *ArrowConfig) Validate() error {
if cfg.NumStreams < 1 {
return fmt.Errorf("stream count must be > 0: %d", cfg.NumStreams)
}
Expand All @@ -103,6 +99,10 @@ func (cfg *ArrowSettings) Validate() error {
return fmt.Errorf("zstd encoder: invalid configuration: %w", err)
}

if err := cfg.Prioritizer.Validate(); err != nil {
return fmt.Errorf("invalid prioritizer: %w", err)
}

// The cfg.PayloadCompression field is validated by the underlying library,
// but we only support Zstd or none.
switch cfg.PayloadCompression {
Expand All @@ -113,7 +113,7 @@ func (cfg *ArrowSettings) Validate() error {
return nil
}

func (cfg *ArrowSettings) toArrowProducerOptions() (arrowOpts []config.Option) {
func (cfg *ArrowConfig) toArrowProducerOptions() (arrowOpts []config.Option) {
switch cfg.PayloadCompression {
case configcompression.TypeZstd:
arrowOpts = append(arrowOpts, config.WithZstd())
Expand Down
28 changes: 16 additions & 12 deletions exporter/otelarrowexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/arrow"
)

func TestUnmarshalDefaultConfig(t *testing.T) {
Expand All @@ -32,6 +34,7 @@ func TestUnmarshalDefaultConfig(t *testing.T) {
assert.NoError(t, component.UnmarshalConfig(cm, cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
assert.Equal(t, "round_robin", cfg.(*Config).ClientConfig.BalancerName)
assert.Equal(t, arrow.DefaultPrioritizer, cfg.(*Config).Arrow.Prioritizer)
}

func TestUnmarshalConfig(t *testing.T) {
Expand All @@ -45,7 +48,7 @@ func TestUnmarshalConfig(t *testing.T) {
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
RetrySettings: configretry.BackOffConfig{
RetryConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 10 * time.Second,
RandomizationFactor: 0.7,
Expand Down Expand Up @@ -79,20 +82,21 @@ func TestUnmarshalConfig(t *testing.T) {
},
WriteBufferSize: 512 * 1024,
BalancerName: "experimental",
Auth: &configauth.Authentication{AuthenticatorID: component.MustNewID("nop")},
Auth: &configauth.Authentication{AuthenticatorID: component.NewID(component.MustNewType("nop"))},
},
Arrow: ArrowSettings{
Arrow: ArrowConfig{
NumStreams: 2,
MaxStreamLifetime: 2 * time.Hour,
PayloadCompression: configcompression.TypeZstd,
Zstd: zstd.DefaultEncoderConfig(),
Prioritizer: "leastloaded8",
},
}, cfg)
}

func TestArrowSettingsValidate(t *testing.T) {
settings := func(enabled bool, numStreams int, maxStreamLifetime time.Duration, level zstd.Level) *ArrowSettings {
return &ArrowSettings{
func TestArrowConfigValidate(t *testing.T) {
settings := func(enabled bool, numStreams int, maxStreamLifetime time.Duration, level zstd.Level) *ArrowConfig {
return &ArrowConfig{
Disabled: !enabled,
NumStreams: numStreams,
MaxStreamLifetime: maxStreamLifetime,
Expand All @@ -118,16 +122,16 @@ func TestArrowSettingsValidate(t *testing.T) {
require.Error(t, settings(true, math.MaxInt, 10*time.Second, zstd.MaxLevel+1).Validate())
}

func TestDefaultSettingsValid(t *testing.T) {
func TestDefaultConfigValid(t *testing.T) {
cfg := createDefaultConfig()
// this must be set by the user and config
// validation always checks that a value is set.
cfg.(*Config).Arrow.MaxStreamLifetime = 2 * time.Second
require.NoError(t, cfg.(*Config).Validate())
require.NoError(t, component.ValidateConfig(cfg))
}

func TestArrowSettingsPayloadCompressionZstd(t *testing.T) {
settings := ArrowSettings{
func TestArrowConfigPayloadCompressionZstd(t *testing.T) {
settings := ArrowConfig{
PayloadCompression: configcompression.TypeZstd,
}
var config config.Config
Expand All @@ -137,9 +141,9 @@ func TestArrowSettingsPayloadCompressionZstd(t *testing.T) {
require.True(t, config.Zstd)
}

func TestArrowSettingsPayloadCompressionNone(t *testing.T) {
func TestArrowConfigPayloadCompressionNone(t *testing.T) {
for _, value := range []string{"", "none"} {
settings := ArrowSettings{
settings := ArrowConfig{
PayloadCompression: configcompression.Type(value),
}
var config config.Config
Expand Down
4 changes: 0 additions & 4 deletions exporter/otelarrowexporter/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,4 @@

//go:generate mdatagen metadata.yaml

// Package otelarrowexporter exports telemetry using OpenTelemetry
// Protocol with Apache Arrow and/or standard OpenTelemetry Protocol
// data using configuration structures similar to the core OTLP
// exporter.
package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter"
46 changes: 23 additions & 23 deletions exporter/otelarrowexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/metadata"
)

// NewFactory creates a factory for OTel-Arrow exporter.
// NewFactory creates a factory for OTLP exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
metadata.Type,
Expand All @@ -39,9 +39,8 @@ func NewFactory() exporter.Factory {
func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
RetrySettings: configretry.NewDefaultBackOffConfig(),
RetryConfig: configretry.NewDefaultBackOffConfig(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),

ClientConfig: configgrpc.ClientConfig{
Headers: map[string]configopaque.String{},
// Default to zstd compression
Expand All @@ -54,11 +53,12 @@ func createDefaultConfig() component.Config {
// destination.
BalancerName: "round_robin",
},
Arrow: ArrowSettings{
Arrow: ArrowConfig{
NumStreams: runtime.NumCPU(),
MaxStreamLifetime: time.Hour,

Zstd: zstd.DefaultEncoderConfig(),
Zstd: zstd.DefaultEncoderConfig(),
Prioritizer: arrow.DefaultPrioritizer,

// PayloadCompression is off by default because gRPC
// compression is on by default, above.
Expand All @@ -67,14 +67,14 @@ func createDefaultConfig() component.Config {
}
}

func (e *baseExporter) helperOptions() []exporterhelper.Option {
func (oce *baseExporter) helperOptions() []exporterhelper.Option {
return []exporterhelper.Option{
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(e.config.TimeoutSettings),
exporterhelper.WithRetry(e.config.RetrySettings),
exporterhelper.WithQueue(e.config.QueueSettings),
exporterhelper.WithStart(e.start),
exporterhelper.WithShutdown(e.shutdown),
exporterhelper.WithTimeout(oce.config.TimeoutSettings),
exporterhelper.WithRetry(oce.config.RetryConfig),
exporterhelper.WithQueue(oce.config.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown),
}
}

Expand All @@ -97,13 +97,13 @@ func createTracesExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
exp, err := newExporter(cfg, set, createArrowTracesStream)
oce, err := newExporter(cfg, set, createArrowTracesStream)
if err != nil {
return nil, err
}
return exporterhelper.NewTracesExporter(ctx, exp.settings, exp.config,
exp.pushTraces,
exp.helperOptions()...,
return exporterhelper.NewTracesExporter(ctx, oce.settings, oce.config,
oce.pushTraces,
oce.helperOptions()...,
)
}

Expand All @@ -116,13 +116,13 @@ func createMetricsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
exp, err := newExporter(cfg, set, createArrowMetricsStream)
oce, err := newExporter(cfg, set, createArrowMetricsStream)
if err != nil {
return nil, err
}
return exporterhelper.NewMetricsExporter(ctx, exp.settings, exp.config,
exp.pushMetrics,
exp.helperOptions()...,
return exporterhelper.NewMetricsExporter(ctx, oce.settings, oce.config,
oce.pushMetrics,
oce.helperOptions()...,
)
}

Expand All @@ -135,12 +135,12 @@ func createLogsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
exp, err := newExporter(cfg, set, createArrowLogsStream)
oce, err := newExporter(cfg, set, createArrowLogsStream)
if err != nil {
return nil, err
}
return exporterhelper.NewLogsExporter(ctx, exp.settings, exp.config,
exp.pushLogs,
exp.helperOptions()...,
return exporterhelper.NewLogsExporter(ctx, oce.settings, oce.config,
oce.pushLogs,
oce.helperOptions()...,
)
}

0 comments on commit e86e8c6

Please sign in to comment.