From bc4bd1cef070ac26e3464d6a4341cb917cc4fcd8 Mon Sep 17 00:00:00 2001 From: Hedingber Date: Sun, 4 Oct 2020 23:50:13 +0300 Subject: [PATCH] Don't use closed sessions in scheduler (#465) --- mlrun/api/utils/scheduler.py | 48 +++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/mlrun/api/utils/scheduler.py b/mlrun/api/utils/scheduler.py index 3497706a308..6397cbf6a92 100644 --- a/mlrun/api/utils/scheduler.py +++ b/mlrun/api/utils/scheduler.py @@ -9,6 +9,7 @@ from sqlalchemy.orm import Session from mlrun.api import schemas +from mlrun.api.db.session import create_session, close_session from mlrun.api.utils.singletons.db import get_db from mlrun.config import config from mlrun.utils import logger @@ -69,7 +70,7 @@ def create_schedule( db_session, project, name, kind, scheduled_object, cron_trigger ) self._create_schedule_in_scheduler( - db_session, project, name, kind, scheduled_object, cron_trigger + project, name, kind, scheduled_object, cron_trigger ) def list_schedules( @@ -153,7 +154,6 @@ def _validate_cron_trigger( def _create_schedule_in_scheduler( self, - db_session: Session, project: str, name: str, kind: schemas.ScheduleKinds, @@ -162,9 +162,7 @@ def _create_schedule_in_scheduler( ): job_id = self._resolve_job_id(project, name) logger.debug("Adding schedule to scheduler", job_id=job_id) - function, args, kwargs = self._resolve_job_function( - db_session, kind, scheduled_object - ) + function, args, kwargs = self._resolve_job_function(kind, scheduled_object) self._scheduler.add_job( function, self.transform_schemas_cron_trigger_to_apscheduler_cron_trigger( @@ -182,7 +180,6 @@ def _reload_schedules(self, db_session: Session): # don't let one failure fail the rest try: self._create_schedule_in_scheduler( - db_session, db_schedule.project, db_schedule.name, db_schedule.kind, @@ -206,29 +203,15 @@ def _transform_db_schedule_to_schedule( return schedule def _resolve_job_function( - self, - db_session: Session, - scheduled_kind: schemas.ScheduleKinds, - scheduled_object: Any, + self, scheduled_kind: schemas.ScheduleKinds, scheduled_object: Any, ) -> Tuple[Callable, Optional[Union[List, Tuple]], Optional[Dict]]: """ :return: a tuple (function, args, kwargs) to be used with the APScheduler.add_job """ if scheduled_kind == schemas.ScheduleKinds.job: - # import here to avoid circular imports - from mlrun.api.api.utils import submit - - # removing the schedule from the body otherwise when the scheduler will submit this job it will go to an - # endless scheduling loop - edited_scheduled_object = copy.deepcopy(scheduled_object) - edited_scheduled_object.pop("schedule", None) - - # removing the uid from the task metadata so that a new uid will be generated for every run - # otherwise all jobs will have the same uid - edited_scheduled_object.get("task", {}).get("metadata", {}).pop("uid", None) - - return submit, [db_session, edited_scheduled_object], {} + scheduled_object_copy = copy.deepcopy(scheduled_object) + return Scheduler.submit_job_wrapper, [scheduled_object_copy], {} if scheduled_kind == schemas.ScheduleKinds.local_function: return scheduled_object, None, None @@ -243,6 +226,25 @@ def _resolve_job_id(self, project, name) -> str: """ return self._job_id_separator.join([project, name]) + @staticmethod + def submit_job_wrapper(scheduled_object): + # import here to avoid circular imports + from mlrun.api.api.utils import submit + + # removing the schedule from the body otherwise when the scheduler will submit this job it will go to an + # endless scheduling loop + scheduled_object.pop("schedule", None) + + # removing the uid from the task metadata so that a new uid will be generated for every run + # otherwise all jobs will have the same uid + scheduled_object.get("task", {}).get("metadata", {}).pop("uid", None) + + db_session = create_session() + + submit(db_session, scheduled_object) + + close_session(db_session) + @staticmethod def transform_schemas_cron_trigger_to_apscheduler_cron_trigger( cron_trigger: schemas.ScheduleCronTrigger,