# Ways to write DAG for Tasks

## Basic Sequential Workflow
The simplest form of a workflow is a sequential flow, where tasks are executed one after another.



In [None]:
"""
Example: Task A >> Task B >> Task C
"""
with DAG(
    dag_id='sequential_workflow',
    default_args=default_args,
    schedule_interval=None,
    description="A simple sequential workflow"
) as dag:

    task_a = DummyOperator(task_id='task_a')
    task_b = DummyOperator(task_id='task_b')
    task_c = DummyOperator(task_id='task_c')

    # Define the sequential flow
    task_a >> task_b >> task_c


## Parallel Workflow
In this workflow, one task executes and then splits into multiple parallel tasks that run at the same time. This is useful when you want to perform multiple independent tasks after a common task.

In this example, Task B and Task C are executed at the same time after Task A completes.


In [None]:
"""
Example: Task A >> [Task B, Task C] (Parallel Execution)
"""
with DAG(
    dag_id='parallel_workflow',
    default_args=default_args,
    schedule_interval=None,
    description="A parallel workflow"
) as dag:

    task_a = DummyOperator(task_id='task_a')
    task_b = DummyOperator(task_id='task_b')
    task_c = DummyOperator(task_id='task_c')

    # Define parallel flow: Task A splits into Task B and Task C
    task_a >> [task_b, task_c]


## Converging Parallel Workflow
After parallel tasks complete, you might want them to converge and trigger a common downstream task. This pattern is useful when tasks need to merge back together.

In this case, Task D will not run until both Task B and Task C have completed.



In [None]:
"""
Example: Task A >> [Task B, Task C] >> Task D (Converging Workflow)
"""
with DAG(
    dag_id='converging_workflow',
    default_args=default_args,
    schedule_interval=None,
    description="A converging workflow"
) as dag:

    task_a = DummyOperator(task_id='task_a')
    task_b = DummyOperator(task_id='task_b')
    task_c = DummyOperator(task_id='task_c')
    task_d = DummyOperator(task_id='task_d')

    # Define parallel and converging flow
    task_a >> [task_b, task_c] >> task_d


## Branching Workflow (Diverging Workflow)
Branching involves choosing one path among several based on a condition. You use the `BranchPythonOperator` to implement branching. This is useful for scenarios where the flow should diverge based on some logic.

In this case, based on the choose_branch function, either Task B or Task C will run after Task A, but not both.

In [None]:
"""
Example: Task A >> [Task B or Task C] (Branching Workflow)
"""
from airflow.operators.python import BranchPythonOperator

def choose_branch(**kwargs):
    # Logic to choose which task to run
    if some_condition:
        return 'task_b'
    else:
        return 'task_c'

with DAG(
    dag_id='branching_workflow',
    default_args=default_args,
    schedule_interval=None,
    description="A branching workflow"
) as dag:

    task_a = DummyOperator(task_id='task_a')
    task_b = DummyOperator(task_id='task_b')
    task_c = DummyOperator(task_id='task_c')

    branch_op = BranchPythonOperator(
        task_id='branch_task',
        python_callable=choose_branch
    )

    # Define branching flow
    task_a >> branch_op >> [task_b, task_c]



## Diverging and Converging (Mixed Workflow)
You can also combine diverging and converging workflows. For example, the flow diverges based on a condition but then converges again at a common task.

In this scenario:

- Task B or Task C is executed based on the branch logic.
- Task D is executed after either Task B or Task C completes.

In [None]:
"""
Example: Task A >> [Task B or Task C] >> Task D (Diverging and Converging)
"""
from airflow.operators.python import BranchPythonOperator

def choose_branch(**kwargs):
    # Logic to choose which task to run
    if some_condition:
        return 'task_b'
    else:
        return 'task_c'

with DAG(
    dag_id='diverge_converge_workflow',
    default_args=default_args,
    schedule_interval=None,
    description="A diverging and converging workflow"
) as dag:

    task_a = DummyOperator(task_id='task_a')
    task_b = DummyOperator(task_id='task_b')
    task_c = DummyOperator(task_id='task_c')
    task_d = DummyOperator(task_id='task_d')

    branch_op = BranchPythonOperator(
        task_id='branch_task',
        python_callable=choose_branch
    )

    # Diverging and converging flow
    task_a >> branch_op >> [task_b, task_c] >> task_d

## Advanced Parallelism (Two Separate Lanes Merging)
You can create two completely separate lanes of tasks, with each lane consisting of multiple tasks, and then merge them together at the end.

- Here, Task A splits into two independent lanes (Task B and Task C), and both converge at Task D.
- Task D will only run after both Task B and Task C have completed.

In [None]:
"""
Example: Task A >> Task B >> Task D and Task A >> Task C >> Task D (Two Lanes Converging)
"""
with DAG(
    dag_id='two_lanes_converging',
    default_args=default_args,
    schedule_interval=None,
    description="Two lanes converging"
) as dag:

    task_a = DummyOperator(task_id='task_a')
    task_b = DummyOperator(task_id='task_b')
    task_c = DummyOperator(task_id='task_c')
    task_d = DummyOperator(task_id='task_d')

    # Two lanes
    task_a >> task_b >> task_d
    task_a >> task_c >> task_d


# When to Use Each Pattern:
- `Sequential Workflow`: Use this when tasks must be executed one after another with strict order.

- `Parallel Workflow`: Use when tasks can be executed simultaneously and don’t depend on each other.
- `Converging Workflow`: Use when multiple parallel tasks must complete before moving on to the next task.
- `Branching Workflow`: Use when you need to choose between different paths based on conditions.
- `Diverging and Converging Workflow`: Use when branching is required, but the workflow needs to merge back into a common task later.
- `Two-Lane Workflow`: Use when there are two independent sets of tasks that eventually converge at a single task.

# Summary of Concepts:
- `Sequential Tasks`: Tasks execute in order, one after another.

- `Parallel Tasks`: Tasks execute simultaneously.
- `Branching`: Diverging paths where only one is executed based on conditions.
- `Converging`: Multiple parallel paths converge into a single task.
- `Two Lanes`: Independent sets of tasks that later converge.