Skip to content

Commit

Permalink
feat(inputs.execd): Add option to not restart program on error (#15271)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed May 23, 2024
1 parent 5e830fb commit 12ab6df
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 27 deletions.
29 changes: 22 additions & 7 deletions internal/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Process struct {
ReadStdoutFn func(io.Reader)
ReadStderrFn func(io.Reader)
RestartDelay time.Duration
StopOnError bool
Log telegraf.Logger

name string
Expand All @@ -31,6 +32,8 @@ type Process struct {
pid int32
cancel context.CancelFunc
mainLoopWg sync.WaitGroup

sync.Mutex
}

// New creates a new process wrapper
Expand Down Expand Up @@ -65,10 +68,10 @@ func (p *Process) Start() error {

p.mainLoopWg.Add(1)
go func() {
defer p.mainLoopWg.Done()
if err := p.cmdLoop(ctx); err != nil {
p.Log.Errorf("Process quit with message: %v", err)
}
p.mainLoopWg.Done()
}()

return nil
Expand All @@ -81,12 +84,24 @@ func (p *Process) Stop() {
p.cancel()
}
// close stdin so the app can shut down gracefully.
if err := p.Stdin.Close(); err != nil {
if err := p.Stdin.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
p.Log.Errorf("Stdin closed with message: %v", err)
}
p.mainLoopWg.Wait()
}

func (p *Process) Pid() int {
pid := atomic.LoadInt32(&p.pid)
return int(pid)
}

func (p *Process) State() (state *os.ProcessState, running bool) {
p.Lock()
defer p.Unlock()

return p.Cmd.ProcessState, p.Cmd.ProcessState.ExitCode() == -1
}

func (p *Process) cmdStart() error {
p.Cmd = exec.Command(p.name, p.args...)

Expand Down Expand Up @@ -119,15 +134,13 @@ func (p *Process) cmdStart() error {
return nil
}

func (p *Process) Pid() int {
pid := atomic.LoadInt32(&p.pid)
return int(pid)
}

// cmdLoop watches an already running process, restarting it when appropriate.
func (p *Process) cmdLoop(ctx context.Context) error {
for {
err := p.cmdWait(ctx)
if err != nil && p.StopOnError {
return err
}
if isQuitting(ctx) {
p.Log.Infof("Process %s shut down", p.Cmd.Path)
return nil
Expand Down Expand Up @@ -184,7 +197,9 @@ func (p *Process) cmdWait(ctx context.Context) error {
wg.Done()
}()

p.Lock()
err := p.Cmd.Wait()
p.Unlock()
processCancel()
wg.Wait()
return err
Expand Down
10 changes: 7 additions & 3 deletions plugins/inputs/execd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,24 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## "SIGHUP" : Send a HUP signal. Not available on Windows. (not recommended)
## "SIGUSR1" : Send a USR1 signal. Not available on Windows.
## "SIGUSR2" : Send a USR2 signal. Not available on Windows.
signal = "none"
# signal = "none"

## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"
# restart_delay = "10s"

## Buffer size used to read from the command output stream
## Optional parameter. Default is 64 Kib, minimum is 16 bytes
# buffer_size = "64Kib"

## Disable automatic restart of the program and stop if the program exits
## with an error (i.e. non-zero error code)
# stop_on_error = false

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
# data_format = "influx"
```

## Example
Expand Down
8 changes: 5 additions & 3 deletions plugins/inputs/execd/execd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ var sampleConfig string
type Execd struct {
Command []string `toml:"command"`
Environment []string `toml:"environment"`
BufferSize config.Size `toml:"buffer_size"`
Signal string `toml:"signal"`
RestartDelay config.Duration `toml:"restart_delay"`
StopOnError bool `toml:"stop_on_error"`
Log telegraf.Logger `toml:"-"`
BufferSize config.Size `toml:"buffer_size"`

process *process.Process
acc telegraf.Accumulator
Expand Down Expand Up @@ -59,10 +60,11 @@ func (e *Execd) Start(acc telegraf.Accumulator) error {
if err != nil {
return fmt.Errorf("error creating new process: %w", err)
}
e.process.Log = e.Log
e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.ReadStdoutFn = e.outputReader
e.process.ReadStderrFn = e.cmdReadErr
e.process.RestartDelay = time.Duration(e.RestartDelay)
e.process.StopOnError = e.StopOnError
e.process.Log = e.Log

if err = e.process.Start(); err != nil {
// if there was only one argument, and it contained spaces, warn the user
Expand Down
88 changes: 77 additions & 11 deletions plugins/inputs/execd/execd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestExternalInputWorks(t *testing.T) {
require.NoError(t, err)

e := &Execd{
Command: []string{exe, "-counter"},
Command: []string{exe, "-mode", "counter"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application", "METRIC_NAME=counter"},
RestartDelay: config.Duration(5 * time.Second),
Signal: "STDIN",
Expand Down Expand Up @@ -159,6 +159,62 @@ test{handler="execd",quantile="0.5"} 42.0
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
}

func TestStopOnError(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)

plugin := &Execd{
Command: []string{exe, "-mode", "fail"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application"},
StopOnError: true,
RestartDelay: config.Duration(5 * time.Second),
Log: testutil.Logger{},
}

parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
require.NoError(t, parser.Init())
plugin.SetParser(parser)

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

require.Eventually(t, func() bool {
_, running := plugin.process.State()
return !running
}, 3*time.Second, 100*time.Millisecond)

state, running := plugin.process.State()
require.False(t, running)
require.Equal(t, 42, state.ExitCode())
}

func TestStopOnErrorSuccess(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)

plugin := &Execd{
Command: []string{exe, "-mode", "success"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application"},
StopOnError: true,
RestartDelay: config.Duration(100 * time.Millisecond),
Log: testutil.Logger{},
}

parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{})
require.NoError(t, parser.Init())
plugin.SetParser(parser)

var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

// Wait for at least two metric as this indicates the process was restarted
require.Eventually(t, func() bool {
return acc.NMetrics() > 1
}, 3*time.Second, 100*time.Millisecond)
}

func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric {
to := time.NewTimer(timeout)
defer to.Stop()
Expand Down Expand Up @@ -189,20 +245,32 @@ func (tm *TestMetricMaker) Log() telegraf.Logger {
return logger.NewLogger("TestPlugin", "test", "")
}

var counter = flag.Bool("counter", false,
"if true, act like line input program instead of test")

func TestMain(m *testing.M) {
var mode string

flag.StringVar(&mode, "mode", "counter", "determines the output when run as mockup program")
flag.Parse()
runMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE")
if *counter && runMode == "application" {

operationMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE")
if operationMode != "application" {
// Run the normal test mode
os.Exit(m.Run())
}

// Run as a mock program
switch mode {
case "counter":
if err := runCounterProgram(); err != nil {
os.Exit(1)
}
os.Exit(0)
case "fail":
os.Exit(42)
case "success":
fmt.Println("test value=42i")
os.Exit(0)
}
code := m.Run()
os.Exit(code)
os.Exit(23)
}

func runCounterProgram() error {
Expand All @@ -217,9 +285,7 @@ func runCounterProgram() error {
for scanner.Scan() {
m := metric.New(envMetricName,
map[string]string{},
map[string]interface{}{
"count": i,
},
map[string]interface{}{"count": i},
time.Now(),
)
i++
Expand Down
10 changes: 7 additions & 3 deletions plugins/inputs/execd/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@
## "SIGHUP" : Send a HUP signal. Not available on Windows. (not recommended)
## "SIGUSR1" : Send a USR1 signal. Not available on Windows.
## "SIGUSR2" : Send a USR2 signal. Not available on Windows.
signal = "none"
# signal = "none"

## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"
# restart_delay = "10s"

## Buffer size used to read from the command output stream
## Optional parameter. Default is 64 Kib, minimum is 16 bytes
# buffer_size = "64Kib"

## Disable automatic restart of the program and stop if the program exits
## with an error (i.e. non-zero error code)
# stop_on_error = false

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
# data_format = "influx"

0 comments on commit 12ab6df

Please sign in to comment.