Skip to content

Commit

Permalink
restructured fileexporter to prepare for group_by (#31068)
Browse files Browse the repository at this point in the history
**Description:** 

In fileexporter: restructured code by splitting out two functionality
from `fileExporter`: marshalling and compression logic was moved to
`marshaller`, and logic related to writing to file and buffering writes
was moved to, `fileWriter`.

This pr introduces no changes to the behavior. The restructure was made
in preparation for adding the new group_by functionality (see linked
ticket for more detail).

**Link to tracking Issue:** #24654

**Testing:** Tests have been updated for the new structure. No tests
were added or modified beyond the structural changes.

**Documentation:** This pr introduces no user-facing changes.
  • Loading branch information
adam-kiss-sg committed Feb 8, 2024
1 parent 3185573 commit 9365b33
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 244 deletions.
26 changes: 26 additions & 0 deletions exporter/fileexporter/error_component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter"

import (
"context"

"go.opentelemetry.io/collector/component"
)

// errorComponent is used to return error from a factory method. SharedComponent does
// not handle errors, so wrapping the error into a component is necessary.
type errorComponent struct {
err error
}

// Start will return the cached error.
func (e *errorComponent) Start(context.Context, component.Host) error {
return e.err
}

// Shutdown will return the cached error.
func (e *errorComponent) Shutdown(context.Context) error {
return e.err
}
108 changes: 73 additions & 35 deletions exporter/fileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"context"
"io"
"os"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"gopkg.in/natefinch/lumberjack.v2"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter/internal/metadata"
Expand All @@ -30,6 +34,13 @@ const (
compressionZSTD = "zstd"
)

type FileExporter interface {
component.Component
consumeTraces(_ context.Context, td ptrace.Traces) error
consumeMetrics(_ context.Context, md pmetric.Metrics) error
consumeLogs(_ context.Context, ld plog.Logs) error
}

// NewFactory creates a factory for OTLP exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
Expand All @@ -52,19 +63,15 @@ func createTracesExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
conf := cfg.(*Config)
writer, err := buildFileWriter(conf)
fe, err := getOrCreateFileExporter(cfg)
if err != nil {
return nil, err
}
fe := exporters.GetOrAdd(cfg, func() component.Component {
return newFileExporter(conf, writer)
})
return exporterhelper.NewTracesExporter(
ctx,
set,
cfg,
fe.Unwrap().(*fileExporter).consumeTraces,
fe.consumeTraces,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
Expand All @@ -76,19 +83,15 @@ func createMetricsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
conf := cfg.(*Config)
writer, err := buildFileWriter(conf)
fe, err := getOrCreateFileExporter(cfg)
if err != nil {
return nil, err
}
fe := exporters.GetOrAdd(cfg, func() component.Component {
return newFileExporter(conf, writer)
})
return exporterhelper.NewMetricsExporter(
ctx,
set,
cfg,
fe.Unwrap().(*fileExporter).consumeMetrics,
fe.consumeMetrics,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
Expand All @@ -100,59 +103,94 @@ func createLogsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
conf := cfg.(*Config)
writer, err := buildFileWriter(conf)
fe, err := getOrCreateFileExporter(cfg)
if err != nil {
return nil, err
}
fe := exporters.GetOrAdd(cfg, func() component.Component {
return newFileExporter(conf, writer)
})
return exporterhelper.NewLogsExporter(
ctx,
set,
cfg,
fe.Unwrap().(*fileExporter).consumeLogs,
fe.consumeLogs,
exporterhelper.WithStart(fe.Start),
exporterhelper.WithShutdown(fe.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
)
}

func newFileExporter(conf *Config, writer io.WriteCloser) *fileExporter {
return &fileExporter{
path: conf.Path,
// getOrCreateFileExporter creates a FileExporter and caches it for a particular configuration,
// or returns the already cached one. Caching is required because the factory is asked trace and
// metric receivers separately when it gets CreateTracesReceiver() and CreateMetricsReceiver()
// but they must not create separate objects, they must use one Exporter object per configuration.
func getOrCreateFileExporter(cfg component.Config) (FileExporter, error) {
conf := cfg.(*Config)
fe := exporters.GetOrAdd(cfg, func() component.Component {
e, err := newFileExporter(conf)
if err != nil {
return &errorComponent{err: err}
}

return e
})

component := fe.Unwrap()
if errComponent, ok := component.(*errorComponent); ok {
return nil, errComponent.err
}

return component.(FileExporter), nil
}

func newFileExporter(conf *Config) (FileExporter, error) {
marshaller := &marshaller{
formatType: conf.FormatType,
file: writer,
tracesMarshaler: tracesMarshalers[conf.FormatType],
metricsMarshaler: metricsMarshalers[conf.FormatType],
logsMarshaler: logsMarshalers[conf.FormatType],
exporter: buildExportFunc(conf),
compression: conf.Compression,
compressor: buildCompressor(conf.Compression),
flushInterval: conf.FlushInterval,
}
export := buildExportFunc(conf)

writer, err := newFileWriter(conf.Path, conf.Rotation, conf.FlushInterval, export)
if err != nil {
return nil, err
}

return &fileExporter{
marshaller: marshaller,
writer: writer,
}, nil
}

func buildFileWriter(cfg *Config) (io.WriteCloser, error) {
if cfg.Rotation == nil {
f, err := os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
func newFileWriter(path string, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {
var wc io.WriteCloser
if rotation == nil {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return nil, err
}
return newBufferedWriteCloser(f), nil
wc = newBufferedWriteCloser(f)
} else {
wc = &lumberjack.Logger{
Filename: path,
MaxSize: rotation.MaxMegabytes,
MaxAge: rotation.MaxDays,
MaxBackups: rotation.MaxBackups,
LocalTime: rotation.LocalTime,
}
}
return &lumberjack.Logger{
Filename: cfg.Path,
MaxSize: cfg.Rotation.MaxMegabytes,
MaxAge: cfg.Rotation.MaxDays,
MaxBackups: cfg.Rotation.MaxBackups,
LocalTime: cfg.Rotation.LocalTime,

return &fileWriter{
path: path,
file: wc,
exporter: export,
flushInterval: flushInterval,
}, nil
}

// This is the map of already created File exporters for particular configurations.
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one Receiver object per configuration.
// create separate objects, they must use one Exporter object per configuration.
var exporters = sharedcomponent.NewSharedComponents()
35 changes: 19 additions & 16 deletions exporter/fileexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"io"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -96,25 +97,27 @@ func TestCreateLogsExporterError(t *testing.T) {
assert.Error(t, err)
}

func TestBuildFileWriter(t *testing.T) {
func TestNewFileWriter(t *testing.T) {
type args struct {
cfg *Config
}
tests := []struct {
name string
args args
want io.WriteCloser
validate func(*testing.T, io.WriteCloser)
validate func(*testing.T, *fileWriter)
}{
{
name: "single file",
args: args{
cfg: &Config{
Path: tempFileName(t),
Path: tempFileName(t),
FlushInterval: 5 * time.Second,
},
},
validate: func(t *testing.T, closer io.WriteCloser) {
_, ok := closer.(*bufferedWriteCloser)
validate: func(t *testing.T, writer *fileWriter) {
assert.Equal(t, 5*time.Second, writer.flushInterval)
_, ok := writer.file.(*bufferedWriteCloser)
assert.Equal(t, true, ok)
},
},
Expand All @@ -128,10 +131,10 @@ func TestBuildFileWriter(t *testing.T) {
},
},
},
validate: func(t *testing.T, closer io.WriteCloser) {
writer, ok := closer.(*lumberjack.Logger)
validate: func(t *testing.T, writer *fileWriter) {
logger, ok := writer.file.(*lumberjack.Logger)
assert.Equal(t, true, ok)
assert.Equal(t, defaultMaxBackups, writer.MaxBackups)
assert.Equal(t, defaultMaxBackups, logger.MaxBackups)
},
},
{
Expand All @@ -147,21 +150,21 @@ func TestBuildFileWriter(t *testing.T) {
},
},
},
validate: func(t *testing.T, closer io.WriteCloser) {
writer, ok := closer.(*lumberjack.Logger)
validate: func(t *testing.T, writer *fileWriter) {
logger, ok := writer.file.(*lumberjack.Logger)
assert.Equal(t, true, ok)
assert.Equal(t, 3, writer.MaxBackups)
assert.Equal(t, 30, writer.MaxSize)
assert.Equal(t, 100, writer.MaxAge)
assert.Equal(t, true, writer.LocalTime)
assert.Equal(t, 3, logger.MaxBackups)
assert.Equal(t, 30, logger.MaxSize)
assert.Equal(t, 100, logger.MaxAge)
assert.Equal(t, true, logger.LocalTime)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildFileWriter(tt.args.cfg)
got, err := newFileWriter(tt.args.cfg.Path, tt.args.cfg.Rotation, tt.args.cfg.FlushInterval, nil)
defer func() {
assert.NoError(t, got.Close())
assert.NoError(t, got.file.Close())
}()
assert.NoError(t, err)
tt.validate(t, got)
Expand Down
Loading

0 comments on commit 9365b33

Please sign in to comment.