<a href="https://colab.research.google.com/github/sergeymasl/my_lifehacks/blob/main/Airflow_base_of_knowledge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Cron редактор https://crontab.guru
Книга [Data Pipelines with Apache Airflow](https://disk.yandex.ru/i/oIm9JcyOxEUkNA)

[GitHub для книги](https://github.com/BasPH/data-pipelines-with-apache-airflow)

#### **Настройка и установка**

1. Установка

```python
# Установка Airflow
pip install apache-airflow
# apache-airflow==2.1.4
```

Для установки коннекторов к сторонним сервисам используются 
```python
pip install 'apache-airflow[<PACKAGE>]'
```

Пример установки
```python
pip install 'apache-airflow[google,amazon,postgres]'
```
Также можно установить все пакеты сразу
```python
pip install 'apache-airflow[all]'
```

Начиная со 2 версии, все сторонние модули были вынесены в так называемые [провайдеры](https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html). Теперь чтобы установить сторонний модуль необходимо исполнить следующую строку (**у меня не сработало**)

```python
# Установка HTTP провайдера и телеграм провайдера
pip install apache-airflow-providers-http
pip install apache-airflow-providers-telegram

# Библиотека для работы с telegram
pip install python-telegram-bot
```

2. Инициализация

На данном этапе мы создаем базу метаданных, от версии к версии команда чутка меняется, это для >2.0

```python
# Инициализация базы данных
airflow db init
```
При инициализации создается папка с логами, конфиг файл и сама БД. Сама БД может быть любой, под капотом используется ORM, и все зависит лишь от вашего Executor и адреса БД. Которые задаются в глобальных переменных или данных файла конфигурации.

Затем можно создать пользователя. Первый пользователь должен иметь роль `Admin`

```python
# Создадим пользователя Airflow
airflow users create \
          --username admin \
          --firstname admin \
          --lastname admin \
          --role Admin \
          --email admin@example.org \
          -p 12345
```

3. Запуск Веб сервера и шедулера

После подготовки, можно запустить вебсервис с мордой на нужном вам порту и шедулер, что это такое мы уже обсуждали ранее. Для них будут созданы отдельные процессы, номера которых будут указаны в файлах рядом с фалом конфигурации. Расширение файлов .pid
```python
airflow webserver -p 18273 -D
airflow scheduler -D
```

#### **DAG**

Ссылка на документацию [dag](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html?highlight=dag#module-airflow.models.dag)

>Примечание `start_date` работает по [**UTC**](https://vremya.org/)

А так же свой первый запуск **шедулер** начинает работу по истечению строка указанного в `start_date` (то есть как только наступит следующая минита/час/день после **start_date**

>Примечание в `schedule_interval` (`schedule` с версии 2.4) можно передать как **timedelta** так и варажение [cron](https://crontab.guru/)
***

#### **Operators**

**Operators** это задачки которые выполняются в **DAG**

[link to a lot of operators](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html)

Все операторы строятся на [**BaseOperators**](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html?highlight=baseoperator#airflow.models.baseoperator.BaseOperator)

`classairflow.models.baseoperator.BaseOperator(task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=conf.getboolean('email', 'default_email_on_retry', fallback=True), email_on_failure=conf.getboolean('email', 'default_email_on_failure', fallback=True), retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, task_concurrency=None, max_active_tis_per_dag=None, executor_config=None, do_xcom_push=True, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, **kwargs)`

- `task_id (str)`  - название задачи
- `owner (str)`  - владелец задачи
- `email (str | Iterable[str] | None) ` - email на который будут приходить оповещения по слудкющим параметрам. Может быть одним адресом или списком адресов
- `email_on_retry (bool) ` - отправка оповещения при повторном выполнении задачи
- `email_on_failure (bool) ` - отправка оповещения при провале задачи
- `retries (int | None)` - количество повторных попыток прежде чем задача будет провалена
- `retry_delay (timedelta | float)` - время между повторными попытками, the default is timedelta(seconds=300)
- `retry_exponential_backoff (bool) ` - каждый раз увеличивает время ожидания между перезапусками с помощью *Exponential Backoff*
- `max_retry_delay (timedelta | float | None)` - максимальное время между попытками при использовании **retry_exponential_backoff**
- `start_date (datetime | None)` - дата начала для задачи
- `end_date (datetime | None)` - если указана эта дату, планировщик прекратит выполнение задачи в эту дату
- `depends_on_past (bool)` - если установлено значение True задача будет выполнена только если предыдущая задача была успешна или пропущена
- `wait_for_downstream (bool)` - 
- `dag (DAG | None)` - даг в котором должна быть задача
- `priority_weight (int)` - вес задачи при определении приоритета во время создания резервной копии. Для более важных задач необходимо устанавливать наибольшие значения 
- `weight_rule (str)` - { `downstream` (defoult)| `upstream` | `absolute` } === дополнить ===
- `queue (str)` - === не знаю ===. речь про очереди, пока не встречал
- `pool (str | None)` - название **пула** в которых должна выполняться эта задача. С помощью пула можно как ограничить так и увеличить величину потоков для выполнения задачи
- `pool_slots (int)` - количество слотов пула которое будет использовать эта задача (должна быть >= 1)
- `sla (timedelta | None)`
- `execution_timeout (timedelta | None)`
- `on_failure_callback (TaskStateChangeCallback | None)` - вызывает указанную **функцию**, при падении
- `on_execute_callback (TaskStateChangeCallback | None)` - вызывает указанную **функцию**, перед непосредственным выполением задачи
- `on_retry_callback (TaskStateChangeCallback | None)` - вызывает указанную **функцию**, при повторной попытке выполнения задачи
- `on_success_callback (TaskStateChangeCallback | None)` - вызывает указанную **функцию**, при успешном выполнении задачи
- `pre_execute (TaskPreExecuteHook | None)`
- `post_execute (TaskPostExecuteHook | None)`
- `trigger_rule (str)` - условия при которых должна выполниться эта задача (описано ниже), если не указана, то будет использована та которая указана в ДАГ 
- `resources (dict[str, Any] | None)`
- `run_as_user (str | None)`
- `max_active_tis_per_dag (int | None)`
- `executor_config (dict | None)`
- `do_xcom_push (bool)`
- `task_group (TaskGroup | None)`
- `doc (str | None)`
- `doc_md (str | None)`
- `doc_rst (str | None)`
- `doc_json (str | None)`
- `doc_yaml (str | None)`

---



#### **Triger Rules**

**Triger rules** - условие при котором начинаются выполняться tasks

[link on trigger_rules](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#trigger-rules)

`trigger_rule` может быть указан в параметрах `dag` и тогда он будет применен ко всем операторам (если в операторах не будет прописано другое)

Также параметр `trigger_rule` для каждого оператора

Виды `trigger_rule`:
- `all_success` (default) - все `tasks_parents` были **выполены успешно**
- `all_failed` - все `tasks_parents` были **провалены**
- `all_done` - запускает задачу после `upstream tasks` (`parents') были выполнены **вне зависимости от статуса** успешно, провалено или пропущено 
- `all_skipped` - все задачи были **пропущены**
- `one_failed` - срабатывает **как только одна из задач** была **провалена**
- `one_success` - срабатывает **как только одна из задач** была **успешна**
- `none_failed` - срабатывает когда задачи `parents` были **успешны** или **пропущены**
- `none_failed_min_one_success` - выполняется когда **ни одна из предыдущих** задач **не была провалена**, но **хотя бы одна** была **успешна**
- `none_skipped` - срабатывает если не было **пропусков** предыдущих задач
- `always` - никаких зависимостей задача может выполниться в любой момент

#### **Connections**

Connections представляют собой подключения к разным источникам данных, задаются они в веб интерфейсе, с указанием `conn_id`. Это `conn_id` в последствии можно вызвывать в тасках

При необходимости к Connections можно получиться как к обычному словарю.

[link on connections](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/hooks/base/index.html)

```python
# Пример доступа к переменной connection
from airflow.hooks.base_hook import BaseHook
host = BaseHook.get_connection("postgres_default").host
pass = BaseHook.get_connection("postgres_default").password
```

#### **Variables**

Глобальные переменные которые также могут задаваться в веб интерфейсе.

Доступ к переменным осуществляется через `key`.
Глобальные переменные можно:
- получать - `get`
- записывать - `set`
- обновлять - `update`
- удалять - `delete`

[link on variables](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/variable/index.html)

```python
# Пример доступа к глобальной переменной
from airflow.models import Variable
foo = Variable.get(key = "key")
# присер записи глобальной переменной
a_dict = {'login' : con}
Variable.set(key = 'test_key', value = a_dict, serialize_json = True)
```

#### **Sensors**

Сенсоры - это операторы которые выступает как **указатели наступило ли событие или нет** само действие необходимо прописывать в следующей задаче

[link to base_sensors](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/base/index.html)

[link to a lot of sensors](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/index.html)

Рассмотрим BaseSensorOperator

`airflow.sensors.base.BaseSensorOperator(*, poke_interval=60, timeout=conf.getfloat('sensors', 'default_timeout'), 
soft_fail=False, mode='poke', exponential_backoff=False, **kwargs)`

- `poke_interval` (float) - интервал через который будет перезапускаться задача для проверки

- `timeout` (float) - время через которое истечет время ожидания и задача будет fail

- `soft_fail` (bool) - пропустить задачу при сбое (я не знаю что это значит)

- `mode` {'poke', 'reschedule'} - default is 'poke' - 
> `poke` - сенсор занимает pool на все время до наступления **события**, сенсор переходит в спящий режим между **poke_interval**. Этот режим стоит использовать если работа сенсора предполагается небольшой или интервал проверки нужен небольшойю.

 > `reschedule` -  Сенсор занимает слот только пока работает потом умирает и тд. Необходимо использовать при большом **poke_interval** (от 1 минуты, иначе будет большая нагрузка на scheduler)

- `exponential_backoff` (bool) - каждый раз увеличивает время ожидания между перезапусками с помощью *Exponential Backoff*


```python
# пример python сенсора оператора
partner_b = PythonSensor(
    task_id='task',
    poke_interval=120, # Через какое время перезапускаться
    timeout=10,# Время до принудительного падения
    mode="reschedule", # Режим перезапуска
    python_callable=func, # функция которая должна вернуть True для выполнения сенсора
    soft_fail=True # Пропустить, скипнуть, задачу если она упадет
)
```

`HttpSensor` - делает запрос по какому либо адресу и при отзыве запускает функцию указанную в параметре `response_check`, в котором нужно проверить полученный ответ и вернуть `True`
**response представляет собой класс [requests.response](https://docs-python.ru/packages/modul-requests-python/obekt-otvet-servera-response/)

#### **Автогенерация DAG и автогенерация Task**

> **Автогенерация DAG**

Для автогенерации DAGs необходимо написать функцию для создания dag и затем в цикле записыать переменные с созданным дагом с использованием `globals()`

```python
# функция по созданию DAG
def create_dag(dag_id, start_date=airflow.utils.dates.days_ago(1), default_args = None, schedule_interval = "@daily"):
  
  # создаем DAG
  dag = DAG(dag_id = dag_id, description = f"This is a auto generated DAG",
  default_args=default_args, start_date=start_date, 
  schedule_interval=schedule_interval)
  
  with dag:
    # task для дага
    t1 = DummyOperator(task_id = "task_0", dag = dag)
  
  return dag

# генерация
for iDag in range(5):
  globals()[f"dag_{iDag}"] = create_dag(dag_id = f"dag_{iDag}")

```

> **Автогенерация Task**

Для автогенерации task в необходимом даге создается список и в него добавляются все сгенерированные таски
```python
dag = DAG(dag_id = dag_id, default_args=default_args, start_date=start_date, schedule_interval=schedule_interval)
with dag:
  # генерация task
  for iTask in range(10):
    # сама задача
    task_list.append(DummyOperator(task_id = f"task_{iTask}", dag = dag))
    # выстараивание очередности
    if iTask > 0:
      task_list[iTask - 1] >> task_list[iTask]
```

#### **Branch operator**

#### **context**

#### ShortCircuitOperator

Текст про shortCircuitOperator