# Airflow. Основные концепции и первый DAG

## Знакомство с Airflow и создание первого DAG

**Цель:** Познакомиться с Apache Airflow как оркестратором задач, создать свой первый DAG (Directed Acyclic Graph) и освоить процесс работы через Merge Requests.

**Выполненные действия:**

### 1. Подготовка рабочего окружения

```bash
# Клонирование учебного репозитория только с основной веткой
git clone -b master --single-branch https://git.lab.karpov.courses/startml/airflow.git

# Настройка Git для работы с длинными путями (актуально для Windows)
git config core.longpaths true
```

### 2. Создание feature branch

```bash
# Создание ветки с именем по шаблону hw_{номер}_{логин}
git checkout -b hw_2_novos
```

### 3. Создание структуры DAG

```bash
# Создание персональной папки для DAG'ов
mkdir dags/novos
```

### 4. Разработка первого DAG

Создан файл `dags/novos/first_dag.py` со следующим содержанием:

```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'novos_first_dag',  # Уникальный ID с указанием логина
    default_args=default_args,
    description='My first DAG for HW2',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['novos'],
) as dag:

    # Задача 1: вывод текущей даты через Bash
    t1 = BashOperator(
        task_id='print_current_date',
        bash_command='date',
    )

    # Задача 2: вывод сообщения через Python
    def print_hello_message():
        print("Это моя первая задача на Airflow!")

    t2 = PythonOperator(
        task_id='print_hello_task',
        python_callable=print_hello_message,
    )

    # Определение порядка выполнения задач
    t1 >> t2
```

### 5. Работа с Git

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в staging area
git add dags/novos/first_dag.py

# Создание коммита
git commit -m "Add my first DAG for HW2"

# Отправка изменений на сервер
git push origin hw_2_novos
```

### 6. Создание Merge Request

Создан Merge Request со следующими параметрами:
- **Source branch:** `hw_2_novos`
- **Target branch:** `master` 
- **Title:** `HW 2 novos`
- **Description:** Стандартное описание

**Особенности реализации:**
- Использованы два типа операторов: `BashOperator` и `PythonOperator`
- Задачи связаны последовательностью выполнения (t1 >> t2)
- Указаны параметры перезапуска при ошибках (retries, retry_delay)
- Задан ежедневный интервал выполнения (schedule_interval)

**Проблемы и решения:**
- Решена проблема с длинными путями файлов в Windows через `git config core.longpaths true`
- Обработан конфликт слияния при синхронизации с удаленным репозиторием
- Настроены параметры DAG согласно best practices

**Результат:** 
- ✅ Успешно создан первый DAG с уникальным ID `novos_first_dag`
- ✅ Реализованы две задачи с разными типами операторов
- ✅ Настроена последовательность выполнения задач
- ✅ Создан и отправлен Merge Request для проверки
- ✅ Освоен процесс работы с Airflow через Git workflow

---

# Airflow. Создание DAG с BashOperator и PythonOperator

## DAG с выводом директории и логической даты

**Цель:** Создать DAG, содержащий BashOperator для вывода рабочей директории и PythonOperator для обработки логической даты ds, с соблюдением порядка выполнения задач.

**Выполненные действия:**

### 1. Подготовка рабочей среды

```bash
# Обновление основной ветки
git checkout master
git pull origin master

# Создание новой ветки для второго задания
git checkout -b hw_3_novos
```

### 2. Создание структуры DAG

```bash
# Переход в персональную папку и создание файла
cd dags/novos
New-Item -ItemType File -Name "second_dag.py"
```

### 3. Разработка DAG с двумя операторами

Создан файл `dags/novos/second_dag.py`:

```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# Аргументы по умолчанию для всех задач
default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'hw_novos_2',  # Уникальный ID в формате hw_{логин}_2
    default_args=default_args,
    description='Second DAG with Bash and Python operators',
    schedule_interval=timedelta(days=1),  # Ежедневное выполнение
    start_date=datetime(2024, 1, 1),      # Начальная дата выполнения
    catchup=False,                        # Не запускать пропущенные периоды
    tags=['novos'],                       # Теги для фильтрации
) as dag:

    # BashOperator - вывод текущей рабочей директории
    bash_task = BashOperator(
        task_id='print_working_directory',
        bash_command='pwd',  # Команда для определения директории выполнения
    )

    # Функция для PythonOperator с приемом параметра ds
    def print_ds_and_message(ds, **kwargs):
        """Функция выводит логическую дату и дополнительную информацию"""
        print(f"Логическая дата: {ds}")
        print("Это сообщение из PythonOperator!")
        print("Дополнительные параметры из kwargs:")
        for key, value in kwargs.items():
            print(f"{key}: {value}")
        return 'Завершено успешно'

    # PythonOperator - обработка логической даты
    python_task = PythonOperator(
        task_id='print_ds_date',
        python_callable=print_ds_and_message,
    )

    # Определение порядка выполнения: сначала bash, потом python
    bash_task >> python_task
```

### 4. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/second_dag.py

# Создание коммита с описанием
git commit -m "Add second DAG for HW3"

# Отправка изменений на удаленный сервер
git push origin hw_3_novos
```

### 5. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_3_novos`
- **Target branch:** `master`
- **Title:** `HW 3 novos`
- **Description:** Стандартное описание задания

**Технические особенности реализации:**

- **Уникальность DAG:** ID `hw_novos_2` соответствует требованию формата `hw_{логин}_{номер}`
- **BashOperator:** Выполняет системную команду `pwd` для определения рабочей директории
- **PythonOperator:** Принимает параметр `ds` (логическая дата Airflow) через сигнатуру функции
- **Аргументы функции:** Использование `**kwargs` для получения дополнительного контекста выполнения
- **Порядок выполнения:** Четкая последовательность через оператор `>>` (bash_task >> python_task)

**Возможные неожиданности:**
- Команда `pwd` может показывать различные пути в зависимости от окружения Airflow
- Логическая дата `ds` может не совпадать с фактической датой выполнения
- Задачи могут выполняться на разных воркерах с разными настройками

**Результат:** 
- ✅ Успешно создан DAG с двумя типами операторов
- ✅ Реализован прием параметра `ds` в Python функции
- ✅ Настроена корректная последовательность выполнения задач
- ✅ Соблюдены требования к именованию DAG
- ✅ Создан и отправлен Merge Request для проверки

---

# Airflow. Создание динамических задач через циклы

## DAG с 30 динамически созданными задачами

**Цель:** Создать DAG с 30 задачами, сгенерированными динамически через цикл for, с использованием BashOperator и PythonOperator с передачей параметров через op_kwargs.

**Выполненные действия:**

### 1. Создание новой ветки и файла

```bash
# Переход на основную ветку и обновление
git checkout master
git pull origin master

# Создание ветки для третьего задания
git checkout -b hw_4_novos

# Создание файла для третьего DAG
cd dags/novos
New-Item -ItemType File -Name "third_dag.py"
```

### 2. Разработка DAG с динамическими задачами

Создан файл `dags/novos/third_dag.py`:

