### Настройка Airflow

Для начала вам необходимо выполнить ряд команд чтобы настроить окружение для дальнейшей работы, это позволит первое время не заниматься настройкой среды исполнения, а сразу начать писать код и работать с Airflow.

In [None]:
# Установка Airflow
!pip install apache-airflow==2.1.4

# Инициализация базы данных
!airflow db init

In [None]:
# Создадим необходимые папки
!mkdir /root/airflow/dags
!mkdir /root/airflow/data
!touch /root/airflow/dags/dag.py

In [None]:
# Включим веб-сервер
!airflow webserver -p 18273 -D

In [None]:
# Создадим пользователя Airflow
!airflow users create \
          --username admin \
          --firstname admin \
          --lastname admin \
          --role Admin \
          --email admin@example.org \
          -p 12345

Поместите в dag.py следующий код.

```python
from airflow import DAG
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('dag',schedule_interval=timedelta(days=1), start_date=days_ago(1))
t1 = DummyOperator(task_id='task_1', dag=dag)
t2 = DummyOperator(task_id='task_2',dag=dag)
t3 = DummyOperator(task_id='task_3',dag=dag)
t4 = DummyOperator(task_id='task_4',dag=dag)
t5 = DummyOperator(task_id='task_5',dag=dag)
t6 = DummyOperator(task_id='task_6',dag=dag)
t7 = DummyOperator(task_id='task_7',dag=dag)

[t1, t2]>>t5
t3>>t6
[t5,t6] >>  t7
t4
```

In [None]:
# Запуск шедулера
!airflow scheduler -D

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/


In [None]:
# Последующие команды не имеют отношения к Airflow
# Они нужни только для корректной работы веб морды
# в среде Google Colab

!pip install pyngrok
!ngrok authtoken <2YiDMwQMH92zZdhbl3vDgtLjuw9_33CkEr973pN83ST34YyKk> # найти его можно https://dashboard.ngrok.com/get-started/setup

# Эта команда просто отображет веб морду на другой адрес
# Его вы можете найти https://dashboard.ngrok.com/cloud-edge/status
# При каждом отключении ссылка будет меняться

!nohup ngrok http 18273 > /dev/null &

/bin/bash: -c: line 1: syntax error near unexpected token `newline'
/bin/bash: -c: line 1: `ngrok authtoken <2YiDMwQMH92zZdhbl3vDgtLjuw9_33CkEr973pN83ST34YyKk> # найти его можно https://dashboard.ngrok.com/get-started/setup'
nohup: redirecting stderr to stdout


In [None]:
import os
from pyngrok import ngrok

#Задание переменной auth_token для аутентификации в сервисе ngrok.
auth_token = "2YiDMwQMH92zZdhbl3vDgtLjuw9_33CkEr973pN83ST34YyKk"
# Since we can't access Colab notebooks IP directly we'll use
# ngrok to create a public URL for the server via a tunnel

# Authenticate ngrok
# https://dashboard.ngrok.com/signup
# Then go to the "Your Authtoken" tab in the sidebar and copy the API key

#Аутентификация в сервисе ngrok с помощью auth_token
os.system(f"ngrok authtoken {auth_token}")

#Запуск ngrok, который создаст публичный URL для сервера через туннель
#для доступа к веб-интерфейсу Airflow из любого места.
#addr="18273" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP
public_url = ngrok.connect(addr="18273", proto="http")

#Вывод публичного URL для доступа к веб-интерфейсу Airflow
print("Адрес Airflow GUI:", public_url)



Адрес Airflow GUI: NgrokTunnel: "https://5ae5-34-139-31-203.ngrok-free.app" -> "http://localhost:18273"


In [None]:
!pkill -f ngrok #поиск и завершение процесса, который запущен с помощью команды ngrok
# отключение публичного URL, созданного с помощью ngrok для веб-сервера Airflow
ngrok.disconnect(public_url=public_url)

После запуска команды выше, перейдите по адресу в ngrok и подождите  пока появится DAG с именем dag

### Задача на разработку

Вы реализовали ETL скрипт который выгружает данные из сторонних источников. Теперь я предлагаю вам взять небольшую его часть и переписать с помощью Airflow. Использовать только 1 дату 2021-01-01 можно прописать в функции напрямую, захардкодить.

Вам необходимо обернуть ваш код в СustomOperator(),который реализует следующую логику


