Skip to content

Commit

Permalink
Add parser for integration SDK errors
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarocabanas committed Feb 5, 2024
1 parent a2743ff commit 11b2fc6
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 8 deletions.
43 changes: 43 additions & 0 deletions internal/integrations/v4/runner/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,46 @@ func Test_parseLogrusFields(t *testing.T) {
})
}
}

//nolint:paralleltest
func Test_parseSDKFields(t *testing.T) {
tests := map[string]struct {
input string
want logrus.Fields
}{
"debug": {`[DEBUG] Temperature changes`, logrus.Fields{
"level": `debug`,
"msg": `Temperature changes`,
}},
"info": {`[INFO] A group of walrus emerges from the ocean`, logrus.Fields{
"level": `info`,
"msg": `A group of walrus emerges from the ocean`,
}},
"fatal": {`[FATAL] The ice breaks!`, logrus.Fields{
"level": `fatal`,
"msg": `The ice breaks!`,
}},
"error": {`[ERR] Error creating connection to SQL Server: lookup mssql on 192.168.65.5:53: no such host`, logrus.Fields{
"level": `error`,
"msg": `Error creating connection to SQL Server: lookup mssql on 192.168.65.5:53: no such host`,
}},
"with-escaped-quotes": {`[WARN] The group's number \"increased\" tremendously!`, logrus.Fields{
"level": `warn`,
"msg": `The group's number \"increased\" tremendously!`,
}},

"not matching": {`simple line`, nil},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
out := parseSDKFields(tc.input)

for k, v := range tc.want {
val, ok := out[k]
assert.True(t, ok)
assert.Equal(t, v, val)
}
})
}
}
59 changes: 51 additions & 8 deletions internal/integrations/v4/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
//2- map: &{any character}
//3- word: any character except spaces
logrusRegexp = regexp.MustCompile(`([^\s]*?)=(".*?[^\\]"|&{.*?}|[^\s]*)`)
sdkLogRegexp = regexp.MustCompile(`\[([^\]]*)\]\s*(.+)`)
)

