Skip to content

Commit

Permalink
Monitor runs from server (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hedingber committed Oct 15, 2020
1 parent 388a0dd commit b1697b6
Show file tree
Hide file tree
Showing 31 changed files with 1,423 additions and 972 deletions.
4 changes: 2 additions & 2 deletions mlrun/__main__.py
Expand Up @@ -780,14 +780,14 @@ def validate_kind(ctx, param, value):
@click.option("--api", help="api service url")
@click.option("--label-selector", "-ls", default="", help="label selector")
@click.option(
"--force", "-f", is_flag=True, help="clean resources in transient states as well"
"--force", "-f", is_flag=True, help="clean resources in non-terminal states as well"
)
@click.option(
"--grace-period",
"-gp",
type=int,
default=mlconf.runtime_resources_deletion_grace_period,
help="the grace period (in seconds) that will be given to runtime resources (after they're in stable state) "
help="the grace period (in seconds) that will be given to runtime resources (after they're in terminal state) "
"before cleaning them. Ignored when --force is given",
show_default=True,
)
Expand Down
4 changes: 2 additions & 2 deletions mlrun/api/api/endpoints/logs.py
Expand Up @@ -25,7 +25,7 @@ def get_log(
offset: int = 0,
db_session: Session = Depends(deps.get_db_session),
):
out, status = crud.Logs.get_log(db_session, project, uid, size, offset)
run_state, log = crud.Logs.get_logs(db_session, project, uid, size, offset)
return Response(
content=out, media_type="text/plain", headers={"pod_status": status}
content=log, media_type="text/plain", headers={"x-mlrun-run-state": run_state}
)
11 changes: 6 additions & 5 deletions mlrun/api/api/endpoints/submit.py
Expand Up @@ -2,11 +2,10 @@
from typing import Optional

from fastapi import APIRouter, Depends, Request, Header
from fastapi.concurrency import run_in_threadpool
from sqlalchemy.orm import Session

import mlrun.api.api.utils
from mlrun.api.api import deps
from mlrun.api.api.utils import log_and_raise, submit
from mlrun.utils import logger

router = APIRouter()
Expand All @@ -26,7 +25,9 @@ async def submit_job(
try:
data = await request.json()
except ValueError:
log_and_raise(HTTPStatus.BAD_REQUEST.value, reason="bad JSON body")
mlrun.api.api.utils.log_and_raise(
HTTPStatus.BAD_REQUEST.value, reason="bad JSON body"
)

# enrich job task with the username from the request header
if username:
Expand All @@ -37,6 +38,6 @@ async def submit_job(
labels.setdefault("v3io_user", username)
labels.setdefault("owner", username)

logger.info("submit_job: {}".format(data))
response = await run_in_threadpool(submit, db_session, data)
logger.info("Submit run", data=data)
response = await mlrun.api.api.utils.submit_run(db_session, data)
return response
33 changes: 25 additions & 8 deletions mlrun/api/api/utils.py
@@ -1,9 +1,11 @@
import traceback
import typing
from http import HTTPStatus
from os import environ
from pathlib import Path

from fastapi import HTTPException, Request
from fastapi.concurrency import run_in_threadpool
from sqlalchemy.orm import Session

import mlrun.errors
Expand Down Expand Up @@ -57,7 +59,7 @@ def get_run_db_instance(db_session: Session):
return run_db


def _parse_submit_job_body(db_session: Session, data):
def _parse_submit_run_body(db_session: Session, data):
task = data.get("task")
function_dict = data.get("function")
function_url = data.get("functionUrl")
Expand Down Expand Up @@ -123,12 +125,26 @@ def _parse_submit_job_body(db_session: Session, data):
return function, task


def submit(db_session: Session, data):
async def submit_run(db_session: Session, data):
_, _, _, response = await run_in_threadpool(_submit_run, db_session, data)
return response


def _submit_run(db_session: Session, data) -> typing.Tuple[str, str, str, typing.Dict]:
"""
:return: Tuple with:
1. str of the project of the run
2. str of the kind of the function of the run
3. str of the uid of the run that started execution (None when it was scheduled)
4. dict of the response info
"""
run_uid = None
project = None
try:
fn, task = _parse_submit_job_body(db_session, data)
fn, task = _parse_submit_run_body(db_session, data)
run_db = get_run_db_instance(db_session)
fn.set_db_connection(run_db, True)
logger.info("func:\n{}".format(fn.to_yaml()))
logger.info("Submitting run", function=fn.to_dict(), task=task)
# fn.spec.rundb = "http://mlrun-api:8080"
schedule = data.get("schedule")
if schedule:
Expand All @@ -143,6 +159,7 @@ def submit(db_session: Session, data):
data,
cron_trigger,
)
project = task["metadata"]["project"]

response = {
"schedule": schedule,
Expand All @@ -151,6 +168,8 @@ def submit(db_session: Session, data):
}
else:
run = fn.run(task, watch=False)
run_uid = run.metadata.uid
project = run.metadata.project
if run:
response = run.to_dict()

Expand All @@ -165,7 +184,5 @@ def submit(db_session: Session, data):
HTTPStatus.BAD_REQUEST.value, reason="runtime error: {}".format(err)
)

logger.info("response: %s", response)
return {
"data": response,
}
logger.info("Run submission succeeded", response=response)
return project, fn.kind, run_uid, {"data": response}
50 changes: 17 additions & 33 deletions mlrun/api/crud/logs.py
@@ -1,3 +1,4 @@
import typing
from http import HTTPStatus

from sqlalchemy.orm import Session
Expand All @@ -6,7 +7,7 @@
from mlrun.api.constants import LogSources
from mlrun.api.utils.singletons.db import get_db
from mlrun.api.utils.singletons.k8s import get_k8s
from mlrun.utils import get_in, now_date, update_in
from mlrun.runtimes.constants import PodPhases


class Logs:
Expand All @@ -19,56 +20,39 @@ def store_log(body: bytes, project: str, uid: str, append: bool = True):
fp.write(body)

@staticmethod
def get_log(
def get_logs(
db_session: Session,
project: str,
uid: str,
size: int = -1,
offset: int = 0,
source: LogSources = LogSources.AUTO,
):
) -> typing.Tuple[str, bytes]:
"""
:return: Tuple with:
1. str of the run state (so watchers will know whether to continue polling for logs)
2. bytes of the logs themselves
"""
out = b""
log_file = log_path(project, uid)
pod_status = None
data = get_db().read_run(db_session, uid, project)
if not data:
log_and_raise(HTTPStatus.NOT_FOUND.value, project=project, uid=uid)
run_state = data.get("status", {}).get("state", "")
if log_file.exists() and source in [LogSources.AUTO, LogSources.PERSISTENCY]:
with log_file.open("rb") as fp:
fp.seek(offset)
out = fp.read(size)
pod_status = ""
elif source in [LogSources.AUTO, LogSources.K8S]:
data = get_db().read_run(db_session, uid, project)
if not data:
log_and_raise(HTTPStatus.NOT_FOUND.value, project=project, uid=uid)

pod_status = get_in(data, "status.state", "")
if get_k8s():
pods = get_k8s().get_logger_pods(uid)
pods = get_k8s().get_logger_pods(project, uid)
if pods:
pod, new_status = list(pods.items())[0]
new_status = new_status.lower()

# TODO: handle in cron/tracking
if new_status != "pending":
pod, pod_phase = list(pods.items())[0]
if pod_phase != PodPhases.pending:
resp = get_k8s().logs(pod)
if resp:
out = resp.encode()[offset:]
if pod_status == "running":
now = now_date().isoformat()
update_in(data, "status.last_update", now)
if new_status == "failed":
update_in(data, "status.state", "error")
update_in(data, "status.error", "error, check logs")
get_db().store_run(db_session, data, uid, project)
if new_status == "succeeded":
update_in(data, "status.state", "completed")
get_db().store_run(db_session, data, uid, project)
pod_status = new_status
elif pod_status == "running":
update_in(data, "status.state", "error")
update_in(data, "status.error", "pod not found, maybe terminated")
get_db().store_run(db_session, data, uid, project)
pod_status = "failed"
return out, pod_status
return run_state, out

@staticmethod
def get_log_mtime(project: str, uid: str) -> int:
Expand Down
17 changes: 11 additions & 6 deletions mlrun/api/db/sqldb/db.py
Expand Up @@ -92,24 +92,27 @@ def _delete_logs(self, session: Session, project: str):
for log in self._query(session, Log, project=project):
self.delete_log(session, project, log.uid)

def store_run(self, session, struct, uid, project="", iter=0):
def store_run(self, session, run_data, uid, project="", iter=0):
project = project or config.default_project
logger.debug(
"Storing run to db", project=project, uid=uid, iter=iter, run=run_data
)
self._ensure_project(session, project)
run = self._get_run(session, uid, project, iter)
if not run:
run = Run(
uid=uid,
project=project,
iteration=iter,
state=run_state(struct),
start_time=run_start_time(struct) or datetime.now(timezone.utc),
state=run_state(run_data),
start_time=run_start_time(run_data) or datetime.now(timezone.utc),
)
labels = run_labels(struct)
new_state = run_state(struct)
labels = run_labels(run_data)
new_state = run_state(run_data)
if new_state:
run.state = new_state
update_labels(run, labels)
run.struct = struct
run.struct = run_data
self._upsert(session, run, ignore=True)

def update_run(self, session, updates: dict, uid, project="", iter=0):
Expand Down Expand Up @@ -746,6 +749,8 @@ def _upsert(self, session, obj, ignore=False):

def _find_runs(self, session, uid, project, labels):
labels = label_set(labels)
if project == "*":
project = None
query = self._query(session, Run, uid=uid, project=project)
return self._add_labels_filter(session, query, Run, labels)

Expand Down
44 changes: 36 additions & 8 deletions mlrun/api/main.py
@@ -1,6 +1,7 @@
import uuid

import fastapi
import fastapi.concurrency
import uvicorn
import uvicorn.protocols.utils
from fastapi.exception_handlers import http_exception_handler
Expand All @@ -14,13 +15,11 @@
cancel_periodic_functions,
)
from mlrun.api.utils.singletons.db import get_db, initialize_db
from mlrun.api.utils.singletons.k8s import initialize_k8s
from mlrun.api.utils.singletons.logs_dir import initialize_logs_dir
from mlrun.api.utils.singletons.scheduler import initialize_scheduler, get_scheduler
from mlrun.config import config
from mlrun.k8s_utils import get_k8s_helper
from mlrun.runtimes import RuntimeKinds
from mlrun.runtimes import get_runtime_handler
from mlrun.runtimes import RuntimeKinds, get_runtime_handler
from mlrun.utils import logger