* Скачайте валюту за 2021-01-01 и положите в CSV файл на диске (использовать PythonOperator чтобы скачать данные, можно использовать pandas)
* Скачайте логи финансовых транзакций за 2021-01-01 и положите в CSV файл на диске (использовать PythonOperator чтобы скачать данные, можно использовать pandas)
* Объединить данные по дате и сложить в таблицу в SQLite


Даг нужно написать в файл /root/airflow/dags/dag.py. Проверку можно сделать в веб интерфейсе. Прежде чем даг появится, может пройти ~ 2-3 минут.

In [None]:
# Решение на разработку нужно оставить в этой ячейке. Прям  в файле с дагом реализовать CustomOperator() и использовать в даге
from airflow import DAG
from datetime import timedelta, datetime
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator
from custom_operators import DownloadCurrencyOperator, DownloadTransactionLogsOperator, MergeDataOperator, LoadToSQLiteOperator

# dag = DAG('dag', schedule_interval=timedelta(days=1), start_date=days_ago(1))
# t1 = DummyOperator(task_id='task_1', dag=dag)
# t2 = DummyOperator(task_id='task_2', dag=dag)
# t3 = DummyOperator(task_id='task_3', dag=dag)
# t4 = DummyOperator(task_id='task_4', dag=dag)
# t5 = DummyOperator(task_id='task_5', dag=dag)
# t6 = DummyOperator(task_id='task_6', dag=dag)
# t7 = DummyOperator(task_id='task_7', dag=dag)

# [t1, t2] >> t5
# t3 >> t6
# [t5, t6] >> t7
# t4

# Задаем аргументы для DAG
default_args = {
    'owner': 'candidate_for_position',
    'start_date': datetime(2021, 1, 1),
    'retries': 1,
}

# Инициализируем DAG
dag = DAG(
    'get_transactions_dag',
    default_args=default_args,  # Передаем аргументы
    description='ETL DAG with CustomOperators',
    # schedule_interval="0 0 * * *", # Настройка интервала
    schedule_interval=None,  # Пока установлен None чтобы он не запускался 900+ раз
)

# Определяем аргументы, которые нужно передать в операторы
date = '2021-01-01'
currency_url = f"https://raw.githubusercontent.com/datanlnja/airflow_course/main/excangerate/{date}.csv"
transaction_logs_url = f"https://raw.githubusercontent.com/datanlnja/airflow_course/main/data/{date}.csv"

# Создаем экземпляры CustomOperator для каждой операции с передачей аргументов
download_currency_task = DownloadCurrencyOperator(
    task_id='download_currency',
    date=date,
    currency_url=currency_url,
    dag=dag,
)

download_transaction_logs_task = DownloadTransactionLogsOperator(
    task_id='download_transaction_logs',
    date=date,
    transaction_logs_url=transaction_logs_url,
    dag=dag,
)

merge_data_task = MergeDataOperator(
    task_id='merge_data',
    date=date,
    dag=dag,
)

load_to_sqlite_task = LoadToSQLiteOperator(
    task_id='load_to_sqlite',
    date=date,
    dag=dag,
)

# Прописываем порядок выполнения задач
download_currency_task >> merge_data_task
download_transaction_logs_task >> merge_data_task
merge_data_task >> load_to_sqlite_task


In [None]:
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
import pandas as pd
import sqlite3

# Функция для загрузки валюты в CSV


def download_currency_to_csv(date, currency_url, **kwargs):
    """
    Загружает данные о валюте из CSV по указанной дате и сохраняет в файл.

    :param date: Дата в формате 'гггг-мм-дд'.
    :param currency_url: URL для загрузки данных о валюте.
    """
    try:
        currency_data = pd.read_csv(currency_url)
        currency_data.to_csv(f"./data/currency_{date}.csv", index=False)
    except Exception as e:
        print(f"Ошибка при загрузке валюты в CSV: {e}")

# Функция для загрузки логов транзакций в CSV


def download_transaction_logs_to_csv(date, transaction_logs_url, **kwargs):
    """
    Загружает логи транзакций из CSV по указанной дате и сохраняет в файл.

    :param date: Дата в формате 'гггг-мм-дд'.
    :param transaction_logs_url: URL для загрузки логов транзакций.
    """
    try:
        transaction_logs_data = pd.read_csv(transaction_logs_url)
        transaction_logs_data.to_csv(
            f"./data/transaction_logs_{date}.csv", index=False)
    except Exception as e:
        print(f"Ошибка при загрузке логов транзакций в CSV: {e}")
# Функция для объединения данных