```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# Аргументы по умолчанию для всех задач
default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        'hw_novos_3',  # Уникальный ID в формате hw_{логин}_3
        default_args=default_args,
        description='Third DAG with 30 dynamic tasks',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['novos'],
) as dag:
    # Список для хранения задач
    tasks = []

    # Первые 10 задач: BashOperator
    for i in range(10):
        bash_task = BashOperator(
            task_id=f'bash_task_{i}',  # Уникальный task_id для каждой задачи
            bash_command=f"echo 'Bash task number: {i}'",  # Команда с переменной цикла
        )
        tasks.append(bash_task)


    # Функция для PythonOperator
    def print_task_number(task_number, **kwargs):
        """Функция выводит номер задачи"""
        print(f"task number is: {task_number}")
        return f'Completed task {task_number}'


    # Следующие 20 задач: PythonOperator
    for i in range(10, 30):  # от 10 до 29
        python_task = PythonOperator(
            task_id=f'python_task_{i}',
            python_callable=print_task_number,
            op_kwargs={'task_number': i},  # Передача переменной через op_kwargs
        )
        tasks.append(python_task)

    # Настройка зависимостей: последовательное выполнение всех задач
    for i in range(len(tasks) - 1):
        tasks[i] >> tasks[i + 1]
```


### 3. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/third_dag.py

# Создание коммита
git commit -m "Add third DAG with 30 dynamic tasks for HW4"

# Отправка на сервер
git push origin hw_4_novos
```

### 4. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_4_novos`
- **Target branch:** `master`
- **Title:** `HW 4 novos`

**Технические особенности реализации:**

- **Динамическое создание задач:** 30 задач создаются через циклы for
- **BashOperator (10 задач):** 
  - `task_id=f'bash_task_{i}'` - уникальные идентификаторы
  - `bash_command=f"echo 'Bash task number: {i}'"` - использование переменной цикла в команде
- **PythonOperator (20 задач):**
  - `task_id=f'python_task_{i}'` - уникальные идентификаторы
  - `op_kwargs={'task_number': i}` - передача переменной цикла в функцию
  - Функция принимает параметр `task_number` и выводит его
- **Управление зависимостями:** Все задачи выполняются последовательно
- **Уникальный ID DAG:** `hw_novos_3` соответствует требованию формата

**Ключевые концепции:**
- Использование `op_kwargs` для передачи параметров в Python функции
- Динамическое создание task_id через f-strings
- Управление порядком выполнения через операторы `>>`
- Работа с переменными цикла в операторах

**Результат:** 
- ✅ Создано 30 динамических задач (10 Bash + 20 Python)
- ✅ Реализована передача параметров через `op_kwargs`
- ✅ Использованы переменные цикла в командах и функциях
- ✅ Настроены последовательные зависимости между задачами
- ✅ Соблюдены требования к именованию DAG и задач

---

# Airflow. Добавление документации к задачам DAG

## Документирование задач с Markdown разметкой

**Цель:** Добавить подробную документацию с Markdown разметкой к задачам из предыдущего задания, включая элементы кода, полужирный и курсивный текст, заголовки.

**Выполненные действия:**

### 1. Создание новой ветки и файла

```bash
# Переход на основную ветку и обновление
git checkout master
git pull origin master

# Создание ветки для четвертого задания
git checkout -b hw_5_novos

# Создание файла для DAG с документацией
cd dags/novos
New-Item -ItemType File -Name "documented_dag.py"
```

### 2. Разработка DAG с документацией

Создан файл `dags/novos/documented_dag.py`:

```python
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# Аргументы по умолчанию для всех задач
default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Документация для всего DAG
dag_doc_md = """
# DAG с документацией задач

**Описание процесса:** 
Этот DAG демонстрирует добавление документации к задачам с использованием *Markdown* разметки.

## Структура DAG
- **10 задач** типа `BashOperator`
- **20 задач** типа `PythonOperator`
- Все задачи выполняются *последовательно*

## Используемые технологии
- `BashOperator` для выполнения shell команд
- `PythonOperator` для выполнения Python функций
- `op_kwargs` для передачи параметров
"""

with DAG(
    'hw_novos_4',  # Уникальный ID
    default_args=default_args,
    description='DAG with documented tasks using Markdown',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['novos', 'documentation'],
    doc_md=dag_doc_md,  # Документация для всего DAG
) as dag:

    # Документация для Bash задач
    bash_doc_md = dedent("""
    ## Bash Task Documentation
    
    **Назначение:** Выполнение shell команд с использованием переменной цикла.
    
    ### Параметры:
    - `task_id`: Уникальный идентификатор в формате `bash_task_{i}`
    - `bash_command`: Команда для выполнения, например `echo 'Bash task number: {i}'`
    
    *Примечание:* Номер задачи передается через переменную цикла.
    """)

    # Документация для Python задач
    python_doc_md = dedent("""
    ## Python Task Documentation
    
    **Назначение:** Выполнение Python функций с передачей параметров.
    
    ### Используемые параметры:
    - `task_id`: Уникальный идентификатор `python_task_{i}`
    - `python_callable`: Функция для выполнения
    - `op_kwargs`: Словарь параметров `{'task_number': i}`
    
    *Особенности:* Параметры передаются через `**kwargs` в функцию.
    """)

    # Документация для функции
    function_doc_md = dedent("""
    ### Функция print_task_number
    
    ```python
    def print_task_number(task_number, **kwargs):
        \"\"\"Выводит номер задачи и дополнительную информацию\"\"\"
        print(f"task number is: {task_number}")
        return f'Completed task {task_number}'
    ```
    
    **Параметры:**
    - `task_number`: Номер задачи из цикла
    - `**kwargs`: Стандартные параметры Airflow контекста
    
    *Возвращает:* Строку с подтверждением выполнения
    """)

    # Список для хранения задач
    tasks = []

    # Первые 10 задач: BashOperator с документацией
    for i in range(10):
        bash_task = BashOperator(
            task_id=f'bash_task_{i}',
            bash_command=f"echo 'Bash task number: {i}'",
            doc_md=dedent(f"""
            ### Bash Task #{i}
            
            **Команда:** `echo 'Bash task number: {i}'`
            
            *Выполняет:* Вывод номера задачи в консоль
            
            **Технические детали:**
            - Использует переменную цикла: `{i}`
            - Запускает shell команду через `BashOperator`
            - Номер задачи: **{i}**
            """)
        )
        tasks.append(bash_task)

    # Функция для PythonOperator
    def print_task_number(task_number, **kwargs):
        """
        Выводит номер задачи и информацию о выполнении
        
        Args:
            task_number: Номер задачи из цикла
            **kwargs: Контекст выполнения Airflow
            
        Returns:
            str: Сообщение о завершении задачи
        """
        print(f"task number is: {task_number}")
        print(f"Execution date: {kwargs.get('ds', 'N/A')}")
        return f'Completed task {task_number}'

    # Следующие 20 задач: PythonOperator с документацией
    for i in range(10, 30):
        python_task = PythonOperator(
            task_id=f'python_task_{i}',
            python_callable=print_task_number,
            op_kwargs={'task_number': i},
            doc_md=dedent(f"""
            ### Python Task #{i}
            
            **Функция:** `print_task_number`
            
            *Параметры:* 
            - `task_number` = `{i}`
            - `**kwargs` = контекст Airflow
            
            **Код функции:**
            ```python
            def print_task_number(task_number, **kwargs):
                print(f"task number is: {{task_number}}")
                return f'Completed task {{task_number}}'
            ```
            
            *Ожидаемый вывод:* `task number is: {i}`
            """)
        )
        tasks.append(python_task)

    # Добавляем общую документацию к первым задачам каждого типа
    tasks[0].doc_md = bash_doc_md  # Документация для первой bash задачи
    tasks[10].doc_md = python_doc_md  # Документация для первой python задачи

    # Настройка зависимостей
    for i in range(len(tasks) - 1):
        tasks[i] >> tasks[i + 1]

    # Документация для DAG
    dag.doc_md = dag_doc_md
```

