In [1]:
from hera.workflows import DAG, Workflow, script
from hera.shared import global_config as hera_config

hera_config.host = "http://localhost:30000/argo/"
hera_config.namespace = 'argo'
hera_config.verify_ssl = False

@script(image='python:3.12-slim')
def echo(message: str):
    print(message)


with Workflow(
    generate_name="dag-diamond-",
    entrypoint="diamond",
) as w:
    with DAG(name="diamond"):
        A = echo(name="A", arguments={"message": "A"})
        B = [
            echo(name="B0", arguments={"message": "B0"}),
            echo(name="B1", arguments={"message": "B1"})]
        C = echo(name="C", arguments={"message": "C"})

        A >> [B[0], B[1]] >> C
ret = w.create()

In [2]:
name = ret.metadata.name
name

'dag-diamond-hncsq'

In [3]:
ret = w.wait()
ret.status.phase

'Succeeded'

In [4]:
from hera.workflows.service import WorkflowsService

service= WorkflowsService()

In [5]:
# For some reason, `service.workflow_logs()` doesn't work.  Here is an alternative:

import requests
import json

def ensure_trailing_slash(s: str):
    return f"{s}{'' if s.endswith('/') else '/'}"

def logs(_name):
    url = f"{ensure_trailing_slash(hera_config.host)}api/v1/workflows/{hera_config.namespace}/"\
          f"{_name}/log?logOptions.container=main"
    print(url)
    response = requests.get(url)
    
    for s in response.text.splitlines():
        obj = json.loads(s)['result']
        print(f"{obj['podName']}: {obj['content']}")

logs(name)

http://localhost:30000/argo/api/v1/workflows/argo/dag-diamond-hncsq/log?logOptions.container=main
dag-diamond-hncsq-echo-3222086449: A
dag-diamond-hncsq-echo-3222086449: time="2024-10-13T14:55:40.490Z" level=info msg="sub-process exited" argo=true error="<nil>"
dag-diamond-hncsq-echo-3819350872: B0
dag-diamond-hncsq-echo-3836128491: B1
dag-diamond-hncsq-echo-3819350872: time="2024-10-13T14:55:50.546Z" level=info msg="sub-process exited" argo=true error="<nil>"
dag-diamond-hncsq-echo-3836128491: time="2024-10-13T14:55:50.547Z" level=info msg="sub-process exited" argo=true error="<nil>"
dag-diamond-hncsq-echo-3188531211: C
dag-diamond-hncsq-echo-3188531211: time="2024-10-13T14:56:00.560Z" level=info msg="sub-process exited" argo=true error="<nil>"


In [6]:
# service.delete_workflow(namespace=hera_config.namespace, name=name)