Skip to content

Commit

Permalink
[chore][pkg/stanza] Cleanup output operator files (open-telemetry#32071)
Browse files Browse the repository at this point in the history
Contributes to open-telemetry#32058
  • Loading branch information
djaglowski authored and rimitchell committed May 8, 2024
1 parent 36215be commit 054f837
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@
package drop // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/drop"

import (
"context"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

const operatorType = "drop_output"

func init() {
operator.Register("drop_output", func() operator.Builder { return NewConfig("") })
operator.Register(operatorType, func() operator.Builder { return NewConfig("") })
}

// NewConfig creates a new drop output config with default values
func NewConfig(operatorID string) *Config {
return &Config{
OutputConfig: helper.NewOutputConfig(operatorID, "drop_output"),
OutputConfig: helper.NewOutputConfig(operatorID, operatorType),
}
}

Expand All @@ -40,13 +39,3 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
OutputOperator: outputOperator,
}, nil
}

// Output is an operator that consumes and ignores incoming entries.
type Output struct {
helper.OutputOperator
}

// Process will drop the incoming entry.
func (p *Output) Process(_ context.Context, _ *entry.Entry) error {
return nil
}
21 changes: 21 additions & 0 deletions pkg/stanza/operator/output/drop/output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package drop // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/drop"

import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

// Output is an operator that consumes and ignores incoming entries.
type Output struct {
helper.OutputOperator
}

// Process will drop the incoming entry.
func (o *Output) Process(_ context.Context, _ *entry.Entry) error {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,25 @@
package file // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/file"

import (
"context"
"encoding/json"
"fmt"
"html/template"
"os"
"sync"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

const operatorType = "file_output"

func init() {
operator.Register("file_output", func() operator.Builder { return NewConfig("") })
operator.Register(operatorType, func() operator.Builder { return NewConfig("") })
}

// NewConfig creates a new file output config with default values
func NewConfig(operatorID string) *Config {
return &Config{
OutputConfig: helper.NewOutputConfig(operatorID, "file_output"),
OutputConfig: helper.NewOutputConfig(operatorID, operatorType),
}
}

Expand Down Expand Up @@ -62,58 +59,3 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
tmpl: tmpl,
}, nil
}

// Output is an operator that writes logs to a file.
type Output struct {
helper.OutputOperator

path string
tmpl *template.Template
encoder *json.Encoder
file *os.File
mux sync.Mutex
}

// Start will open the output file.
func (fo *Output) Start(_ operator.Persister) error {
var err error
fo.file, err = os.OpenFile(fo.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return err
}

fo.encoder = json.NewEncoder(fo.file)
fo.encoder.SetEscapeHTML(false)

return nil
}

// Stop will close the output file.
func (fo *Output) Stop() error {
if fo.file != nil {
if err := fo.file.Close(); err != nil {
fo.Errorf(err.Error())
}
}
return nil
}

// Process will write an entry to the output file.
func (fo *Output) Process(_ context.Context, entry *entry.Entry) error {
fo.mux.Lock()
defer fo.mux.Unlock()

if fo.tmpl != nil {
err := fo.tmpl.Execute(fo.file, entry)
if err != nil {
return err
}
} else {
err := fo.encoder.Encode(entry)
if err != nil {
return err
}
}

return nil
}
71 changes: 71 additions & 0 deletions pkg/stanza/operator/output/file/output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package file // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/file"

import (
"context"
"encoding/json"
"html/template"
"os"
"sync"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

// Output is an operator that writes logs to a file.
type Output struct {
helper.OutputOperator

path string
tmpl *template.Template
encoder *json.Encoder
file *os.File
mux sync.Mutex
}

// Start will open the output file.
func (o *Output) Start(_ operator.Persister) error {
var err error
o.file, err = os.OpenFile(o.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return err
}

o.encoder = json.NewEncoder(o.file)
o.encoder.SetEscapeHTML(false)

return nil
}

// Stop will close the output file.
func (o *Output) Stop() error {
if o.file != nil {
if err := o.file.Close(); err != nil {
o.Errorf(err.Error())
}
}
return nil
}

// Process will write an entry to the output file.
func (o *Output) Process(_ context.Context, entry *entry.Entry) error {
o.mux.Lock()
defer o.mux.Unlock()

if o.tmpl != nil {
err := o.tmpl.Execute(o.file, entry)
if err != nil {
return err
}
} else {
err := o.encoder.Encode(entry)
if err != nil {
return err
}
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,29 @@
package stdout // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/stdout"

import (
"context"
"encoding/json"
"io"
"os"
"sync"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

const operatorType = "stdout"

// Stdout is a global handle to standard output
var Stdout io.Writer = os.Stdout

func init() {
operator.Register("stdout", func() operator.Builder { return NewConfig("") })
operator.Register(operatorType, func() operator.Builder { return NewConfig("") })
}

// NewConfig creates a new stdout config with default values
func NewConfig(operatorID string) *Config {
return &Config{
OutputConfig: helper.NewOutputConfig(operatorID, "stdout"),
OutputConfig: helper.NewOutputConfig(operatorID, operatorType),
}
}

Expand All @@ -48,23 +47,3 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
encoder: json.NewEncoder(Stdout),
}, nil
}

// Output is an operator that logs entries using stdout.
type Output struct {
helper.OutputOperator
encoder *json.Encoder
mux sync.Mutex
}

// Process will log entries received.
func (o *Output) Process(_ context.Context, entry *entry.Entry) error {
o.mux.Lock()
err := o.encoder.Encode(entry)
if err != nil {
o.mux.Unlock()
o.Errorf("Failed to process entry: %s", err)
return err
}
o.mux.Unlock()
return nil
}
33 changes: 33 additions & 0 deletions pkg/stanza/operator/output/stdout/output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package stdout // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/stdout"

import (
"context"
"encoding/json"
"sync"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

// Output is an operator that logs entries using stdout.
type Output struct {
helper.OutputOperator
encoder *json.Encoder
mux sync.Mutex
}

// Process will log entries received.
func (o *Output) Process(_ context.Context, entry *entry.Entry) error {
o.mux.Lock()
err := o.encoder.Encode(entry)
if err != nil {
o.mux.Unlock()
o.Errorf("Failed to process entry: %s", err)
return err
}
o.mux.Unlock()
return nil
}

0 comments on commit 054f837

Please sign in to comment.