Skip to content

Commit

Permalink
[Notifications & KFP] Move Workflow Step Analysis to Pipelines-Adapte…
Browse files Browse the repository at this point in the history
…rs (#5583)

* Moved step analysis to mlrun-pipelines

* show skipped

* CR

* CR2

---------

Co-authored-by: quaark <a.melnick@icloud.com>
  • Loading branch information
quaark and quaark committed May 20, 2024
1 parent 3f1f138 commit 49d0656
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 44 deletions.
3 changes: 3 additions & 0 deletions mlrun/common/runtimes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class RunStates:
unknown = "unknown"
aborted = "aborted"
aborting = "aborting"
skipped = "skipped"

@staticmethod
def all():
Expand All @@ -148,6 +149,7 @@ def all():
RunStates.unknown,
RunStates.aborted,
RunStates.aborting,
RunStates.skipped,
]

@staticmethod
Expand All @@ -156,6 +158,7 @@ def terminal_states():
RunStates.completed,
RunStates.error,
RunStates.aborted,
RunStates.skipped,
]

@staticmethod
Expand Down
4 changes: 4 additions & 0 deletions mlrun/common/schemas/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class FunctionState:
# same goes for the build which is not coming from the pod, but is used and we can't just omit it for BC reasons
build = "build"

# for pipeline steps
skipped = "skipped"

@classmethod
def get_function_state_from_pod_state(cls, pod_state: str):
if pod_state == "succeeded":
Expand All @@ -60,6 +63,7 @@ def terminal_states(cls):
return [
cls.ready,
cls.error,
cls.skipped,
]


Expand Down
5 changes: 3 additions & 2 deletions mlrun/utils/notifications/notification/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class SlackNotification(NotificationBase):
"completed": ":smiley:",
"running": ":man-running:",
"error": ":x:",
"skipped": ":zzz:",
}

async def push(
Expand Down Expand Up @@ -157,11 +158,11 @@ def _get_run_line(self, run: dict) -> dict:

# Only show the URL if the run is not a function (serving or mlrun function)
kind = run.get("step_kind")
if url and not kind or kind == "run":
state = run["status"].get("state", "")
if state != "skipped" and (url and not kind or kind == "run"):
line = f'<{url}|*{meta.get("name")}*>'
else:
line = meta.get("name")
state = run["status"].get("state", "")
if kind:
line = f'{line} *({run.get("step_kind", run.get("kind", ""))})*'
line = f'{self.emojis.get(state, ":question:")} {line}'
Expand Down
78 changes: 38 additions & 40 deletions mlrun/utils/notifications/notification_pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import asyncio
import datetime
import json
import os
import re
import traceback
Expand All @@ -23,6 +22,7 @@

import kfp
import mlrun_pipelines.common.ops
import mlrun_pipelines.models

import mlrun.common.runtimes.constants
import mlrun.common.schemas
Expand Down Expand Up @@ -392,17 +392,29 @@ def get_workflow_steps(self, run: mlrun.model.RunObject) -> list:
steps = []
db = mlrun.get_run_db()

def _add_run_step(_node_name, _node_template, _step_kind):
_run = db.list_runs(
project=run.metadata.project,
labels=f"mlrun/runner-pod={_node_name}",
)[0]
_run["step_kind"] = _step_kind
def _add_run_step(_step: mlrun_pipelines.models.PipelineStep):
try:
_run = db.list_runs(
project=run.metadata.project,
labels=f"mlrun/runner-pod={_step.node_name}",
)[0]
except IndexError:
_run = {
"metadata": {
"name": _step.display_name,
"project": run.metadata.project,
},
}
_run["step_kind"] = _step.step_type
if _step.skipped:
_run.setdefault("status", {})["state"] = (
mlrun.common.runtimes.constants.RunStates.skipped
)
steps.append(_run)

def _add_deploy_function_step(_, _node_template, _step_kind):
def _add_deploy_function_step(_step: mlrun_pipelines.models.PipelineStep):
project, name, hash_key = self._extract_function_uri(
_node_template["metadata"]["annotations"]["mlrun/function-uri"]
_step.get_annotation("mlrun/function-uri")
)
if name:
try:
Expand All @@ -419,16 +431,19 @@ def _add_deploy_function_step(_, _node_template, _step_kind):
"hash_key": hash_key,
},
}
function["status"] = {
"state": mlrun.common.runtimes.constants.PodPhases.pod_phase_to_run_state(
node["phase"]
),
}
pod_phase = _step.phase
if _step.skipped:
state = mlrun.common.schemas.FunctionState.skipped
else:
state = mlrun.common.runtimes.constants.PodPhases.pod_phase_to_run_state(
pod_phase
)
function["status"] = {"state": state}
if isinstance(function["metadata"].get("updated"), datetime.datetime):
function["metadata"]["updated"] = function["metadata"][
"updated"
].isoformat()
function["step_kind"] = _step_kind
function["step_kind"] = _step.step_type
steps.append(function)