### 3. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/documented_dag.py

# Создание коммита
git commit -m "Add documented DAG with Markdown for HW5"

# Отправка на сервер
git push origin hw_5_novos
```

### 4. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_5_novos`
- **Target branch:** `master`
- **Title:** `HW 5 novos`

**Технические особенности реализации:**

- **Использование `dedent()`:** Для удобного форматирования многострочной документации
- **Markdown разметка:**
  - `# Заголовки` разных уровней
  - **`Полужирный текст`** для акцентов
  - *`Курсивный текст`* для примечаний
  - `` `код в строке` `` для элементов кода
  - ```код в блоке``` для многострочного кода
- **Различные уровни документации:**
  - Документация для всего DAG (`dag.doc_md`)
  - Документация для групп задач
  - Индивидуальная документация для каждой задачи
  - Документация для функций

**Элементы Markdown в документации:**
- ✅ Заголовки различных уровней (`#`, `##`, `###`)
- ✅ Полужирный текст (`**text**`)
- ✅ Курсивный текст (`*text*`)
- ✅ Код в строке (`` `code` ``)
- ✅ Блоки кода (```python\ncode\n```)
- ✅ Списки и структурирование

**Результат:** 
- ✅ Добавлена comprehensive документация ко всем элементам DAG
- ✅ Использованы все требуемые элементы Markdown разметки
- ✅ Реализовано многоуровневое документирование (DAG → группы → задачи)
- ✅ Сохранена функциональность динамического создания задач
- ✅ Соблюдены требования к именованию

---

# Airflow. Шаблонизация в BashOperator

## Использование шаблонных переменных в Bash команде

**Цель:** Создать DAG с BashOperator, использующим шаблонизированную команду с переменными Airflow ts и run_id в цикле Jinja2.

**Выполненные действия:**

### 1. Создание новой ветки и файла

```bash
# Переход на основную ветку и обновление
git checkout master
git pull origin master

# Создание ветки для пятого задания
git checkout -b hw_6_novos

# Создание файла для DAG с шаблонизацией
cd dags/novos
New-Item -ItemType File -Name "templated_dag.py"
```

### 2. Разработка DAG с шаблонизированной командой

Создан файл `dags/novos/templated_dag.py`:

```python
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator

# Аргументы по умолчанию
default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Документация DAG
dag_doc_md = """
# DAG с шаблонизацией Jinja2

**Цель:** Демонстрация использования шаблонных переменных Airflow в BashOperator.

## Используемые шаблонные переменные:
- `ts` - timestamp выполнения DAG
- `run_id` - идентификатор запуска DAG

## Шаблонная команда:
```bash
{% for i in range(5) %}
    echo "ts: {{ ts }}"
    echo "run_id: {{ run_id }}"
{% endfor %}
``` """

with DAG(
    'hw_novos_5',  # Уникальный ID
    default_args=default_args,
    description='DAG with templated Bash command using Jinja2',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['novos', 'templating'],
    doc_md=dag_doc_md,
) as dag:

    # Шаблонизированная команда с использованием Jinja2
    templated_command = dedent("""
    {% for i in range(5) %}
        echo "Цикл итерация {{ i }}:"
        echo "ts = {{ ts }}"
        echo "run_id = {{ run_id }}"
        echo "---"
    {% endfor %}
    """)

    # BashOperator с шаблонизированной командой
    templated_task = BashOperator(
        task_id='templated_bash_task',
        bash_command=templated_command,
        doc_md=dedent("""
        ## BashOperator с шаблонизацией
        
        **Команда использует шаблонные переменные Airflow:**
        
        ### Переменные:
        - `{{ ts }}` - timestamp выполнения (execution date)
        - `{{ run_id }}` - идентификатор запуска DAGRun
        - `{{ i }}` - переменная цикла Jinja2
        
        ### Логика шаблона:
        ```jinja2
        {% for i in range(5) %}
            echo "Цикл итерация {{ i }}:"
            echo "ts = {{ ts }}"
            echo "run_id = {{ run_id }}"
            echo "---"
        {% endfor %}
        ```
        
        *Ожидаемый вывод:* 5 итераций с значениями ts и run_id
        """)
    )
```

### 3. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/templated_dag.py

# Создание коммита
git commit -m "Add templated DAG with Jinja2 for HW6"

# Отправка на сервер
git push origin hw_6_novos
```

### 4. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_6_novos`
- **Target branch:** `master`
- **Title:** `HW 6 novos`

**Технические особенности реализации:**

- **Шаблонизация Jinja2:** Использование синтаксиса `{% %}` и `{{ }}` в bash_command
- **Цикл for:** `{% for i in range(5) %}` для создания 5 итераций
- **Шаблонные переменные Airflow:**
  - `{{ ts }}` - execution timestamp
  - `{{ run_id }}` - идентификатор запуска DAG
  - `{{ ds }}` - логическая дата выполнения
  - `{{ ds_nodash }}` - дата без разделителей
- **Логика в шаблонах:** Возможность использовать условия и арифметические операции

**Ключевые элементы шаблонизации:**
- ✅ Цикл `for` с диапазоном range(5)
- ✅ Подстановка переменных `{{ ts }}` и `{{ run_id }}`
- ✅ Использование синтаксиса Jinja2 в bash_command
- ✅ Многострочная команда с использованием `dedent()`

**Ожидаемый вывод команды:**
```bash
Цикл итерация 0:
ts = 2024-01-01T00:00:00+00:00
run_id = manual__2024-01-01T00:00:00+00:00
---
Цикл итерация 1:
ts = 2024-01-01T00:00:00+00:00
run_id = manual__2024-01-01T00:00:00+00:00
---
... (5 итераций)
```

**Результат:** 
- ✅ Создан DAG с шаблонизированным BashOperator
- ✅ Реализован цикл for в шаблоне Jinja2
- ✅ Использованы шаблонные переменные ts и run_id
- ✅ Добавлена comprehensive документация
- ✅ Соблюдены требования к именованию

---

# Airflow. Переменные окружения в BashOperator

## Использование переменных окружения в BashOperator

**Цель:** Модифицировать BashOperator из третьего задания для передачи переменной окружения NUMBER и ее использования в bash команде.

**Выполненные действия:**

### 1. Создание новой ветки и файла

```bash
# Переход на основную ветку и обновление
git checkout master
git pull origin master

# Создание ветки для шестого задания
git checkout -b hw_7_novos

# Создание файла для DAG с переменными окружения
cd dags/novos
New-Item -ItemType File -Name "env_variables_dag.py"
```

### 2. Разработка DAG с переменными окружения

Создан файл `dags/novos/env_variables_dag.py`:

