Skip to content

Commit

Permalink
[Log Collector] Remove project mutex (#3675)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tankilevitch committed May 31, 2023
1 parent c852c17 commit 97ec295
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 83 deletions.
37 changes: 0 additions & 37 deletions go/pkg/services/logcollector/logcollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,43 +603,6 @@ func (suite *LogCollectorTestSuite) TestGetLogFilePath() {
suite.Require().Equal(runFilePath, logFilePath, "Expected log file path to be the same as the run file path")
}

func (suite *LogCollectorTestSuite) TestGetLogFilePathConcurrently() {
runUID := "1234"
projectName := "someProjectB"
var err error

projectMutex := &sync.Mutex{}
suite.logCollectorServer.readDirentProjectNameSyncMap = &sync.Map{}
suite.logCollectorServer.readDirentProjectNameSyncMap.Store(projectName, projectMutex)
projectMutex.Lock()
startTime := time.Now()

// unlock the mutex after 1 second
time.AfterFunc(1500*time.Millisecond, func() {
projectMutex.Unlock()
})

// make the project dir
err = os.MkdirAll(path.Join(suite.baseDir, projectName), 0755)
suite.Require().NoError(err)

// make the run file
runFilePath := suite.logCollectorServer.resolveRunLogFilePath(projectName, runUID)
err = common.WriteToFile(runFilePath, []byte("some log"), false)
suite.Require().NoError(err, "Failed to write to file")

// get the log file path
logFilePath, err := suite.logCollectorServer.getLogFilePath(suite.ctx, runUID, projectName)
suite.Require().NoError(err, "Failed to get log file path")
suite.Require().Equal(runFilePath, logFilePath, "Expected log file path to be the same as the run file path")

endTime := time.Since(startTime)
suite.Require().Truef(endTime >= 1*time.Second, "Expected getLogFilePath to take more than a second (took %v)", endTime)

// make sure the mutex is unlocked
suite.Require().True(projectMutex.TryLock(), "Expected project mutex to be unlocked")
}

func TestLogCollectorTestSuite(t *testing.T) {
suite.Run(t, new(LogCollectorTestSuite))
}
46 changes: 0 additions & 46 deletions go/pkg/services/logcollector/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"golang.org/x/sync/errgroup"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -69,14 +68,6 @@ type Server struct {
// interval durations
readLogWaitTime time.Duration
monitoringInterval time.Duration

// log file cache to reduce sys calls finding the log file paths.
logFilesCache *cache.Expiring
logFilesCacheTTL time.Duration

// map of project name to its mutex lock
// using project mutex to prevent listing project dir concurrently
readDirentProjectNameSyncMap *sync.Map
}

// NewLogCollectorServer creates a new log collector server
Expand Down Expand Up @@ -148,8 +139,6 @@ func NewLogCollectorServer(logger logger.Logger,
logCollectionBufferPool := bufferpool.NewSizedBytePool(logCollectionBufferPoolSize, logCollectionBufferSizeBytes)
getLogsBufferPool := bufferpool.NewSizedBytePool(getLogsBufferPoolSize, getLogsBufferSizeBytes)

logFilesCache := cache.NewExpiring()

return &Server{
AbstractMlrunGRPCServer: abstractServer,
namespace: namespace,
Expand All @@ -164,18 +153,8 @@ func NewLogCollectorServer(logger logger.Logger,
logCollectionBufferSizeBytes: logCollectionBufferSizeBytes,
getLogsBufferSizeBytes: getLogsBufferSizeBytes,
isChief: isChief,
logFilesCache: logFilesCache,
startLogsFindingPodsInterval: 3 * time.Second,
startLogsFindingPodsTimeout: 15 * time.Second,
readDirentProjectNameSyncMap: &sync.Map{},

// we delete log files only when deleting the project
// that means, if project is gone, log files are gone too
// hasLogFiles is called during get_logs on project runs
// so if no project, no runs, no get_logs, and this one is pretty much safe to cache
// that being said, limit to few minutes (hard coded for now)
// this cache is done to reduce IOs
logFilesCacheTTL: 5 * time.Minute,
}, nil
}

Expand Down Expand Up @@ -467,7 +446,6 @@ func (s *Server) HasLogs(ctx context.Context, request *protologcollector.HasLogs
ErrorMessage: common.GetErrorStack(err, common.DefaultErrorStackDepth),
}, nil
}

return &protologcollector.HasLogsResponse{
Success: true,
HasLogs: true,
Expand Down Expand Up @@ -668,9 +646,6 @@ func (s *Server) startLogStreaming(ctx context.Context,
return
}

// add log file path to cache
s.logFilesCache.Set(s.getLogFileCacheKey(runUID, projectName), logFilePath, s.logFilesCacheTTL)

// open log file in read/write and append, to allow reading the logs while we write more logs to it
openFlags := os.O_RDWR | os.O_APPEND
file, err := os.OpenFile(logFilePath, openFlags, 0644)
Expand Down Expand Up @@ -805,21 +780,6 @@ func (s *Server) resolveRunLogFilePath(projectName, runUID string) string {

// getLogFilePath returns the path to the run's latest log file
func (s *Server) getLogFilePath(ctx context.Context, runUID, projectName string) (string, error) {

// first try load from cache
if filePath, found := s.logFilesCache.Get(s.getLogFileCacheKey(runUID, projectName)); found {
return filePath.(string), nil
}

// get project mutex or create one
projectMutex, _ := s.readDirentProjectNameSyncMap.LoadOrStore(projectName, &sync.Mutex{})

// lock project mutex, we want only one project dir to be read at a time
projectMutex.(*sync.Mutex).Lock()

// unlock project mutex when done
defer projectMutex.(*sync.Mutex).Unlock()

var logFilePath string
var retryCount int
if err := common.RetryUntilSuccessful(5*time.Second, 1*time.Second, func() (bool, error) {
Expand Down Expand Up @@ -874,8 +834,6 @@ func (s *Server) getLogFilePath(ctx context.Context, runUID, projectName string)
return "", errors.Wrap(err, "Exhausted getting log file path")
}

// store in cache
s.logFilesCache.Set(s.getLogFileCacheKey(runUID, projectName), logFilePath, s.logFilesCacheTTL)
return logFilePath, nil
}

Expand Down Expand Up @@ -1135,7 +1093,3 @@ func (s *Server) deleteProjectLogs(project string) error {

return nil
}

func (s *Server) getLogFileCacheKey(runUID, project string) string {
return fmt.Sprintf("%s/%s", runUID, project)
}

0 comments on commit 97ec295

Please sign in to comment.