diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go index 9d983c3da0..e589b1db66 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go @@ -510,7 +510,7 @@ func TestStreamUserServiceLogsPerWeek_WithLogLineAcrossWeeks(t *testing.T) { logLine3a} week3logLinesStr := strings.Join(week3logLines, "\n") + "\n" - week4logLinesStr := strings.Join(week4logLines, "\n") + week4logLinesStr := strings.Join(week4logLines, "\n") + "\n" week4filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpathForTests, strconv.Itoa(defaultYear), strconv.Itoa(4), testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) week3filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpathForTests, strconv.Itoa(defaultYear), strconv.Itoa(3), testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype) diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go index 03ca6f7988..75fe00b77c 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy/per_week_stream_logs_strategy.go @@ -13,6 +13,7 @@ import ( "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/volume_filesystem" "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/logline" "github.com/kurtosis-tech/stacktrace" + "github.com/nxadm/tail" "github.com/sirupsen/logrus" "golang.org/x/exp/slices" "io" @@ -25,9 +26,9 @@ const ( oneWeek = 7 * 24 * time.Hour ) -// This strategy pulls logs from filesytsem where there is a log file per year, per week, per enclave, per service +// PerWeekStreamLogsStrategy pulls logs from filesystem where there is a log file per year, per week, per enclave, per service // Weeks are denoted 01-52 -// eg. +// e.g. // [.../28/d3e8832d671f/61830789f03a.json] is the file containing logs from service with uuid 61830789f03a, in enclave with uuid d3e8832d671f, // in the 28th week of the current year type PerWeekStreamLogsStrategy struct { @@ -65,8 +66,9 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( This means logs past the retention period are being returned, likely a bug in Kurtosis.`, volume_consts.LogRetentionPeriodInWeeks+1, len(paths)) } + latestLogFile := paths[len(paths)-1] - fileReaders := []io.Reader{} + var fileReaders []io.Reader for _, pathStr := range paths { logsFile, err := fs.Open(pathStr) if err != nil { @@ -87,11 +89,12 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( return default: var jsonLogStr string - var err error - var readErr error var jsonLogNewStr string - var shouldReturnAfterStreamingLastLine = false + var readErr error + var err error + var isLastLogLine = false + // get a complete log line for { jsonLogNewStr, readErr = logsReader.ReadString(volume_consts.NewLineRune) jsonLogStr = jsonLogStr + jsonLogNewStr @@ -105,15 +108,19 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( } } if readErr != nil && errors.Is(readErr, io.EOF) { - if shouldFollowLogs { - continue - } // exiting stream logrus.Debugf("EOF error returned when reading logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid) if jsonLogStr != "" { - shouldReturnAfterStreamingLastLine = true + isLastLogLine = true } else { - return + if shouldFollowLogs { + if err = strategy.tailLogs(latestLogFile, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + streamErrChan <- stacktrace.Propagate(err, "An error occurred following logs for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid) + return + } + } else { + return + } } } break @@ -123,52 +130,18 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs( return } - // each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."} - // eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd", - // "log":"hi","timestamp":"2023-08-14T14:57:49Z"} - - // First, we decode the line - var jsonLog JsonLog - err = json.Unmarshal([]byte(jsonLogStr), &jsonLog) - if err != nil { - streamErrChan <- stacktrace.Propagate(err, "An error occurred parsing the json logs file for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid) - return - } - - // Then we extract the actual log message using the "log" field - logLineStr, found := jsonLog[volume_consts.LogLabel] - if !found { - streamErrChan <- stacktrace.NewError("An error retrieving the log field from logs json file. This is a bug in Kurtosis.") - return + if err = strategy.sendJsonLogLine(jsonLogStr, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + streamErrChan <- stacktrace.Propagate(err, "An error occurred sending log line for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid) } - logLine := logline.NewLogLine(logLineStr) - // Then we filter by checking if the log message is valid based on requested filtersr - validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex) - if err != nil { - streamErrChan <- stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex) - break - } - // ensure this log line is within the retention period if it has a timestamp - withinRetention, err := strategy.isWithinRetentionPeriod(jsonLog) - if err != nil { - streamErrChan <- stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex) - break - } - - shouldReturnLogLine := validLogLine && withinRetention - if !shouldReturnLogLine { - break - } - - // send the log line - logLines := []logline.LogLine{*logLine} - userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ - serviceUuid: logLines, - } - logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap - if shouldReturnAfterStreamingLastLine { - return + if isLastLogLine { + if shouldFollowLogs { + if err = strategy.tailLogs(latestLogFile, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil { + streamErrChan <- stacktrace.Propagate(err, "An error occurred following logs for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid) + } + } else { + return + } } } } @@ -185,7 +158,7 @@ func (strategy *PerWeekStreamLogsStrategy) getRetainedLogsFilePaths( filesystem volume_filesystem.VolumeFilesystem, retentionPeriodInWeeks int, enclaveUuid, serviceUuid string) []string { - paths := []string{} + var paths []string // get log file paths as far back as they exist for i := 0; i < (retentionPeriodInWeeks + 1); i++ { @@ -203,16 +176,97 @@ func (strategy *PerWeekStreamLogsStrategy) getRetainedLogsFilePaths( return paths } -// Returns true if no [logLine] has no timestamp +// tail -f [filepath] +func (strategy *PerWeekStreamLogsStrategy) tailLogs( + filepath string, + logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + serviceUuid service.ServiceUUID, + conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex, +) error { + logTail, err := tail.TailFile(filepath, tail.Config{ + Location: nil, + ReOpen: false, + MustExist: true, + Poll: false, + Pipe: false, + Follow: true, + MaxLineSize: 0, + RateLimiter: nil, + Logger: logrus.StandardLogger()}) + if err != nil { + return stacktrace.Propagate(err, "An error occurred while attempting to tail the log file.") + } + + for logLine := range logTail.Lines { + err = strategy.sendJsonLogLine(logLine.Text, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex) + if err != nil { + return stacktrace.Propagate(err, "An error occurred sending json log line '%v'.", logLine.Text) + } + } + return nil +} + +// Returns error if [jsonLogLineStr] is not a valid log line +func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine( + jsonLogLineStr string, + logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine, + serviceUuid service.ServiceUUID, + conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error { + // each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."} + // eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd", + // "log":"hi","timestamp":"2023-08-14T14:57:49Z"} + + // First decode the line + var jsonLog JsonLog + if err := json.Unmarshal([]byte(jsonLogLineStr), &jsonLog); err != nil { + return stacktrace.Propagate(err, "An error occurred parsing the json log string: %v\n", jsonLogLineStr) + } + + // Then extract the actual log message using the "log" field + logLineStr, found := jsonLog[volume_consts.LogLabel] + if !found { + return stacktrace.NewError("An error retrieving the log field from json log string: %v\n", jsonLogLineStr) + } + logLine := logline.NewLogLine(logLineStr) + + // Then filter by checking if the log message is valid based on requested filters + validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex) + if err != nil { + return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex) + } + if !validLogLine { + return nil + } + + // ensure this log line is within the retention period if it has a timestamp + withinRetentionPeriod, err := strategy.isWithinRetentionPeriod(jsonLog) + if err != nil { + return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex) + } + if !withinRetentionPeriod { + return nil + } + + // send the log line + logLines := []logline.LogLine{*logLine} + userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{ + serviceUuid: logLines, + } + logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap + return nil +} + +// Returns true if [logLine] has no timestamp func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine JsonLog) (bool, error) { retentionPeriod := strategy.time.Now().Add(time.Duration(-volume_consts.LogRetentionPeriodInWeeks-1) * oneWeek) timestampStr, found := logLine[volume_consts.TimestampLabel] - if found { - timestamp, err := time.Parse(time.RFC3339, timestampStr) - if err != nil { - return false, stacktrace.Propagate(err, "An error occurred retrieving the timestamp field from logs json log line. This is a bug in Kurtosis.") - } - return timestamp.After(retentionPeriod), nil + if !found { + return true, nil + } + + timestamp, err := time.Parse(time.RFC3339, timestampStr) + if err != nil { + return false, stacktrace.Propagate(err, "An error occurred retrieving the timestamp field from logs json log line. This is a bug in Kurtosis.") } - return true, nil + return timestamp.After(retentionPeriod), nil } diff --git a/engine/server/go.mod b/engine/server/go.mod index f1215e6ef8..86021466b0 100644 --- a/engine/server/go.mod +++ b/engine/server/go.mod @@ -62,6 +62,7 @@ require ( require ( github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.1 // indirect @@ -83,6 +84,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nxadm/tail v1.4.8 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.0 // indirect golang.org/x/mod v0.12.0 // indirect @@ -93,6 +95,7 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230706204954-ccb25ca9f130 // indirect gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/api v0.27.2 // indirect k8s.io/apimachinery v0.27.2 // indirect diff --git a/engine/server/go.sum b/engine/server/go.sum index 25d7e5f340..f8fcfe2e16 100644 --- a/engine/server/go.sum +++ b/engine/server/go.sum @@ -29,6 +29,8 @@ github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhF github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/gammazero/deque v0.1.0 h1:f9LnNmq66VDeuAlSAapemq/U7hJ2jpIWa4c09q8Dlik= github.com/gammazero/deque v0.1.0/go.mod h1:KQw7vFau1hHuM8xmI9RbgKFbAsQFWmBpqQ2KenFLk6M= github.com/gammazero/workerpool v1.1.2 h1:vuioDQbgrz4HoaCi2q1HLlOXdpbap5AET7xu5/qj87g= @@ -112,6 +114,8 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo/v2 v2.9.1 h1:zie5Ly042PD3bsCvsSOPvRnFwyo3rKe64TJlD6nu0mk= github.com/onsi/gomega v1.27.4 h1:Z2AnStgsdSayCMDiCU42qIz+HLqEPcgiOCXjAU/w+8E= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -185,6 +189,7 @@ golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -248,6 +253,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=