Skip to content

Commit

Permalink
[Nuclio] Delete Nuclio function when removing function of Nuclio runt…
Browse files Browse the repository at this point in the history
…ime (#5462)
  • Loading branch information
rokatyy committed May 1, 2024
1 parent b0c8065 commit a0e7738
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 16 deletions.
2 changes: 2 additions & 0 deletions mlrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@
"background_tasks": {
# enabled / disabled
"timeout_mode": "enabled",
"function_deletion_batch_size": 10,
# timeout in seconds to wait for background task to be updated / finished by the worker responsible for the task
"default_timeouts": {
"operations": {
Expand All @@ -196,6 +197,7 @@
"run_abortion": "600",
"abort_grace_period": "10",
"delete_project": "900",
"delete_function": "900",
},
"runtimes": {"dask": "600"},
},
Expand Down
24 changes: 23 additions & 1 deletion mlrun/db/httpdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,29 @@ def delete_function(self, name: str, project: str = ""):
project = project or config.default_project
path = f"projects/{project}/functions/{name}"
error_message = f"Failed deleting function {project}/{name}"
self.api_call("DELETE", path, error_message)
response = self.api_call("DELETE", path, error_message, version="v2")
if response.status_code == http.HTTPStatus.ACCEPTED:
logger.info(
"Function is being deleted", project_name=project, function_name=name
)
background_task = mlrun.common.schemas.BackgroundTask(**response.json())
background_task = self._wait_for_background_task_to_reach_terminal_state(
background_task.metadata.name, project=project
)
if (
background_task.status.state
== mlrun.common.schemas.BackgroundTaskState.succeeded
):
logger.info(
"Function deleted", project_name=project, function_name=name
)
elif (
background_task.status.state
== mlrun.common.schemas.BackgroundTaskState.failed
):
logger.info(
"Function deletion failed", project_name=project, function_name=name
)

def list_functions(self, name=None, project=None, tag=None, labels=None):
"""Retrieve a list of functions, filtered by specific criteria.
Expand Down
6 changes: 6 additions & 0 deletions server/api/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
files,
frontend_spec,
functions,
functions_v2,
grafana_proxy,
healthz,
hub,
Expand Down Expand Up @@ -196,3 +197,8 @@
tags=["projects"],
dependencies=[Depends(deps.authenticate_request)],
)
api_v2_router.include_router(
functions_v2.router,
tags=["functions"],
dependencies=[Depends(deps.authenticate_request)],
)
7 changes: 6 additions & 1 deletion server/api/api/endpoints/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,13 @@ async def get_function(
}


# TODO: Remove in 1.9.0
@router.delete(
"/projects/{project}/functions/{name}", status_code=HTTPStatus.NO_CONTENT.value
"/projects/{project}/functions/{name}",
status_code=HTTPStatus.NO_CONTENT.value,
deprecated=True,
description="'/v1/projects/{project}/functions/{name}' will be removed in 1.9.0, "
"use '/v2/projects/{project}/functions/{name}' instead.",
)
async def delete_function(
request: Request,
Expand Down
113 changes: 113 additions & 0 deletions server/api/api/endpoints/functions_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import http

import fastapi
from fastapi import (
APIRouter,
Depends,
Request,
)
from fastapi.concurrency import run_in_threadpool
from sqlalchemy.orm import Session

import mlrun.common.model_monitoring
import mlrun.common.model_monitoring.helpers
import mlrun.common.schemas
import server.api.api.utils
import server.api.crud.model_monitoring.deployment
import server.api.crud.runtimes.nuclio.function
import server.api.db.session
import server.api.launcher
import server.api.utils.auth.verifier
import server.api.utils.background_tasks
import server.api.utils.clients.chief
import server.api.utils.functions
import server.api.utils.pagination
import server.api.utils.singletons.k8s
import server.api.utils.singletons.project_member
from mlrun.utils import logger
from server.api.api import deps
from server.api.utils.singletons.scheduler import get_scheduler

router = APIRouter()


@router.delete(
"/projects/{project}/functions/{name}",
responses={
http.HTTPStatus.ACCEPTED.value: {"model": mlrun.common.schemas.BackgroundTask},
},
)
async def delete_function(
background_tasks: fastapi.BackgroundTasks,
response: fastapi.Response,
request: Request,
project: str,
name: str,
auth_info: mlrun.common.schemas.AuthInfo = Depends(deps.authenticate_request),
db_session: Session = Depends(deps.get_db_session),
):
await server.api.utils.auth.verifier.AuthVerifier().query_project_resource_permissions(
mlrun.common.schemas.AuthorizationResourceTypes.function,
project,
name,
mlrun.common.schemas.AuthorizationAction.delete,
auth_info,
)
# If the requested function has a schedule, we must delete it before deleting the function
try:
function_schedule = await run_in_threadpool(
get_scheduler().get_schedule,
db_session,
project,
name,
)
except mlrun.errors.MLRunNotFoundError:
function_schedule = None

if function_schedule:
# when deleting a function, we should also delete its schedules if exists
# schedules are only supposed to be run by the chief, therefore, if the function has a schedule,
# and we are running in worker, we send the request to the chief client
if (
mlrun.mlconf.httpdb.clusterization.role
!= mlrun.common.schemas.ClusterizationRole.chief
):
logger.info(
"Function has a schedule, deleting",
function=name,
project=project,
)
chief_client = server.api.utils.clients.chief.Client()
await chief_client.delete_schedule(
project=project, name=name, request=request
)
else:
await run_in_threadpool(
get_scheduler().delete_schedule, db_session, project, name
)
task = await run_in_threadpool(
server.api.api.utils.create_function_deletion_background_task,
background_tasks,
db_session,
project,
name,
auth_info,
)

response.status_code = http.HTTPStatus.ACCEPTED.value
return task
113 changes: 112 additions & 1 deletion server/api/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import collections
import copy
import functools
Expand All @@ -29,7 +30,7 @@
import kubernetes.client
import semver
import sqlalchemy.orm
from fastapi import HTTPException
from fastapi import BackgroundTasks, HTTPException
from fastapi.concurrency import run_in_threadpool
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -1276,3 +1277,113 @@ def _verify_project_is_deleted():
True,
_verify_project_is_deleted,
)


def create_function_deletion_background_task(
background_tasks: BackgroundTasks,
db_session: sqlalchemy.orm.Session,
project_name: str,
function_name: str,
auth_info: mlrun.common.schemas.AuthInfo,
):
# create the background task for function deletion
return server.api.utils.background_tasks.ProjectBackgroundTasksHandler().create_background_task(
db_session,
project_name,
background_tasks,
_delete_function,
mlrun.mlconf.background_tasks.default_timeouts.operations.delete_function,
None,
db_session,
project_name,
function_name,
auth_info,
)


async def _delete_function(
db_session: sqlalchemy.orm.Session,
project: str,
function_name: str,
auth_info: mlrun.common.schemas.AuthInfo,
):
# getting all function tags
functions = await run_in_threadpool(
server.api.crud.Functions().list_functions,
db_session,
project,
function_name,
)
if len(functions) > 0:
# Since we request functions by a specific name and project,
# in MLRun terminology, they are all just versions of the same function
# therefore, it's enough to check the kind of the first one only
if functions[0].get("kind") in mlrun.runtimes.RuntimeKinds.nuclio_runtimes():
# generate Nuclio function names based on function tags
nuclio_function_names = [
mlrun.runtimes.nuclio.function.get_fullname(
function_name, project, function.get("metadata", {}).get("tag")
)
for function in functions
]
# delete Nuclio functions associated with the function tags in batches
failed_requests = await _delete_nuclio_functions_in_batches(
auth_info, project, nuclio_function_names
)
if failed_requests:
error_message = f"Failed to delete function {function_name}. Errors: {' '.join(failed_requests)}"
raise mlrun.errors.MLRunInternalServerError(error_message)

# delete the function from the database
await run_in_threadpool(
server.api.crud.Functions().delete_function,
db_session,
project,
function_name,
)


async def _delete_nuclio_functions_in_batches(
auth_info: mlrun.common.schemas.AuthInfo,
project_name: str,
function_names: list[str],
):
async def delete_function(
nuclio_client: server.api.utils.clients.iguazio.AsyncClient,
project: str,
function: str,
_semaphore: asyncio.Semaphore,
) -> tuple[str, str]:
async with _semaphore:
try:
await nuclio_client.delete_function(name=function, project_name=project)
return None
except Exception as exc:
# return tuple with failure info
return function, str(exc)

# Configure maximum concurrent deletions
max_concurrent_deletions = (
mlrun.mlconf.background_tasks.function_deletion_batch_size
)
semaphore = asyncio.Semaphore(max_concurrent_deletions)
failed_requests = []

async with server.api.utils.clients.async_nuclio.Client(auth_info) as client:
tasks = [
delete_function(client, project_name, function_name, semaphore)
for function_name in function_names
]

results = await asyncio.gather(*tasks, return_exceptions=True)

# process results to identify failed deletion requests
for result in results:
if isinstance(result, tuple):
nuclio_name, error_message = result
if error_message:
failed_requests.append(
f"Failed to delete nuclio function {nuclio_name}: {error_message}"
)

return failed_requests
1 change: 1 addition & 0 deletions server/api/utils/background_tasks/kinds.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ class BackgroundTaskKinds:
db_migrations = "db.migrations"
project_deletion = "project.deletion.{0}"
project_deletion_wrapper = "project.deletion.wrapper.{0}"
function_deletion = "function.deletion.{0}"

0 comments on commit a0e7738

Please sign in to comment.