Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use closed sessions in scheduler #465

Merged
merged 5 commits into from
Oct 4, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 25 additions & 23 deletions mlrun/api/utils/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sqlalchemy.orm import Session

from mlrun.api import schemas
from mlrun.api.db.session import create_session, close_session
from mlrun.api.utils.singletons.db import get_db
from mlrun.config import config
from mlrun.utils import logger
Expand Down Expand Up @@ -69,7 +70,7 @@ def create_schedule(
db_session, project, name, kind, scheduled_object, cron_trigger
)
self._create_schedule_in_scheduler(
db_session, project, name, kind, scheduled_object, cron_trigger
project, name, kind, scheduled_object, cron_trigger
)

def list_schedules(
Expand Down Expand Up @@ -153,7 +154,6 @@ def _validate_cron_trigger(

def _create_schedule_in_scheduler(
self,
db_session: Session,
project: str,
name: str,
kind: schemas.ScheduleKinds,
Expand All @@ -162,9 +162,7 @@ def _create_schedule_in_scheduler(
):
job_id = self._resolve_job_id(project, name)
logger.debug("Adding schedule to scheduler", job_id=job_id)
function, args, kwargs = self._resolve_job_function(
db_session, kind, scheduled_object
)
function, args, kwargs = self._resolve_job_function(kind, scheduled_object)
self._scheduler.add_job(
function,
self.transform_schemas_cron_trigger_to_apscheduler_cron_trigger(
Expand All @@ -182,7 +180,6 @@ def _reload_schedules(self, db_session: Session):
# don't let one failure fail the rest
try:
self._create_schedule_in_scheduler(
db_session,
db_schedule.project,
db_schedule.name,
db_schedule.kind,
Expand All @@ -206,29 +203,15 @@ def _transform_db_schedule_to_schedule(
return schedule

def _resolve_job_function(
self,
db_session: Session,
scheduled_kind: schemas.ScheduleKinds,
scheduled_object: Any,
self, scheduled_kind: schemas.ScheduleKinds, scheduled_object: Any,
) -> Tuple[Callable, Optional[Union[List, Tuple]], Optional[Dict]]:
"""
:return: a tuple (function, args, kwargs) to be used with the APScheduler.add_job
"""

if scheduled_kind == schemas.ScheduleKinds.job:
# import here to avoid circular imports
from mlrun.api.api.utils import submit

# removing the schedule from the body otherwise when the scheduler will submit this job it will go to an
# endless scheduling loop
edited_scheduled_object = copy.deepcopy(scheduled_object)
edited_scheduled_object.pop("schedule", None)

# removing the uid from the task metadata so that a new uid will be generated for every run
# otherwise all jobs will have the same uid
edited_scheduled_object.get("task", {}).get("metadata", {}).pop("uid", None)

return submit, [db_session, edited_scheduled_object], {}
scheduled_object_copy = copy.deepcopy(scheduled_object)
return Scheduler.submit_job_wrapper, [scheduled_object_copy], {}
if scheduled_kind == schemas.ScheduleKinds.local_function:
return scheduled_object, None, None

Expand All @@ -243,6 +226,25 @@ def _resolve_job_id(self, project, name) -> str:
"""
return self._job_id_separator.join([project, name])

@staticmethod
def submit_job_wrapper(scheduled_object):
# import here to avoid circular imports
from mlrun.api.api.utils import submit

# removing the schedule from the body otherwise when the scheduler will submit this job it will go to an
# endless scheduling loop
scheduled_object.pop("schedule", None)

# removing the uid from the task metadata so that a new uid will be generated for every run
# otherwise all jobs will have the same uid
scheduled_object.get("task", {}).get("metadata", {}).pop("uid", None)

db_session = create_session()

submit(db_session, scheduled_object)

close_session(db_session)

@staticmethod
def transform_schemas_cron_trigger_to_apscheduler_cron_trigger(
cron_trigger: schemas.ScheduleCronTrigger,
Expand Down