Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry Protocol with Apache Arrow Exporter component #31996

Merged
merged 43 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c8bfc04
OpenTelemetry Protocol with Apache Arrow Exporter
jmacd Mar 26, 2024
a33a749
linting
jmacd Mar 26, 2024
78b78ca
fix build
jmacd Mar 26, 2024
170910a
ArrowSettings->ArrowConfig
jmacd Mar 26, 2024
54511e6
tidy
jmacd Mar 26, 2024
72edc30
chlog
jmacd Mar 26, 2024
19b6344
chlog up
jmacd Mar 27, 2024
f56207b
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Mar 27, 2024
9db405a
lint++
jmacd Mar 28, 2024
52c4784
versions
jmacd Mar 28, 2024
49b867b
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 17, 2024
640646a
re-tidy
jmacd Apr 17, 2024
2f94146
Copy from v0.22
jmacd Apr 17, 2024
41bd121
lint
jmacd Apr 17, 2024
ef9424d
lint++
jmacd Apr 17, 2024
27c980b
split exporter mods 2->3
jmacd Apr 17, 2024
05d7259
again
jmacd Apr 17, 2024
ed18731
oops
jmacd Apr 17, 2024
6c682c3
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 17, 2024
2bbffb8
revert two files
jmacd Apr 17, 2024
12e3e69
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 22, 2024
b9379e6
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 22, 2024
ee96d91
revert two
jmacd Apr 22, 2024
2770c24
gencol
jmacd Apr 23, 2024
4cecc14
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 23, 2024
2b95e7b
from 14c63d1eaac7c53585e6b9195d09f1f9703869ed
jmacd Apr 23, 2024
935a08c
re-lint
jmacd Apr 23, 2024
a38a63c
toolchain fix
jmacd Apr 24, 2024
3a1484b
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 24, 2024
8af4b69
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd May 1, 2024
3bf5738
harden test
jmacd May 2, 2024
6f40a7b
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd May 2, 2024
cba162d
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd May 6, 2024
e7a8bf2
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd May 8, 2024
099d57c
update from v0.23.0
jmacd May 9, 2024
64ea382
generate
jmacd May 9, 2024
f2d2de2
fix test
jmacd May 9, 2024
af47d8d
lint the test
jmacd May 9, 2024
2172639
Merge branch 'main' into jmacd/arrow_exporter
jmacd May 10, 2024
0881db7
Merge branch 'main' into jmacd/arrow_exporter
jmacd May 10, 2024
7e6db4e
add gracefulstop to some tests
jmacd May 10, 2024
d300433
Merge branch 'jmacd/arrow_exporter' of github.com:jmacd/opentelemetry…
jmacd May 10, 2024
7e540e9
idk go generate changed
jmacd May 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/otelarrowexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: OpenTelemetry Protocol with Apache Arrow Exporter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
component: OpenTelemetry Protocol with Apache Arrow Exporter
component: exporter/otelarrow


# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implementation copied from opentelemetry/otel-arrow repository @v0.20.0.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26491]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
15 changes: 4 additions & 11 deletions exporter/otelarrowexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ Apache Arrow.
OpenTelemetry Protocol with Apache Arrow supports column-oriented data
transport using the Apache Arrow data format. This component converts
OTLP data into an optimized representation and then sends batches of
data using Apache Arrow to encode the stream. The OpenTelemetry
Protocol with Apache Arrow receiver <!-- TODO add link when the
corresponding skeleton is introduced.
[](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/otelarrowreceiver)
--> component contains logic to reverse the process used in this
data using Apache Arrow to encode the stream. The [OpenTelemetry
Protocol with Apache Arrow receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/otelarrowreceiver) component contains logic to reverse the process used in this
component.

The use of an OpenTelemetry Protocol with Apache Arrow
Expand All @@ -51,7 +48,7 @@ exporter component. This is as simple as replacing "otlp" with

To enable the OpenTelemetry Protocol with Apache Arrow exporter,
include it in the list of exporters for a pipeline. The `endpoint`
setting is required. The `tls` setting is requirede for insecure
setting is required. The `tls` setting is required for insecure
transport.

