Skip to content

Commit

Permalink
fix: follow logs (#1298)
Browse files Browse the repository at this point in the history
## Description:
Fixes `.. service logs <enclave> <service> -f`

ex.



https://github.com/kurtosis-tech/kurtosis/assets/46531991/2d8edf68-7dde-4343-8e2e-6ff1f9778d7c







## Is this change user facing?
YES

## References:
#1293
  • Loading branch information
tedim52 committed Sep 14, 2023
1 parent 13e78c5 commit 9b0bcb7
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 65 deletions.
Expand Up @@ -510,7 +510,7 @@ func TestStreamUserServiceLogsPerWeek_WithLogLineAcrossWeeks(t *testing.T) {
logLine3a}

week3logLinesStr := strings.Join(week3logLines, "\n") + "\n"
week4logLinesStr := strings.Join(week4logLines, "\n")
week4logLinesStr := strings.Join(week4logLines, "\n") + "\n"

week4filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpathForTests, strconv.Itoa(defaultYear), strconv.Itoa(4), testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype)
week3filepath := fmt.Sprintf(volume_consts.PerWeekFilePathFmtStr, volume_consts.LogsStorageDirpathForTests, strconv.Itoa(defaultYear), strconv.Itoa(3), testEnclaveUuid, testUserService1Uuid, volume_consts.Filetype)
Expand Down
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/volume_filesystem"
"github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/logline"
"github.com/kurtosis-tech/stacktrace"
"github.com/nxadm/tail"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
"io"
Expand All @@ -25,9 +26,9 @@ const (
oneWeek = 7 * 24 * time.Hour
)

