Skip to content

Commit

Permalink
[KFP] Fix Pipeline Formatting and Project Deletion not Deleting KFP E…
Browse files Browse the repository at this point in the history
…xperiments (#5602)

* fix pipeline formatting

* nicer

* nicer

* fix for no resource references

---------

Co-authored-by: quaark <a.melnick@icloud.com>
  • Loading branch information
quaark and quaark committed May 21, 2024
1 parent a73ad7d commit 3de848d
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pipeline-adapters/mlrun-pipelines-kfp-common/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

setup(
name="mlrun-pipelines-kfp-common-experiment",
version="0.2.0",
version="0.2.1",
description="MLRun Pipelines package for providing KFP 1.8 compatibility",
author="Yaron Haviv",
author_email="yaronh@iguazio.com",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ def __bool__(self) -> bool:

def to_dict(self) -> dict:
"""
Converts the mapping to a dict. This method follows the attribute rules defined on __iter__
Converts the mapping to a dict. The dict is the result of merging the external data dict with
the class attributes, where the class attributes take precedence.
:returns: a dict representation of the mapping.
"""
return {a: getattr(self, a, None) for a in self}
data = self._external_data.copy()
data.update({a: getattr(self, a, None) for a in self})
return data
2 changes: 1 addition & 1 deletion pipeline-adapters/mlrun-pipelines-kfp-v1-8/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

setup(
name="mlrun-pipelines-kfp-v1-8-experiment",
version="0.2.3",
version="0.2.4",
description="MLRun Pipelines package for providing KFP 1.8 compatibility",
author="Yaron Haviv",
author_email="yaronh@iguazio.com",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ def finished_at(self):
def finished_at(self, finished_at):
self._external_data["finished_at"] = finished_at

@property
def experiment_id(self) -> str:
for reference in self._external_data.get("resource_references") or []:
data = reference.get("key", {})
if (
data.get("type", "") == "EXPERIMENT"
and reference.get("relationship", "") == "OWNER"
and reference.get("name", "") != "Default"
):
return data.get("id", "")
return ""

def workflow_manifest(self) -> PipelineManifest:
return self._workflow_manifest

Expand Down
2 changes: 1 addition & 1 deletion pipeline-adapters/mlrun-pipelines-kfp-v2/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

setup(
name="mlrun-pipelines-kfp-v2-experiment",
version="0.2.0",
version="0.2.1",
description="MLRun Pipelines package for providing KFP 2.* compatibility",
author="Yaron Haviv",
author_email="yaronh@iguazio.com",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import typing

from kfp.dsl import PipelineTask
from mlrun_pipelines.common.helpers import FlexibleMapper
Expand All @@ -20,6 +21,31 @@
PipelineNodeWrapper = PipelineTask


class PipelineStep(FlexibleMapper):
@property
def step_type(self):
raise NotImplementedError

@property
def node_name(self):
raise NotImplementedError

@property
def phase(self):
raise NotImplementedError

@property
def skipped(self):
raise NotImplementedError

@property
def display_name(self):
raise NotImplementedError

def get_annotation(self, annotation_name: str):
raise NotImplementedError


class PipelineManifest(FlexibleMapper):
"""
A Pipeline Manifest might have been created by an 1.8 SDK regardless of coming from a 2.0 API,
Expand All @@ -45,6 +71,9 @@ def get_executors(self):
else:
yield from self._external_data["deploymentSpec"]["executors"].items()

def get_steps(self) -> typing.Generator[PipelineStep, None, None]:
raise NotImplementedError


class PipelineRun(FlexibleMapper):
@property
Expand Down Expand Up @@ -99,6 +128,9 @@ def finished_at(self):
def finished_at(self, finished_at):
self._external_data["finished_at"] = finished_at

def experiment_id(self) -> str:
raise NotImplementedError

def workflow_manifest(self) -> PipelineManifest:
return PipelineManifest(
self._external_data["pipeline_spec"],
Expand Down
22 changes: 5 additions & 17 deletions server/api/crud/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,19 @@ def delete_pipelines_runs(
experiment_ids = set()
for pipeline_run in project_pipeline_runs:
try:
pipeline_run = PipelineRun(pipeline_run)
# delete pipeline run also terminates it if it is in progress
kfp_client._run_api.delete_run(pipeline_run["id"])
experiment_id = self._get_experiment_id_from_run(pipeline_run)
if experiment_id:
experiment_ids.add(self._get_experiment_id_from_run(pipeline_run))
kfp_client._run_api.delete_run(pipeline_run.id)
if pipeline_run.experiment_id:
experiment_ids.add(pipeline_run.experiment_id)
succeeded += 1
except Exception as exc:
# we don't want to fail the entire delete operation if we failed to delete a single pipeline run
# so it won't fail the delete project operation. we will log the error and continue
logger.warning(
"Failed to delete pipeline run",
project_name=project_name,
pipeline_run_id=pipeline_run["id"],
pipeline_run_id=pipeline_run.id,
exc_info=exc,
)
failed += 1
Expand Down Expand Up @@ -389,18 +389,6 @@ def _resolve_project_from_command(
def resolve_project_from_pipeline(self, pipeline: PipelineRun):
return self.resolve_project_from_workflow_manifest(pipeline.workflow_manifest())

@staticmethod
def _get_experiment_id_from_run(run: dict) -> str:
for reference in run.get("resource_references", []):
data = reference.get("key", {})
if (
data.get("type", "") == "EXPERIMENT"
and reference.get("relationship", "") == "OWNER"
and reference.get("name", "") != "Default"
):
return data.get("id", "")
return ""

def _filter_runs_by_name(self, runs: list, target_name: str) -> list:
"""Filter runs by their name while ignoring the project string on them
Expand Down

0 comments on commit 3de848d

Please sign in to comment.