Skip to content

Commit

Permalink
[Pipelines] Enable listing pipelines by project (#626)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hedingber committed Dec 30, 2020
1 parent de5ba23 commit b53f5e5
Show file tree
Hide file tree
Showing 20 changed files with 799 additions and 94 deletions.
2 changes: 1 addition & 1 deletion dockerfiles/base/requirements.txt
Expand Up @@ -9,7 +9,7 @@ lz4~=3.0
msgpack~=1.0
numpy~=1.18
scikit-learn~=0.23.0
scikit-plot~=0.3.0
scikit-plot~=0.3.7
tornado~=6.0
urllib3~=1.25
vaex~=2.6
Expand Down
2 changes: 1 addition & 1 deletion dockerfiles/jupyter/requirements.txt
Expand Up @@ -2,6 +2,6 @@ matplotlib~=3.0
scipy~=1.0
scikit-learn~=0.23.0
seaborn~=0.11.0
scikit-plot~=0.3.0
scikit-plot~=0.3.7
xgboost~=1.1

2 changes: 1 addition & 1 deletion dockerfiles/mlrun/requirements.txt
Expand Up @@ -2,4 +2,4 @@ matplotlib~=3.0
scipy~=1.0
scikit-learn~=0.23.0
seaborn~=0.11.0
scikit-plot~=0.3.0
scikit-plot~=0.3.7
4 changes: 0 additions & 4 deletions mlrun/api/api/api.py
Expand Up @@ -15,7 +15,6 @@
schedules,
submit,
tags,
workflows,
feature_sets,
)

