# Airflow

## Tasks

### Task 1. 



Write a DAG that contains both a `BashOperator` and a `PythonOperator`. 
- In the `PythonOperator` function, accept the `ds` argument and print it. You can also print any additional message.
- In the `BashOperator`, execute the `pwd` command, which will output the directory where your Airflow code is being executed.
- Ensure that the `BashOperator` is executed first, followed by the `PythonOperator`.

In [1]:
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator


default_args={
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),  # timedelta from datetime
}

def print_ds(ds):
    print(ds)
    return 'This statement will be printed in the logs!'


with DAG (
    'hw_2_anikitin8',
    default_args=default_args,
    description='homework task 2 by a.nikitin-8',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 10, 29),
    catchup=False,
    tags=['nktnlx'],
) as dag:
    t1 = BashOperator(
        task_id = 'print_working_directory',
        depends_on_past = False,
        bash_command = 'pwd',
        retries = 5
    )

    t2 = PythonOperator (
        task_id='print_ds_argument',
        depends_on_past = False,
        python_callable = print_ds,
        retries=5
    )

    t1 >> t2

### Task 2. 

Create a new DAG and declare 30 tasks within it. Make the first 10 tasks of type `BashOperator` and execute an arbitrary command in them that utilizes a loop variable in some way (for example, you can specify `f"echo {i}"`).

The remaining 20 tasks should be `PythonOperator`, and the function should use the loop variable. You can achieve this by passing the variable through `op_kwargs` and receiving it on the function side. The function should print `"task number is: {task_number}"`, where task_number is the task number from the loop.

In [None]:
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator


default_args={
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),  # timedelta из пакета datetime
}

def print_ds(task_number):
    print(f'task number is: {task_number}')


with DAG (
    'hw_3_anikitin8',
    default_args=default_args,
    description='homework task 3 by a.nikitin-8',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 10, 29),
    catchup=False,
    tags=['nktnlx'],
) as dag:
    for i in range(10):
        t1 = BashOperator(
            task_id = f'echo_task_{i}',
            depends_on_past = False,
            bash_command = f'echo {i}',
            retries = 5
        )

    for i in range(20):
        t2 = PythonOperator (
            task_id= f'print_ds_argument_{i}_times',
            depends_on_past = False,
            python_callable = print_ds,
            op_kwargs = {'task_number': i},
            retries=5
        )

    t1 >> t2


### Task 3. 

Add documentation to your tasks from the previous assignment. The documentation must include code elements (enclosed in backticks `code`), **bold text**, and *italic text*, as well as a paragraph (declared with a hash symbol).

In [None]:
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator


default_args={
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),  # timedelta из пакета datetime
}

def print_ds(task_number):
    print(f'task number is: {task_number}')


with DAG (
    'hw_4_anikitin8',
    default_args=default_args,
    description='homework task 4 by a.nikitin-8',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 10, 29),
    catchup=False,
    tags=['nktnlx'],
) as dag:
    for i in range(10):
        t1 = BashOperator(
            task_id = f'echo_task_{i}',
            depends_on_past = False,
            bash_command = f'echo {i}',
            retries = 5
        )
    t1.doc_md = dedent(
        '''
        # Documentation
        `code style text`
        **bold style text**
        *italic style text*
        '''
    )

    for i in range(20):
        t2 = PythonOperator (
            task_id= f'print_ds_argument_{i}_times',
            depends_on_past = False,
            python_callable = print_ds,
            op_kwargs = {'task_number': i},
            retries=5
        )
    t2.doc_md = dedent(
        '''
        # Documentation
        `code style text`
        **bold style text**
        *italic style text*
        '''
    )

    t1 >> t2


### Task 4. 

Create a new DAG consisting of a single `BashOperator`. This operator should use a templated command of the following kind: `"For each i in the range from 0 to 5 not inclusive, print the value of ts and then print the value of run_id."` Here, `ts` and `run_id` are template variables.

In [None]:
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator


default_args={
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),  # timedelta из пакета datetime
}


with DAG (
    'hw_5_anikitin8',
    default_args=default_args,
    description='homework task 5 by a.nikitin-8',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 10, 29),
    catchup=False,
    tags=['nktnlx'],
) as dag:
    jinja_template = dedent(
        '''
        {% for i in range(5) %}
            echo "{{ ts }}"
            echo "{{ run_id }}"
        {% endfor %}
        '''
    )

    t1 = BashOperator(
        task_id = 'jinja_template',
        depends_on_past = False,
        bash_command = jinja_template,
        retries = 5
    )

    t1


### Task 5. 

Take the `BashOperator` from the third task (where you created a task using a loop) and add the environment variable `NUMBER` to it, whose value will be equal to  `i` from the loop. Print this value in the command specified in the operator (to do this, use `bash_command="echo $NUMBER"`).

In [None]:
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator


default_args={
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),  # timedelta from datetime
}


with DAG (
    'hw_6_anikitin8',
    default_args=default_args,
    description='homework task 6 by a.nikitin-8',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 10, 29),
    catchup=False,
    tags=['nktnlx'],
) as dag:
    for i in range(10):
        t1 = BashOperator(
            task_id = f'echo_task_{i}',
            depends_on_past = False,
            env={'NUMBER': str(i)},
            bash_command = 'echo $NUMBER',
            retries = 5
        )

    t1


### Task 6. 

tbc..

### Task 7. 

tbc..

### Task 8. 

tbc..

### Task 9. 

tbc..

### Task 10. 

tbc..

### Task 11. 

tbc..

### Task 12. 

tbc..