-
Notifications
You must be signed in to change notification settings - Fork 239
/
logs.py
67 lines (60 loc) · 2.37 KB
/
logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import typing
from http import HTTPStatus
from sqlalchemy.orm import Session
from mlrun.api.api.utils import log_and_raise, log_path
from mlrun.api.constants import LogSources
from mlrun.api.utils.singletons.db import get_db
from mlrun.api.utils.singletons.k8s import get_k8s
from mlrun.runtimes.constants import PodPhases
class Logs:
@staticmethod
def store_log(body: bytes, project: str, uid: str, append: bool = True):
log_file = log_path(project, uid)
log_file.parent.mkdir(parents=True, exist_ok=True)
mode = "ab" if append else "wb"
with log_file.open(mode) as fp:
fp.write(body)
@staticmethod
def get_logs(
db_session: Session,
project: str,
uid: str,
size: int = -1,
offset: int = 0,
source: LogSources = LogSources.AUTO,
) -> typing.Tuple[str, bytes]:
"""
:return: Tuple with:
1. str of the run state (so watchers will know whether to continue polling for logs)
2. bytes of the logs themselves
"""
out = b""
log_file = log_path(project, uid)
data = get_db().read_run(db_session, uid, project)
if not data:
log_and_raise(HTTPStatus.NOT_FOUND.value, project=project, uid=uid)
run_state = data.get("status", {}).get("state", "")
if log_file.exists() and source in [LogSources.AUTO, LogSources.PERSISTENCY]:
with log_file.open("rb") as fp:
fp.seek(offset)
out = fp.read(size)
elif source in [LogSources.AUTO, LogSources.K8S]:
if get_k8s():
pods = get_k8s().get_logger_pods(project, uid)
if pods:
pod, pod_phase = list(pods.items())[0]
if pod_phase != PodPhases.pending:
resp = get_k8s().logs(pod)
if resp:
out = resp.encode()[offset:]
return run_state, out
@staticmethod
def get_log_mtime(project: str, uid: str) -> int:
log_file = log_path(project, uid)
if not log_file.exists():
raise FileNotFoundError(f"Log file does not exist: {log_file}")
return log_file.stat().st_mtime
@staticmethod
def log_file_exists(project: str, uid: str) -> bool:
log_file = log_path(project, uid)
return log_file.exists()