Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parser for integration SDK errors #1800

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions internal/integrations/v4/runner/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,46 @@ func Test_parseLogrusFields(t *testing.T) {
})
}
}

//nolint:paralleltest
func Test_parseSDKFields(t *testing.T) {
tests := map[string]struct {
input string
want logrus.Fields
}{
"debug": {`[DEBUG] Temperature changes`, logrus.Fields{
"level": `debug`,
"msg": `Temperature changes`,
}},
"info": {`[INFO] A group of walrus emerges from the ocean`, logrus.Fields{
"level": `info`,
"msg": `A group of walrus emerges from the ocean`,
}},
"fatal": {`[FATAL] The ice breaks!`, logrus.Fields{
"level": `fatal`,
"msg": `The ice breaks!`,
}},
"error": {`[ERR] Error creating connection to SQL Server: lookup mssql on 192.168.65.5:53: no such host`, logrus.Fields{
"level": `error`,
"msg": `Error creating connection to SQL Server: lookup mssql on 192.168.65.5:53: no such host`,
}},
"with-escaped-quotes": {`[WARN] The group's number \"increased\" tremendously!`, logrus.Fields{
"level": `warn`,
"msg": `The group's number \"increased\" tremendously!`,
}},

"not matching": {`simple line`, nil},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
out := parseSDKFields(tc.input)

for k, v := range tc.want {
val, ok := out[k]
assert.True(t, ok)
assert.Equal(t, v, val)
}
})
}
}
58 changes: 50 additions & 8 deletions internal/integrations/v4/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
//2- map: &{any character}
//3- word: any character except spaces
logrusRegexp = regexp.MustCompile(`([^\s]*?)=(".*?[^\\]"|&{.*?}|[^\s]*)`)
sdkLogRegexp = regexp.MustCompile(`\[([^\]]*)\]\s*(.+)`)
)

