Skip to content

Commit

Permalink
[Log Collector] Change log file names to old format [1.3.x] (#3649)
Browse files Browse the repository at this point in the history
  • Loading branch information
TomerShor committed May 25, 2023
1 parent b15239d commit abc022f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 67 deletions.
17 changes: 7 additions & 10 deletions go/pkg/services/logcollector/logcollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (suite *LogCollectorTestSuite) TestStreamPodLogs() {
suite.Require().True(started, "Log streaming didn't start")

// resolve log file path
logFilePath := suite.logCollectorServer.resolvePodLogFilePath(suite.projectName, runId, pod.Name)
logFilePath := suite.logCollectorServer.resolveRunLogFilePath(suite.projectName, runId)

// read log file until it has content, or timeout
timeout := time.After(30 * time.Second)
Expand Down Expand Up @@ -259,10 +259,9 @@ func (suite *LogCollectorTestSuite) TestStartLogBestEffort() {
func (suite *LogCollectorTestSuite) TestGetLogsSuccessful() {

runUID := uuid.New().String()
podName := "my-pod"

// creat log file for runUID and pod
logFilePath := suite.logCollectorServer.resolvePodLogFilePath(suite.projectName, runUID, podName)
logFilePath := suite.logCollectorServer.resolveRunLogFilePath(suite.projectName, runUID)

// write log file
logText := "Some fake pod logs\n"
Expand Down Expand Up @@ -397,8 +396,6 @@ func (suite *LogCollectorTestSuite) TestReadLogsFromFileWhileWriting() {

func (suite *LogCollectorTestSuite) TestHasLogs() {
runUID := uuid.New().String()
podName := "my-pod"

request := &log_collector.HasLogsRequest{
RunUID: runUID,
ProjectName: suite.projectName,
Expand All @@ -411,7 +408,7 @@ func (suite *LogCollectorTestSuite) TestHasLogs() {
suite.Require().False(hasLogsResponse.HasLogs, "Expected run to not have logs")

// create log file for runUID and pod
logFilePath := suite.logCollectorServer.resolvePodLogFilePath(suite.projectName, runUID, podName)
logFilePath := suite.logCollectorServer.resolveRunLogFilePath(suite.projectName, runUID)

// write log file
logText := "Some fake pod logs\n"
Expand Down Expand Up @@ -521,7 +518,7 @@ func (suite *LogCollectorTestSuite) TestDeleteLogs() {
for i := 0; i < testCase.logsNumToCreate; i++ {
runUID := uuid.New().String()
runUIDs = append(runUIDs, runUID)
logFilePath := suite.logCollectorServer.resolvePodLogFilePath(projectName, runUID, "pod")
logFilePath := suite.logCollectorServer.resolveRunLogFilePath(projectName, runUID)
err := common.WriteToFile(logFilePath, []byte("some log"), false)
suite.Require().NoError(err, "Failed to write to file")
}
Expand Down Expand Up @@ -558,7 +555,7 @@ func (suite *LogCollectorTestSuite) TestDeleteProjectLogs() {
for i := 0; i < logsNum; i++ {
runUID := uuid.New().String()
runUIDs = append(runUIDs, runUID)
logFilePath := suite.logCollectorServer.resolvePodLogFilePath(projectName, runUID, "pod")
logFilePath := suite.logCollectorServer.resolveRunLogFilePath(projectName, runUID)
err := common.WriteToFile(logFilePath, []byte("some log"), false)
suite.Require().NoError(err, "Failed to write to file")
}
Expand Down Expand Up @@ -596,7 +593,7 @@ func (suite *LogCollectorTestSuite) TestGetLogFilePath() {
suite.Require().NoError(err)

// make the run file
runFilePath := suite.logCollectorServer.resolvePodLogFilePath(projectName, runUID, "pod")
runFilePath := suite.logCollectorServer.resolveRunLogFilePath(projectName, runUID)
err = common.WriteToFile(runFilePath, []byte("some log"), false)
suite.Require().NoError(err, "Failed to write to file")

Expand Down Expand Up @@ -627,7 +624,7 @@ func (suite *LogCollectorTestSuite) TestGetLogFilePathConcurrently() {
suite.Require().NoError(err)

// make the run file
runFilePath := suite.logCollectorServer.resolvePodLogFilePath(projectName, runUID, "pod")
runFilePath := suite.logCollectorServer.resolveRunLogFilePath(projectName, runUID)
err = common.WriteToFile(runFilePath, []byte("some log"), false)
suite.Require().NoError(err, "Failed to write to file")

Expand Down
78 changes: 28 additions & 50 deletions go/pkg/services/logcollector/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"io"
"io/fs"
"math"
"os"
"path"
Expand Down Expand Up @@ -433,6 +432,10 @@ func (s *Server) GetLogs(request *protologcollector.GetLogsRequest, responseStre

// HasLogs returns true if the log file exists for a given run id
func (s *Server) HasLogs(ctx context.Context, request *protologcollector.HasLogsRequest) (*protologcollector.HasLogsResponse, error) {
s.Logger.DebugWithCtx(ctx,
"Received has log request",
"runUID", request.RunUID,
"project", request.ProjectName)

// get log file path
if _, err := s.getLogFilePath(ctx, request.RunUID, request.ProjectName); err != nil {
Expand Down Expand Up @@ -656,7 +659,7 @@ func (s *Server) startLogStreaming(ctx context.Context,
startedStreamingGoroutine <- true

// create a log file to the pod
logFilePath := s.resolvePodLogFilePath(projectName, runUID, podName)
logFilePath := s.resolveRunLogFilePath(projectName, runUID)
if err := common.EnsureFileExists(logFilePath); err != nil {
s.Logger.ErrorWithCtx(ctx,
"Failed to ensure log file",
Expand Down Expand Up @@ -785,9 +788,9 @@ func (s *Server) streamPodLogs(ctx context.Context,
return true, nil
}

// resolvePodLogFilePath returns the path to the pod log file
func (s *Server) resolvePodLogFilePath(projectName, runUID, podName string) string {
return path.Join(s.baseDir, projectName, fmt.Sprintf("%s_%s", runUID, podName))
// resolveRunLogFilePath returns the path to the pod log file
func (s *Server) resolveRunLogFilePath(projectName, runUID string) string {
return path.Join(s.baseDir, projectName, runUID)
}

// getLogFilePath returns the path to the run's latest log file
Expand All @@ -808,8 +811,6 @@ func (s *Server) getLogFilePath(ctx context.Context, runUID, projectName string)
defer projectMutex.(*sync.Mutex).Unlock()

var logFilePath string
var latestModTime time.Time

var retryCount int
if err := common.RetryUntilSuccessful(5*time.Second, 1*time.Second, func() (bool, error) {
defer func() {
Expand All @@ -836,52 +837,29 @@ func (s *Server) getLogFilePath(ctx context.Context, runUID, projectName string)
return false, errors.Wrap(err, "Failed to get project directory")
}

// list all files in project directory
if err := filepath.WalkDir(filepath.Join(s.baseDir, projectName),
func(path string, dirEntry fs.DirEntry, err error) error {
if err != nil {
s.Logger.WarnWithCtx(ctx,
"Failed to walk path",
"retryCount", retryCount,
"path", path,
"err", errors.GetErrorStackString(err, 10))
return errors.Wrapf(err, "Failed to walk path %s", path)
}

// skip directories
if dirEntry.IsDir() {
return nil
}

// if file name starts with run id, it's a log file
if strings.HasPrefix(dirEntry.Name(), runUID) {
info, err := dirEntry.Info()
if err != nil {
return errors.Wrapf(err, "Failed to get file info for %s", path)
}

// if it's the first file, set it as the log file path
// otherwise, check if it's the latest modified file
if logFilePath == "" || info.ModTime().After(latestModTime) {
logFilePath = path
latestModTime = info.ModTime()
}
}

return nil
}); err != nil {
// get run log file path
runLogFilePath := s.resolveRunLogFilePath(projectName, runUID)

// retry
return true, errors.Wrap(err, "Failed to list files in base directory")
}

if logFilePath == "" {
return true, errors.Errorf("Log file not found for run %s", runUID)
if exists, err := common.FileExists(runLogFilePath); err != nil {
s.Logger.WarnWithCtx(ctx,
"Failed to get run log file path",
"retryCount", retryCount,
"runUID", runUID,
"projectName", projectName,
"err", err.Error())
return false, errors.Wrap(err, "Failed to get project directory")
} else if !exists {
s.Logger.WarnWithCtx(ctx,
"Run log file not found",
"retryCount", retryCount,
"runUID", runUID,
"projectName", projectName)
return true, errors.New("Run log file not found")
}

// found log file
// found it
logFilePath = runLogFilePath
return false, nil

}); err != nil {
return "", errors.Wrap(err, "Exhausted getting log file path")
}
Expand Down Expand Up @@ -1108,7 +1086,7 @@ func (s *Server) successfulBaseResponse() *protologcollector.BaseResponse {
func (s *Server) deleteRunLogFiles(ctx context.Context, runUID, project string) error {

// get all files that have the runUID as a prefix
pattern := path.Join(s.baseDir, project, fmt.Sprintf("%s_*", runUID))
pattern := path.Join(s.baseDir, project, runUID)
files, err := filepath.Glob(pattern)
if err != nil {
return errors.Wrap(err, "Failed to get log files")
Expand Down
12 changes: 5 additions & 7 deletions mlrun/api/crud/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,20 +242,18 @@ def get_log_mtime(self, project: str, uid: str) -> int:
def log_file_exists_for_run_uid(project: str, uid: str) -> (bool, pathlib.Path):
"""
Checks if the log file exists for the given project and uid
There could be two types of log files:
1. Log file which was created by the legacy logger with the following file format - project/<run-uid>)
2. Log file which was created by the new logger with the following file format- /project/<run-uid>-<pod-name>
Therefore, we check if the log file exists for both formats
A Run's log file path is: /mlrun/logs/{project}/{uid}
:param project: project name
:param uid: run uid
:return: True if the log file exists, False otherwise, and the log file path
"""
project_logs_dir = project_logs_path(project)
if not project_logs_dir.exists():
return False, None
for file in os.listdir(str(project_logs_dir)):
if file.startswith(uid):
return True, project_logs_dir / file

log_file = log_path(project, uid)
if log_file.exists():
return True, log_file

return False, None

Expand Down
5 changes: 5 additions & 0 deletions mlrun/api/utils/clients/log_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ async def get_logs(
try:
has_logs = await self.has_logs(run_uid, project, verbose, raise_on_error)
if not has_logs:
logger.debug(
"Run has no logs to collect",
run_uid=run_uid,
project=project,
)

# run has no logs - return empty logs and exit so caller won't wait for logs or retry
yield b""
Expand Down

0 comments on commit abc022f

Please sign in to comment.