// This strategy pulls logs from filesytsem where there is a log file per year, per week, per enclave, per service
// PerWeekStreamLogsStrategy pulls logs from filesystem where there is a log file per year, per week, per enclave, per service
// Weeks are denoted 01-52
// eg.
// e.g.
// [.../28/d3e8832d671f/61830789f03a.json] is the file containing logs from service with uuid 61830789f03a, in enclave with uuid d3e8832d671f,
// in the 28th week of the current year
type PerWeekStreamLogsStrategy struct {
Expand Down Expand Up @@ -65,8 +66,9 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
This means logs past the retention period are being returned, likely a bug in Kurtosis.`,
volume_consts.LogRetentionPeriodInWeeks+1, len(paths))
}
latestLogFile := paths[len(paths)-1]

fileReaders := []io.Reader{}
var fileReaders []io.Reader
for _, pathStr := range paths {
logsFile, err := fs.Open(pathStr)
if err != nil {
Expand All @@ -87,11 +89,12 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
return
default:
var jsonLogStr string
var err error
var readErr error
var jsonLogNewStr string
var shouldReturnAfterStreamingLastLine = false
var readErr error
var err error
var isLastLogLine = false

// get a complete log line
for {
jsonLogNewStr, readErr = logsReader.ReadString(volume_consts.NewLineRune)
jsonLogStr = jsonLogStr + jsonLogNewStr
Expand All @@ -105,15 +108,19 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
}
}
if readErr != nil && errors.Is(readErr, io.EOF) {
if shouldFollowLogs {
continue
}
// exiting stream
logrus.Debugf("EOF error returned when reading logs for service '%v' in enclave '%v'", serviceUuid, enclaveUuid)
if jsonLogStr != "" {
shouldReturnAfterStreamingLastLine = true
isLastLogLine = true
} else {
return
if shouldFollowLogs {
if err = strategy.tailLogs(latestLogFile, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred following logs for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid)
return
}
} else {
return
}
}
}
break
Expand All @@ -123,52 +130,18 @@ func (strategy *PerWeekStreamLogsStrategy) StreamLogs(
return
}

// each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."}
// eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd",
// "log":"hi","timestamp":"2023-08-14T14:57:49Z"}

// First, we decode the line
var jsonLog JsonLog
err = json.Unmarshal([]byte(jsonLogStr), &jsonLog)
if err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred parsing the json logs file for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid)
return
}

// Then we extract the actual log message using the "log" field
logLineStr, found := jsonLog[volume_consts.LogLabel]
if !found {
streamErrChan <- stacktrace.NewError("An error retrieving the log field from logs json file. This is a bug in Kurtosis.")
return
if err = strategy.sendJsonLogLine(jsonLogStr, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred sending log line for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid)
}
logLine := logline.NewLogLine(logLineStr)

// Then we filter by checking if the log message is valid based on requested filtersr
validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex)
if err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex)
break
}
// ensure this log line is within the retention period if it has a timestamp
withinRetention, err := strategy.isWithinRetentionPeriod(jsonLog)
if err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex)
break
}

shouldReturnLogLine := validLogLine && withinRetention
if !shouldReturnLogLine {
break
}

// send the log line
logLines := []logline.LogLine{*logLine}
userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{
serviceUuid: logLines,
}
logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap
if shouldReturnAfterStreamingLastLine {
return
if isLastLogLine {
if shouldFollowLogs {
if err = strategy.tailLogs(latestLogFile, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex); err != nil {
streamErrChan <- stacktrace.Propagate(err, "An error occurred following logs for service '%v' in enclave '%v'.", serviceUuid, enclaveUuid)
}
} else {
return
}
}
}
}
Expand All @@ -185,7 +158,7 @@ func (strategy *PerWeekStreamLogsStrategy) getRetainedLogsFilePaths(
filesystem volume_filesystem.VolumeFilesystem,
retentionPeriodInWeeks int,
enclaveUuid, serviceUuid string) []string {
paths := []string{}
var paths []string

// get log file paths as far back as they exist
for i := 0; i < (retentionPeriodInWeeks + 1); i++ {
Expand All @@ -203,16 +176,97 @@ func (strategy *PerWeekStreamLogsStrategy) getRetainedLogsFilePaths(
return paths
}

// Returns true if no [logLine] has no timestamp
// tail -f [filepath]
func (strategy *PerWeekStreamLogsStrategy) tailLogs(
filepath string,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex,
) error {
logTail, err := tail.TailFile(filepath, tail.Config{
Location: nil,
ReOpen: false,
MustExist: true,
Poll: false,
Pipe: false,
Follow: true,
MaxLineSize: 0,
RateLimiter: nil,
Logger: logrus.StandardLogger()})
if err != nil {
return stacktrace.Propagate(err, "An error occurred while attempting to tail the log file.")
}

for logLine := range logTail.Lines {
err = strategy.sendJsonLogLine(logLine.Text, logsByKurtosisUserServiceUuidChan, serviceUuid, conjunctiveLogLinesFiltersWithRegex)
if err != nil {
return stacktrace.Propagate(err, "An error occurred sending json log line '%v'.", logLine.Text)
}
}
return nil
}

// Returns error if [jsonLogLineStr] is not a valid log line
func (strategy *PerWeekStreamLogsStrategy) sendJsonLogLine(
jsonLogLineStr string,
logsByKurtosisUserServiceUuidChan chan map[service.ServiceUUID][]logline.LogLine,
serviceUuid service.ServiceUUID,
conjunctiveLogLinesFiltersWithRegex []logline.LogLineFilterWithRegex) error {
// each logLineStr is of the following structure: {"enclave_uuid": "...", "service_uuid":"...", "log": "...",.. "timestamp":"..."}
// eg. {"container_type":"api-container", "container_id":"8f8558ba", "container_name":"/kurtosis-api--ffd",
// "log":"hi","timestamp":"2023-08-14T14:57:49Z"}

// First decode the line
var jsonLog JsonLog
if err := json.Unmarshal([]byte(jsonLogLineStr), &jsonLog); err != nil {
return stacktrace.Propagate(err, "An error occurred parsing the json log string: %v\n", jsonLogLineStr)
}

// Then extract the actual log message using the "log" field
logLineStr, found := jsonLog[volume_consts.LogLabel]
if !found {
return stacktrace.NewError("An error retrieving the log field from json log string: %v\n", jsonLogLineStr)
}
logLine := logline.NewLogLine(logLineStr)

// Then filter by checking if the log message is valid based on requested filters
validLogLine, err := logLine.IsValidLogLineBaseOnFilters(conjunctiveLogLinesFiltersWithRegex)
if err != nil {
return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex)
}
if !validLogLine {
return nil
}

// ensure this log line is within the retention period if it has a timestamp
withinRetentionPeriod, err := strategy.isWithinRetentionPeriod(jsonLog)
if err != nil {
return stacktrace.Propagate(err, "An error occurred filtering log line '%+v' using filters '%+v'", logLine, conjunctiveLogLinesFiltersWithRegex)
}
if !withinRetentionPeriod {
return nil
}

// send the log line
logLines := []logline.LogLine{*logLine}
userServicesLogLinesMap := map[service.ServiceUUID][]logline.LogLine{
serviceUuid: logLines,
}
logsByKurtosisUserServiceUuidChan <- userServicesLogLinesMap
return nil
}

// Returns true if [logLine] has no timestamp
func (strategy *PerWeekStreamLogsStrategy) isWithinRetentionPeriod(logLine JsonLog) (bool, error) {
retentionPeriod := strategy.time.Now().Add(time.Duration(-volume_consts.LogRetentionPeriodInWeeks-1) * oneWeek)
timestampStr, found := logLine[volume_consts.TimestampLabel]
if found {
timestamp, err := time.Parse(time.RFC3339, timestampStr)
if err != nil {
return false, stacktrace.Propagate(err, "An error occurred retrieving the timestamp field from logs json log line. This is a bug in Kurtosis.")
}
return timestamp.After(retentionPeriod), nil
if !found {
return true, nil
}

timestamp, err := time.Parse(time.RFC3339, timestampStr)
if err != nil {
return false, stacktrace.Propagate(err, "An error occurred retrieving the timestamp field from logs json log line. This is a bug in Kurtosis.")
}
return true, nil
return timestamp.After(retentionPeriod), nil
}
3 changes: 3 additions & 0 deletions engine/server/go.mod
Expand Up @@ -62,6 +62,7 @@ require (
require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.1 // indirect
Expand All @@ -83,6 +84,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/mod v0.12.0 // indirect
Expand All @@ -93,6 +95,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230706204954-ccb25ca9f130 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/api v0.27.2 // indirect
k8s.io/apimachinery v0.27.2 // indirect
Expand Down
7 changes: 7 additions & 0 deletions engine/server/go.sum
Expand Up @@ -29,6 +29,8 @@ github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhF
github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/gammazero/deque v0.1.0 h1:f9LnNmq66VDeuAlSAapemq/U7hJ2jpIWa4c09q8Dlik=
github.com/gammazero/deque v0.1.0/go.mod h1:KQw7vFau1hHuM8xmI9RbgKFbAsQFWmBpqQ2KenFLk6M=
github.com/gammazero/workerpool v1.1.2 h1:vuioDQbgrz4HoaCi2q1HLlOXdpbap5AET7xu5/qj87g=
Expand Down Expand Up @@ -112,6 +114,8 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo/v2 v2.9.1 h1:zie5Ly042PD3bsCvsSOPvRnFwyo3rKe64TJlD6nu0mk=
github.com/onsi/gomega v1.27.4 h1:Z2AnStgsdSayCMDiCU42qIz+HLqEPcgiOCXjAU/w+8E=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
Expand Down Expand Up @@ -185,6 +189,7 @@ golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -248,6 +253,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
Expand Down

0 comments on commit 9b0bcb7

Please sign in to comment.