diff --git a/dockerfiles/base/requirements.txt b/dockerfiles/base/requirements.txt index f69519ff282..b273bb0465d 100644 --- a/dockerfiles/base/requirements.txt +++ b/dockerfiles/base/requirements.txt @@ -9,7 +9,7 @@ lz4~=3.0 msgpack~=1.0 numpy~=1.18 scikit-learn~=0.23.0 -scikit-plot~=0.3.0 +scikit-plot~=0.3.7 tornado~=6.0 urllib3~=1.25 vaex~=2.6 diff --git a/dockerfiles/jupyter/requirements.txt b/dockerfiles/jupyter/requirements.txt index 05d48293c72..199862afa11 100644 --- a/dockerfiles/jupyter/requirements.txt +++ b/dockerfiles/jupyter/requirements.txt @@ -2,6 +2,6 @@ matplotlib~=3.0 scipy~=1.0 scikit-learn~=0.23.0 seaborn~=0.11.0 -scikit-plot~=0.3.0 +scikit-plot~=0.3.7 xgboost~=1.1 diff --git a/dockerfiles/mlrun/requirements.txt b/dockerfiles/mlrun/requirements.txt index 009200b8e08..6848d689a8c 100644 --- a/dockerfiles/mlrun/requirements.txt +++ b/dockerfiles/mlrun/requirements.txt @@ -2,4 +2,4 @@ matplotlib~=3.0 scipy~=1.0 scikit-learn~=0.23.0 seaborn~=0.11.0 -scikit-plot~=0.3.0 +scikit-plot~=0.3.7 diff --git a/mlrun/api/api/api.py b/mlrun/api/api/api.py index 3091dc7658b..b8cfe15737f 100644 --- a/mlrun/api/api/api.py +++ b/mlrun/api/api/api.py @@ -15,7 +15,6 @@ schedules, submit, tags, - workflows, feature_sets, ) @@ -59,9 +58,6 @@ api_router.include_router( tags.router, tags=["tags"], dependencies=[Depends(deps.AuthVerifier)] ) -api_router.include_router( - workflows.router, tags=["workflows"], dependencies=[Depends(deps.AuthVerifier)] -) api_router.include_router( feature_sets.router, tags=["feature-sets"], diff --git a/mlrun/api/api/endpoints/pipelines.py b/mlrun/api/api/endpoints/pipelines.py index 9bb4d9d448c..da12c33c2ea 100644 --- a/mlrun/api/api/endpoints/pipelines.py +++ b/mlrun/api/api/endpoints/pipelines.py @@ -8,13 +8,42 @@ from fastapi.concurrency import run_in_threadpool from kfp import Client as kfclient +import mlrun.api.crud +import mlrun.api.schemas from mlrun.api.api.utils import log_and_raise from mlrun.config import config +from mlrun.k8s_utils import get_k8s_helper from mlrun.utils import logger router = APIRouter() +@router.get( + "/projects/{project}/pipelines", response_model=mlrun.api.schemas.PipelinesOutput +) +def list_pipelines( + project: str, + namespace: str = None, + sort_by: str = "", + page_token: str = "", + filter_: str = Query("", alias="filter"), + format_: mlrun.api.schemas.Format = Query( + mlrun.api.schemas.Format.metadata_only, alias="format" + ), + page_size: int = Query(None, gt=0, le=200), +): + total_size, next_page_token, runs = None, None, None + if get_k8s_helper(silent=True).is_running_inside_kubernetes_cluster(): + total_size, next_page_token, runs = mlrun.api.crud.list_pipelines( + project, namespace, sort_by, page_token, filter_, format_, page_size, + ) + return mlrun.api.schemas.PipelinesOutput( + runs=runs or [], + total_size=total_size or 0, + next_page_token=next_page_token or None, + ) + + # curl -d@/path/to/pipe.yaml http://localhost:8080/submit_pipeline @router.post("/submit_pipeline") @router.post("/submit_pipeline/") diff --git a/mlrun/api/api/endpoints/workflows.py b/mlrun/api/api/endpoints/workflows.py deleted file mode 100644 index d2dfe846255..00000000000 --- a/mlrun/api/api/endpoints/workflows.py +++ /dev/null @@ -1,33 +0,0 @@ -from fastapi import APIRouter - -from mlrun.k8s_utils import get_k8s_helper -from mlrun.run import list_pipelines - -router = APIRouter() - - -# curl http://localhost:8080/workflows?full=no -@router.get("/workflows") -def list_workflows( - experiment_id: str = None, - namespace: str = None, - sort_by: str = "", - page_token: str = "", - full: bool = False, - page_size: int = 10, -): - total_size, next_page_token, runs = None, None, None - if get_k8s_helper(silent=True).is_running_inside_kubernetes_cluster(): - total_size, next_page_token, runs = list_pipelines( - full=full, - page_token=page_token, - page_size=page_size, - sort_by=sort_by, - experiment_id=experiment_id, - namespace=namespace, - ) - return { - "runs": runs or [], - "total_size": total_size or 0, - "next_page_token": next_page_token or None, - } diff --git a/mlrun/api/crud/__init__.py b/mlrun/api/crud/__init__.py index d8b9192896b..30f0eb13b73 100644 --- a/mlrun/api/crud/__init__.py +++ b/mlrun/api/crud/__init__.py @@ -1 +1,2 @@ from .logs import Logs # noqa: F401 +from .pipelines import list_pipelines # noqa: F401 diff --git a/mlrun/api/crud/pipelines.py b/mlrun/api/crud/pipelines.py new file mode 100644 index 00000000000..fb5e9e695e2 --- /dev/null +++ b/mlrun/api/crud/pipelines.py @@ -0,0 +1,186 @@ +import ast +import json +import typing + +import kfp + +import mlrun +import mlrun.api.schemas +import mlrun.errors +import mlrun.utils.helpers +from mlrun.utils import logger + + +def list_pipelines( + project: str, + namespace: str = "", + sort_by: str = "", + page_token: str = "", + filter_: str = "", + format_: mlrun.api.schemas.Format = mlrun.api.schemas.Format.metadata_only, + page_size: typing.Optional[int] = None, +) -> typing.Tuple[int, typing.Optional[int], typing.List[dict]]: + if project != "*" and (page_token or page_size or sort_by or filter_): + raise mlrun.errors.MLRunInvalidArgumentError( + "Filtering by project can not be used together with pagination, sorting, or custom filter" + ) + namespace = namespace or mlrun.mlconf.namespace + kfp_client = kfp.Client(namespace=namespace) + if project != "*": + run_dicts = [] + while page_token is not None: + response = kfp_client._run_api.list_runs( + page_token=page_token, + page_size=mlrun.api.schemas.PipelinesPagination.max_page_size, + ) + run_dicts.extend([run.to_dict() for run in response.runs or []]) + page_token = response.next_page_token + project_runs = [] + for run_dict in run_dicts: + run_project = _resolve_pipeline_project(run_dict) + if run_project == project: + project_runs.append(run_dict) + runs = project_runs + total_size = len(project_runs) + next_page_token = None + else: + response = kfp_client._run_api.list_runs( + page_token=page_token, + page_size=page_size + or mlrun.api.schemas.PipelinesPagination.default_page_size, + sort_by=sort_by, + filter=filter_, + ) + runs = [run.to_dict() for run in response.runs or []] + total_size = response.total_size + next_page_token = response.next_page_token + runs = _format_runs(runs, format_) + + return total_size, next_page_token, runs + + +def _format_runs( + runs: typing.List[dict], + format_: mlrun.api.schemas.Format = mlrun.api.schemas.Format.metadata_only, +) -> typing.List[dict]: + if format_ == mlrun.api.schemas.Format.full: + return runs + elif format_ == mlrun.api.schemas.Format.metadata_only: + formatted_runs = [] + for run in runs: + formatted_runs.append( + { + k: str(v) + for k, v in run.items() + if k + in [ + "id", + "name", + "status", + "error", + "created_at", + "scheduled_at", + "finished_at", + "description", + ] + } + ) + return formatted_runs + elif format_ == mlrun.api.schemas.Format.name_only: + formatted_runs = [] + for run in runs: + formatted_runs.append(run.get("name")) + return formatted_runs + else: + raise NotImplementedError(f"Provided format is not supported. format={format_}") + + +def _resolve_project_from_command( + command: typing.List[str], + hyphen_p_is_also_project: bool, + has_func_url_flags: bool, + has_runtime_flags: bool, +): + # project has precedence over function url so search for it first + for index, argument in enumerate(command): + if ( + (argument == "-p" and hyphen_p_is_also_project) or argument == "--project" + ) and index + 1 < len(command): + return command[index + 1] + if has_func_url_flags: + for index, argument in enumerate(command): + if (argument == "-f" or argument == "--func-url") and index + 1 < len( + command + ): + function_url = command[index + 1] + if function_url.startswith("db://"): + project, _, _, _ = mlrun.utils.helpers.parse_function_uri( + function_url[len("db://") :] + ) + if project: + return project + if has_runtime_flags: + for index, argument in enumerate(command): + if (argument == "-r" or argument == "--runtime") and index + 1 < len( + command + ): + runtime = command[index + 1] + try: + parsed_runtime = ast.literal_eval(runtime) + except Exception as exc: + logger.warning( + "Failed parsing runtime. Skipping", runtime=runtime, exc=exc + ) + else: + if isinstance(parsed_runtime, dict): + project = parsed_runtime.get("metadata", {}).get("project") + if project: + return project + + return None + + +def _resolve_pipeline_project(pipeline): + workflow_manifest = json.loads( + pipeline.get("pipeline_spec", {}).get("workflow_manifest", "{}") + ) + templates = workflow_manifest.get("spec", {}).get("templates", []) + for template in templates: + command = template.get("container", {}).get("command", []) + action = None + for index, argument in enumerate(command): + if argument == "mlrun" and index + 1 < len(command): + action = command[index + 1] + break + if action: + if action == "deploy": + project = _resolve_project_from_command( + command, + hyphen_p_is_also_project=True, + has_func_url_flags=True, + has_runtime_flags=False, + ) + if project: + return project + elif action == "run": + project = _resolve_project_from_command( + command, + hyphen_p_is_also_project=False, + has_func_url_flags=True, + has_runtime_flags=True, + ) + if project: + return project + elif action == "build": + project = _resolve_project_from_command( + command, + hyphen_p_is_also_project=False, + has_func_url_flags=False, + has_runtime_flags=True, + ) + if project: + return project + else: + raise NotImplementedError(f"Unknown action: {action}") + + return mlrun.mlconf.default_project diff --git a/mlrun/api/schemas/__init__.py b/mlrun/api/schemas/__init__.py index 95810942808..b70a3b31504 100644 --- a/mlrun/api/schemas/__init__.py +++ b/mlrun/api/schemas/__init__.py @@ -29,6 +29,7 @@ FeatureVectorsOutput, ) from .object import ObjectMetadata, ObjectSpec, ObjectStatus, ObjectKind +from .pipeline import PipelinesPagination, PipelinesOutput from .project import ( Project, ProjectMetadata, diff --git a/mlrun/api/schemas/constants.py b/mlrun/api/schemas/constants.py index 32aa301ef2b..b24c5dde117 100644 --- a/mlrun/api/schemas/constants.py +++ b/mlrun/api/schemas/constants.py @@ -8,6 +8,7 @@ class Format(str, Enum): full = "full" name_only = "name_only" + metadata_only = "metadata_only" class PatchMode(str, Enum): diff --git a/mlrun/api/schemas/pipeline.py b/mlrun/api/schemas/pipeline.py new file mode 100644 index 00000000000..67d54649593 --- /dev/null +++ b/mlrun/api/schemas/pipeline.py @@ -0,0 +1,16 @@ +import typing + +import pydantic + + +class PipelinesPagination(str): + default_page_size = 20 + # https://github.com/kubeflow/pipelines/blob/master/backend/src/apiserver/list/list.go#L363 + max_page_size = 200 + + +class PipelinesOutput(pydantic.BaseModel): + # use the format query param to control what is returned + runs: typing.List[typing.Union[dict, str]] + total_size: int + next_page_token: typing.Optional[str] diff --git a/mlrun/db/base.py b/mlrun/db/base.py index d1834cc6668..d99c278cade 100644 --- a/mlrun/db/base.py +++ b/mlrun/db/base.py @@ -282,3 +282,16 @@ def patch_feature_vector( @abstractmethod def delete_feature_vector(self, name, project=""): pass + + @abstractmethod + def list_pipelines( + self, + project: str, + namespace: str = None, + sort_by: str = "", + page_token: str = "", + filter_: str = "", + format_: Union[str, schemas.Format] = schemas.Format.metadata_only, + page_size: int = None, + ) -> schemas.PipelinesOutput: + pass diff --git a/mlrun/db/filedb.py b/mlrun/db/filedb.py index e0e9dd189d7..8e410f9bd7d 100644 --- a/mlrun/db/filedb.py +++ b/mlrun/db/filedb.py @@ -16,7 +16,7 @@ import pathlib from datetime import datetime, timedelta, timezone from os import makedirs, path, remove, scandir, listdir -from typing import List +from typing import List, Union import yaml from dateutil.parser import parse as parse_time @@ -597,6 +597,20 @@ def patch_feature_vector( def delete_feature_vector(self, name, project=""): raise NotImplementedError() + def list_pipelines( + self, + project: str, + namespace: str = None, + sort_by: str = "", + page_token: str = "", + filter_: str = "", + format_: Union[ + str, mlrun.api.schemas.Format + ] = mlrun.api.schemas.Format.metadata_only, + page_size: int = None, + ) -> mlrun.api.schemas.PipelinesOutput: + raise NotImplementedError() + def make_time_pred(since, until): if not (since or until): diff --git a/mlrun/db/httpdb.py b/mlrun/db/httpdb.py index 7abcdb10a3a..34967e36730 100644 --- a/mlrun/db/httpdb.py +++ b/mlrun/db/httpdb.py @@ -678,6 +678,39 @@ def submit_pipeline( logger.info("submitted pipeline {} id={}".format(resp["name"], resp["id"])) return resp["id"] + def list_pipelines( + self, + project: str, + namespace: str = None, + sort_by: str = "", + page_token: str = "", + filter_: str = "", + format_: Union[ + str, mlrun.api.schemas.Format + ] = mlrun.api.schemas.Format.metadata_only, + page_size: int = None, + ) -> mlrun.api.schemas.PipelinesOutput: + if project != "*" and (page_token or page_size or sort_by or filter_): + raise mlrun.errors.MLRunInvalidArgumentError( + "Filtering by project can not be used together with pagination, sorting, or custom filter" + ) + if isinstance(format_, mlrun.api.schemas.Format): + format_ = format_.value + params = { + "namespace": namespace, + "sort_by": sort_by, + "page_token": page_token, + "filter": filter_, + "format": format_, + "page_size": page_size, + } + + error_message = f"Failed listing pipelines, query: {params}" + response = self.api_call( + "GET", f"projects/{project}/pipelines", error_message, params=params + ) + return mlrun.api.schemas.PipelinesOutput(**response.json()) + def get_pipeline(self, run_id: str, namespace: str = None, timeout: int = 10): try: @@ -969,12 +1002,14 @@ def delete_feature_vector(self, name, project=""): def list_projects( self, owner: str = None, - format_: mlrun.api.schemas.Format = mlrun.api.schemas.Format.full, + format_: Union[str, mlrun.api.schemas.Format] = mlrun.api.schemas.Format.full, labels: List[str] = None, - state: mlrun.api.schemas.ProjectState = None, + state: Union[str, mlrun.api.schemas.ProjectState] = None, ) -> List[Union[mlrun.projects.MlrunProject, str]]: if isinstance(state, mlrun.api.schemas.ProjectState): state = state.value + if isinstance(format_, mlrun.api.schemas.Format): + format_ = format_.value params = { "owner": owner, "state": state, diff --git a/mlrun/db/sqldb.py b/mlrun/db/sqldb.py index 0911162f844..c123d2b3deb 100644 --- a/mlrun/db/sqldb.py +++ b/mlrun/db/sqldb.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List +from typing import List, Union import mlrun.api.schemas from mlrun.api.db.base import DBError @@ -392,3 +392,17 @@ def delete_feature_vector(self, name, project=""): return self._transform_db_error( self.db.delete_feature_vector, self.session, project, name, ) + + def list_pipelines( + self, + project: str, + namespace: str = None, + sort_by: str = "", + page_token: str = "", + filter_: str = "", + format_: Union[ + str, mlrun.api.schemas.Format + ] = mlrun.api.schemas.Format.metadata_only, + page_size: int = None, + ) -> mlrun.api.schemas.PipelinesOutput: + raise NotImplementedError() diff --git a/mlrun/run.py b/mlrun/run.py index 5a6a528d48e..16cc2b3e6f6 100644 --- a/mlrun/run.py +++ b/mlrun/run.py @@ -15,7 +15,7 @@ import importlib.util as imputil import json import socket -from typing import Union, List +from typing import Union, List, Tuple, Optional import uuid from base64 import b64decode from copy import deepcopy @@ -27,6 +27,8 @@ from kfp import Client from nuclio import build_file +import mlrun.errors +import mlrun.api.schemas from .config import config as mlconf from .datastore import store_manager from .db import get_or_set_dburl, get_run_db @@ -910,40 +912,36 @@ def get_pipeline(run_id, namespace=None): def list_pipelines( full=False, page_token="", - page_size=10, + page_size=None, sort_by="", - experiment_id=None, + filter_="", namespace=None, -): - """List pipelines""" - namespace = namespace or mlconf.namespace - client = Client(namespace=namespace) - resp = client._run_api.list_runs( - page_token=page_token, page_size=page_size, sort_by=sort_by + project="*", + format_: mlrun.api.schemas.Format = mlrun.api.schemas.Format.metadata_only, +) -> Tuple[int, Optional[int], List[dict]]: + """List pipelines + + :param full: Deprecated, use format_ instead. if True will set format_ to full, otherwise format_ will be used + :param page_token: A page token to request the next page of results. The token is acquried from the nextPageToken + field of the response from the previous call or can be omitted when fetching the first page. + :param page_size: The number of pipelines to be listed per page. If there are more pipelines than this number, the + response message will contain a nextPageToken field you can use to fetch the next page. + :param sort_by: Can be format of \"field_name\", \"field_name asc\" or \"field_name desc\" (Example, \"name asc\" + or \"id desc\"). Ascending by default. + :param filter_: A url-encoded, JSON-serialized Filter protocol buffer + (see [filter.proto](https://github.com/kubeflow/pipelines/ blob/master/backend/api/filter.proto)). + :param namespace: Kubernetes namespace if other than default + :param project: Can be used to retrieve only specific project pipeliens. "*" for all projects. Note that filtering + by project can't be used together with pagination, sorting, or custom filter. + :param format_: Control what will be returned (full/metadata_only/name_only) + """ + if full: + format_ = mlrun.api.schemas.Format.full + run_db = get_run_db() + pipelines = run_db.list_pipelines( + project, namespace, sort_by, page_token, filter_, format_, page_size ) - runs = resp.runs - if not full and runs: - runs = [] - for run in resp.runs: - runs.append( - { - k: str(v) - for k, v in run.to_dict().items() - if k - in [ - "id", - "name", - "status", - "error", - "created_at", - "scheduled_at", - "finished_at", - "description", - ] - } - ) - - return resp.total_size, resp.next_page_token, runs + return pipelines.total_size, pipelines.next_page_token, pipelines.runs def get_object(url, secrets=None, size=None, offset=0, db=None): diff --git a/tests/api/api/test_pipelines.py b/tests/api/api/test_pipelines.py new file mode 100644 index 00000000000..fd398898b31 --- /dev/null +++ b/tests/api/api/test_pipelines.py @@ -0,0 +1,214 @@ +import http +import importlib +import unittest.mock + +import deepdiff +import fastapi.testclient +import kfp +import kfp_server_api.models +import pytest +import sqlalchemy.orm + +import mlrun.api.crud +import mlrun.api.schemas +import mlrun.api.utils.singletons.k8s + + +@pytest.fixture +def kfp_client_mock(monkeypatch) -> kfp.Client: + mlrun.api.utils.singletons.k8s.get_k8s().is_running_inside_kubernetes_cluster = unittest.mock.Mock( + return_value=True + ) + kfp_client_mock = unittest.mock.Mock() + monkeypatch.setattr(kfp, "Client", lambda *args, **kwargs: kfp_client_mock) + return kfp_client_mock + + +def test_list_pipelines_not_exploding_on_no_k8s( + db: sqlalchemy.orm.Session, client: fastapi.testclient.TestClient +) -> None: + response = client.get("/api/projects/*/pipelines") + expected_response = mlrun.api.schemas.PipelinesOutput( + runs=[], total_size=0, next_page_token=None + ) + _assert_list_pipelines_response(expected_response, response) + + +def test_list_pipelines_empty_list( + db: sqlalchemy.orm.Session, + client: fastapi.testclient.TestClient, + kfp_client_mock: kfp.Client, +) -> None: + runs = [] + _mock_list_runs(kfp_client_mock, runs) + response = client.get("/api/projects/*/pipelines") + expected_response = mlrun.api.schemas.PipelinesOutput( + runs=runs, total_size=len(runs), next_page_token=None + ) + _assert_list_pipelines_response(expected_response, response) + + +def test_list_pipelines_names_only( + db: sqlalchemy.orm.Session, + client: fastapi.testclient.TestClient, + kfp_client_mock: kfp.Client, +) -> None: + runs = _generate_run_mocks() + expected_runs = [run.name for run in runs] + _mock_list_runs(kfp_client_mock, runs) + response = client.get( + "/api/projects/*/pipelines", + params={"format": mlrun.api.schemas.Format.name_only}, + ) + expected_response = mlrun.api.schemas.PipelinesOutput( + runs=expected_runs, total_size=len(runs), next_page_token=None + ) + _assert_list_pipelines_response(expected_response, response) + + +def test_list_pipelines_metadata_only( + db: sqlalchemy.orm.Session, + client: fastapi.testclient.TestClient, + kfp_client_mock: kfp.Client, +) -> None: + runs = _generate_run_mocks() + expected_runs = [run.to_dict() for run in runs] + expected_runs = mlrun.api.crud.pipelines._format_runs( + expected_runs, mlrun.api.schemas.Format.metadata_only + ) + _mock_list_runs(kfp_client_mock, runs) + response = client.get( + "/api/projects/*/pipelines", + params={"format": mlrun.api.schemas.Format.metadata_only}, + ) + expected_response = mlrun.api.schemas.PipelinesOutput( + runs=expected_runs, total_size=len(runs), next_page_token=None + ) + _assert_list_pipelines_response(expected_response, response) + + +def test_list_pipelines_full( + db: sqlalchemy.orm.Session, + client: fastapi.testclient.TestClient, + kfp_client_mock: kfp.Client, +) -> None: + runs = _generate_run_mocks() + expected_runs = [run.to_dict() for run in runs] + _mock_list_runs(kfp_client_mock, runs) + response = client.get( + "/api/projects/*/pipelines", params={"format": mlrun.api.schemas.Format.full} + ) + expected_response = mlrun.api.schemas.PipelinesOutput( + runs=expected_runs, total_size=len(runs), next_page_token=None + ) + _assert_list_pipelines_response(expected_response, response) + + +def test_list_pipelines_specific_project( + db: sqlalchemy.orm.Session, + client: fastapi.testclient.TestClient, + kfp_client_mock: kfp.Client, +) -> None: + project = "project-name" + runs = _generate_run_mocks() + expected_runs = [run.name for run in runs] + _mock_list_runs_with_one_run_per_page(kfp_client_mock, runs) + mlrun.api.crud.pipelines._resolve_pipeline_project = unittest.mock.Mock( + return_value=project + ) + response = client.get( + f"/api/projects/{project}/pipelines", + params={"format": mlrun.api.schemas.Format.name_only}, + ) + expected_response = mlrun.api.schemas.PipelinesOutput( + runs=expected_runs, total_size=len(expected_runs), next_page_token=None + ) + _assert_list_pipelines_response(expected_response, response) + + # revert mock setting (it's global function, without reloading it the mock will persist to following tests) + importlib.reload(mlrun.api.crud.pipelines) + + +def _generate_run_mocks(): + return [ + kfp_server_api.models.api_run.ApiRun( + id="id1", + name="run1", + description="desc1", + pipeline_spec=kfp_server_api.models.api_pipeline_spec.ApiPipelineSpec( + pipeline_id="pipe_id1" + ), + ), + kfp_server_api.models.api_run.ApiRun( + id="id2", + name="run2", + description="desc2", + pipeline_spec=kfp_server_api.models.api_pipeline_spec.ApiPipelineSpec( + pipeline_id="pipe_id2" + ), + ), + kfp_server_api.models.api_run.ApiRun( + id="id3", + name="run3", + description="desc3", + pipeline_spec=kfp_server_api.models.api_pipeline_spec.ApiPipelineSpec( + pipeline_id="pipe_id3" + ), + ), + kfp_server_api.models.api_run.ApiRun( + id="id4", + name="run4", + description="desc4", + pipeline_spec=kfp_server_api.models.api_pipeline_spec.ApiPipelineSpec( + pipeline_id="pipe_id4" + ), + ), + ] + + +def _mock_list_runs_with_one_run_per_page(kfp_client_mock: kfp.Client, runs): + expected_page_tokens = [""] + for i in range(2, len(runs) + 1): + expected_page_tokens.append(i) + expected_page_tokens.append(None) + + def list_runs_mock(*args, page_token=None, page_size=None, **kwargs): + assert expected_page_tokens.pop(0) == page_token + assert mlrun.api.schemas.PipelinesPagination.max_page_size == page_size + return kfp_server_api.models.api_list_runs_response.ApiListRunsResponse( + [runs.pop(0)], 1, next_page_token=expected_page_tokens[0] + ) + + kfp_client_mock._run_api.list_runs = list_runs_mock + + +def _mock_list_runs( + kfp_client_mock: kfp.Client, + runs, + expected_page_token="", + expected_page_size=mlrun.api.schemas.PipelinesPagination.default_page_size, + expected_sort_by="", + expected_filter="", +): + def list_runs_mock( + *args, page_token=None, page_size=None, sort_by=None, filter=None, **kwargs + ): + assert expected_page_token == page_token + assert expected_page_size == page_size + assert expected_sort_by == sort_by + assert expected_filter == filter + return kfp_server_api.models.api_list_runs_response.ApiListRunsResponse( + runs, len(runs) + ) + + kfp_client_mock._run_api.list_runs = list_runs_mock + + +def _assert_list_pipelines_response( + expected_response: mlrun.api.schemas.PipelinesOutput, response +): + assert response.status_code == http.HTTPStatus.OK.value + assert ( + deepdiff.DeepDiff(expected_response.dict(), response.json(), ignore_order=True,) + == {} + ) diff --git a/tests/api/api/test_workflows.py b/tests/api/api/test_workflows.py deleted file mode 100644 index c87ab0147d5..00000000000 --- a/tests/api/api/test_workflows.py +++ /dev/null @@ -1,18 +0,0 @@ -import deepdiff -from http import HTTPStatus - -from fastapi.testclient import TestClient -from sqlalchemy.orm import Session - - -def test_list_workflows(db: Session, client: TestClient) -> None: - response = client.get("/api/workflows") - assert response.status_code == HTTPStatus.OK.value - expected_response = { - "runs": [], - "total_size": 0, - "next_page_token": None, - } - assert ( - deepdiff.DeepDiff(expected_response, response.json(), ignore_order=True,) == {} - ) diff --git a/tests/api/crud/__init__.py b/tests/api/crud/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/api/crud/test_pipelines.py b/tests/api/crud/test_pipelines.py new file mode 100644 index 00000000000..6c64612bcb0 --- /dev/null +++ b/tests/api/crud/test_pipelines.py @@ -0,0 +1,238 @@ +import json + +import mlrun.api.crud.pipelines +import mlrun.errors +import mlrun.run +import mlrun.utils.helpers + + +def test_resolve_pipeline_project(): + cases = [ + { + "expected_project": "project-from-deploy-p", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "deploy", + "-p", + "project-from-deploy-p", + ] + } + }, + }, + { + "expected_project": "project-from-deploy--project", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "deploy", + "--project", + "project-from-deploy--project", + ] + } + }, + }, + { + "expected_project": "project-from-deploy-f", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "deploy", + "-f", + "db://project-from-deploy-f/tf2-serving@2db2ec7d89c0c8c9d1b9a86279d8440ebc230597", + ] + } + }, + }, + { + "expected_project": "project-from-deploy--func-url", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "deploy", + "--func-url", + "db://project-from-deploy--func-url/tf2-serving@2db2ec7d89c0c8c9d1b9a86279d8440ebc230597", + ] + } + }, + }, + { + "expected_project": "project-from-deploy-precedence-p", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "deploy", + "--func-url", + "db://project-from-deploy--func-url/tf2-serving@2db2ec7d89c0c8c9d1b9a86279d8440ebc230597", + "-p", + "project-from-deploy-precedence-p", + ] + } + }, + }, + { + "expected_project": "project-from-run--project", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "run", + "--project", + "project-from-run--project", + ] + } + }, + }, + { + "expected_project": "project-from-run-f", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "run", + "-f", + "db://project-from-run-f/tf2-serving@2db2ec7d89c0c8c9d1b9a86279d8440ebc230597", + ] + } + }, + }, + { + "expected_project": "project-from-run--func-url", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "run", + "--func-url", + "db://project-from-run--func-url/tf2-serving@2db2ec7d89c0c8c9d1b9a86279d8440ebc230597", + ] + } + }, + }, + { + "expected_project": "project-from-run-r", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "run", + "-r", + "{'kind': 'job', 'metadata': {'project': 'project-from-run-r'}}", + ] + } + }, + }, + { + "expected_project": "project-from-run--runtime", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "run", + "--runtime", + "{'kind': 'job', 'metadata': {'project': 'project-from-run--runtime'}}", + ] + } + }, + }, + { + "expected_project": "project-from-run-precedence--project", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "run", + "--func-url", + "db://project-from-deploy--func-url/tf2-serving@2db2ec7d89c0c8c9d1b9a86279d8440ebc230597", + "--project", + "project-from-run-precedence--project", + ] + } + }, + }, + { + "expected_project": "project-from-build-r", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "build", + "-r", + "{'kind': 'job', 'metadata': {'project': 'project-from-build-r'}}", + ] + } + }, + }, + { + "expected_project": "project-from-build--runtime", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "build", + "--runtime", + "{'kind': 'job', 'metadata': {'project': 'project-from-build--runtime'}}", + ] + } + }, + }, + { + "expected_project": "project-from-build-precedence--runtime", + "template": { + "container": { + "command": [ + "python", + "-m", + "mlrun", + "build", + "--runtime", + "{'kind': 'job', 'metadata': {'project': 'project-from-build--runtime'}}", + "--project", + "project-from-build-precedence--runtime", + ] + } + }, + }, + { + "expected_project": mlrun.mlconf.default_project, + "template": {"dag": {"asdasd": "asdasd"}}, + }, + ] + for case in cases: + workflow_manifest = {"spec": {"templates": [case["template"]]}} + pipeline = { + "pipeline_spec": {"workflow_manifest": json.dumps(workflow_manifest)} + } + project = mlrun.api.crud.pipelines._resolve_pipeline_project(pipeline) + assert project == case["expected_project"]