From 12ab6dfb33fe32f12f480fe80aa1b8fa3a6e00af Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 23 May 2024 14:42:34 -0400 Subject: [PATCH] feat(inputs.execd): Add option to not restart program on error (#15271) --- internal/process/process.go | 29 +++++++--- plugins/inputs/execd/README.md | 10 +++- plugins/inputs/execd/execd.go | 8 ++- plugins/inputs/execd/execd_test.go | 88 ++++++++++++++++++++++++++---- plugins/inputs/execd/sample.conf | 10 +++- 5 files changed, 118 insertions(+), 27 deletions(-) diff --git a/internal/process/process.go b/internal/process/process.go index acd8a424709b6..979a53848d726 100644 --- a/internal/process/process.go +++ b/internal/process/process.go @@ -23,6 +23,7 @@ type Process struct { ReadStdoutFn func(io.Reader) ReadStderrFn func(io.Reader) RestartDelay time.Duration + StopOnError bool Log telegraf.Logger name string @@ -31,6 +32,8 @@ type Process struct { pid int32 cancel context.CancelFunc mainLoopWg sync.WaitGroup + + sync.Mutex } // New creates a new process wrapper @@ -65,10 +68,10 @@ func (p *Process) Start() error { p.mainLoopWg.Add(1) go func() { + defer p.mainLoopWg.Done() if err := p.cmdLoop(ctx); err != nil { p.Log.Errorf("Process quit with message: %v", err) } - p.mainLoopWg.Done() }() return nil @@ -81,12 +84,24 @@ func (p *Process) Stop() { p.cancel() } // close stdin so the app can shut down gracefully. - if err := p.Stdin.Close(); err != nil { + if err := p.Stdin.Close(); err != nil && !errors.Is(err, os.ErrClosed) { p.Log.Errorf("Stdin closed with message: %v", err) } p.mainLoopWg.Wait() } +func (p *Process) Pid() int { + pid := atomic.LoadInt32(&p.pid) + return int(pid) +} + +func (p *Process) State() (state *os.ProcessState, running bool) { + p.Lock() + defer p.Unlock() + + return p.Cmd.ProcessState, p.Cmd.ProcessState.ExitCode() == -1 +} + func (p *Process) cmdStart() error { p.Cmd = exec.Command(p.name, p.args...) @@ -119,15 +134,13 @@ func (p *Process) cmdStart() error { return nil } -func (p *Process) Pid() int { - pid := atomic.LoadInt32(&p.pid) - return int(pid) -} - // cmdLoop watches an already running process, restarting it when appropriate. func (p *Process) cmdLoop(ctx context.Context) error { for { err := p.cmdWait(ctx) + if err != nil && p.StopOnError { + return err + } if isQuitting(ctx) { p.Log.Infof("Process %s shut down", p.Cmd.Path) return nil @@ -184,7 +197,9 @@ func (p *Process) cmdWait(ctx context.Context) error { wg.Done() }() + p.Lock() err := p.Cmd.Wait() + p.Unlock() processCancel() wg.Wait() return err diff --git a/plugins/inputs/execd/README.md b/plugins/inputs/execd/README.md index b452a1a90d23e..5b5dc414d3b99 100644 --- a/plugins/inputs/execd/README.md +++ b/plugins/inputs/execd/README.md @@ -59,20 +59,24 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## "SIGHUP" : Send a HUP signal. Not available on Windows. (not recommended) ## "SIGUSR1" : Send a USR1 signal. Not available on Windows. ## "SIGUSR2" : Send a USR2 signal. Not available on Windows. - signal = "none" + # signal = "none" ## Delay before the process is restarted after an unexpected termination - restart_delay = "10s" + # restart_delay = "10s" ## Buffer size used to read from the command output stream ## Optional parameter. Default is 64 Kib, minimum is 16 bytes # buffer_size = "64Kib" + ## Disable automatic restart of the program and stop if the program exits + ## with an error (i.e. non-zero error code) + # stop_on_error = false + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "influx" + # data_format = "influx" ``` ## Example diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index c89693ae7847b..ea365790210d1 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -25,10 +25,11 @@ var sampleConfig string type Execd struct { Command []string `toml:"command"` Environment []string `toml:"environment"` + BufferSize config.Size `toml:"buffer_size"` Signal string `toml:"signal"` RestartDelay config.Duration `toml:"restart_delay"` + StopOnError bool `toml:"stop_on_error"` Log telegraf.Logger `toml:"-"` - BufferSize config.Size `toml:"buffer_size"` process *process.Process acc telegraf.Accumulator @@ -59,10 +60,11 @@ func (e *Execd) Start(acc telegraf.Accumulator) error { if err != nil { return fmt.Errorf("error creating new process: %w", err) } - e.process.Log = e.Log - e.process.RestartDelay = time.Duration(e.RestartDelay) e.process.ReadStdoutFn = e.outputReader e.process.ReadStderrFn = e.cmdReadErr + e.process.RestartDelay = time.Duration(e.RestartDelay) + e.process.StopOnError = e.StopOnError + e.process.Log = e.Log if err = e.process.Start(); err != nil { // if there was only one argument, and it contained spaces, warn the user diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index 4525b2dfdcc9a..3d4576358e31a 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -51,7 +51,7 @@ func TestExternalInputWorks(t *testing.T) { require.NoError(t, err) e := &Execd{ - Command: []string{exe, "-counter"}, + Command: []string{exe, "-mode", "counter"}, Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application", "METRIC_NAME=counter"}, RestartDelay: config.Duration(5 * time.Second), Signal: "STDIN", @@ -159,6 +159,62 @@ test{handler="execd",quantile="0.5"} 42.0 testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) } +func TestStopOnError(t *testing.T) { + exe, err := os.Executable() + require.NoError(t, err) + + plugin := &Execd{ + Command: []string{exe, "-mode", "fail"}, + Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application"}, + StopOnError: true, + RestartDelay: config.Duration(5 * time.Second), + Log: testutil.Logger{}, + } + + parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{}) + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + require.Eventually(t, func() bool { + _, running := plugin.process.State() + return !running + }, 3*time.Second, 100*time.Millisecond) + + state, running := plugin.process.State() + require.False(t, running) + require.Equal(t, 42, state.ExitCode()) +} + +func TestStopOnErrorSuccess(t *testing.T) { + exe, err := os.Executable() + require.NoError(t, err) + + plugin := &Execd{ + Command: []string{exe, "-mode", "success"}, + Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application"}, + StopOnError: true, + RestartDelay: config.Duration(100 * time.Millisecond), + Log: testutil.Logger{}, + } + + parser := models.NewRunningParser(&influx.Parser{}, &models.ParserConfig{}) + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Wait for at least two metric as this indicates the process was restarted + require.Eventually(t, func() bool { + return acc.NMetrics() > 1 + }, 3*time.Second, 100*time.Millisecond) +} + func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout time.Duration) telegraf.Metric { to := time.NewTimer(timeout) defer to.Stop() @@ -189,20 +245,32 @@ func (tm *TestMetricMaker) Log() telegraf.Logger { return logger.NewLogger("TestPlugin", "test", "") } -var counter = flag.Bool("counter", false, - "if true, act like line input program instead of test") - func TestMain(m *testing.M) { + var mode string + + flag.StringVar(&mode, "mode", "counter", "determines the output when run as mockup program") flag.Parse() - runMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE") - if *counter && runMode == "application" { + + operationMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE") + if operationMode != "application" { + // Run the normal test mode + os.Exit(m.Run()) + } + + // Run as a mock program + switch mode { + case "counter": if err := runCounterProgram(); err != nil { os.Exit(1) } os.Exit(0) + case "fail": + os.Exit(42) + case "success": + fmt.Println("test value=42i") + os.Exit(0) } - code := m.Run() - os.Exit(code) + os.Exit(23) } func runCounterProgram() error { @@ -217,9 +285,7 @@ func runCounterProgram() error { for scanner.Scan() { m := metric.New(envMetricName, map[string]string{}, - map[string]interface{}{ - "count": i, - }, + map[string]interface{}{"count": i}, time.Now(), ) i++ diff --git a/plugins/inputs/execd/sample.conf b/plugins/inputs/execd/sample.conf index 61772c26932c1..52727daf8bcc7 100644 --- a/plugins/inputs/execd/sample.conf +++ b/plugins/inputs/execd/sample.conf @@ -18,17 +18,21 @@ ## "SIGHUP" : Send a HUP signal. Not available on Windows. (not recommended) ## "SIGUSR1" : Send a USR1 signal. Not available on Windows. ## "SIGUSR2" : Send a USR2 signal. Not available on Windows. - signal = "none" + # signal = "none" ## Delay before the process is restarted after an unexpected termination - restart_delay = "10s" + # restart_delay = "10s" ## Buffer size used to read from the command output stream ## Optional parameter. Default is 64 Kib, minimum is 16 bytes # buffer_size = "64Kib" + ## Disable automatic restart of the program and stop if the program exits + ## with an error (i.e. non-zero error code) + # stop_on_error = false + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "influx" + # data_format = "influx"