// generic types to handle the stderr log parsing
Expand All @@ -56,7 +57,7 @@ type runner struct {
log log.Entry
definition integration.Definition
handleErrors func(context.Context, <-chan error) // by default, runner.logErrors. Replaceable for testing purposes
stderrParser logParser
stderrParser []logParser
lastStderr stderrQueue
healthCheck sync.Once
heartBeatFunc func()
Expand Down Expand Up @@ -85,7 +86,7 @@ func NewRunner(
dSources: dSources,
definition: intDef,
heartBeatFunc: func() {},
stderrParser: parseLogrusFields,
stderrParser: []logParser{parseSDKFields, parseLogrusFields},
lastStderr: newStderrQueue(intDef.LogsQueueSize),
terminateQueue: terminateQ,
cache: cache.CreateCache(),
Expand Down Expand Up @@ -298,17 +299,40 @@ func (r *runner) handleStderr(stderr <-chan []byte) {

// obfuscated stderr
obfuscatedLine := helpers.ObfuscateSensitiveDataFromString(string(line))
var lineWasParsed bool

if r.log.IsDebugEnabled() {
r.log.WithField("line", obfuscatedLine).Debug("Integration stderr (not parsed).")
} else {
fields := r.stderrParser(obfuscatedLine)
if v, ok := fields["level"]; ok && (v == "error" || v == "fatal") {
for _, errParser := range r.stderrParser {
fields := errParser(obfuscatedLine)

if lvl, ok := fields["level"]; ok {
// If a field already exists, like the time, logrus automatically adds the prefix "deps." to the
// Duplicated keys
r.log.WithFields(logrus.Fields(fields)).Info("received an integration log line")
logLine := r.log.WithFields(logrus.Fields(fields))
logMessage := "integration log"

lvl, ok := lvl.(string)
if ok {
switch strings.ToLower(lvl) {
case "info", "debug":
logLine.Debug(logMessage)
case "trace":
logLine.Trace(logMessage)
case "warn", "warning":
logLine.Warn(logMessage)
default:
logLine.Error(logMessage)
}

lineWasParsed = true

break
}
}
}

if !lineWasParsed {
r.log.WithField("line", obfuscatedLine).Debug("Integration stderr (not parsed).")
}
}
}

Expand Down Expand Up @@ -429,3 +453,22 @@ func parseLogrusFields(line string) (fields logFields) {

return
}

func parseSDKFields(line string) logFields {
matches := sdkLogRegexp.FindAllStringSubmatch(line, -1)
if len(matches) == 0 {
return nil
}

lvl, msg := matches[0][1], matches[0][2]

level := strings.ToLower(lvl)
if level == "err" {
level = "error"
}

return logFields{
"level": level,
"msg": msg,
}
}
141 changes: 141 additions & 0 deletions internal/integrations/v4/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,144 @@ func Test_runner_Run_handlesCfgProtocol(t *testing.T) {
return false
}, time.Second, 10*time.Millisecond)
}

//nolint:exhaustruct,funlen
func Test_runner_Run_Integration_Log(t *testing.T) {
t.Parallel()

if runtime.GOOS == "windows" {
t.Skip()
}

log.SetOutput(ioutil.Discard) // discard logs so not to break race tests
t.Cleanup(func() { log.SetOutput(os.Stderr) }) // return back to default

hook := new(test.Hook)
log.AddHook(hook)

log.SetLevel(logrus.TraceLevel)

testCases := []struct {
name string
logLine string
expectedLogMsg string
expectedLevel logrus.Level
}{
{
name: "SDK_Info_log",
logLine: "[INFO] This is an info message",
expectedLogMsg: "This is an info message",
expectedLevel: logrus.DebugLevel,
},
{
name: "SDK_Debug_log",
logLine: "[DEBUG] This is a debug message",
expectedLogMsg: "This is a debug message",
expectedLevel: logrus.DebugLevel,
},
{
name: "SDK_Trace_log",
logLine: "[TRACE] This is a trace message",
expectedLogMsg: "This is a trace message",
expectedLevel: logrus.TraceLevel,
},
{
name: "SDK_Warning_log",
logLine: "[WARN] This is a warning message",
expectedLogMsg: "This is a warning message",
expectedLevel: logrus.WarnLevel,
},
{
name: "SDK_Error_log",
logLine: "[ERR] This is an error message",
expectedLogMsg: "This is an error message",
expectedLevel: logrus.ErrorLevel,
},
{
name: "SDK_Fatal_log",
logLine: "[FATAL] This is a fatal message",
expectedLogMsg: "This is a fatal message",
expectedLevel: logrus.ErrorLevel,
},
{
name: "Logrus_Info_log",
logLine: "level=info msg=\"This is an info message\"",
expectedLogMsg: "This is an info message",
expectedLevel: logrus.DebugLevel,
},
{
name: "Logrus_Debug_log",
logLine: "level=debug msg=\"This is a debug message\"",
expectedLogMsg: "This is a debug message",
expectedLevel: logrus.DebugLevel,
},
{
name: "Logrus_Trace_log",
logLine: "level=trace msg=\"This is a trace message\"",
expectedLogMsg: "This is a trace message",
expectedLevel: logrus.TraceLevel,
},
{
name: "Logrus_Warning_log",
logLine: "level=warning msg=\"This is a warning message\"",
expectedLogMsg: "This is a warning message",
expectedLevel: logrus.WarnLevel,
},
{
name: "Logrus_Error_log",
logLine: "level=error msg=\"This is an error message\"",
expectedLogMsg: "This is an error message",
expectedLevel: logrus.ErrorLevel,
},
{
name: "Logrus_Fatal_log",
logLine: "level=fatal msg=\"This is a fatal message\"",
expectedLogMsg: "This is a fatal message",
expectedLevel: logrus.ErrorLevel,
},
{
name: "Obfuscated_log",
logLine: "This is a parser-orphan log",
expectedLogMsg: "Integration stderr (not parsed).",
expectedLevel: logrus.DebugLevel,
},
}

for _, tt := range testCases {
testCase := tt
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()
// GIVEN a runner that receives a cfg request without a handle function.
def, err := integration.NewDefinition(config.ConfigEntry{
InstanceName: testCase.name,
Exec: testhelp.Command(fixtures.EchoFromEnv),
Env: map[string]string{"STDERR_STRING": testCase.logLine},
}, integration.ErrLookup, nil, nil)
require.NoError(t, err)

e := &testemit.RecordEmitter{}
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, nil, nil, host.IDLookup{})

// WHEN the runner executes the binary and handle the payload.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
t.Cleanup(func() { cancel() })
r.Run(ctx, nil, nil)

var lastEntry *logrus.Entry
// THEN log entry found.
assert.Eventually(t, func() bool {
entries := hook.AllEntries()
for _, e := range entries {
if e.Data["msg"] == testCase.expectedLogMsg || e.Message == testCase.expectedLogMsg {
lastEntry = e

return true
}
}

return false
}, time.Second, 100*time.Millisecond)
assert.Equal(t, testCase.expectedLevel, lastEntry.Level)
})
}
}

0 comments on commit 11b2fc6

Please sign in to comment.