Skip to content

Commit

Permalink
[API] Enhance log collector [1.6.x] (#5321)
Browse files Browse the repository at this point in the history
  • Loading branch information
liranbg committed Mar 25, 2024
1 parent e5107cf commit 467e492
Show file tree
Hide file tree
Showing 15 changed files with 360 additions and 52 deletions.
1 change: 1 addition & 0 deletions server/api/db/base.py
Expand Up @@ -71,6 +71,7 @@ def list_distinct_runs_uids(
only_uids: bool = False,
last_update_time_from: datetime.datetime = None,
states: List[str] = None,
specific_uids: List[str] = None,
):
pass

Expand Down
4 changes: 4 additions & 0 deletions server/api/db/sqldb/db.py
Expand Up @@ -247,6 +247,7 @@ def list_distinct_runs_uids(
only_uids=True,
last_update_time_from: datetime = None,
states: typing.List[str] = None,
specific_uids: List[str] = None,
) -> typing.Union[typing.List[str], RunList]:
"""
List all runs uids in the DB
Expand Down Expand Up @@ -277,6 +278,9 @@ def list_distinct_runs_uids(
if requested_logs_modes is not None:
query = query.filter(Run.requested_logs.in_(requested_logs_modes))

if specific_uids:
query = query.filter(Run.uid.in_(specific_uids))

if not only_uids:
# group_by allows us to have a row per uid with the whole record rather than just the uid (as distinct does)
# note we cannot promise that the same row will be returned each time per uid as the order is not guaranteed
Expand Down
29 changes: 27 additions & 2 deletions server/api/main.py
Expand Up @@ -511,12 +511,36 @@ async def _start_periodic_stop_logs():

async def _verify_log_collection_stopped_on_startup():
"""
Pulls runs from DB that are in terminal state and have logs requested, and call stop logs for them.
First, list runs that are currently being collected in the log collector.
Second, query the DB for those runs that are also in terminal state and have logs requested.
Lastly, call stop logs for the runs that met all of the above conditions.
This is done so that the log collector won't keep trying to collect logs for runs that are already
in terminal state.
"""
logger.debug("Listing runs currently being log collected")
log_collector_client = server.api.utils.clients.log_collector.LogCollectorClient()
run_uids_in_progress = []
failed_listing = False
try:
runs_in_progress_response_stream = log_collector_client.list_runs_in_progress()
# collate the run uids from the response stream to a list
async for run_uids in runs_in_progress_response_stream:
run_uids_in_progress.extend(run_uids)
except Exception as exc:
failed_listing = True
logger.warning(
"Failed listing runs currently being log collected",
exc=err_to_str(exc),
traceback=traceback.format_exc(),
)

if len(run_uids_in_progress) == 0 and not failed_listing:
logger.debug("No runs currently being log collected")
return

logger.debug(
"Getting all runs which have reached terminal state and already have logs requested",
"Getting current log collected runs which have reached terminal state and already have logs requested",
run_uids_in_progress_count=len(run_uids_in_progress),
)
db_session = await fastapi.concurrency.run_in_threadpool(create_session)
try:
Expand All @@ -531,6 +555,7 @@ async def _verify_log_collection_stopped_on_startup():
# usually it happens when run pods get preempted
mlrun.runtimes.constants.RunStates.unknown,
],
specific_uids=run_uids_in_progress,
)

if len(runs) > 0:
Expand Down
35 changes: 34 additions & 1 deletion server/api/utils/clients/log_collector.py
Expand Up @@ -313,7 +313,40 @@ async def delete_logs(
if verbose:
logger.warning(msg, error=response.errorMessage)

def _retryable_error(self, error_message, retryable_error_patterns) -> bool:
async def list_runs_in_progress(
self,
project: str = None,
verbose: bool = True,
raise_on_error: bool = True,
) -> typing.AsyncIterable[str]:
"""
List runs in progress from the log collector service
:param project: A project name to filter the runs by. If not provided, all runs in progress will be listed
:param verbose: Whether to log errors
:param raise_on_error: Whether to raise an exception on error
:return: A list of run uids
"""
request = self._log_collector_pb2.ListRunsRequest(
project=project,
)

response_stream = self._call_stream("ListRunsInProgress", request)
try:
async for chunk in response_stream:
yield chunk.runUIDs
except Exception as exc:
msg = "Failed to list runs in progress"
if raise_on_error:
raise LogCollectorErrorCode.map_error_code_to_mlrun_error(
LogCollectorErrorCode.ErrCodeInternal.value,
mlrun.errors.err_to_str(exc),
msg,
)
if verbose:
logger.warning(msg, error=mlrun.errors.err_to_str(exc))

@staticmethod
def _retryable_error(error_message, retryable_error_patterns) -> bool:
"""
Check if the error is retryable
:param error_message: The error message
Expand Down
4 changes: 3 additions & 1 deletion server/log-collector/cmd/logcollector/main.go
Expand Up @@ -45,6 +45,7 @@ func StartServer() error {
getLogsBufferSizeBytes := flag.Int("get-logs-buffer-buffer-size-bytes", common.GetEnvOrDefaultInt("MLRUN_LOG_COLLECTOR__GET_LOGS_BUFFER_SIZE_BYTES", common.DefaultGetLogsBufferSize), "Size of buffers in the buffer pool for getting logs, in bytes (default: 3.75MB)")
logTimeUpdateBytesInterval := flag.Int("log-time-update-bytes-interval", common.GetEnvOrDefaultInt("MLRUN_LOG_COLLECTOR__LOG_TIME_UPDATE_BYTES_INTERVAL", common.LogTimeUpdateBytesInterval), "Amount of logs to read between updates of the last log time in the 'in memory' state, in bytes (default: 4KB)")
clusterizationRole := flag.String("clusterization-role", common.GetEnvOrDefaultString("MLRUN_HTTPDB__CLUSTERIZATION__ROLE", "chief"), "The role of the log collector in the cluster (chief, worker)")
listRunsChunkSize := flag.Int("list-runs-chunk-size", common.GetEnvOrDefaultInt("MLRUN_LOG_COLLECTOR__LIST_RUNS_CHUNK_SIZE", common.DefaultListRunsChunkSize), "The chunk size for listing runs in progress")

// if namespace is not passed, it will be taken from env
namespace := flag.String("namespace", "", "The namespace to collect logs from")
Expand Down Expand Up @@ -80,7 +81,8 @@ func StartServer() error {
*logCollectionBufferSizeBytes,
*getLogsBufferSizeBytes,
*logTimeUpdateBytesInterval,
*advancedLogLevel)
*advancedLogLevel,
*listRunsChunkSize)
if err != nil {
return errors.Wrap(err, "Failed to create log collector server")
}
Expand Down
2 changes: 2 additions & 0 deletions server/log-collector/go.mod
Expand Up @@ -8,6 +8,7 @@ require (
github.com/nuclio/errors v0.0.4
github.com/nuclio/logger v0.0.1
github.com/nuclio/loggerus v0.0.6
github.com/samber/lo v1.39.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.6.0
Expand Down Expand Up @@ -41,6 +42,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions server/log-collector/go.sum
Expand Up @@ -102,6 +102,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.8.0/go.mod h1:4GuYW9TZmE769R5STWrRakJc4UqQ3+QQ95fyz7ENv1A=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand All @@ -128,6 +130,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
9 changes: 6 additions & 3 deletions server/log-collector/pkg/common/consts.go
Expand Up @@ -23,9 +23,6 @@ const (
ErrCodeBadRequest
)

const DefaultErrorStackDepth = 3

// Buffer sizes
const (
// DefaultLogCollectionBufferSize is the default buffer size for collecting logs from pods
DefaultLogCollectionBufferSize int = 10 * 1024 * 1024 // 10MB
Expand All @@ -37,6 +34,12 @@ const (
// LogTimeUpdateBytesInterval is the bytes amount to read between updates of the
// last log time in the in memory state
LogTimeUpdateBytesInterval int = 4 * 1024 // 4KB

// DefaultListRunsChunkSize is the default chunk size for listing runs
DefaultListRunsChunkSize int = 10

// DefaultErrorStackDepth is the default stack depth for errors
DefaultErrorStackDepth = 3
)

// Custom errors
Expand Down
98 changes: 79 additions & 19 deletions server/log-collector/pkg/services/logcollector/logcollector_test.go
Expand Up @@ -68,6 +68,7 @@ func (suite *LogCollectorTestSuite) SetupSuite() {
bufferSizeBytes := 100
clusterizationRole := "chief"
advancedLogLevel := 0
listRunsChunkSize := 10

// create base dir
suite.baseDir = path.Join(os.TempDir(), "/log_collector_test")
Expand All @@ -88,7 +89,8 @@ func (suite *LogCollectorTestSuite) SetupSuite() {
bufferSizeBytes,
bufferSizeBytes,
common.LogTimeUpdateBytesInterval,
advancedLogLevel)
advancedLogLevel,
listRunsChunkSize)
suite.Require().NoError(err, "Failed to create log collector server")

suite.logger.InfoWith("Setup complete")
Expand Down Expand Up @@ -628,24 +630,7 @@ func (suite *LogCollectorTestSuite) TestStopLog() {
projectNum := 2
var projectToRuns = map[string][]string{}

for i := 0; i < projectNum; i++ {
projectName := fmt.Sprintf("project-%d", i)

// add log item to the server's states, so no error will be returned
for j := 0; j < logItemsNum; j++ {
runUID := uuid.New().String()
projectToRuns[projectName] = append(projectToRuns[projectName], runUID)
selector := fmt.Sprintf("run=%s", runUID)

// Add state to the log collector's state manifest
err = suite.logCollectorServer.stateManifest.AddLogItem(suite.ctx, runUID, selector, projectName)
suite.Require().NoError(err, "Failed to add log item to the state manifest")

// Add state to the log collector's current state
err = suite.logCollectorServer.currentState.AddLogItem(suite.ctx, runUID, selector, projectName)
suite.Require().NoError(err, "Failed to add log item to the current state")
}
}
suite.createLogItems(projectNum, logItemsNum, projectToRuns)

// write state
err = suite.logCollectorServer.stateManifest.WriteState(suite.logCollectorServer.stateManifest.GetState())
Expand Down Expand Up @@ -802,6 +787,81 @@ func (suite *LogCollectorTestSuite) TestGetLogFilePath() {
suite.Require().Equal(runFilePath, logFilePath, "Expected log file path to be the same as the run file path")
}

func (suite *LogCollectorTestSuite) TestListRunsInProgress() {
listRunsInProgress := func(request *log_collector.ListRunsRequest) []string {
nopStream := &nop.ListRunsResponseStreamNop{}
err := suite.logCollectorServer.ListRunsInProgress(request, nopStream)
suite.Require().NoError(err, "Failed to list runs in progress")
return nopStream.RunUIDs

}

verifyRuns := func(expectedRunUIDs []string, responseRunUIDs []string) {
suite.Require().Equal(len(expectedRunUIDs), len(responseRunUIDs))
for _, runUID := range responseRunUIDs {
suite.Require().Contains(expectedRunUIDs, runUID, "Expected runUID to be in the expected list")
}
}

// list runs without any runs in progress
runsInProgress := listRunsInProgress(&log_collector.ListRunsRequest{})
suite.Require().Empty(runsInProgress, "Expected no runs in progress")

// create log items in progress
projectNum := 5
logItemsNum := 5
var projectToRuns = map[string][]string{}
suite.createLogItems(projectNum, logItemsNum, projectToRuns)
defer func() {
// remove projects from state manifest and current state when test is done to avoid conflicts with other tests
for project := range projectToRuns {
err := suite.logCollectorServer.stateManifest.RemoveProject(project)
suite.Assert().NoError(err, "Failed to remove project from state manifest")

err = suite.logCollectorServer.currentState.RemoveProject(project)
suite.Assert().NoError(err, "Failed to remove project from current state")
}
}()

var expectedRunUIDs []string
for _, runs := range projectToRuns {
expectedRunUIDs = append(expectedRunUIDs, runs...)
}

// list runs in progress for all projects
runsInProgress = listRunsInProgress(&log_collector.ListRunsRequest{})
verifyRuns(expectedRunUIDs, runsInProgress)

// list runs in progress for a specific project
projectName := "project-1"
expectedRunUIDs = projectToRuns[projectName]
runsInProgress = listRunsInProgress(&log_collector.ListRunsRequest{Project: projectName})
verifyRuns(expectedRunUIDs, runsInProgress)
}

// createLogItems creates `logItemsNum` log items for `projectNum` projects, and adds them to the server's states
func (suite *LogCollectorTestSuite) createLogItems(projectNum int, logItemsNum int, projectToRuns map[string][]string) {
var err error
for i := 0; i < projectNum; i++ {
projectName := fmt.Sprintf("project-%d", i)

// add log item to the server's states, so no error will be returned
for j := 0; j < logItemsNum; j++ {
runUID := uuid.New().String()
projectToRuns[projectName] = append(projectToRuns[projectName], runUID)
selector := fmt.Sprintf("run=%s", runUID)

// Add state to the log collector's state manifest
err = suite.logCollectorServer.stateManifest.AddLogItem(suite.ctx, runUID, selector, projectName)
suite.Require().NoError(err, "Failed to add log item to the state manifest")

// Add state to the log collector's current state
err = suite.logCollectorServer.currentState.AddLogItem(suite.ctx, runUID, selector, projectName)
suite.Require().NoError(err, "Failed to add log item to the current state")
}
}
}

func TestLogCollectorTestSuite(t *testing.T) {
suite.Run(t, new(LogCollectorTestSuite))
}

0 comments on commit 467e492

Please sign in to comment.