# Airflow

For pipelines that support Python based execution you can directly use the
TorchX API. TorchX is designed to be easily integrated in to other applications
via the programmatic API. No special Airflow integrations are needed.

With TorchX, you can use Airflow for the pipeline orchestration and run your
PyTorch application (i.e. distributed training) on a remote GPU cluster.

In [1]:
import datetime
import pendulum

from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
from airflow.models.dag import DAG
from airflow.decorators import task


DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)

To launch a TorchX job from Airflow you can create a Airflow Python task to
import the runner, launch the job and wait for it to complete. If you're running
on a remote cluster you may need to use the virtualenv task to install the
`torchx` package.

In [2]:
@task(task_id=f'hello_torchx')
def run_torchx(message):
    """This is a function that will run within the DAG execution"""
    from torchx.runner import get_runner
    with get_runner() as runner:
        # Run the utils.sh component on the local_cwd scheduler.
        app_id = runner.run_component(
            "utils.sh",
            ["echo", message],
            scheduler="local_cwd",
        )

        # Wait for the the job to complete
        status = runner.wait(app_id, wait_interval=1)

        # Raise_for_status will raise an exception if the job didn't succeed
        status.raise_for_status()

        # Finally we can print all of the log lines from the TorchX job so it
        # will show up in the workflow logs.
        for line in runner.log_lines(app_id, "sh", k=0):
            print(line, end="")

Once we have the task defined we can put it into a Airflow DAG and run it like
normal.

In [3]:
from torchx.schedulers.ids import make_unique

with DAG(
    dag_id=make_unique('example_python_operator'),
    schedule_interval=None,
    start_date=DATA_INTERVAL_START,
    catchup=False,
    tags=['example'],
) as dag:
    run_job = run_torchx("Hello, TorchX!")


dagrun = dag.create_dagrun(
    state=DagRunState.RUNNING,
    execution_date=DATA_INTERVAL_START,
    data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
    start_date=DATA_INTERVAL_END,
    run_type=DagRunType.MANUAL,
)
ti = dagrun.get_task_instance(task_id="hello_torchx")
ti.task = dag.get_task(task_id="hello_torchx")
ti.run(ignore_ti_state=True)
assert ti.state == TaskInstanceState.SUCCESS

[[34m2025-08-21T21:19:30.108+0000[0m] {[34mtaskinstance.py:[0m2614} INFO[0m - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_python_operator-w60r1dld2vqh9.hello_torchx manual__2021-09-13T00:00:00+00:00 [None]>[0m


[[34m2025-08-21T21:19:30.114+0000[0m] {[34mtaskinstance.py:[0m2614} INFO[0m - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_python_operator-w60r1dld2vqh9.hello_torchx manual__2021-09-13T00:00:00+00:00 [None]>[0m


[[34m2025-08-21T21:19:30.115+0000[0m] {[34mtaskinstance.py:[0m2867} INFO[0m - Starting attempt 0 of 1[0m




[[34m2025-08-21T21:19:30.128+0000[0m] {[34mtaskinstance.py:[0m2890} INFO[0m - Executing <Task(_PythonDecoratedOperator): hello_torchx> on 2021-09-13 00:00:00+00:00[0m


[[34m2025-08-21T21:19:30.668+0000[0m] {[34mtaskinstance.py:[0m3134} INFO[0m - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_python_operator-w60r1dld2vqh9' AIRFLOW_CTX_TASK_ID='hello_torchx' AIRFLOW_CTX_EXECUTION_DATE='2021-09-13T00:00:00+00:00' AIRFLOW_CTX_DAG_RUN_ID='manual__2021-09-13T00:00:00+00:00'[0m


Task instance is in running state
 Previous state of the Task instance: queued
Current task name:hello_torchx state:running start_date:2025-08-21 21:19:30.110069+00:00
Dag name:example_python_operator-w60r1dld2vqh9 and current dag run status:running
[[34m2025-08-21T21:19:30.672+0000[0m] {[34mtaskinstance.py:[0m732} INFO[0m - ::endgroup::[0m


[[34m2025-08-21T21:19:31.866+0000[0m] {[34mapi.py:[0m89} INFO[0m - Tracker configurations: {}[0m


[[34m2025-08-21T21:19:31.871+0000[0m] {[34mlocal_scheduler.py:[0m787} INFO[0m - Log directory not set in scheduler cfg. Creating a temporary log dir that will be deleted on exit. To preserve log directory set the `log_dir` cfg option[0m


[[34m2025-08-21T21:19:31.872+0000[0m] {[34mlocal_scheduler.py:[0m793} INFO[0m - Log directory is: /tmp/torchx_kk9f6zg5[0m


Hello, TorchX!
[[34m2025-08-21T21:19:32.121+0000[0m] {[34mpython.py:[0m240} INFO[0m - Done. Returned value was: None[0m


[[34m2025-08-21T21:19:32.125+0000[0m] {[34mtaskinstance.py:[0m341} INFO[0m - ::group::Post task execution logs[0m


[[34m2025-08-21T21:19:32.125+0000[0m] {[34mtaskinstance.py:[0m353} INFO[0m - Marking task as SUCCESS. dag_id=example_python_operator-w60r1dld2vqh9, task_id=hello_torchx, run_id=manual__2021-09-13T00:00:00+00:00, execution_date=20210913T000000, start_date=20250821T211930, end_date=20250821T211932[0m


Task instance in success state
 Previous state of the Task instance: running
Dag name:example_python_operator-w60r1dld2vqh9 queued_at:None
Task hostname:pkrvmqc4gcfdwos.431ufn41rrtebp1cnt0qymc3pa.ex.internal.cloudapp.net operator:_PythonDecoratedOperator


If all goes well you should see `Hello, TorchX!` printed above.

## Next Steps

* Checkout the [runner API documentation](../runner.rst) to learn more about
  programmatic usage of TorchX
* Browse through the collection of [builtin components](../components/overview.rst)
  which can be used in your Airflow pipeline