step_methods = {
Expand All @@ -446,26 +461,10 @@ def _add_deploy_function_step(_, _node_template, _step_kind):
return steps

try:
workflow_nodes = sorted(
workflow_manifest["status"]["nodes"].items(),
key=lambda _node: _node[1]["finishedAt"],
)
for node_name, node in workflow_nodes:
if node["type"] != "Pod":
# Skip the parent DAG node
continue

node_template = next(
template
for template in workflow_manifest["spec"]["templates"]
if template["name"] == node["templateName"]
)
step_type = node_template["metadata"]["annotations"].get(
"mlrun/pipeline-step-type"
)
step_method = step_methods.get(step_type)
for step in workflow_manifest.get_steps():
step_method = step_methods.get(step.step_type)
if step_method:
step_method(node_name, node_template, step_type)
step_method(step)
return steps
except Exception:
# If we fail to read the pipeline steps, we will return the list of runs that have the same workflow id
Expand All @@ -481,7 +480,9 @@ def _add_deploy_function_step(_, _node_template, _step_kind):
)

@staticmethod
def _get_workflow_manifest(workflow_id: str) -> typing.Optional[dict]:
def _get_workflow_manifest(
workflow_id: str,
) -> typing.Optional[mlrun_pipelines.models.PipelineManifest]:
kfp_url = mlrun.mlconf.resolve_kfp_url(mlrun.mlconf.namespace)
if not kfp_url:
raise mlrun.errors.MLRunNotFoundError(
Expand All @@ -495,11 +496,8 @@ def _get_workflow_manifest(workflow_id: str) -> typing.Optional[dict]:
if not kfp_run:
return None

kfp_run = kfp_run.to_dict()
try:
return json.loads(kfp_run["pipeline_runtime"]["workflow_manifest"])
except Exception:
return None
kfp_run = mlrun_pipelines.models.PipelineRun(kfp_run)
return kfp_run.workflow_manifest()

def _extract_function_uri(self, function_uri: str) -> tuple[str, str, str]:
"""
Expand Down
2 changes: 1 addition & 1 deletion pipeline-adapters/mlrun-pipelines-kfp-v1-8/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

setup(
name="mlrun-pipelines-kfp-v1-8-experiment",
version="0.2.2",
version="0.2.3",
description="MLRun Pipelines package for providing KFP 1.8 compatibility",
author="Yaron Haviv",
author_email="yaronh@iguazio.com",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#

import json
import typing
from typing import Any, Union

from kfp.dsl import ContainerOp
Expand All @@ -28,6 +29,42 @@
PipelineNodeWrapper = ContainerOp


class PipelineStep(FlexibleMapper):
def __init__(self, step_type, node_name, node, node_template):
data = {
"step_type": step_type,
"node_name": node_name,
"node": node,
"node_template": node_template,
}
super().__init__(data)

@property
def step_type(self):
return self._external_data["step_type"]

@property
def node_name(self):
return self._external_data["node_name"]

@property
def phase(self):
return self._external_data["node"]["phase"]

@property
def skipped(self):
return self._external_data["node"]["type"] == "Skipped"

@property
def display_name(self):
return self._external_data["node"]["displayName"]

def get_annotation(self, annotation_name: str):
return self._external_data["node_template"]["metadata"]["annotations"].get(
annotation_name
)


class PipelineManifest(FlexibleMapper):
def __init__(
self, workflow_manifest: Union[str, dict] = "{}", pipeline_manifest: str = "{}"
Expand All @@ -36,11 +73,31 @@ def __init__(
main_manifest = json.loads(workflow_manifest)
except TypeError:
main_manifest = workflow_manifest
if pipeline_manifest:
if pipeline_manifest != "{}":
pipeline_manifest = json.loads(pipeline_manifest)
main_manifest["status"] = pipeline_manifest.get("status", {})
super().__init__(main_manifest)

def get_steps(self) -> typing.Generator[PipelineStep, None, None]:
nodes = sorted(
self._external_data["status"]["nodes"].items(),
key=lambda _node: _node[1]["finishedAt"],
)
for node_name, node in nodes:
if node["type"] == "DAG":
# Skip the parent DAG node
continue

node_template = next(
template
for template in self._external_data["spec"]["templates"]
if template["name"] == node["templateName"]
)
step_type = node_template["metadata"]["annotations"].get(
"mlrun/pipeline-step-type"
)
yield PipelineStep(step_type, node_name, node, node_template)


class PipelineRun(FlexibleMapper):
_workflow_manifest: PipelineManifest
Expand Down

0 comments on commit 49d0656

Please sign in to comment.