```python
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# Аргументы по умолчанию
default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Документация DAG
dag_doc_md = """
# DAG с переменными окружения в BashOperator

**Цель:** Демонстрация передачи переменных окружения в BashOperator и их использования в bash командах.

## Ключевые особенности:
- Передача переменной `NUMBER` через параметр `env`
- Использование переменной окружения в bash команде через `$NUMBER`
- Сохранение динамического создания задач через цикл
"""

with DAG(
    'hw_novos_6',  # Уникальный ID
    default_args=default_args,
    description='DAG with environment variables in BashOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['novos', 'environment-variables'],
    doc_md=dag_doc_md,
) as dag:

    # Список для хранения задач
    tasks = []

    # Первые 10 задач: BashOperator с переменными окружения
    for i in range(10):
        bash_task = BashOperator(
            task_id=f'bash_env_task_{i}',
            bash_command='echo "Значение переменной NUMBER: $NUMBER"',  # Использование переменной окружения
            env={'NUMBER': str(i)},  # Передача переменной окружения
            doc_md=dedent(f"""
            ## Bash Task with Environment Variable #{i}
            
            **Переменная окружения:** `NUMBER = {i}`
            
            **Команда:** `echo "Значение переменной NUMBER: $NUMBER"`
            
            **Ожидаемый вывод:** `Значение переменной NUMBER: {i}`
            
            ### Технические детали:
            - Переменная передается через параметр `env={{'NUMBER': '{i}'}}`
            - В команде используется синтаксис `$NUMBER` для подстановки значения
            - Номер итерации: **{i}**
            """)
        )
        tasks.append(bash_task)

    # Функция для PythonOperator
    def print_task_number(task_number, **kwargs):
        """Выводит номер задачи"""
        print(f"task number is: {task_number}")
        return f'Completed task {task_number}'

    # Следующие 20 задач: PythonOperator
    for i in range(10, 30):
        python_task = PythonOperator(
            task_id=f'python_task_{i}',
            python_callable=print_task_number,
            op_kwargs={'task_number': i},
            doc_md=dedent(f"""
            ### Python Task #{i}
            
            **Функция:** `print_task_number`
            
            *Параметры:* `task_number = {i}`
            
            *Ожидаемый вывод:* `task number is: {i}`
            """)
        )
        tasks.append(python_task)

    # Настройка зависимостей
    for i in range(len(tasks) - 1):
        tasks[i] >> tasks[i + 1]
```

### 3. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/env_variables_dag.py

# Создание коммита
git commit -m "Add DAG with environment variables for HW7"

# Отправка на сервер
git push origin hw_7_novos
```

### 4. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_7_novos`
- **Target branch:** `master`
- **Title:** `HW 7 novos`

**Технические особенности реализации:**

- **Передача переменных окружения:** Через параметр `env` в BashOperator
- **Синтаксис переменных:** `{'VARIABLE_NAME': 'value'}`
- **Использование в bash:** Через `$VARIABLE_NAME` в bash_command
- **Динамическое значение:** Передача переменной `i` из цикла

**Ключевые элементы:**
- ✅ Параметр `env` для передачи переменных окружения
- ✅ Синтаксис `$VARIABLE` для использования в bash командах
- ✅ Динамическое создание значений переменных через цикл
- ✅ Сохранение структуры из третьего задания

**Ожидаемый вывод для каждой задачи:**
```bash
Значение переменной NUMBER: 0  # для i=0
Значение переменной NUMBER: 1  # для i=1
...
Значение переменной NUMBER: 9  # для i=9
```

**Результат:** 
- ✅ Модифицирован BashOperator для использования переменных окружения
- ✅ Реализована передача динамического значения `i` через `env`
- ✅ Использован синтаксис `$NUMBER` в bash команде
- ✅ Сохранена структура динамического создания задач
- ✅ Добавлена comprehensive документация

---

# Airflow. Передача дополнительных аргументов в PythonOperator

## Добавление ts, run_id и kwargs в PythonOperator

**Цель:** Модифицировать PythonOperator из второго задания для приема дополнительных аргументов ts, run_id и передачи task_number через kwargs.

**Выполненные действия:**

### 1. Создание новой ветки и файла

```bash
# Переход на основную ветку и обновление
git checkout master
git pull origin master

# Создание ветки для седьмого задания
git checkout -b hw_8_novos

# Создание файла для DAG с дополнительными аргументами
cd dags/novos
New-Item -ItemType File -Name "kwargs_dag.py"
```

### 2. Разработка DAG с дополнительными аргументами

Создан файл `dags/novos/kwargs_dag.py`:

```python
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# Аргументы по умолчанию
default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Документация DAG
dag_doc_md = """
# DAG с дополнительными аргументами в PythonOperator

**Цель:** Демонстрация передачи ts, run_id и task_number через kwargs в PythonOperator.

## Ключевые особенности:
- Прием аргументов `ts` и `run_id` в функции
- Передача `task_number` через `op_kwargs`
- Печать всех полученных значений
"""

with DAG(
    'hw_novos_7',  # Уникальный ID
    default_args=default_args,
    description='DAG with additional arguments in PythonOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['novos', 'kwargs', 'arguments'],
    doc_md=dag_doc_md,
) as dag:

    # Список для хранения задач
    tasks = []

    # Первые 10 задач: BashOperator
    for i in range(10):
        bash_task = BashOperator(
            task_id=f'bash_task_{i}',
            bash_command=f"echo 'Bash task number: {i}'",
            doc_md=dedent(f"""
            ### Bash Task #{i}
            
            **Команда:** `echo 'Bash task number: {i}'`
            
            *Выполняет:* Вывод номера задачи в консоль
            """)
        )
        tasks.append(bash_task)

    # Функция для PythonOperator с приемом ts, run_id и kwargs
    def print_task_with_context(ts, run_id, task_number, **kwargs):
        """
        Выводит информацию о задаче и контексте выполнения
        
        Args:
            ts: Execution timestamp from Airflow context
            run_id: DAG run identifier from Airflow context  
            task_number: Номер задачи из цикла (передается через op_kwargs)
            **kwargs: Дополнительные параметры контекста Airflow
        """
        print("=== Контекст выполнения задачи ===")
        print(f"task number is: {task_number}")
        print(f"ts (execution timestamp): {ts}")
        print(f"run_id: {run_id}")
        print(f"ds (logical date): {kwargs.get('ds', 'N/A')}")
        print(f"data_interval_start: {kwargs.get('data_interval_start', 'N/A')}")
        print(f"data_interval_end: {kwargs.get('data_interval_end', 'N/A')}")
        print("Дополнительные kwargs параметры:")
        for key, value in kwargs.items():
            if key not in ['ts', 'run_id', 'ds', 'data_interval_start', 'data_interval_end']:
                print(f"  {key}: {value}")
        print("=== Завершение ===")
        
        return f'Completed task {task_number} with ts={ts}'

    # Следующие 20 задач: PythonOperator с дополнительными аргументами
    for i in range(10, 30):
        python_task = PythonOperator(
            task_id=f'python_task_{i}',
            python_callable=print_task_with_context,
            op_kwargs={'task_number': i},  # Передача task_number через op_kwargs
            doc_md=dedent(f"""
            ### Python Task #{i}
            
            **Функция:** `print_task_with_context`
            
            **Передаваемые аргументы:**
            - `ts` (из контекста Airflow) - execution timestamp
            - `run_id` (из контекста Airflow) - идентификатор запуска
            - `task_number` (через op_kwargs) = {i}
            - `**kwargs` - все дополнительные параметры контекста
            
            **Ожидаемый вывод:**
            - Номер задачи: {i}
            - Значение ts и run_id
            - Дополнительные параметры контекста
            """)
        )
        tasks.append(python_task)

    # Настройка зависимостей
    for i in range(len(tasks) - 1):
        tasks[i] >> tasks[i + 1]
```

### 3. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/kwargs_dag.py

# Создание коммита
git commit -m "Add DAG with kwargs and context arguments for HW8"

# Отправка на сервер
git push origin hw_8_novos
```

### 4. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_8_novos`
- **Target branch:** `master`
- **Title:** `HW 8 novos`

**Технические особенности реализации:**

