Skip to content

Commit

Permalink
Don't use closed sessions in scheduler (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hedingber committed Oct 4, 2020
1 parent c05a458 commit bc4bd1c
Showing 1 changed file with 25 additions and 23 deletions.
48 changes: 25 additions & 23 deletions mlrun/api/utils/scheduler.py
Expand Up @@ -9,6 +9,7 @@
from sqlalchemy.orm import Session

from mlrun.api import schemas
from mlrun.api.db.session import create_session, close_session
from mlrun.api.utils.singletons.db import get_db
from mlrun.config import config
from mlrun.utils import logger
Expand Down Expand Up @@ -69,7 +70,7 @@ def create_schedule(
db_session, project, name, kind, scheduled_object, cron_trigger
)
self._create_schedule_in_scheduler(
db_session, project, name, kind, scheduled_object, cron_trigger
project, name, kind, scheduled_object, cron_trigger
)

def list_schedules(
Expand Down Expand Up @@ -153,7 +154,6 @@ def _validate_cron_trigger(

def _create_schedule_in_scheduler(
self,
db_session: Session,
project: str,
name: str,
kind: schemas.ScheduleKinds,
Expand All @@ -162,9 +162,7 @@ def _create_schedule_in_scheduler(
):
job_id = self._resolve_job_id(project, name)
logger.debug("Adding schedule to scheduler", job_id=job_id)
function, args, kwargs = self._resolve_job_function(
db_session, kind, scheduled_object
)
function, args, kwargs = self._resolve_job_function(kind, scheduled_object)
self._scheduler.add_job(
function,
self.transform_schemas_cron_trigger_to_apscheduler_cron_trigger(
Expand All @@ -182,7 +180,6 @@ def _reload_schedules(self, db_session: Session):
# don't let one failure fail the rest
try:
self._create_schedule_in_scheduler(
db_session,
db_schedule.project,
db_schedule.name,
db_schedule.kind,
Expand All @@ -206,29 +203,15 @@ def _transform_db_schedule_to_schedule(
return schedule

def _resolve_job_function(
self,
db_session: Session,
scheduled_kind: schemas.ScheduleKinds,
scheduled_object: Any,
self, scheduled_kind: schemas.ScheduleKinds, scheduled_object: Any,
) -> Tuple[Callable, Optional[Union[List, Tuple]], Optional[Dict]]:
"""
:return: a tuple (function, args, kwargs) to be used with the APScheduler.add_job
"""

if scheduled_kind == schemas.ScheduleKinds.job:
# import here to avoid circular imports
from mlrun.api.api.utils import submit

# removing the schedule from the body otherwise when the scheduler will submit this job it will go to an
# endless scheduling loop
edited_scheduled_object = copy.deepcopy(scheduled_object)
edited_scheduled_object.pop("schedule", None)

# removing the uid from the task metadata so that a new uid will be generated for every run
# otherwise all jobs will have the same uid
edited_scheduled_object.get("task", {}).get("metadata", {}).pop("uid", None)

return submit, [db_session, edited_scheduled_object], {}
scheduled_object_copy = copy.deepcopy(scheduled_object)
return Scheduler.submit_job_wrapper, [scheduled_object_copy], {}
if scheduled_kind == schemas.ScheduleKinds.local_function:
return scheduled_object, None, None

Expand All @@ -243,6 +226,25 @@ def _resolve_job_id(self, project, name) -> str:
"""
return self._job_id_separator.join([project, name])

@staticmethod
def submit_job_wrapper(scheduled_object):
# import here to avoid circular imports
from mlrun.api.api.utils import submit

# removing the schedule from the body otherwise when the scheduler will submit this job it will go to an
# endless scheduling loop
scheduled_object.pop("schedule", None)

# removing the uid from the task metadata so that a new uid will be generated for every run
# otherwise all jobs will have the same uid
scheduled_object.get("task", {}).get("metadata", {}).pop("uid", None)

db_session = create_session()

submit(db_session, scheduled_object)

close_session(db_session)

@staticmethod
def transform_schemas_cron_trigger_to_apscheduler_cron_trigger(
cron_trigger: schemas.ScheduleCronTrigger,
Expand Down

0 comments on commit bc4bd1c

Please sign in to comment.