Skip to content

Commit

Permalink
[chore][pkg/stanza] Cleanup debug input operator files (open-telemetr…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored and rimitchell committed May 8, 2024
1 parent 0e370d8 commit 30e26bf
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 93 deletions.
50 changes: 50 additions & 0 deletions pkg/stanza/operator/input/generate/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package generate // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/generate"

import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

const operatorType = "generate_input"

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig("") })
}

// NewConfig creates a new generate input config with default values
func NewConfig(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
}
}

// Config is the configuration of a generate input operator.
type Config struct {
helper.InputConfig `mapstructure:",squash"`
Entry entry.Entry `mapstructure:"entry"`
Count int `mapstructure:"count"`
Static bool `mapstructure:"static"`
}

// Build will build a generate input operator.
func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

c.Entry.Body = recursiveMapInterfaceToMapString(c.Entry.Body)

return &Input{
InputOperator: inputOperator,
entry: c.Entry,
count: c.Count,
static: c.Static,
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,11 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

func init() {
operator.Register("generate_input", func() operator.Builder { return NewConfig("") })
}

// NewConfig creates a new generate input config with default values
func NewConfig(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, "generate_input"),
}
}

// Config is the configuration of a generate input operator.
type Config struct {
helper.InputConfig `mapstructure:",squash"`
Entry entry.Entry `mapstructure:"entry"`
Count int `mapstructure:"count"`
Static bool `mapstructure:"static"`
}

// Build will build a generate input operator.
func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

c.Entry.Body = recursiveMapInterfaceToMapString(c.Entry.Body)

return &Input{
InputOperator: inputOperator,
entry: c.Entry,
count: c.Count,
static: c.Static,
}, nil
}

// Input is an operator that generates log entries.
type Input struct {
helper.InputOperator
Expand All @@ -63,29 +25,29 @@ type Input struct {
}

// Start will start generating log entries.
func (g *Input) Start(_ operator.Persister) error {
func (i *Input) Start(_ operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
g.cancel = cancel
i.cancel = cancel

g.wg.Add(1)
i.wg.Add(1)
go func() {
defer g.wg.Done()
i := 0
defer i.wg.Done()
n := 0
for {
select {
case <-ctx.Done():
return
default:
}

entry := g.entry.Copy()
if !g.static {
entry := i.entry.Copy()
if !i.static {
entry.Timestamp = time.Now()
}
g.Write(ctx, entry)
i.Write(ctx, entry)

i++
if i == g.count {
n++
if n == i.count {
return
}
}
Expand All @@ -95,9 +57,9 @@ func (g *Input) Start(_ operator.Persister) error {
}

// Stop will stop generating logs.
func (g *Input) Stop() error {
g.cancel()
g.wg.Wait()
func (i *Input) Stop() error {
i.cancel()
i.wg.Wait()
return nil
}

Expand Down
44 changes: 44 additions & 0 deletions pkg/stanza/operator/input/stdin/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package stdin // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/stdin"

import (
"os"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

const operatorType = "stdin"

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig("") })
}

// NewConfig creates a new stdin input config with default values
func NewConfig(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
}
}

// Config is the configuration of a stdin input operator.
type Config struct {
helper.InputConfig `mapstructure:",squash"`
}

// Build will build a stdin input operator.
func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

return &Input{
InputOperator: inputOperator,
stdin: os.Stdin,
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

func init() {
operator.Register("stdin", func() operator.Builder { return NewConfig("") })
}

// NewConfig creates a new stdin input config with default values
func NewConfig(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, "stdin"),
}
}

// Config is the configuration of a stdin input operator.
type Config struct {
helper.InputConfig `mapstructure:",squash"`
}

// Build will build a stdin input operator.
func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
inputOperator, err := c.InputConfig.Build(logger)
if err != nil {
return nil, err
}

return &Input{
InputOperator: inputOperator,
stdin: os.Stdin,
}, nil
}

// Input is an operator that reads input from stdin
type Input struct {
helper.InputOperator
Expand All @@ -55,25 +26,25 @@ type Input struct {
}

// Start will start generating log entries.
func (g *Input) Start(_ operator.Persister) error {
func (i *Input) Start(_ operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
g.cancel = cancel
i.cancel = cancel

stat, err := g.stdin.Stat()
stat, err := i.stdin.Stat()
if err != nil {
return fmt.Errorf("failed to stat stdin: %w", err)
}

if stat.Mode()&os.ModeNamedPipe == 0 {
g.Warn("No data is being written to stdin")
i.Warn("No data is being written to stdin")
return nil
}

scanner := bufio.NewScanner(g.stdin)
scanner := bufio.NewScanner(i.stdin)

g.wg.Add(1)
i.wg.Add(1)
go func() {
defer g.wg.Done()
defer i.wg.Done()
for {
select {
case <-ctx.Done():
Expand All @@ -83,24 +54,24 @@ func (g *Input) Start(_ operator.Persister) error {

if ok := scanner.Scan(); !ok {
if err := scanner.Err(); err != nil {
g.Errorf("Scanning failed", zap.Error(err))
i.Errorf("Scanning failed", zap.Error(err))
}
g.Infow("Stdin has been closed")
i.Infow("Stdin has been closed")
return
}

e := entry.New()
e.Body = scanner.Text()
g.Write(ctx, e)
i.Write(ctx, e)
}
}()

return nil
}

// Stop will stop generating logs.
func (g *Input) Stop() error {
g.cancel()
g.wg.Wait()
func (i *Input) Stop() error {
i.cancel()
i.wg.Wait()
return nil
}

0 comments on commit 30e26bf

Please sign in to comment.