- **Прием аргументов:** Функция явно принимает `ts` и `run_id` как параметры
- **Передача через op_kwargs:** `task_number` передается через `op_kwargs={'task_number': i}`
- **Контекст Airflow:** Использование `**kwargs` для получения всех дополнительных параметров
- **Печать значений:** Вывод ts, run_id и task_number

**Ключевые элементы:**
- ✅ Явный прием `ts` и `run_id` в сигнатуре функции
- ✅ Передача `task_number` через `op_kwargs`
- ✅ Использование `**kwargs` для доступа к полному контексту
- ✅ Печать всех полученных значений

**Ожидаемый вывод для каждой Python задачи:**
```bash
=== Контекст выполнения задачи ===
task number is: 10
ts (execution timestamp): 2024-01-01T00:00:00+00:00
run_id: scheduled__2024-01-01T00:00:00+00:00
ds (logical date): 2024-01-01
data_interval_start: 2024-01-01T00:00:00+00:00
data_interval_end: 2024-01-02T00:00:00+00:00
=== Завершение ===
```

**Результат:** 
- ✅ Модифицирован PythonOperator для приема ts и run_id
- ✅ Реализована передача task_number через op_kwargs
- ✅ Добавлен вывод всех полученных значений
- ✅ Сохранена структура динамического создания задач
- ✅ Добавлена comprehensive документация

---

# Airflow. Работа с XCom для передачи данных между задачами

## Явная работа с XCom для передачи данных

**Цель:** Создать DAG с двумя PythonOperator, где первый кладет значение в XCom, а второй достает и печатает его.

**Выполненные действия:**

### 1. Создание новой ветки и файла

```bash
# Переход на основную ветку и обновление
git checkout master
git pull origin master

# Создание ветки для девятого задания
git checkout -b hw_9_novos

# Создание файла для DAG с XCom
cd dags/novos
New-Item -ItemType File -Name "xcom_dag.py"
```

### 2. Разработка DAG с использованием XCom

Создан файл `dags/novos/xcom_dag.py`:

```python
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.python import PythonOperator

# Аргументы по умолчанию
default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Документация DAG
dag_doc_md = """
# DAG с явным использованием XCom

**Цель:** Демонстрация передачи данных между задачами через XCom.

## Ключевые особенности:
- Первая задача кладет значение в XCom по ключу
- Вторая задача достает значение из XCom по ключу и task_id
- Настроена правильная последовательность выполнения
"""

with DAG(
    'hw_novos_9',  # Уникальный ID
    default_args=default_args,
    description='DAG with explicit XCom usage between tasks',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['novos', 'xcom'],
    doc_md=dag_doc_md,
) as dag:

    # Функция для первого оператора - кладет значение в XCom
    def push_xcom_value(ti, **kwargs):
        """
        Кладет значение в XCom по заданному ключу
        """
        value_to_push = "xcom test"
        xcom_key = "sample_xcom_key"
        
        # Кладем значение в XCom
        ti.xcom_push(
            key=xcom_key,
            value=value_to_push
        )
        
        print(f"✅ Значение '{value_to_push}' положено в XCom по ключу '{xcom_key}'")
        print(f"Execution date: {kwargs.get('ds')}")
        print(f"Task instance: {ti.task_id}")
        
        return f"Successfully pushed to XCom with key: {xcom_key}"

    # Функция для второго оператора - достает значение из XCom
    def pull_xcom_value(ti, **kwargs):
        """
        Достает значение из XCom по ключу и task_id отправителя
        """
        xcom_key = "sample_xcom_key"
        source_task_id = "push_xcom_task"  # ID задачи, которая положила значение
        
        # Достаем значение из XCom
        xcom_value = ti.xcom_pull(
            key=xcom_key,
            task_ids=source_task_id
        )
        
        print("=== Получение значения из XCom ===")
        print(f"Ключ: {xcom_key}")
        print(f"Задача-источник: {source_task_id}")
        print(f"Полученное значение: '{xcom_value}'")
        print(f"Тип значения: {type(xcom_value)}")
        
        # Проверка полученного значения
        if xcom_value == "xcom test":
            print("✅ Значение успешно получено и соответствует ожидаемому")
        else:
            print(f"❌ Получено неожиданное значение: {xcom_value}")
        
        return f"Successfully pulled from XCom: {xcom_value}"

    # Первый оператор - кладет значение в XCom
    push_task = PythonOperator(
        task_id='push_xcom_task',
        python_callable=push_xcom_value,
        doc_md=dedent("""
        ## Push XCom Task
        
        **Функция:** `push_xcom_value`
        
        **Действие:** Кладет значение в XCom
        - Ключ: `sample_xcom_key`
        - Значение: `xcom test`
        
        **Используемый метод:**
        ```python
        ti.xcom_push(key="sample_xcom_key", value="xcom test")
        ```
        
        **Ожидаемый результат:** Значение сохранено в XCom
        """)
    )

    # Второй оператор - достает значение из XCom
    pull_task = PythonOperator(
        task_id='pull_xcom_task',
        python_callable=pull_xcom_value,
        doc_md=dedent("""
        ## Pull XCom Task
        
        **Функция:** `pull_xcom_value`
        
        **Действие:** Достает значение из XCom
        - Ключ: `sample_xcom_key`
        - Задача-источник: `push_xcom_task`
        
        **Используемый метод:**
        ```python
        ti.xcom_pull(key="sample_xcom_key", task_ids="push_xcom_task")
        ```
        
        **Ожидаемый результат:** Значение `xcom test` получено из XCom
        """)
    )

    # Настройка последовательности: сначала push, потом pull
    push_task >> pull_task
```

### 3. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/xcom_dag.py

# Создание коммита
git commit -m "Add XCom DAG for explicit data transfer for HW9"

# Отправка на сервер
git push origin hw_9_novos
```

### 4. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_9_novos`
- **Target branch:** `master`
- **Title:** `HW 9 novos`

**Технические особенности реализации:**

- **Явный XCom push:** Использование `ti.xcom_push(key="sample_xcom_key", value="xcom test")`
- **Явный XCom pull:** Использование `ti.xcom_pull(key="sample_xcom_key", task_ids="push_xcom_task")`
- **Передача ti:** Функции принимают параметр `ti` (task instance) для работы с XCom
- **Правильная последовательность:** `push_task >> pull_task`

**Ключевые элементы:**
- ✅ Явное использование `ti.xcom_push()` с указанием key и value
- ✅ Явное использование `ti.xcom_pull()` с указанием key и task_ids
- ✅ Правильная передача параметра `ti` в функции
- ✅ Корректная последовательность выполнения задач
- ✅ Документация с примерами кода

**Ожидаемый вывод:**
```bash
# Задача push_xcom_task:
✅ Значение 'xcom test' положено в XCom по ключу 'sample_xcom_key'

# Задача pull_xcom_task:
=== Получение значения из XCom ===
Ключ: sample_xcom_key
Задача-источник: push_xcom_task
Полученное значение: 'xcom test'
Тип значения: <class 'str'>
✅ Значение успешно получено и соответствует ожидаемому
```

**Результат:** 
- ✅ Создан DAG с явной передачей данных через XCom
- ✅ Реализованы функции push и pull с использованием ti.xcom_push/pull
- ✅ Настроена правильная последовательность выполнения
- ✅ Добавлена comprehensive документация
- ✅ Соблюдены требования к именованию

---

# Airflow. Неявная работа с XCom через return значений

## Неявная передача данных через возвращаемые значения

**Цель:** Создать DAG с двумя PythonOperator, где первый возвращает значение (неявно идет в XCom), а второй достает его по ключу return_value.

