forked from magnusbaeck/logstash-filter-verifier
/
process.go
147 lines (129 loc) · 4.27 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright (c) 2015-2016 Magnus Bäck <magnus@noun.se>
package logstash
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
)
// Process represents the invocation and execution of a Logstash child
// process that emits JSON events from the input and filter
// configuration files supplied by the caller.
type Process struct {
// Input will be connected to the stdin stream of the started
// Logstash process. Make sure to close it when all data has
// been written so that the process will terminate.
Input io.WriteCloser
child *exec.Cmd
inv *Invocation
output io.ReadCloser
stdio io.Reader
}
// NewProcess prepares for the execution of a new Logstash process but
// doesn't actually start it. logstashPath is the path to the Logstash
// executable (typically /opt/logstash/bin/logstash), inputCodec is
// the desired codec for the stdin input and inputType the value of
// the "type" field for ingested events. The configs parameter is
// one or more configuration files containing Logstash filters.
func NewProcess(inv *Invocation, inputCodec string, fields FieldSet, keptEnvVars []string) (*Process, error) {
// Unfortunately Logstash doesn't make it easy to just read
// events from a stdout-connected pipe and the log from a
// stderr-connected pipe. Stdout can contain other garbage (at
// the very least "future logs will be sent to ...") and error
// messages could very well be sent there too. Mitigate by
// having Logstash write output logs to a temporary file and
// its own logs to a different temporary file.
outputFile, err := newDeletedTempFile("", "")
if err != nil {
return nil, err
}
fieldHash, err := fields.LogstashHash()
if err != nil {
_ = outputFile.Close()
return nil, err
}
inputs := fmt.Sprintf("input { stdin { codec => %q add_field => %s } }", inputCodec, fieldHash)
outputs := fmt.Sprintf("output { file { path => %q codec => \"json_lines\" } }", outputFile.Name())
env := getLimitedEnvironment(os.Environ(), keptEnvVars)
args, err := inv.Args(inputs, outputs)
if err != nil {
_ = outputFile.Close()
return nil, err
}
p, err := newProcessWithArgs(inv.LogstashPath, args, env)
if err != nil {
_ = outputFile.Close()
return nil, err
}
p.output = outputFile
p.inv = inv
return p, nil
}
// newProcessWithArgs performs the non-Logstash specific low-level
// actions of preparing to spawn a child process, making it easier to
// test the code in this package.
func newProcessWithArgs(command string, args []string, env []string) (*Process, error) {
c := exec.Command(command, args...)
c.Env = env
// Save the process's stdout and stderr since an early startup
// failure (e.g. JVM issues) will get dumped there and not in
// the log file.
var b bytes.Buffer
c.Stdout = &b
c.Stderr = &b
inputPipe, err := c.StdinPipe()
if err != nil {
return nil, err
}
return &Process{
Input: inputPipe,
child: c,
stdio: &b,
}, nil
}
// Start starts a Logstash child process with the previously supplied
// configuration.
func (p *Process) Start() error {
log.Info("Starting %q with args %q.", p.child.Path, p.child.Args[1:])
return p.child.Start()
}
// Wait blocks until the started Logstash process terminates and
// returns the result of the execution.
func (p *Process) Wait() (*Result, error) {
if p.child.Process == nil {
return nil, errors.New("can't wait on an unborn process")
}
log.Debug("Waiting for child with pid %d to terminate.", p.child.Process.Pid)
waiterr := p.child.Wait()
// Save the log output regardless of whether the child process
// succeeded or not.
logbuf, logerr := ioutil.ReadAll(p.inv.logFile)
if logerr != nil {
// Log this weird error condition but don't let it
// fail the function. We don't care about the log
// contents unless Logstash fails, in which we'll
// report that problem anyway.
log.Error("Error reading the Logstash logfile: %s", logerr)
}
outbuf, _ := ioutil.ReadAll(p.stdio)
result := Result{
Events: []Event{},
Log: string(logbuf),
Output: string(outbuf),
Success: waiterr == nil,
}
if waiterr != nil {
return &result, waiterr
}
var err error
result.Events, err = readEvents(p.output)
result.Success = err == nil
return &result, err
}
// Release frees all allocated resources connected to this process.
func (p *Process) Release() {
_ = p.output.Close()
}