def merge_data(date, **kwargs):
    """
    Объединяет данные о валюте и логи транзакций по указанной дате и сохраняет в файл.

    :param date: Дата в формате 'гггг-мм-дд'.
    """
    try:
        currency_data = pd.read_csv(f"./data/currency_{date}.csv")
        transaction_logs_data = pd.read_csv(
            f"./data/transaction_logs_{date}.csv")
        merged_data = pd.merge(currency_data, transaction_logs_data, on='date')
        merged_data.to_csv(f"./data/merged_data_{date}.csv", index=False)
    except Exception as e:
        print(f"Ошибка при объединении данных: {e}")

# Функция для загрузки в SQLite


def load_to_sqlite(date, **kwargs):
    """
    Загружает объединенные данные в базу данных SQLite по указанной дате.

    :param date: Дата в формате 'гггг-мм-дд'.
    """
    try:
        merged_data = pd.read_csv(f"./data/merged_data_{date}.csv")
        conn = sqlite3.connect("./data/transactions_db.db")
        merged_data.to_sql('transactions', conn,
                           index=False, if_exists='replace')
        print("Данные успешно загружены в SQLite.")
    except Exception as e:
        print(f"Ошибка при загрузке данных в SQLite: {e}")
    finally:
        conn.close()

# CustomOperator для выполнения загрузки валюты


class DownloadCurrencyOperator(BaseOperator):
    """
    CustomOperator для выполнения загрузки данных о валюте.

    :param date: Дата в формате 'гггг-мм-дд'.
    :param currency_url: URL для загрузки данных о валюте.
    """
    @apply_defaults
    def __init__(self, date, currency_url, *args, **kwargs):
        super(DownloadCurrencyOperator, self).__init__(*args, **kwargs)
        self.date = date
        self.currency_url = currency_url

    def execute(self, context):
        download_currency_to_csv(self.date, self.currency_url)

# CustomOperator для выполнения загрузки логов транзакций


class DownloadTransactionLogsOperator(BaseOperator):
    """
    CustomOperator для выполнения загрузки логов транзакций.

    :param date: Дата в формате 'гггг-мм-дд'.
    :param transaction_logs_url: URL для загрузки логов транзакций.
    """
    @apply_defaults
    def __init__(self, date, transaction_logs_url, *args, **kwargs):
        super(DownloadTransactionLogsOperator, self).__init__(*args, **kwargs)
        self.date = date
        self.transaction_logs_url = transaction_logs_url

    def execute(self, context):
        download_transaction_logs_to_csv(self.date, self.transaction_logs_url)

# CustomOperator для выполнения объединения данных


class MergeDataOperator(BaseOperator):
    """
    CustomOperator для выполнения объединения данных.

    :param date: Дата в формате 'гггг-мм-дд'.
    """
    @apply_defaults
    def __init__(self, date, *args, **kwargs):
        super(MergeDataOperator, self).__init__(*args, **kwargs)
        self.date = date

    def execute(self, context):
        merge_data(self.date)

# CustomOperator для выполнения загрузки в SQLite


class LoadToSQLiteOperator(BaseOperator):
    """
    CustomOperator для выполнения загрузки данных в SQLite.

    :param date: Дата в формате 'гггг-мм-дд'.
    """
    @apply_defaults
    def __init__(self, date, *args, **kwargs):
        super(LoadToSQLiteOperator, self).__init__(*args, **kwargs)
        self.date = date

    def execute(self, context):
        load_to_sqlite(self.date)


In [11]:
# чтобы првоерить решение можете обратиться к вашей базе данных таким образом
%load_ext sql
%config SqlMagic.feedback=False
%config SqlMagic.autopandas=True
%sql sqlite:////root//airflow/airflow.db
%sql select * from transactions

The sql extension is already loaded. To reload it, use:
  %reload_ext sql
 * sqlite:////root//airflow/airflow.db


Unnamed: 0,date,currency_from,currency_to,amount,currency,value
0,2021-01-01,eur,usd,1.21,EUR,38
1,2021-01-01,eur,usd,1.21,EUR,65
2,2021-01-01,eur,usd,1.21,EUR,74
3,2021-01-01,eur,usd,1.21,EUR,42
4,2021-01-01,eur,usd,1.21,EUR,23
5,2021-01-01,eur,usd,1.21,EUR,48
6,2021-01-01,eur,usd,1.21,EUR,86
7,2021-01-01,eur,usd,1.21,EUR,74
8,2021-01-01,eur,usd,1.21,EUR,24