**Выполненные действия:**

### 1. Создание новой ветки и файла

```bash
# Переход на основную ветку и обновление
git checkout master
git pull origin master

# Создание ветки для десятого задания
git checkout -b hw_10_novos

# Создание файла для DAG с неявным XCom
cd dags/novos
New-Item -ItemType File -Name "implicit_xcom_dag.py"
```

### 2. Разработка DAG с неявным XCom

Создан файл `dags/novos/implicit_xcom_dag.py`:

```python
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.python import PythonOperator

# Аргументы по умолчанию
default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Документация DAG
dag_doc_md = """
# DAG с неявным использованием XCom

**Цель:** Демонстрация неявной передачи данных через возвращаемые значения функций.

## Ключевые особенности:
- Первая задача возвращает значение (автоматически идет в XCom)
- Вторая задача достает значение по ключу `return_value`
- Настроена правильная последовательность выполнения
"""

with DAG(
    'hw_novos_10',  # Уникальный ID
    default_args=default_args,
    description='DAG with implicit XCom via return values',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['novos', 'xcom-implicit'],
    doc_md=dag_doc_md,
) as dag:

    # Функция для первого оператора - возвращает значение (неявный XCom)
    def return_string_function(**kwargs):
        """
        Возвращает строку, которая автоматически попадет в XCom
        с ключом 'return_value'
        """
        return_string = "Airflow tracks everything"
        print(f"🔄 Возвращаем строку: '{return_string}'")
        print("📦 Значение автоматически сохранится в XCom с ключом 'return_value'")
        return return_string

    # Функция для второго оператора - достает значение по ключу return_value
    def pull_implicit_xcom(ti, **kwargs):
        """
        Достает значение из неявного XCom по ключу return_value
        """
        source_task_id = "return_string_task"
        
        # Достаем значение из XCom по ключу return_value (неявный ключ)
        xcom_value = ti.xcom_pull(
            task_ids=source_task_id,
            key='return_value'  # Ключ по умолчанию для возвращаемых значений
        )
        
        print("=== Получение значения из неявного XCom ===")
        print(f"Задача-источник: {source_task_id}")
        print(f"Ключ XCom: return_value")
        print(f"Полученное значение: '{xcom_value}'")
        print(f"Тип значения: {type(xcom_value)}")
        
        # Проверка полученного значения
        expected_value = "Airflow tracks everything"
        if xcom_value == expected_value:
            print("✅ Значение успешно получено и соответствует ожидаемому")
            print("✅ Доказано: Airflow действительно tracks everything!")
        else:
            print(f"❌ Получено неожиданное значение: {xcom_value}")
            print(f"❌ Ожидалось: {expected_value}")
        
        return f"Successfully pulled implicit XCom: {xcom_value}"

    # Первый оператор - возвращает значение (неявный XCom)
    return_task = PythonOperator(
        task_id='return_string_task',
        python_callable=return_string_function,
        doc_md=dedent("""
        ## Return String Task
        
        **Функция:** `return_string_function`
        
        **Действие:** Возвращает строку "Airflow tracks everything"
        
        **Неявный XCom:** 
        - Значение автоматически сохраняется в XCom
        - Ключ: `return_value` (по умолчанию)
        - Не требует явного вызова `ti.xcom_push()`
        
        **Код:**
        ```python
        def return_string_function(**kwargs):
            return "Airflow tracks everything"
        ```
        """)
    )

    # Второй оператор - достает значение по ключу return_value
    pull_task = PythonOperator(
        task_id='pull_implicit_xcom_task',
        python_callable=pull_implicit_xcom,
        doc_md=dedent("""
        ## Pull Implicit XCom Task
        
        **Функция:** `pull_implicit_xcom`
        
        **Действие:** Достает значение из неявного XCom
        - Задача-источник: `return_string_task`
        - Ключ: `return_value` (ключ по умолчанию для возвращаемых значений)
        
        **Используемый метод:**
        ```python
        ti.xcom_pull(
            task_ids="return_string_task",
            key="return_value"
        )
        ```
        
        **Ожидаемый результат:** Значение "Airflow tracks everything" получено из XCom
        """)
    )

    # Настройка последовательности: сначала return, потом pull
    return_task >> pull_task
```

### 3. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/implicit_xcom_dag.py

# Создание коммита
git commit -m "Add implicit XCom DAG for return values for HW10"

# Отправка на сервер
git push origin hw_10_novos
```

### 4. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_10_novos`
- **Target branch:** `master`
- **Title:** `HW 10 novos`

**Технические особенности реализации:**

- **Неявный XCom:** Возврат значения через `return` (автоматически идет в XCom)
- **Ключ по умолчанию:** `return_value` для неявных XCom значений
- **Простая функция:** Не требует явной работы с `ti.xcom_push()`
- **Правильная последовательность:** `return_task >> pull_task`

**Ключевые элементы:**
- ✅ Неявное сохранение в XCom через `return "Airflow tracks everything"`
- ✅ Использование ключа `return_value` для получения значения
- ✅ Простая функция без явного XCom push
- ✅ Корректная последовательность выполнения задач
- ✅ Документация с объяснением механизма

**Ожидаемый вывод:**
```bash
# Задача return_string_task:
🔄 Возвращаем строку: 'Airflow tracks everything'
📦 Значение автоматически сохранится в XCom с ключом 'return_value'

# Задача pull_implicit_xcom_task:
=== Получение значения из неявного XCom ===
Задача-источник: return_string_task
Ключ XCom: return_value
Полученное значение: 'Airflow tracks everything'
Тип значения: <class 'str'>
✅ Значение успешно получено и соответствует ожидаемому
✅ Доказано: Airflow действительно tracks everything!
```

**Результат:** 
- ✅ Создан DAG с неявной передачей данных через return значений
- ✅ Реализовано автоматическое сохранение в XCom с ключом `return_value`
- ✅ Доказана работа механизма неявного XCom
- ✅ Настроена правильная последовательность выполнения
- ✅ Добавлена comprehensive документация

---

# Airflow. Подключение к PostgreSQL и работа с Connections

## Поиск пользователя с наибольшим количеством лайков через PostgreSQL подключение

**Цель:** Создать DAG с PythonOperator, который подключается к PostgreSQL через Connection и находит пользователя с максимальным количеством лайков.

**Выполненные действия:**

### 1. Создание новой ветки и файла

```bash
# Переход на основную ветку и обновление
git checkout master
git pull origin master

# Создание ветки для одиннадцатого задания
git checkout -b hw_11_novos

# Создание файла для DAG с PostgreSQL подключением
cd dags/novos
New-Item -ItemType File -Name "postgres_connection_dag.py"
```

### 2. Разработка DAG с PostgreSQL подключением с использованием BaseHook

Создан файл `dags/novos/postgres_connection_dag.py`:

```python
# Альтернативная версия с использованием BaseHook вместо PostgresHook
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.base import BaseHook
import psycopg2

default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'hw_novos_11_base_hook',
    default_args=default_args,
    description='DAG with BaseHook connection to PostgreSQL',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['novos', 'postgres', 'basehook'],
) as dag:

    def find_most_liking_user_base_hook(**kwargs):
        """
        Альтернативная реализация с использованием BaseHook
        """
        # Получаем информацию о подключении
        creds = BaseHook.get_connection("startml_feed")
        
        print(f"🔗 Подключаемся к: {creds.host}:{creds.port}/{creds.schema}")
        print(f"👤 Пользователь: {creds.login}")
        
        # Формируем строку подключения
        conn_string = f"""
        postgresql://{creds.login}:{creds.password}
        @{creds.host}:{creds.port}/{creds.schema}
        """
        
        # Убираем переносы строк для корректного подключения
        conn_string = conn_string.replace('\n', '').replace(' ', '')
        
        try:
            with psycopg2.connect(conn_string) as conn:
                with conn.cursor() as cursor:
                    sql_query = """
                    SELECT 
                        user_id,
                        COUNT(*) AS like_count
                    FROM 
                        feed_action
                    WHERE 
                        action = 'like'
                    GROUP BY 
                        user_id
                    ORDER BY 
                        like_count DESC
                    LIMIT 1
                    """
                    
                    cursor.execute(sql_query)
                    result = cursor.fetchone()
                    
                    if result:
                        user_id, like_count = result
                        print(f"✅ Результат: user_id={user_id}, likes={like_count}")
                        
                        return {
                            'user_id': user_id,
                            'count': like_count
                        }
                    else:
                        print("❌ Нет данных")
                        return {'user_id': None, 'count': 0}
                        
        except Exception as e:
            print(f"❌ Ошибка подключения: {e}")
            raise

    base_hook_task = PythonOperator(
        task_id='find_user_base_hook_task',
        python_callable=find_most_liking_user_base_hook,
        doc_md=dedent("""
        ## BaseHook Connection Task
        
        **Альтернативная реализация** с использованием `BaseHook` вместо `PostgresHook`
        
        **Особенности:**
        - Ручное формирование строки подключения
        - Прямая работа с psycopg2
        - Больше контроля над подключением
        """)
    )
```

### 3. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/postgres_connection_dag.py

# Создание коммита
git commit -m "Add PostgreSQL connection DAG for finding most liking user for HW11"

# Отправка на сервер
git push origin hw_11_novos
```

### 4. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_11_novos`
- **Target branch:** `master`
- **Title:** `HW 11 novos`

**Технические особенности реализации:**

- **PostgresHook:** Использование `PostgresHook(postgres_conn_id="startml_feed")` для удобного подключения
- **SQL запрос:** Поиск пользователя с максимальным количеством лайков в таблице `feed_action`
- **Фильтрация:** `WHERE action = 'like'` для учета только лайков
- **Группировка:** `GROUP BY user_id` и `ORDER BY like_count DESC`
- **Возврат результата:** Словарь `{'user_id': user_id, 'count': count}`

**Ключевые элементы:**
- ✅ Подключение через `conn_id="startml_feed"`
- ✅ Использование `PostgresHook` для удобства
- ✅ Корректный SQL запрос для поиска максимальных лайков
- ✅ Обработка результатов и возврат словаря
- ✅ Автоматическое сохранение в XCom через return

**Ожидаемый вывод:**
```bash
🔗 Подключаемся к PostgreSQL через соединение 'startml_feed'
📊 Выполняем SQL запрос:
```
```sql
SELECT 
    user_id,
    COUNT(*) AS like_count
FROM 
    feed_action
WHERE 
    action = 'like'
GROUP BY 
    user_id
ORDER BY 
    like_count DESC
LIMIT 1
```
```
✅ Найден пользователь с максимальными лайками:
   user_id: 123
   Количество лайков: 456
```

**Ожидаемый результат в XCom:**
```python
{'user_id': 123, 'count': 456}
```

**Результат:** 
- ✅ Создан DAG с подключением к PostgreSQL через Connection
- ✅ Реализован поиск пользователя с максимальным количеством лайков
- ✅ Использован правильный SQL запрос с фильтрацией и группировкой
- ✅ Результат возвращается в виде словаря и сохраняется в XCom
- ✅ Добавлена comprehensive документация

---

# Airflow. Работа с Variables

## Чтение значения Variable в PythonOperator

**Цель:** Создать DAG с PythonOperator, который читает и печатает значение Variable с названием "is_startml".

**Выполненные действия:**

### 1. Создание новой ветки и файла

```bash
# Переход на основную ветку и обновление
git checkout master
git pull origin master

# Создание ветки для двенадцатого задания
git checkout -b hw_12_novos

# Создание файла для DAG с Variable
cd dags/novos
New-Item -ItemType File -Name "variables_dag.py"
```

### 2. Разработка DAG с чтением Variable

Создан файл `dags/novos/variables_dag.py`:

```python
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

# Аргументы по умолчанию
default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Документация DAG
dag_doc_md = """
# DAG с чтением Variable

**Цель:** Прочитать и напечатать значение Variable с названием "is_startml".

## Ключевые особенности:
- Использование `Variable.get()` для чтения значений
- Обработка случая, когда переменная не найдена
- Печать значения и его типа
"""

with DAG(
    'hw_novos_12',  # Уникальный ID
    default_args=default_args,
    description='DAG to read and print Variable value',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['novos', 'variables'],
    doc_md=dag_doc_md,
) as dag:

    def print_variable_value(**kwargs):
        """
        Читает и печатает значение Variable 'is_startml'
        """
        variable_name = "is_startml"
        
        print(f"📖 Читаем значение Variable: {variable_name}")
        
        try:
            # Чтение значения Variable
            variable_value = Variable.get(variable_name)
            
            print(f"✅ Значение Variable '{variable_name}': '{variable_value}'")
            print(f"📊 Тип значения: {type(variable_value).__name__}")
            
            # Дополнительная информация о переменной
            print(f"🔍 Длина значения: {len(variable_value)} символов")
            
            # Проверка на булево значение
            if variable_value.lower() in ['true', 'false']:
                print(f"💡 Значение можно интерпретировать как boolean: {variable_value.lower() == 'true'}")
                
            return f"Variable {variable_name} = {variable_value}"
            
        except KeyError:
            print(f"❌ Variable '{variable_name}' не найдена!")
            print("ℹ️  Убедитесь, что переменная создана в Airflow UI")
            return f"Variable {variable_name} not found"
            
        except Exception as e:
            print(f"❌ Ошибка при чтении Variable: {e}")
            raise

    # PythonOperator для чтения Variable
    variable_task = PythonOperator(
        task_id='print_variable_task',
        python_callable=print_variable_value,
        doc_md=dedent("""
        ## Print Variable Task
        
        **Функция:** `print_variable_value`
        
        **Действие:** 
        - Читает значение Variable с названием "is_startml"
        - Печатает значение и его тип
        - Обрабатывает ошибки, если переменная не найдена
        
        **Используемый метод:**
        ```python
        Variable.get("is_startml")
        ```
        
        **Ожидаемый результат:** 
        - Печать значения переменной is_startml
        - Сохранение значения в XCom через return
        """)
    )
```

### 3. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/variables_dag.py

# Создание коммита
git commit -m "Add Variables DAG for reading is_startml variable for HW12"

# Отправка на сервер
git push origin hw_12_novos
```

### 4. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_12_novos`
- **Target branch:** `master`
- **Title:** `HW 12 novos`

**Технические особенности реализации:**

- **Чтение Variable:** Использование `Variable.get("is_startml")`
- **Обработка ошибок:** try-catch для случая, когда переменная не найдена
- **Информативный вывод:** Печать значения, типа и дополнительной информации
- **Возврат значения:** Автоматическое сохранение в XCom через return

**Ключевые элементы:**
- ✅ Использование `Variable.get()` для чтения значений
- ✅ Обработка `KeyError` при отсутствии переменной
- ✅ Печать значения и его типа
- ✅ Информативные сообщения для дебаггинга
- ✅ Автоматическое сохранение в XCom

