Skip to content

Commit

Permalink
[Notifications] Improve Remote Workflow Start Notification Functional…
Browse files Browse the repository at this point in the history
…ity (#5697)
  • Loading branch information
quaark committed Jun 3, 2024
1 parent 3f74769 commit c712802
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions mlrun/projects/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ def run(
namespace=None,
source=None,
notifications: list[mlrun.model.Notification] = None,
send_start_notification: bool = True,
) -> _PipelineRunStatus:
pass

Expand Down Expand Up @@ -523,6 +524,7 @@ def run(
namespace=None,
source=None,
notifications: list[mlrun.model.Notification] = None,
send_start_notification: bool = True,
) -> _PipelineRunStatus:
pipeline_context.set(project, workflow_spec)
workflow_handler = _PipelineRunner._get_handler(
Expand Down Expand Up @@ -571,13 +573,13 @@ def run(
func_name=func.metadata.name,
exc_info=err_to_str(exc),
)

project.notifiers.push_pipeline_start_message(
project.metadata.name,
project.get_param("commit_id", None),
run_id,
True,
)
if send_start_notification:
project.notifiers.push_pipeline_start_message(
project.metadata.name,
project.get_param("commit_id", None),
run_id,
True,
)
pipeline_context.clear()
return _PipelineRunStatus(run_id, cls, project=project, workflow=workflow_spec)

Expand Down Expand Up @@ -668,6 +670,7 @@ def run(
namespace=None,
source=None,
notifications: list[mlrun.model.Notification] = None,
send_start_notification: bool = True,
) -> _PipelineRunStatus:
pipeline_context.set(project, workflow_spec)
workflow_handler = _PipelineRunner._get_handler(
Expand All @@ -688,9 +691,11 @@ def run(
original_source = project.spec.source
project.set_source(source=source)
pipeline_context.workflow_artifact_path = artifact_path
project.notifiers.push_pipeline_start_message(
project.metadata.name, pipeline_id=workflow_id
)

if send_start_notification:
project.notifiers.push_pipeline_start_message(
project.metadata.name, pipeline_id=workflow_id
)
err = None
try:
workflow_handler(**workflow_spec.args)
Expand Down Expand Up @@ -758,13 +763,21 @@ def run(
namespace: str = None,
source: str = None,
notifications: list[mlrun.model.Notification] = None,
send_start_notification: bool = True,
) -> typing.Optional[_PipelineRunStatus]:
workflow_name = normalize_workflow_name(name=name, project_name=project.name)
workflow_id = None

# for start message, fallback to old notification behavior
for notification in notifications or []:
project.notifiers.add_notification(notification.kind, notification.params)
if send_start_notification:
for notification in notifications or []:
project.notifiers.add_notification(
notification.kind, notification.params
)
# if a notification with `when=running` is provided, it will be used explicitly and others
# will be ignored
if "running" in notification.when:
break

# The returned engine for this runner is the engine of the workflow.
# In this way wait_for_completion/get_run_status would be executed by the correct pipeline runner.
Expand Down

0 comments on commit c712802

Please sign in to comment.