// generic types to handle the stderr log parsing
Expand All @@ -56,7 +57,7 @@ type runner struct {
log log.Entry
definition integration.Definition
handleErrors func(context.Context, <-chan error) // by default, runner.logErrors. Replaceable for testing purposes
stderrParser logParser
stderrParser []logParser
lastStderr stderrQueue
healthCheck sync.Once
heartBeatFunc func()
Expand Down Expand Up @@ -85,7 +86,7 @@ func NewRunner(
dSources: dSources,
definition: intDef,
heartBeatFunc: func() {},
stderrParser: parseLogrusFields,
stderrParser: []logParser{parseSDKFields, parseLogrusFields},
lastStderr: newStderrQueue(intDef.LogsQueueSize),
terminateQueue: terminateQ,
cache: cache.CreateCache(),
Expand Down Expand Up @@ -298,17 +299,39 @@ func (r *runner) handleStderr(stderr <-chan []byte) {

// obfuscated stderr
obfuscatedLine := helpers.ObfuscateSensitiveDataFromString(string(line))
var lineWasParsed bool

if r.log.IsDebugEnabled() {
r.log.WithField("line", obfuscatedLine).Debug("Integration stderr (not parsed).")
} else {
fields := r.stderrParser(obfuscatedLine)
if v, ok := fields["level"]; ok && (v == "error" || v == "fatal") {
for _, errParser := range r.stderrParser {
fields := errParser(obfuscatedLine)

if lvl, ok := fields["level"]; ok {
// If a field already exists, like the time, logrus automatically adds the prefix "deps." to the
// Duplicated keys
r.log.WithFields(logrus.Fields(fields)).Info("received an integration log line")
logLine := r.log.WithFields(logrus.Fields(fields))
logMessage := "integration log"

lvl, ok := lvl.(string)
if ok {
switch strings.ToLower(lvl) {
// We will set all this levels as debug to not be shown by default
case "info", "debug", "warn", "warning":
logLine.Debug(logMessage)
case "trace":
logLine.Trace(logMessage)
default:
logLine.Error(logMessage)
}

lineWasParsed = true

break
}
}
}

if !lineWasParsed {
r.log.WithField("line", obfuscatedLine).Debug("Integration stderr (not parsed).")
}
}
}

Expand Down Expand Up @@ -429,3 +452,22 @@ func parseLogrusFields(line string) (fields logFields) {

return
}

func parseSDKFields(line string) logFields {
matches := sdkLogRegexp.FindAllStringSubmatch(line, -1)
if len(matches) == 0 {
return nil
}

lvl, msg := matches[0][1], matches[0][2]

level := strings.ToLower(lvl)
if level == "err" {
level = "error"
}

return logFields{
"level": level,
"msg": msg,
}
}
141 changes: 141 additions & 0 deletions internal/integrations/v4/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,144 @@ func Test_runner_Run_handlesCfgProtocol(t *testing.T) {
return false
}, time.Second, 10*time.Millisecond)
}

//nolint:exhaustruct,funlen
func Test_runner_Run_Integration_Log(t *testing.T) {
t.Parallel()

if runtime.GOOS == "windows" {
t.Skip()
}

log.SetOutput(ioutil.Discard) // discard logs so not to break race tests
t.Cleanup(func() { log.SetOutput(os.Stderr) }) // return back to default

hook := new(test.Hook)
log.AddHook(hook)

log.SetLevel(logrus.TraceLevel)

testCases := []struct {
name string
logLine string
expectedLogMsg string
expectedLevel logrus.Level
}{
{
name: "SDK_Info_log",
logLine: "[INFO] This is an info message",
expectedLogMsg: "This is an info message",
expectedLevel: logrus.DebugLevel,
},
{
name: "SDK_Debug_log",
logLine: "[DEBUG] This is a debug message",
expectedLogMsg: "This is a debug message",
expectedLevel: logrus.DebugLevel,
},
{
name: "SDK_Trace_log",
logLine: "[TRACE] This is a trace message",
expectedLogMsg: "This is a trace message",
expectedLevel: logrus.TraceLevel,
},
{
name: "SDK_Warning_log",
logLine: "[WARN] This is a warning message",
expectedLogMsg: "This is a warning message",
expectedLevel: logrus.DebugLevel,
},
{
name: "SDK_Error_log",
logLine: "[ERR] This is an error message",
expectedLogMsg: "This is an error message",
expectedLevel: logrus.ErrorLevel,
},
{
name: "SDK_Fatal_log",
logLine: "[FATAL] This is a fatal message",
expectedLogMsg: "This is a fatal message",
expectedLevel: logrus.ErrorLevel,
},
{
name: "Logrus_Info_log",
logLine: "level=info msg=\"This is an info message\"",
expectedLogMsg: "This is an info message",
expectedLevel: logrus.DebugLevel,
},
{
name: "Logrus_Debug_log",
logLine: "level=debug msg=\"This is a debug message\"",
expectedLogMsg: "This is a debug message",
expectedLevel: logrus.DebugLevel,
},
{
name: "Logrus_Trace_log",
logLine: "level=trace msg=\"This is a trace message\"",
expectedLogMsg: "This is a trace message",
expectedLevel: logrus.TraceLevel,
},
{
name: "Logrus_Warning_log",
logLine: "level=warning msg=\"This is a warning message\"",
expectedLogMsg: "This is a warning message",
expectedLevel: logrus.DebugLevel,
},
{
name: "Logrus_Error_log",
logLine: "level=error msg=\"This is an error message\"",
expectedLogMsg: "This is an error message",
expectedLevel: logrus.ErrorLevel,
},
{
name: "Logrus_Fatal_log",
logLine: "level=fatal msg=\"This is a fatal message\"",
expectedLogMsg: "This is a fatal message",
expectedLevel: logrus.ErrorLevel,
},
{
name: "Obfuscated_log",
logLine: "This is a parser-orphan log",
expectedLogMsg: "Integration stderr (not parsed).",
expectedLevel: logrus.DebugLevel,
},
}

for _, tt := range testCases {
testCase := tt
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()
// GIVEN a runner that receives a cfg request without a handle function.
def, err := integration.NewDefinition(config.ConfigEntry{
InstanceName: testCase.name,
Exec: testhelp.Command(fixtures.EchoFromEnv),
Env: map[string]string{"STDERR_STRING": testCase.logLine},
}, integration.ErrLookup, nil, nil)
require.NoError(t, err)

e := &testemit.RecordEmitter{}
r := NewRunner(def, e, nil, nil, cmdrequest.NoopHandleFn, nil, nil, host.IDLookup{})

// WHEN the runner executes the binary and handle the payload.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
t.Cleanup(func() { cancel() })
r.Run(ctx, nil, nil)

var lastEntry *logrus.Entry
// THEN log entry found.
assert.Eventually(t, func() bool {
entries := hook.AllEntries()
for _, e := range entries {
if e.Data["msg"] == testCase.expectedLogMsg || e.Message == testCase.expectedLogMsg {
lastEntry = e

return true
}
}

return false
}, time.Second, 100*time.Millisecond)
assert.Equal(t, testCase.expectedLevel, lastEntry.Level)
})
}
}
Loading