- `endpoint` (no default): host:port to which the exporter is going to send OTLP trace data,
Expand Down Expand Up @@ -143,13 +140,9 @@ exporters:
When this is configured, the stream will terminate cleanly without
causing retries, with `OK` gRPC status.

The corresponding `otelarrowreceiver` keepalive setting, that is
The [corresponding `otelarrowreceiver` keepalive setting](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/otelarrowreceiver#keepalive-configuration), that is
compatible with the one above, reads:

<!-- TODO add a link to the (../../receiver/otelarrowreceiver/README.md) section
discussing this topic from the receiver perspective after both READMEs are present
in collector-contrib -->

```
receivers:
otelarrow:
Expand Down
36 changes: 18 additions & 18 deletions exporter/otelarrowexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/arrow"
)

// Config defines configuration for OTLP exporter.
Expand All @@ -26,12 +28,12 @@ type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

RetrySettings configretry.BackOffConfig `mapstructure:"retry_on_failure"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`

configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// Arrow includes settings specific to OTel Arrow.
Arrow ArrowSettings `mapstructure:"arrow"`
Arrow ArrowConfig `mapstructure:"arrow"`

// UserDialOptions cannot be configured via `mapstructure`
// schemes. This is useful for custom purposes where the
Expand All @@ -40,9 +42,9 @@ type Config struct {
UserDialOptions []grpc.DialOption `mapstructure:"-"`
}

// ArrowSettings includes whether Arrow is enabled and the number of
// ArrowConfig includes whether Arrow is enabled and the number of
// concurrent Arrow streams.
type ArrowSettings struct {
type ArrowConfig struct {
// NumStreams determines the number of OTel Arrow streams.
NumStreams int `mapstructure:"num_streams"`

Expand All @@ -65,32 +67,26 @@ type ArrowSettings struct {
// Note that `Zstd` applies to gRPC, not Arrow compression.
PayloadCompression configcompression.Type `mapstructure:"payload_compression"`

// Disabled prevents using OTel Arrow streams. The exporter
// Disabled prevents using OTel-Arrow streams. The exporter
// falls back to standard OTLP.
Disabled bool `mapstructure:"disabled"`

// DisableDowngrade prevents this exporter from fallback back
// to standard OTLP. If the Arrow service is unavailable, it
// will retry and/or fail.
DisableDowngrade bool `mapstructure:"disable_downgrade"`

// Prioritizer is a policy name for how load is distributed
// across streams.
Prioritizer arrow.PrioritizerName `mapstructure:"prioritizer"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config option is not described in the README. It's not a big issue since the component is in development stability.

}

var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if err := cfg.QueueSettings.Validate(); err != nil {
return fmt.Errorf("queue settings has invalid configuration: %w", err)
}
if err := cfg.Arrow.Validate(); err != nil {
return fmt.Errorf("arrow settings has invalid configuration: %w", err)
}

return nil
}
var _ component.ConfigValidator = (*ArrowConfig)(nil)

