Skip to content

Commit

Permalink
[Function] Fix deleting a function with schedule only (#3668)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaelgen committed May 31, 2023
1 parent 7bf2601 commit 2efe330
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 41 deletions.
28 changes: 28 additions & 0 deletions mlrun/api/api/endpoints/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from mlrun.api.api.utils import get_run_db_instance, log_and_raise, log_path
from mlrun.api.crud.secrets import Secrets, SecretsClientType
from mlrun.api.utils.builder import build_runtime
from mlrun.api.utils.singletons.scheduler import get_scheduler
from mlrun.config import config
from mlrun.errors import MLRunRuntimeError, err_to_str
from mlrun.run import new_function
Expand Down Expand Up @@ -161,6 +162,33 @@ async def delete_function(
mlrun.common.schemas.AuthorizationAction.delete,
auth_info,
)

# If the requested function has a schedule, we must delete it before deleting the function
schedule = await run_in_threadpool(
get_scheduler().get_schedule,
db_session,
project,
name,
)
if schedule:
# when deleting a function, we should also delete its schedules if exists
# schedules are only supposed to be run by the chief, therefore, if the function has a schedule,
# and we are running in worker, we send the request to the chief client
if (
mlrun.mlconf.httpdb.clusterization.role
!= mlrun.common.schemas.ClusterizationRole.chief
):
logger.info(
"Function has a schedule, deleting",
function=name,
project=project,
)
chief_client = mlrun.api.utils.clients.chief.Client()
await chief_client.delete_schedule(project=project, name=name)
else:
await run_in_threadpool(
get_scheduler().delete_schedule, db_session, project, name
)
await run_in_threadpool(
mlrun.api.crud.Functions().delete_function, db_session, project, name
)
Expand Down
11 changes: 0 additions & 11 deletions mlrun/api/db/sqldb/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,6 @@ def delete_function(self, session: Session, project: str, name: str):

# deleting tags and labels, because in sqlite the relationships aren't necessarily cascading
self._delete_function_tags(session, project, name, commit=False)
self._delete_function_schedules(session, project, name)
self._delete_class_labels(
session, Function, project=project, name=name, commit=False
)
Expand Down Expand Up @@ -1141,16 +1140,6 @@ def _delete_function_tags(self, session, project, function_name, commit=True):
if commit:
session.commit()

def _delete_function_schedules(self, session, project, function_name, commit=True):
try:
self.delete_schedule(session=session, project=project, name=function_name)
except mlrun.errors.MLRunNotFoundError:
logger.info(
"No schedules were found for function",
project=project,
function=function_name,
)

def _list_function_tags(self, session, project, function_id):
query = (
session.query(Function.Tag.name)
Expand Down
67 changes: 67 additions & 0 deletions tests/api/api/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,73 @@ async def test_list_functions_with_hash_key_versioned(
assert list_functions_results[0]["metadata"]["hash"] == hash_key


def test_delete_function_with_schedule(
db: sqlalchemy.orm.Session,
client: fastapi.testclient.TestClient,
):
# create project and function
tests.api.api.utils.create_project(client, PROJECT)

function_tag = "function-tag"
function_name = "function-name"
project_name = "project-name"

function = {
"kind": "job",
"metadata": {
"name": function_name,
"project": project_name,
"tag": function_tag,
},
"spec": {"image": "mlrun/mlrun"},
}

function_endpoint = f"projects/{PROJECT}/functions/{function_name}"
function = client.post(function_endpoint, data=mlrun.utils.dict_to_json(function))
hash_key = function.json()["hash_key"]

# generate schedule object that matches to the function and create it
scheduled_object = {
"task": {
"spec": {
"function": f"{PROJECT}/{function_name}@{hash_key}",
"handler": "handler",
},
"metadata": {"name": "my-task", "project": f"{PROJECT}"},
}
}
schedule_cron_trigger = mlrun.common.schemas.ScheduleCronTrigger(minute=1)

schedule = mlrun.common.schemas.ScheduleInput(
name=function_name,
kind=mlrun.common.schemas.ScheduleKinds.job,
scheduled_object=scheduled_object,
cron_trigger=schedule_cron_trigger,
)

endpoint = f"projects/{PROJECT}/schedules"
response = client.post(endpoint, data=mlrun.utils.dict_to_json(schedule.dict()))
assert response.status_code == HTTPStatus.CREATED.value

response = client.get(endpoint)
assert (
response.status_code == HTTPStatus.OK.value
and response.json()["schedules"][0]["name"] == function_name
)

# delete the function and assert that it has been removed, as has its schedule
response = client.delete(function_endpoint)
assert response.status_code == HTTPStatus.NO_CONTENT.value

response = client.get(function_endpoint)
assert response.status_code == HTTPStatus.NOT_FOUND.value

response = client.get(endpoint)
assert (
response.status_code == HTTPStatus.OK.value and not response.json()["schedules"]
)


@pytest.mark.asyncio
async def test_multiple_store_function_race_condition(
db: sqlalchemy.orm.Session, async_client: httpx.AsyncClient
Expand Down
30 changes: 0 additions & 30 deletions tests/api/db/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,36 +100,6 @@ def test_store_function_not_versioned(db: DBInterface, db_session: Session):
assert len(functions) == 1


def test_delete_schedule_when_deleting_function(db: DBInterface, db_session: Session):
project_name, func_name = "project", "function"
func = _generate_function()

db.store_function(db_session, func.to_dict(), func.metadata.name, versioned=True)

# creating a schedule for the created function
db.create_schedule(
db_session,
project=project_name,
name=func_name,
kind=mlrun.common.schemas.ScheduleKinds.local_function,
scheduled_object="*/15 * * * *",
cron_trigger=mlrun.common.schemas.ScheduleCronTrigger(minute="*/15"),
concurrency_limit=15,
)

# get the schedule and make sure it was created
schedule = db.get_schedule(session=db_session, project=project_name, name=func_name)
assert schedule.name == func_name

db.delete_function(session=db_session, project=project_name, name=func_name)

# ensure that both the function and the schedule have been removed
with pytest.raises(mlrun.errors.MLRunNotFoundError):
db.get_function(session=db_session, project=project_name, name=func_name)
with pytest.raises(mlrun.errors.MLRunNotFoundError):
db.get_schedule(session=db_session, project=project_name, name=func_name)


def test_get_function_by_hash_key(db: DBInterface, db_session: Session):
function_1 = _generate_function()
function_hash_key = db.store_function(
Expand Down

0 comments on commit 2efe330

Please sign in to comment.