# Creating HTCondor's DAG file via Python script for a single task

- Preparing Python script: `./scripts/print_hello.py` to run

In [1]:
import os

# Creating a new directory, if not exists
os.makedirs("./data/scripts", exist_ok=True)

In [2]:
%%writefile ./data/scripts/task1.py
#!/usr/bin/env python3
# task1.py
import time

print("Task 1 started")
time.sleep(3)  # Simulate a task that takes 2 seconds
print("Task 1 completed")

Overwriting ./data/scripts/task1.py


In [3]:
%%writefile ./data/scripts/task2.py
#!/usr/bin/env python3
# task2.py
import time

print("Task 2 started")
time.sleep(5)  # Simulate a task that takes 2 seconds
print("Task 2 completed")

Overwriting ./data/scripts/task2.py


In [4]:
%%writefile ./data/scripts/task3.py
#!/usr/bin/env python3
# task3.py
import time

print("Task 3 started")
time.sleep(5)  # Simulate a task that takes 2 seconds
print("Task 3 completed")

Overwriting ./data/scripts/task3.py


In [5]:
%%writefile ./data/scripts/task4.py
#!/usr/bin/env python3
# task4.py
import time

print("Task 4 started")
time.sleep(3)  # Simulate a task that takes 2 seconds
print("Task 4 completed")

Overwriting ./data/scripts/task4.py


In [6]:
# make sure the python script is executable
!chmod 764 ./data/scripts/task1.py
!chmod 764 ./data/scripts/task2.py
!chmod 764 ./data/scripts/task3.py
!chmod 764 ./data/scripts/task4.py

- Creating DAG file with Python Script

In [7]:
import htcondor
import os
from htcondor import dags
from pathlib import Path
import shutil

base_dir = "/home/tanyongsheng_net/data"

# create the submit script for task 1
sub1 = htcondor.Submit({
    "executable": os.path.join(base_dir, "scripts/task1.py"),
    "request_cpus": "1",
    "request_memory": "128MB",
    "request_disk": "128MB",
    "output": os.path.join(base_dir, "output/task1.out"),
    "error": os.path.join(base_dir, "error/task1.err"),
    "log": os.path.join(base_dir, "log/task1.log"),
})

## Task 2 and Task 3 will be running concurrently
# create the submit script for task 2
sub2 = htcondor.Submit({
    "executable": os.path.join(base_dir, "scripts/task2.py"),
    "request_cpus": "1",
    "request_memory": "128MB",
    "request_disk": "128MB",
    "output": os.path.join(base_dir, "output/task2.out"),
    "error": os.path.join(base_dir, "error/task2.err"),
    "log": os.path.join(base_dir, "log/task2.log"),
})

# create the submit script for task 3
sub3 = htcondor.Submit({
    "executable": os.path.join(base_dir, "scripts/task3.py"),
    "request_cpus": "1",
    "request_memory": "128MB",
    "request_disk": "128MB",
    "output": os.path.join(base_dir, "output/task3.out"),
    "error": os.path.join(base_dir, "error/task3.err"),
    "log": os.path.join(base_dir, "log/task3.log"),
})

# create the submit script for task 4
sub4 = htcondor.Submit({
    "executable": os.path.join(base_dir, "scripts/task4.py"),
    "request_cpus": "1",
    "request_memory": "128MB",
    "request_disk": "128MB",
    "output": os.path.join(base_dir, "output/task4.out"),
    "error": os.path.join(base_dir, "error/task4.err"),
    "log": os.path.join(base_dir, "log/task4.log"),
})


dag = dags.DAG()

# Define job layers for each task
task1_layer = dag.layer(
    name='Task1',
    submit_description=sub1
)

# Task 2 and Task 3 are children of Task 1, meaning they will run concurrently after Task 1 completes
task2_layer = task1_layer.child_layer(
    name='Task2',
    submit_description=sub2
)

task3_layer = task1_layer.child_layer(
    name='Task3',
    submit_description=sub3
)

# Task 4 will run after both Task 2 and Task 3 complete
task4_layer = dag.layer(
    name='Task4',
    submit_description=sub4
)
task4_layer.add_parents([task2_layer, task3_layer])

# Set up the DAG directory
# Write the DAG to disk
dag_dir = os.path.abspath("./data/demo_dags/")
os.makedirs(dag_dir, exist_ok=True)
dag_file = dags.write_dag(dag, dag_dir)


- Check the DAG file created

Based on these dependencies, the following execution order is implied:

- Task1 executes
- Task3 and Task2 execute concurrently (since they both depend on Task1)
- Task4 executes only after both Task3 and Task2 have completed successfully

In [8]:
with open(dag_file, "r") as file:
    dagfile_content = file.read()
    print(dagfile_content)

# BEGIN META
# END META
# BEGIN NODES AND EDGES
JOB Task1:0 Task1.sub
PARENT Task1:0 CHILD Task3:0
PARENT Task1:0 CHILD Task2:0
JOB Task3:0 Task3.sub
PARENT Task3:0 CHILD Task4:0
JOB Task2:0 Task2.sub
PARENT Task2:0 CHILD Task4:0
JOB Task4:0 Task4.sub
# END NODES AND EDGES



- Go to terminal, then type

> cd ~/data/demo_dags

> condor_submit_dag dagfile.dag