app = fastapi.FastAPI(
Expand Down Expand Up @@ -120,6 +119,7 @@ async def startup_event():
# periodic cleanup is not needed if we're not inside kubernetes cluster
if get_k8s_helper(silent=True).is_running_inside_kubernetes_cluster():
_start_periodic_cleanup()
_start_periodic_runs_monitoring()


@app.on_event("shutdown")
Expand All @@ -131,22 +131,50 @@ async def shutdown_event():
async def _initialize_singletons():
initialize_db()
await initialize_scheduler()
initialize_k8s()
initialize_logs_dir()


def _start_periodic_cleanup():
logger.info("Starting periodic runtimes cleanup")
run_function_periodically(int(config.runtimes_cleanup_interval), _cleanup_runtimes)
interval = int(config.runtimes_cleanup_interval)
if interval > 0:
logger.info("Starting periodic runtimes cleanup", interval=interval)
run_function_periodically(interval, _cleanup_runtimes)


def _start_periodic_runs_monitoring():
interval = int(config.runs_monitoring_interval)
if interval > 0:
logger.info("Starting periodic runs monitoring", interval=interval)
run_function_periodically(interval, _monitor_runs)


def _monitor_runs():
db_session = create_session()
try:
for kind in RuntimeKinds.runtime_with_handlers():
try:
runtime_handler = get_runtime_handler(kind)
runtime_handler.monitor_runs(get_db(), db_session)
except Exception as exc:
logger.warning(
"Failed monitoring runs. Ignoring", exc=str(exc), kind=kind
)
finally:
close_session(db_session)


def _cleanup_runtimes():
logger.debug("Cleaning runtimes")
db_session = create_session()
try:
for kind in RuntimeKinds.runtime_with_handlers():
runtime_handler = get_runtime_handler(kind)
runtime_handler.delete_resources(get_db(), db_session)
try:
runtime_handler = get_runtime_handler(kind)
runtime_handler.delete_resources(get_db(), db_session)
except Exception as exc:
logger.warning(
"Failed deleting resources. Ignoring", exc=str(exc), kind=kind
)
finally:
close_session(db_session)

Expand Down
12 changes: 6 additions & 6 deletions mlrun/api/utils/scheduler.py
Expand Up @@ -211,7 +211,7 @@ def _resolve_job_function(

if scheduled_kind == schemas.ScheduleKinds.job:
scheduled_object_copy = copy.deepcopy(scheduled_object)
return Scheduler.submit_job_wrapper, [scheduled_object_copy], {}
return Scheduler.submit_task_wrapper, [scheduled_object_copy], {}
if scheduled_kind == schemas.ScheduleKinds.local_function:
return scheduled_object, None, None

Expand All @@ -227,21 +227,21 @@ def _resolve_job_id(self, project, name) -> str:
return self._job_id_separator.join([project, name])

@staticmethod
def submit_job_wrapper(scheduled_object):
def submit_task_wrapper(scheduled_object):
# import here to avoid circular imports
from mlrun.api.api.utils import submit
from mlrun.api.api.utils import submit_run

# removing the schedule from the body otherwise when the scheduler will submit this job it will go to an
# removing the schedule from the body otherwise when the scheduler will submit this task it will go to an
# endless scheduling loop
scheduled_object.pop("schedule", None)

# removing the uid from the task metadata so that a new uid will be generated for every run
# otherwise all jobs will have the same uid
# otherwise all runs will have the same uid
scheduled_object.get("task", {}).get("metadata", {}).pop("uid", None)

db_session = create_session()

submit(db_session, scheduled_object)
submit_run(db_session, scheduled_object)

close_session(db_session)

Expand Down

0 comments on commit b1697b6

Please sign in to comment.