**Ожидаемый вывод:**
```bash
📖 Читаем значение Variable: is_startml
✅ Значение Variable 'is_startml': 'True'
📊 Тип значения: str
🔍 Длина значения: 4 символов
💡 Значение можно интерпретировать как boolean: True
```

**Ожидаемый результат в XCom:**
```python
"Variable is_startml = True"
```

**Результат:** 
- ✅ Создан DAG для чтения Variable "is_startml"
- ✅ Реализовано чтение значения через `Variable.get()`
- ✅ Добавлена обработка ошибок для случая отсутствия переменной
- ✅ Реализован информативный вывод с типом значения
- ✅ Значение автоматически сохраняется в XCom

---

# Airflow. BranchPythonOperator с Variables для условного ветвления

## Условное ветвление на основе значения Variable

**Цель:** Создать DAG с BranchPythonOperator, который выбирает следующую задачу на основе значения Variable "is_startml".

**Выполненные действия:**

### 1. Создание новой ветки и файла

```bash
# Переход на основную ветку и обновление
git checkout master
git pull origin master

# Создание ветки для тринадцатого задания
git checkout -b hw_13_novos

# Создание файла для DAG с ветвлением
cd dags/novos
New-Item -ItemType File -Name "branching_dag.py"
```

### 2. Разработка DAG с BranchPythonOperator

Создан файл `dags/novos/branching_dag.py`:

```python
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.models import Variable

# Аргументы по умолчанию
default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Документация DAG
dag_doc_md = """
# DAG с BranchPythonOperator и условным ветвлением

**Цель:** Реализовать условное ветвление на основе значения Variable "is_startml".

## Логика ветвления:
- Если `Variable.get("is_startml") == "True"` → task_id="startml_desc"
- Иначе → task_id="not_startml_desc"

## Структура DAG:
1. DummyOperator (начало)
2. BranchPythonOperator (ветвление)
3. PythonOperator (startml_desc)
4. PythonOperator (not_startml_desc) 
5. DummyOperator (конец)
"""

with DAG(
    'hw_novos_13',  # Уникальный ID
    default_args=default_args,
    description='DAG with BranchPythonOperator based on Variable value',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['novos', 'branching', 'variables'],
    doc_md=dag_doc_md,
) as dag:

    # Начальная задача (для красивого графа)
    start_task = DummyOperator(
        task_id='start',
        doc_md="Начальная точка DAG"
    )

    # Функция для ветвления
    def choose_branch(**kwargs):
        """
        Определяет направление ветвления на основе Variable is_startml
        Возвращает task_id следующей задачи
        """
        try:
            # Читаем значение Variable (возвращает строку!)
            is_startml_value = Variable.get("is_startml")
            
            print(f"📊 Значение Variable 'is_startml': '{is_startml_value}'")
            print(f"📊 Тип значения: {type(is_startml_value).__name__}")
            
            # Проверяем условие (сравниваем как строки)
            if is_startml_value == "True":
                print("✅ Переходим в ветку: startml_desc")
                return "startml_desc"
            else:
                print("✅ Переходим в ветку: not_startml_desc")
                return "not_startml_desc"
                
        except KeyError:
            print("❌ Variable 'is_startml' не найдена! Переходим в not_startml_desc")
            return "not_startml_desc"
        except Exception as e:
            print(f"❌ Ошибка при чтении Variable: {e}")
            return "not_startml_desc"

    # BranchPythonOperator для условного ветвления
    branching_task = BranchPythonOperator(
        task_id='branching',
        python_callable=choose_branch,
        doc_md=dedent("""
        ## Branching Task
        
        **Функция:** `choose_branch`
        
        **Логика ветвления:**
        - Читает `Variable.get("is_startml")`
        - Если значение == "True" → возвращает "startml_desc"
        - Иначе → возвращает "not_startml_desc"
        
        **Особенности:**
        - Variable возвращает строку, поэтому сравниваем с "True"
        - Обрабатывает случай отсутствия переменной
        """)
    )

    # Задача для ветки True
    def startml_description(**kwargs):
        """Выводит описание StartML курса"""
        message = "StartML is a starter course for ambitious people"
        print(f"🎯 {message}")
        return message

    startml_task = PythonOperator(
        task_id='startml_desc',
        python_callable=startml_description,
        doc_md=dedent("""
        ## StartML Description Task
        
        **Выполняется если:** `is_startml == "True"`
        
        **Выводит:** "StartML is a starter course for ambitious people"
        """)
    )

    # Задача для ветки False
    def not_startml_description(**kwargs):
        """Выводит сообщение, что это не StartML курс"""
        message = "Not a startML course, sorry"
        print(f"ℹ️  {message}")
        return message

    not_startml_task = PythonOperator(
        task_id='not_startml_desc',
        python_callable=not_startml_description,
        doc_md=dedent("""
        ## Not StartML Description Task
        
        **Выполняется если:** `is_startml != "True"`
        
        **Выводит:** "Not a startML course, sorry"
        """)
    )

    # Конечная задача (для красивого графа)
    end_task = DummyOperator(
        task_id='end',
        doc_md="Конечная точка DAG",
        trigger_rule='none_failed'  # Выполняется независимо от того, какая ветка сработала
    )

    # Настройка зависимостей
    start_task >> branching_task
    branching_task >> [startml_task, not_startml_task]
    [startml_task, not_startml_task] >> end_task
```

### 3. Работа с системой контроля версий

```bash
# Переходим в директорию airflow
cd ../..

# Добавление файла в отслеживание
git add dags/novos/branching_dag.py

# Создание коммита
git commit -m "Add Branching DAG with Variable-based conditional logic for HW13"

# Отправка на сервер
git push origin hw_13_novos
```

### 4. Создание Merge Request

Создан Merge Request с параметрами:
- **Source branch:** `hw_13_novos`
- **Target branch:** `master`
- **Title:** `HW 13 novos`

**Технические особенности реализации:**

- **BranchPythonOperator:** Для условного ветвления на основе функции
- **Variable.get():** Чтение значения переменной "is_startml" (возвращает строку)
- **Сравнение строк:** `value == "True"` (Variable возвращает строку!)
- **DummyOperator:** Для создания красивой структуры графа
- **Trigger rule:** `none_failed` для конечной задачи

**Ключевые элементы:**
- ✅ Использование BranchPythonOperator для ветвления
- ✅ Чтение Variable и сравнение как строки
- ✅ Две альтернативные ветки с разными сообщениями
- ✅ Обработка ошибок при чтении переменной
- ✅ Красивая структура графа с DummyOperator

**Ожидаемый вывод при is_startml = "True":**
```bash
📊 Значение Variable 'is_startml': 'True'
📊 Тип значения: str
✅ Переходим в ветку: startml_desc
🎯 StartML is a starter course for ambitious people
```

**Ожидаемый вывод при is_startml = "False":**
```bash
📊 Значение Variable 'is_startml': 'False'
📊 Тип значения: str
✅ Переходим в ветку: not_startml_desc
ℹ️  Not a startML course, sorry
```

**Структура графа:**
```
                  ↱    startml_desc   ↴
start → branching                      end
                  ↳ not_startml_desc ⤴
```

**Результат:** 
- ✅ Создан DAG с условным ветвлением на основе Variable
- ✅ Реализован BranchPythonOperator с логикой выбора ветки
- ✅ Добавлены две альтернативные задачи с разными сообщениями
- ✅ Реализована обработка ошибок чтения переменной
- ✅ Создана красивая структура графа с DummyOperator