Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(exec, execd): add an option to pass a custom environment to their child process #11049

Merged
merged 3 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion internal/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"os"
"os/exec"
"sync"
"sync/atomic"
Expand All @@ -26,13 +27,14 @@ type Process struct {

name string
args []string
envs []string
pid int32
cancel context.CancelFunc
mainLoopWg sync.WaitGroup
}

// New creates a new process wrapper
func New(command []string) (*Process, error) {
func New(command []string, envs []string) (*Process, error) {
if len(command) == 0 {
return nil, errors.New("no command")
}
Expand All @@ -41,6 +43,7 @@ func New(command []string) (*Process, error) {
RestartDelay: 5 * time.Second,
name: command[0],
args: []string{},
envs: envs,
}

if len(command) > 1 {
Expand Down Expand Up @@ -85,6 +88,10 @@ func (p *Process) Stop() {
func (p *Process) cmdStart() error {
p.Cmd = exec.Command(p.name, p.args...)

if len(p.envs) > 0 {
p.Cmd.Env = append(os.Environ(), p.envs...)
}

var err error
p.Stdin, err = p.Cmd.StdinPipe()
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions internal/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestRestartingRebindsPipes(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)

p, err := New([]string{exe, "-external"})
p, err := New([]string{exe, "-external"}, []string{"INTERNAL_PROCESS_MODE=application"})
p.RestartDelay = 100 * time.Nanosecond
p.Log = testutil.Logger{}
require.NoError(t, err)
Expand Down Expand Up @@ -62,7 +62,8 @@ var external = flag.Bool("external", false,

func TestMain(m *testing.M) {
flag.Parse()
if *external {
runMode := os.Getenv("INTERNAL_PROCESS_MODE")
if *external && runMode == "application" {
externalProcess()
os.Exit(0)
}
Expand Down
8 changes: 7 additions & 1 deletion plugins/inputs/exec/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ This plugin can be used to poll for custom metrics from any source.
"/tmp/collect_*.sh"
]

## Environment variables
## Array of "key=value" pairs to pass as environment variables
## e.g. "KEY=value", "USERNAME=John Doe",
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
# environment = []

## Timeout for each command to complete.
timeout = "5s"

Expand Down Expand Up @@ -55,7 +61,7 @@ It can be paired with the following configuration and will be run at the `interv

### My script works when I run it by hand, but not when Telegraf is running as a service

This may be related to the Telegraf service running as a different user. The
This may be related to the Telegraf service running as a different user. The
official packages run Telegraf as the `telegraf` user and group on Linux
systems.

Expand Down
17 changes: 12 additions & 5 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"os"
osExec "os/exec"
"path/filepath"
"runtime"
Expand All @@ -24,9 +25,10 @@ import (
const MaxStderrBytes int = 512

type Exec struct {
Commands []string `toml:"commands"`
Command string `toml:"command"`
Timeout config.Duration `toml:"timeout"`
Commands []string `toml:"commands"`
Command string `toml:"command"`
Environment []string `toml:"environment"`
Timeout config.Duration `toml:"timeout"`

parser parsers.Parser

Expand All @@ -42,13 +44,14 @@ func NewExec() *Exec {
}

type Runner interface {
Run(string, time.Duration) ([]byte, []byte, error)
Run(string, []string, time.Duration) ([]byte, []byte, error)
}

type CommandRunner struct{}

func (c CommandRunner) Run(
command string,
environments []string,
timeout time.Duration,
) ([]byte, []byte, error) {
splitCmd, err := shellquote.Split(command)
Expand All @@ -58,6 +61,10 @@ func (c CommandRunner) Run(

cmd := osExec.Command(splitCmd[0], splitCmd[1:]...)

if len(environments) > 0 {
cmd.Env = append(os.Environ(), environments...)
}

var (
out bytes.Buffer
stderr bytes.Buffer
Expand Down Expand Up @@ -120,7 +127,7 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync
defer wg.Done()
_, isNagios := e.parser.(*nagios.NagiosParser)

out, errbuf, runErr := e.runner.Run(command, time.Duration(e.Timeout))
out, errbuf, runErr := e.runner.Run(command, e.Environment, time.Duration(e.Timeout))
if !isNagios && runErr != nil {
err := fmt.Errorf("exec: %s for command '%s': %s", runErr, command, string(errbuf))
acc.AddError(err)
Expand Down
19 changes: 18 additions & 1 deletion plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func newRunnerMock(out []byte, errout []byte, err error) Runner {
}
}

func (r runnerMock) Run(_ string, _ time.Duration) ([]byte, []byte, error) {
func (r runnerMock) Run(_ string, _ []string, _ time.Duration) ([]byte, []byte, error) {
return r.out, r.errout, r.err
}

Expand Down Expand Up @@ -191,6 +191,23 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) {
acc.AssertContainsFields(t, "metric", fields)
}

func TestExecCommandWithEnv(t *testing.T) {
parser, _ := parsers.NewValueParser("metric", "string", "", nil)
e := NewExec()
e.Commands = []string{"/bin/sh -c 'echo ${METRIC_NAME}'"}
e.Environment = []string{"METRIC_NAME=metric_value"}
e.SetParser(parser)

var acc testutil.Accumulator
err := acc.GatherError(e.Gather)
require.NoError(t, err)

fields := map[string]interface{}{
"value": "metric_value",
}
acc.AssertContainsFields(t, "metric", fields)
}

func TestTruncate(t *testing.T) {
tests := []struct {
name string
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/execd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ STDERR from the process will be relayed to Telegraf as errors in the logs.
## NOTE: process and each argument should each be their own string
command = ["telegraf-smartctl", "-d", "/dev/sda"]

## Environment variables
## Array of "key=value" pairs to pass as environment variables
## e.g. "KEY=value", "USERNAME=John Doe",
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
# environment = []

## Define how the process is signaled on each collection interval.
## Valid values are:
## "none" : Do not signal anything. (Recommended for service inputs)
Expand Down
3 changes: 2 additions & 1 deletion plugins/inputs/execd/execd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

type Execd struct {
Command []string `toml:"command"`
Environment []string `toml:"environment"`
Signal string `toml:"signal"`
RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger `toml:"-"`
Expand All @@ -35,7 +36,7 @@ func (e *Execd) SetParser(parser parsers.Parser) {
func (e *Execd) Start(acc telegraf.Accumulator) error {
e.acc = acc
var err error
e.process, err = process.New(e.Command)
e.process, err = process.New(e.Command, e.Environment)
if err != nil {
return fmt.Errorf("error creating new process: %w", err)
}
Expand Down
9 changes: 7 additions & 2 deletions plugins/inputs/execd/execd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestSettingConfigWorks(t *testing.T) {
cfg := `
[[inputs.execd]]
command = ["a", "b", "c"]
environment = ["d=e", "f=1"]
restart_delay = "1m"
signal = "SIGHUP"
`
Expand All @@ -35,6 +36,7 @@ func TestSettingConfigWorks(t *testing.T) {
inp, ok := conf.Inputs[0].Input.(*Execd)
require.True(t, ok)
require.EqualValues(t, []string{"a", "b", "c"}, inp.Command)
require.EqualValues(t, []string{"d=e", "f=1"}, inp.Environment)
require.EqualValues(t, 1*time.Minute, inp.RestartDelay)
require.EqualValues(t, "SIGHUP", inp.Signal)
}
Expand All @@ -48,6 +50,7 @@ func TestExternalInputWorks(t *testing.T) {

e := &Execd{
Command: []string{exe, "-counter"},
Environment: []string{"PLUGINS_INPUTS_EXECD_MODE=application", "METRIC_NAME=counter"},
RestartDelay: config.Duration(5 * time.Second),
parser: influxParser,
Signal: "STDIN",
Expand Down Expand Up @@ -152,7 +155,8 @@ var counter = flag.Bool("counter", false,

func TestMain(m *testing.M) {
flag.Parse()
if *counter {
runMode := os.Getenv("PLUGINS_INPUTS_EXECD_MODE")
if *counter && runMode == "application" {
if err := runCounterProgram(); err != nil {
os.Exit(1)
}
Expand All @@ -163,6 +167,7 @@ func TestMain(m *testing.M) {
}

func runCounterProgram() error {
envMetricName := os.Getenv("METRIC_NAME")
i := 0
serializer, err := serializers.NewInfluxSerializer()
if err != nil {
Expand All @@ -173,7 +178,7 @@ func runCounterProgram() error {

scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
m := metric.New("counter",
m := metric.New(envMetricName,
map[string]string{},
map[string]interface{}{
"count": i,
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/exec/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ For better performance, consider execd, which runs continuously.
## Command to ingest metrics via stdin.
command = ["tee", "-a", "/dev/null"]

## Environment variables
## Array of "key=value" pairs to pass as environment variables
## e.g. "KEY=value", "USERNAME=John Doe",
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
# environment = []

## Timeout for command to complete.
# timeout = "5s"

Expand Down
17 changes: 11 additions & 6 deletions plugins/outputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"os"
"os/exec"
"runtime"
"time"
Expand All @@ -19,9 +20,10 @@ const maxStderrBytes = 512

// Exec defines the exec output plugin.
type Exec struct {
Command []string `toml:"command"`
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`
Command []string `toml:"command"`
Environment []string `toml:"environment"`
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`

runner Runner
serializer serializers.Serializer
Expand Down Expand Up @@ -61,12 +63,12 @@ func (e *Exec) Write(metrics []telegraf.Metric) error {
return nil
}

return e.runner.Run(time.Duration(e.Timeout), e.Command, &buffer)
return e.runner.Run(time.Duration(e.Timeout), e.Command, e.Environment, &buffer)
}

// Runner provides an interface for running exec.Cmd.
type Runner interface {
Run(time.Duration, []string, io.Reader) error
Run(time.Duration, []string, []string, io.Reader) error
}

// CommandRunner runs a command with the ability to kill the process before the timeout.
Expand All @@ -76,8 +78,11 @@ type CommandRunner struct {
}

// Run runs the command.
func (c *CommandRunner) Run(timeout time.Duration, command []string, buffer io.Reader) error {
func (c *CommandRunner) Run(timeout time.Duration, command []string, environments []string, buffer io.Reader) error {
cmd := exec.Command(command[0], command[1:]...)
if len(environments) > 0 {
cmd.Env = append(os.Environ(), environments...)
}
cmd.Stdin = buffer
var stderr bytes.Buffer
cmd.Stderr = &stderr
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/execd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ Telegraf minimum version: Telegraf 1.15.0
## NOTE: process and each argument should each be their own string
command = ["my-telegraf-output", "--some-flag", "value"]

## Environment variables
## Array of "key=value" pairs to pass as environment variables
## e.g. "KEY=value", "USERNAME=John Doe",
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
# environment = []

## Delay before the process is restarted after an unexpected termination
restart_delay = "10s"

Expand Down
3 changes: 2 additions & 1 deletion plugins/outputs/execd/execd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

type Execd struct {
Command []string `toml:"command"`
Environment []string `toml:"environment"`
RestartDelay config.Duration `toml:"restart_delay"`
Log telegraf.Logger

Expand All @@ -34,7 +35,7 @@ func (e *Execd) Init() error {

var err error

e.process, err = process.New(e.Command)
e.process, err = process.New(e.Command, e.Environment)
if err != nil {
return fmt.Errorf("error creating process %s: %w", e.Command, err)
}
Expand Down
7 changes: 5 additions & 2 deletions plugins/outputs/execd/execd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestExternalOutputWorks(t *testing.T) {

e := &Execd{
Command: []string{exe, "-testoutput"},
Environment: []string{"PLUGINS_OUTPUTS_EXECD_MODE=application", "METRIC_NAME=cpu"},
RestartDelay: config.Duration(5 * time.Second),
serializer: influxSerializer,
Log: testutil.Logger{},
Expand Down Expand Up @@ -74,7 +75,8 @@ var testoutput = flag.Bool("testoutput", false,

func TestMain(m *testing.M) {
flag.Parse()
if *testoutput {
runMode := os.Getenv("PLUGINS_OUTPUTS_EXECD_MODE")
if *testoutput && runMode == "application" {
runOutputConsumerProgram()
os.Exit(0)
}
Expand All @@ -83,6 +85,7 @@ func TestMain(m *testing.M) {
}

func runOutputConsumerProgram() {
metricName := os.Getenv("METRIC_NAME")
parser := influx.NewStreamParser(os.Stdin)

for {
Expand All @@ -103,7 +106,7 @@ func runOutputConsumerProgram() {
os.Exit(1)
}

expected := testutil.MustMetric("cpu",
expected := testutil.MustMetric(metricName,
map[string]string{"name": "cpu1"},
map[string]interface{}{"idle": 50, "sys": 30},
now,
Expand Down
6 changes: 6 additions & 0 deletions plugins/processors/execd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ Telegraf minimum version: Telegraf 1.15.0
## eg: command = ["/path/to/your_program", "arg1", "arg2"]
command = ["cat"]

## Environment variables
## Array of "key=value" pairs to pass as environment variables
## e.g. "KEY=value", "USERNAME=John Doe",
## "LD_LIBRARY_PATH=/opt/custom/lib64:/usr/local/libs"
# environment = []

## Delay before the process is restarted after an unexpected termination
# restart_delay = "10s"
```
Expand Down
Loading