// Validate returns an error when the number of streams is less than 1.
func (cfg *ArrowSettings) Validate() error {
func (cfg *ArrowConfig) Validate() error {
if cfg.NumStreams < 1 {
return fmt.Errorf("stream count must be > 0: %d", cfg.NumStreams)
}
Expand All @@ -103,6 +99,10 @@ func (cfg *ArrowSettings) Validate() error {
return fmt.Errorf("zstd encoder: invalid configuration: %w", err)
}

if err := cfg.Prioritizer.Validate(); err != nil {
return fmt.Errorf("invalid prioritizer: %w", err)
}

// The cfg.PayloadCompression field is validated by the underlying library,
// but we only support Zstd or none.
switch cfg.PayloadCompression {
Expand All @@ -113,7 +113,7 @@ func (cfg *ArrowSettings) Validate() error {
return nil
}

func (cfg *ArrowSettings) toArrowProducerOptions() (arrowOpts []config.Option) {
func (cfg *ArrowConfig) toArrowProducerOptions() (arrowOpts []config.Option) {
switch cfg.PayloadCompression {
case configcompression.TypeZstd:
arrowOpts = append(arrowOpts, config.WithZstd())
Expand Down
28 changes: 16 additions & 12 deletions exporter/otelarrowexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/arrow"
)

func TestUnmarshalDefaultConfig(t *testing.T) {
Expand All @@ -32,6 +34,7 @@ func TestUnmarshalDefaultConfig(t *testing.T) {
assert.NoError(t, component.UnmarshalConfig(cm, cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
assert.Equal(t, "round_robin", cfg.(*Config).ClientConfig.BalancerName)
assert.Equal(t, arrow.DefaultPrioritizer, cfg.(*Config).Arrow.Prioritizer)
}

func TestUnmarshalConfig(t *testing.T) {
Expand All @@ -45,7 +48,7 @@ func TestUnmarshalConfig(t *testing.T) {
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
RetrySettings: configretry.BackOffConfig{
RetryConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 10 * time.Second,
RandomizationFactor: 0.7,
Expand Down Expand Up @@ -79,20 +82,21 @@ func TestUnmarshalConfig(t *testing.T) {
},
WriteBufferSize: 512 * 1024,
BalancerName: "experimental",
Auth: &configauth.Authentication{AuthenticatorID: component.MustNewID("nop")},
Auth: &configauth.Authentication{AuthenticatorID: component.NewID(component.MustNewType("nop"))},
},
Arrow: ArrowSettings{
Arrow: ArrowConfig{
NumStreams: 2,
MaxStreamLifetime: 2 * time.Hour,
PayloadCompression: configcompression.TypeZstd,
Zstd: zstd.DefaultEncoderConfig(),
Prioritizer: "leastloaded8",
},
}, cfg)
}

func TestArrowSettingsValidate(t *testing.T) {
settings := func(enabled bool, numStreams int, maxStreamLifetime time.Duration, level zstd.Level) *ArrowSettings {
return &ArrowSettings{
func TestArrowConfigValidate(t *testing.T) {
settings := func(enabled bool, numStreams int, maxStreamLifetime time.Duration, level zstd.Level) *ArrowConfig {
return &ArrowConfig{
Disabled: !enabled,
NumStreams: numStreams,
MaxStreamLifetime: maxStreamLifetime,
Expand All @@ -118,16 +122,16 @@ func TestArrowSettingsValidate(t *testing.T) {
require.Error(t, settings(true, math.MaxInt, 10*time.Second, zstd.MaxLevel+1).Validate())
}

func TestDefaultSettingsValid(t *testing.T) {
func TestDefaultConfigValid(t *testing.T) {
cfg := createDefaultConfig()
// this must be set by the user and config
// validation always checks that a value is set.
cfg.(*Config).Arrow.MaxStreamLifetime = 2 * time.Second
require.NoError(t, cfg.(*Config).Validate())
require.NoError(t, component.ValidateConfig(cfg))
}

func TestArrowSettingsPayloadCompressionZstd(t *testing.T) {
settings := ArrowSettings{
func TestArrowConfigPayloadCompressionZstd(t *testing.T) {
settings := ArrowConfig{
PayloadCompression: configcompression.TypeZstd,
}
var config config.Config
Expand All @@ -137,9 +141,9 @@ func TestArrowSettingsPayloadCompressionZstd(t *testing.T) {
require.True(t, config.Zstd)
}

func TestArrowSettingsPayloadCompressionNone(t *testing.T) {
func TestArrowConfigPayloadCompressionNone(t *testing.T) {
for _, value := range []string{"", "none"} {
settings := ArrowSettings{
settings := ArrowConfig{
PayloadCompression: configcompression.Type(value),
}
var config config.Config
Expand Down
4 changes: 0 additions & 4 deletions exporter/otelarrowexporter/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,4 @@

//go:generate mdatagen metadata.yaml

// Package otelarrowexporter exports telemetry using OpenTelemetry
// Protocol with Apache Arrow and/or standard OpenTelemetry Protocol
// data using configuration structures similar to the core OTLP
// exporter.
package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter"
46 changes: 23 additions & 23 deletions exporter/otelarrowexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter/internal/metadata"
)

// NewFactory creates a factory for OTel-Arrow exporter.
// NewFactory creates a factory for OTLP exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
metadata.Type,
Expand All @@ -39,9 +39,8 @@ func NewFactory() exporter.Factory {
func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
RetrySettings: configretry.NewDefaultBackOffConfig(),
RetryConfig: configretry.NewDefaultBackOffConfig(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),

ClientConfig: configgrpc.ClientConfig{
Headers: map[string]configopaque.String{},
// Default to zstd compression
Expand All @@ -54,11 +53,12 @@ func createDefaultConfig() component.Config {
// destination.
BalancerName: "round_robin",
},
Arrow: ArrowSettings{
Arrow: ArrowConfig{
NumStreams: runtime.NumCPU(),
MaxStreamLifetime: time.Hour,

Zstd: zstd.DefaultEncoderConfig(),
Zstd: zstd.DefaultEncoderConfig(),
Prioritizer: arrow.DefaultPrioritizer,

// PayloadCompression is off by default because gRPC
// compression is on by default, above.
Expand All @@ -67,14 +67,14 @@ func createDefaultConfig() component.Config {
}
}

func (e *baseExporter) helperOptions() []exporterhelper.Option {
func (oce *baseExporter) helperOptions() []exporterhelper.Option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear why this variable was renamed - what is oce stand for? 🤔

return []exporterhelper.Option{
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(e.config.TimeoutSettings),
exporterhelper.WithRetry(e.config.RetrySettings),
exporterhelper.WithQueue(e.config.QueueSettings),
exporterhelper.WithStart(e.start),
exporterhelper.WithShutdown(e.shutdown),
exporterhelper.WithTimeout(oce.config.TimeoutSettings),
exporterhelper.WithRetry(oce.config.RetryConfig),
exporterhelper.WithQueue(oce.config.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown),
}
}

Expand All @@ -97,13 +97,13 @@ func createTracesExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
exp, err := newExporter(cfg, set, createArrowTracesStream)
oce, err := newExporter(cfg, set, createArrowTracesStream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear why this variable was renamed - what is oce stand for? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OTLP-general stuff up two levels was derived from the core otlpexporter and still has some irregular names as a result. I've tried to fix them as a I notice them. I fixed them in the skeleton for this component, but copied the old names by mistake when adding code from the otel-arrow repo. Will fix!

if err != nil {
return nil, err
}
return exporterhelper.NewTracesExporter(ctx, exp.settings, exp.config,
exp.pushTraces,
exp.helperOptions()...,
return exporterhelper.NewTracesExporter(ctx, oce.settings, oce.config,
oce.pushTraces,
oce.helperOptions()...,
)
}

Expand All @@ -116,13 +116,13 @@ func createMetricsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
exp, err := newExporter(cfg, set, createArrowMetricsStream)
oce, err := newExporter(cfg, set, createArrowMetricsStream)
if err != nil {
return nil, err
}
return exporterhelper.NewMetricsExporter(ctx, exp.settings, exp.config,
exp.pushMetrics,
exp.helperOptions()...,
return exporterhelper.NewMetricsExporter(ctx, oce.settings, oce.config,
oce.pushMetrics,
oce.helperOptions()...,
)
}

Expand All @@ -135,12 +135,12 @@ func createLogsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
exp, err := newExporter(cfg, set, createArrowLogsStream)
oce, err := newExporter(cfg, set, createArrowLogsStream)
if err != nil {
return nil, err
}
return exporterhelper.NewLogsExporter(ctx, exp.settings, exp.config,
exp.pushLogs,
exp.helperOptions()...,
return exporterhelper.NewLogsExporter(ctx, oce.settings, oce.config,
oce.pushLogs,
oce.helperOptions()...,
)
}
Loading