Skip to content

Commit

Permalink
[Projects] Fix get pipeline when not remote and default to remote (#1293
Browse files Browse the repository at this point in the history
)
  • Loading branch information
yaronha committed Sep 6, 2021
1 parent da6d288 commit 82e1e55
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 31 deletions.
2 changes: 1 addition & 1 deletion mlrun/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ def project(
engine=engine,
local=local,
)
print(f"run id: {run_result.run_id}, state={run_result.state}")
print(f"run id: {run_result.run_id}")
except Exception as exc:
print(traceback.format_exc())
message = f"failed to run pipeline, {exc}"
Expand Down
43 changes: 21 additions & 22 deletions mlrun/projects/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,21 @@ def wait_for_completion(run_id, project=None, timeout=None, expected_statuses=No
def get_state(run_id, project=None):
return ""

@staticmethod
def _get_handler(workflow_handler, workflow_spec, project, secrets):
if not (workflow_handler and callable(workflow_handler)):
workflow_file = workflow_spec.get_source_file(project.spec.context)
workflow_handler = create_pipeline(
project,
workflow_file,
pipeline_context.functions,
secrets,
handler=workflow_handler or workflow_spec.handler,
)
else:
builtins.funcs = pipeline_context.functions
return workflow_handler


class _KFPRunner(_PipelineRunner):
"""Kubeflow pipelines runner"""
Expand Down Expand Up @@ -334,17 +349,9 @@ def run(
namespace=None,
) -> _PipelineRunStatus:
pipeline_context.set(project, workflow_spec)
if not workflow_handler or not callable(workflow_handler):
workflow_file = workflow_spec.get_source_file(project.spec.context)
workflow_handler = create_pipeline(
project,
workflow_file,
pipeline_context.functions,
secrets,
handler=workflow_handler,
)
else:
builtins.funcs = pipeline_context.functions
workflow_handler = _PipelineRunner._get_handler(
workflow_handler, workflow_spec, project, secrets
)

namespace = namespace or config.namespace
id = run_pipeline(
Expand Down Expand Up @@ -402,17 +409,9 @@ def run(
namespace=None,
) -> _PipelineRunStatus:
pipeline_context.set(project, workflow_spec)
if not workflow_handler or not callable(workflow_handler):
workflow_file = workflow_spec.get_source_file(project.spec.context)
workflow_handler = create_pipeline(
project,
workflow_file,
pipeline_context.functions,
secrets,
handler=workflow_handler,
)
else:
builtins.funcs = pipeline_context.functions
workflow_handler = _PipelineRunner._get_handler(
workflow_handler, workflow_spec, project, secrets
)

workflow_id = uuid.uuid4().hex
pipeline_context.workflow_id = workflow_id
Expand Down
4 changes: 4 additions & 0 deletions mlrun/projects/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ def set_workflow(
embed=False,
engine=None,
args_schema: typing.List[EntrypointParam] = None,
handler=None,
**args,
):
"""add or update a workflow, specify a name and the code path
Expand All @@ -946,6 +947,7 @@ def set_workflow(
:param embed: add the workflow code into the project.yaml
:param engine: workflow processing engine ("kfp" or "local")
:param args_schema: list of arg schema definitions (:py:class`~mlrun.model.EntrypointParam`)
:param handler: workflow function handler
:param args: argument values (key=value, ..)
"""
if not workflow_path:
Expand All @@ -960,6 +962,8 @@ def set_workflow(
workflow = {"name": name, "path": workflow_path}
if args:
workflow["args"] = args
if handler:
workflow["handler"] = handler
if args_schema:
args_schema = [
schema.to_dict() if hasattr(schema, "to_dict") else schema
Expand Down
8 changes: 4 additions & 4 deletions mlrun/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from .datastore import store_manager
from .db import get_or_set_dburl, get_run_db
from .execution import MLClientCtx
from .k8s_utils import get_k8s_helper
from .model import BaseMetadata, RunObject, RunTemplate
from .runtimes import (
DaskCluster,
Expand Down Expand Up @@ -1008,7 +1007,7 @@ def get_pipeline_if_completed(run_id, namespace=namespace):
resp = client.wait_for_run_completion(run_id, timeout)
if resp:
resp = resp.to_dict()
resp = format_summary_from_kfp_run(resp["run"])
resp = format_summary_from_kfp_run(resp)
show_kfp_run(resp)

status = resp["run"]["status"] if resp else "unknown"
Expand Down Expand Up @@ -1037,6 +1036,7 @@ def get_pipeline(
str, mlrun.api.schemas.PipelinesFormat
] = mlrun.api.schemas.PipelinesFormat.summary,
project: str = None,
remote: bool = True,
):
"""Get Pipeline status
Expand All @@ -1046,11 +1046,11 @@ def get_pipeline(
- ``summary`` (default value) - Return summary of the object data.
- ``full`` - Return full pipeline object.
:param project: the project of the pipeline run
:param remote: read kfp data from mlrun service (default=True)
:return: kfp run dict
"""
namespace = namespace or mlconf.namespace
remote = not get_k8s_helper(silent=True).is_running_inside_kubernetes_cluster()
if remote:
mldb = get_run_db()
if mldb.kind != "http":
Expand All @@ -1072,7 +1072,7 @@ def get_pipeline(
not format_
or format_ == mlrun.api.schemas.PipelinesFormat.summary.value
):
resp = format_summary_from_kfp_run(resp["run"])
resp = format_summary_from_kfp_run(resp)

show_kfp_run(resp)
return resp
Expand Down
5 changes: 1 addition & 4 deletions tests/system/projects/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _create_project(self, project_name, with_repo=False):
doc="model package/algorithm",
)
proj.set_workflow("main", "./kflow.py", args_schema=[arg])
proj.set_workflow("newflow", "./newflow.py")
proj.set_workflow("newflow", "./newflow.py", handler="newpipe")
proj.save()
return proj

Expand Down Expand Up @@ -218,7 +218,6 @@ def _test_new_pipeline(self, name, engine):
print(project.to_yaml())
run = project.run(
"newflow",
workflow_handler="newpipe",
engine=engine,
artifact_path=f"v3io:///projects/{name}",
watch=True,
Expand Down Expand Up @@ -248,8 +247,6 @@ def test_local_cli(self):
name,
"-r",
"newflow",
"--handler",
"newpipe",
"--engine",
"local",
"-w",
Expand Down

0 comments on commit 82e1e55

Please sign in to comment.