Skip to content

Commit

Permalink
Add parser for integration SDK errors
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarocabanas committed Feb 1, 2024
1 parent a2743ff commit e81af00
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 8 deletions.
43 changes: 43 additions & 0 deletions internal/integrations/v4/runner/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,46 @@ func Test_parseLogrusFields(t *testing.T) {
})
}
}

//nolint:paralleltest
func Test_parseSDKFields(t *testing.T) {
tests := map[string]struct {
input string
want logrus.Fields
}{
"debug": {`[DEBUG] Temperature changes`, logrus.Fields{
"level": `debug`,
"msg": `Temperature changes`,
}},
"info": {`[INFO] A group of walrus emerges from the ocean`, logrus.Fields{
"level": `info`,
"msg": `A group of walrus emerges from the ocean`,
}},
"fatal": {`[FATAL] The ice breaks!`, logrus.Fields{
"level": `fatal`,
"msg": `The ice breaks!`,
}},
"error": {`[ERR] Error creating connection to SQL Server: lookup mssql on 192.168.65.5:53: no such host`, logrus.Fields{
"level": `error`,
"msg": `Error creating connection to SQL Server: lookup mssql on 192.168.65.5:53: no such host`,
}},
"with-escaped-quotes": {`[WARN] The group's number \"increased\" tremendously!`, logrus.Fields{
"level": `warn`,
"msg": `The group's number \"increased\" tremendously!`,
}},

"not matching": {`simple line`, nil},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
out := parseSDKFields(tc.input)

for k, v := range tc.want {
val, ok := out[k]
assert.True(t, ok)
assert.Equal(t, v, val)
}
})
}
}
40 changes: 32 additions & 8 deletions internal/integrations/v4/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
//2- map: &{any character}
//3- word: any character except spaces
logrusRegexp = regexp.MustCompile(`([^\s]*?)=(".*?[^\\]"|&{.*?}|[^\s]*)`)
sdkLogRegexp = regexp.MustCompile(`\[([^\]]*)\]\s*(.+)`)
)

// generic types to handle the stderr log parsing
Expand All @@ -56,7 +57,7 @@ type runner struct {
log log.Entry
definition integration.Definition
handleErrors func(context.Context, <-chan error) // by default, runner.logErrors. Replaceable for testing purposes
stderrParser logParser
stderrParser []logParser
lastStderr stderrQueue
healthCheck sync.Once
heartBeatFunc func()
Expand Down Expand Up @@ -85,7 +86,7 @@ func NewRunner(
dSources: dSources,
definition: intDef,
heartBeatFunc: func() {},
stderrParser: parseLogrusFields,
stderrParser: []logParser{parseSDKFields, parseLogrusFields},
lastStderr: newStderrQueue(intDef.LogsQueueSize),
terminateQueue: terminateQ,
cache: cache.CreateCache(),
Expand Down Expand Up @@ -302,11 +303,15 @@ func (r *runner) handleStderr(stderr <-chan []byte) {
if r.log.IsDebugEnabled() {
r.log.WithField("line", obfuscatedLine).Debug("Integration stderr (not parsed).")
} else {
fields := r.stderrParser(obfuscatedLine)
if v, ok := fields["level"]; ok && (v == "error" || v == "fatal") {
// If a field already exists, like the time, logrus automatically adds the prefix "deps." to the
// Duplicated keys
r.log.WithFields(logrus.Fields(fields)).Info("received an integration log line")
for _, errParser := range r.stderrParser {
fields := errParser(obfuscatedLine)
if v, ok := fields["level"]; ok && (v == "error" || v == "fatal") {
// If a field already exists, like the time, logrus automatically adds the prefix "deps." to the
// Duplicated keys
r.log.WithFields(logrus.Fields(fields)).Info("received an integration log line")

break
}
}
}
}
Expand All @@ -327,7 +332,7 @@ func (r *runner) logErrors(ctx context.Context, errs <-chan error) {
flush := r.lastStderr.Flush()
// err contains the exit code number
r.log.WithError(err).WithField("stderr", helpers.ObfuscateSensitiveDataFromString(flush)).
Warn("integration exited with error state")
Error("integration exited with error state")
}
}
}
Expand Down Expand Up @@ -429,3 +434,22 @@ func parseLogrusFields(line string) (fields logFields) {

return
}

func parseSDKFields(line string) logFields {
matches := sdkLogRegexp.FindAllStringSubmatch(line, -1)
if len(matches) == 0 {
return nil
}

lvl, msg := matches[0][1], matches[0][2]

level := strings.ToLower(lvl)
if level == "err" {
level = "error"
}

return logFields{
"level": level,
"msg": msg,
}
}
86 changes: 86 additions & 0 deletions internal/integrations/v4/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,89 @@ func Test_runner_Run_handlesCfgProtocol(t *testing.T) {
return false
}, time.Second, 10*time.Millisecond)
}

//nolint:paralleltest,exhaustruct,dupl
func Test_runner_Run_SDK_error(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip()
}

log.SetOutput(ioutil.Discard) // discard logs so not to break race tests
defer log.SetOutput(os.Stderr) // return back to default

hook := new(test.Hook)
log.AddHook(hook)

// GIVEN a runner that receives a cfg request without a handle function.
def, err := integration.NewDefinition(config.ConfigEntry{
InstanceName: "Parent",
Exec: testhelp.Command(fixtures.EchoFromEnv),
Env: map[string]string{"STDERR_STRING": "[ERR] This is an error"},
}, integration.ErrLookup, nil, nil)
require.NoError(t, err)

e := &testemit.RecordEmitter{}
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, nil, nil, host.IDLookup{})

// WHEN the runner executes the binary and handle the payload.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
r.Run(ctx, nil, nil)

// THEN log entry found.
assert.Eventually(t, func() bool {
entries := hook.AllEntries()
for _, e := range entries {
if e.Data["msg"] == "This is an error" {
return true
}
}

return false
}, time.Second, 100*time.Millisecond)
le := hook.LastEntry()
assert.Equal(t, logrus.ErrorLevel.String(), le.Data["level"])
}

//nolint:paralleltest,exhaustruct,dupl
func Test_runner_Run_Logrus_error(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip()
}

log.SetOutput(ioutil.Discard) // discard logs so not to break race tests
defer log.SetOutput(os.Stderr) // return back to default

hook := new(test.Hook)
log.AddHook(hook)

// GIVEN a runner that receives a cfg request without a handle function.
def, err := integration.NewDefinition(config.ConfigEntry{
InstanceName: "Parent",
Exec: testhelp.Command(fixtures.EchoFromEnv),
Env: map[string]string{"STDERR_STRING": "time=\"2020-02-11T17:28:50+01:00\" level=error msg=\"config: failed to sub lookup file data\" component=integrations.runner.Group error=\"config name: /var/db/something: p file, error: open /var/db/something: no such file or directory\" integration_name=nri-flex"},
}, integration.ErrLookup, nil, nil)
require.NoError(t, err)

e := &testemit.RecordEmitter{}
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, nil, nil, host.IDLookup{})

// WHEN the runner executes the binary and handle the payload.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
r.Run(ctx, nil, nil)

// THEN log entry found.
assert.Eventually(t, func() bool {
entries := hook.AllEntries()
for _, e := range entries {
if e.Data["msg"] == "config: failed to sub lookup file data" {
return true
}
}

return false
}, time.Second, 100*time.Millisecond)
le := hook.LastEntry()
assert.Equal(t, logrus.ErrorLevel.String(), le.Data["level"])
}

0 comments on commit e81af00

Please sign in to comment.