Expand Down Expand Up @@ -59,9 +58,6 @@
api_router.include_router(
tags.router, tags=["tags"], dependencies=[Depends(deps.AuthVerifier)]
)
api_router.include_router(
workflows.router, tags=["workflows"], dependencies=[Depends(deps.AuthVerifier)]
)
api_router.include_router(
feature_sets.router,
tags=["feature-sets"],
Expand Down
29 changes: 29 additions & 0 deletions mlrun/api/api/endpoints/pipelines.py
Expand Up @@ -8,13 +8,42 @@
from fastapi.concurrency import run_in_threadpool
from kfp import Client as kfclient

import mlrun.api.crud
import mlrun.api.schemas
from mlrun.api.api.utils import log_and_raise
from mlrun.config import config
from mlrun.k8s_utils import get_k8s_helper
from mlrun.utils import logger

router = APIRouter()


@router.get(
"/projects/{project}/pipelines", response_model=mlrun.api.schemas.PipelinesOutput
)
def list_pipelines(
project: str,
namespace: str = None,
sort_by: str = "",
page_token: str = "",
filter_: str = Query("", alias="filter"),
format_: mlrun.api.schemas.Format = Query(
mlrun.api.schemas.Format.metadata_only, alias="format"
),
page_size: int = Query(None, gt=0, le=200),
):
total_size, next_page_token, runs = None, None, None
if get_k8s_helper(silent=True).is_running_inside_kubernetes_cluster():
total_size, next_page_token, runs = mlrun.api.crud.list_pipelines(
project, namespace, sort_by, page_token, filter_, format_, page_size,
)
return mlrun.api.schemas.PipelinesOutput(
runs=runs or [],
total_size=total_size or 0,
next_page_token=next_page_token or None,
)


# curl -d@/path/to/pipe.yaml http://localhost:8080/submit_pipeline
@router.post("/submit_pipeline")
@router.post("/submit_pipeline/")
Expand Down
33 changes: 0 additions & 33 deletions mlrun/api/api/endpoints/workflows.py

This file was deleted.

1 change: 1 addition & 0 deletions mlrun/api/crud/__init__.py
@@ -1 +1,2 @@
from .logs import Logs # noqa: F401
from .pipelines import list_pipelines # noqa: F401
186 changes: 186 additions & 0 deletions mlrun/api/crud/pipelines.py
@@ -0,0 +1,186 @@
import ast
import json
import typing

import kfp

import mlrun
import mlrun.api.schemas
import mlrun.errors
import mlrun.utils.helpers
from mlrun.utils import logger


def list_pipelines(
project: str,
namespace: str = "",
sort_by: str = "",
page_token: str = "",
filter_: str = "",
format_: mlrun.api.schemas.Format = mlrun.api.schemas.Format.metadata_only,
page_size: typing.Optional[int] = None,
) -> typing.Tuple[int, typing.Optional[int], typing.List[dict]]:
if project != "*" and (page_token or page_size or sort_by or filter_):
raise mlrun.errors.MLRunInvalidArgumentError(
"Filtering by project can not be used together with pagination, sorting, or custom filter"
)
namespace = namespace or mlrun.mlconf.namespace
kfp_client = kfp.Client(namespace=namespace)
if project != "*":
run_dicts = []
while page_token is not None:
response = kfp_client._run_api.list_runs(
page_token=page_token,
page_size=mlrun.api.schemas.PipelinesPagination.max_page_size,
)
run_dicts.extend([run.to_dict() for run in response.runs or []])
page_token = response.next_page_token
project_runs = []
for run_dict in run_dicts:
run_project = _resolve_pipeline_project(run_dict)
if run_project == project:
project_runs.append(run_dict)
runs = project_runs
total_size = len(project_runs)
next_page_token = None
else:
response = kfp_client._run_api.list_runs(
page_token=page_token,
page_size=page_size
or mlrun.api.schemas.PipelinesPagination.default_page_size,
sort_by=sort_by,
filter=filter_,
)
runs = [run.to_dict() for run in response.runs or []]
total_size = response.total_size
next_page_token = response.next_page_token
runs = _format_runs(runs, format_)

return total_size, next_page_token, runs


def _format_runs(
runs: typing.List[dict],
format_: mlrun.api.schemas.Format = mlrun.api.schemas.Format.metadata_only,
) -> typing.List[dict]:
if format_ == mlrun.api.schemas.Format.full:
return runs
elif format_ == mlrun.api.schemas.Format.metadata_only:
formatted_runs = []
for run in runs:
formatted_runs.append(
{
k: str(v)
for k, v in run.items()
if k
in [
"id",
"name",
"status",
"error",
"created_at",
"scheduled_at",
"finished_at",
"description",
]
}
)
return formatted_runs
elif format_ == mlrun.api.schemas.Format.name_only:
formatted_runs = []
for run in runs:
formatted_runs.append(run.get("name"))
return formatted_runs
else:
raise NotImplementedError(f"Provided format is not supported. format={format_}")


def _resolve_project_from_command(
command: typing.List[str],
hyphen_p_is_also_project: bool,
has_func_url_flags: bool,
has_runtime_flags: bool,
):
# project has precedence over function url so search for it first
for index, argument in enumerate(command):
if (
(argument == "-p" and hyphen_p_is_also_project) or argument == "--project"
) and index + 1 < len(command):
return command[index + 1]
if has_func_url_flags:
for index, argument in enumerate(command):
if (argument == "-f" or argument == "--func-url") and index + 1 < len(
command
):
function_url = command[index + 1]
if function_url.startswith("db://"):
project, _, _, _ = mlrun.utils.helpers.parse_function_uri(
function_url[len("db://") :]
)
if project:
return project
if has_runtime_flags:
for index, argument in enumerate(command):
if (argument == "-r" or argument == "--runtime") and index + 1 < len(
command
):
runtime = command[index + 1]
try:
parsed_runtime = ast.literal_eval(runtime)
except Exception as exc:
logger.warning(
"Failed parsing runtime. Skipping", runtime=runtime, exc=exc
)
else:
if isinstance(parsed_runtime, dict):
project = parsed_runtime.get("metadata", {}).get("project")
if project:
return project

return None


def _resolve_pipeline_project(pipeline):
workflow_manifest = json.loads(
pipeline.get("pipeline_spec", {}).get("workflow_manifest", "{}")
)
templates = workflow_manifest.get("spec", {}).get("templates", [])
for template in templates:
command = template.get("container", {}).get("command", [])
action = None
for index, argument in enumerate(command):
if argument == "mlrun" and index + 1 < len(command):
action = command[index + 1]
break
if action:
if action == "deploy":
project = _resolve_project_from_command(
command,
hyphen_p_is_also_project=True,
has_func_url_flags=True,
has_runtime_flags=False,
)
if project:
return project
elif action == "run":
project = _resolve_project_from_command(
command,
hyphen_p_is_also_project=False,
has_func_url_flags=True,
has_runtime_flags=True,
)
if project:
return project
elif action == "build":
project = _resolve_project_from_command(
command,
hyphen_p_is_also_project=False,
has_func_url_flags=False,
has_runtime_flags=True,
)
if project:
return project
else:
raise NotImplementedError(f"Unknown action: {action}")

return mlrun.mlconf.default_project
1 change: 1 addition & 0 deletions mlrun/api/schemas/__init__.py
Expand Up @@ -29,6 +29,7 @@
FeatureVectorsOutput,
)
from .object import ObjectMetadata, ObjectSpec, ObjectStatus, ObjectKind
from .pipeline import PipelinesPagination, PipelinesOutput
from .project import (
Project,
ProjectMetadata,
Expand Down
1 change: 1 addition & 0 deletions mlrun/api/schemas/constants.py
Expand Up @@ -8,6 +8,7 @@
class Format(str, Enum):
full = "full"
name_only = "name_only"
metadata_only = "metadata_only"


class PatchMode(str, Enum):
Expand Down
16 changes: 16 additions & 0 deletions mlrun/api/schemas/pipeline.py
@@ -0,0 +1,16 @@
import typing

import pydantic


class PipelinesPagination(str):
default_page_size = 20
# https://github.com/kubeflow/pipelines/blob/master/backend/src/apiserver/list/list.go#L363
max_page_size = 200


class PipelinesOutput(pydantic.BaseModel):
# use the format query param to control what is returned
runs: typing.List[typing.Union[dict, str]]
total_size: int
next_page_token: typing.Optional[str]
13 changes: 13 additions & 0 deletions mlrun/db/base.py
Expand Up @@ -282,3 +282,16 @@ def patch_feature_vector(
@abstractmethod
def delete_feature_vector(self, name, project=""):
pass

@abstractmethod
def list_pipelines(
self,
project: str,
namespace: str = None,
sort_by: str = "",
page_token: str = "",
filter_: str = "",
format_: Union[str, schemas.Format] = schemas.Format.metadata_only,
page_size: int = None,
) -> schemas.PipelinesOutput:
pass
16 changes: 15 additions & 1 deletion mlrun/db/filedb.py
Expand Up @@ -16,7 +16,7 @@
import pathlib
from datetime import datetime, timedelta, timezone
from os import makedirs, path, remove, scandir, listdir
from typing import List
from typing import List, Union

import yaml
from dateutil.parser import parse as parse_time
Expand Down Expand Up @@ -597,6 +597,20 @@ def patch_feature_vector(
def delete_feature_vector(self, name, project=""):
raise NotImplementedError()

def list_pipelines(
self,
project: str,
namespace: str = None,
sort_by: str = "",
page_token: str = "",
filter_: str = "",
format_: Union[
str, mlrun.api.schemas.Format
] = mlrun.api.schemas.Format.metadata_only,
page_size: int = None,
) -> mlrun.api.schemas.PipelinesOutput:
raise NotImplementedError()


def make_time_pred(since, until):
if not (since or until):
Expand Down

0 comments on commit b53f5e5

Please sign in to comment.