**Задачи на запросы решены с тестовыми данными для наглядности**

**Задание 1**

Table: Transactions В таблице содержатся данные по транзациям клиентов в рублях.
* Client_id	Report_date	Txn_amount
* 123	     2017.01.01	  50000

Table: Rates. В таблице содержатся курсы валют по отношению к рублю.
* Report_date	Ccy_code	CCy_rate
* 2016.12.30	  840	      60,58
* 2017.01.09	  840	      61,01

Напишите sql запрос, который будет переводить сумму транзакций из rub в usd (ccy_code = 840) с учетом того, что в таблице rates данные только за рабочие дни. Транзакции, совершенные в выходные, пересчитываются по курсу последнего рабочего дня перед праздником/выходным. 

**Результат:**
* Клиент, дата, сумма операций в usd.

**Решение задачи с пересчетом сумм транзакций из RUB в USD включает:**

- *LATERAL JOIN для поиска курса*: подзапрос гарантирует, что для каждой даты транзакции выбирается наиболее подходящий курс – последний доступный курс до или в день транзакции;

- *условия фильтрации подзапроса*: в подзапросе устанавливается условие, что дата курса (в таблице `Rates`) должна быть меньше или равна дате транзакции. Сортировка записей по убыванию даты гарантирует использование последнего курса перед датой транзакции или в этот день;

- *округление и вывод результата*: результат перевода суммы транзакции из рублей в доллары округляется до двух десятичных знаков, обеспечивая аккуратность и удобство интерпретации данных.

In [1]:
import subprocess
import time
import psycopg2
from psycopg2 import sql

DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'telecome_db'
DB_USER = 'postgres'
DB_PASSWORD = 'password'


def create_postgres_container():
    """
    Создание контейнера Docker с PostgreSQL.
    """
    try:
        subprocess.run(["docker", "run", "--name", "telecome", "-e", f"POSTGRES_PASSWORD={DB_PASSWORD}", "-p", f"{DB_PORT}:5432", "-d", "postgres"])
    except subprocess.SubprocessError as e:
        print("Ошибка при создании контейнера Docker с PostgreSQL:", e)


def create_database():
    """
    Создание базы данных.
    """
    try:
        conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD)
        conn.autocommit = True
        cursor = conn.cursor()
        cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(DB_NAME)))
        conn.close()
    except psycopg2.Error as e:
        print("Ошибка при создании базы данных:", e)


def connect_to_database():
    """
    Подключение к базе данных.
    """
    try:
        return psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD)
    except psycopg2.Error as e:
        print("Ошибка при подключении к базе данных:", e)


def create_test_data(conn):
    """
    Создание тестовых данных на основе исходных таблиц задания 1.
    """
    try:
        with conn.cursor() as cur:
            cur.execute("""
                CREATE TEMP TABLE IF NOT EXISTS Transactions (
                    Client_id INT,
                    Report_date DATE,
                    Txn_amount NUMERIC
                )
            """)
            cur.execute("""
                INSERT INTO Transactions (Client_id, Report_date, Txn_amount)
                VALUES (123, '2017-01-01', 50000),
                       (456, '2017-01-02', 75000),
                       (789, '2017-01-03', 100000),
                       (101, '2017-02-10', 25000),
                       (202, '2017-02-15', 30000),
                       (303, '2017-02-20', 35000),
                       (404, '2017-03-05', 40000),
                       (505, '2017-03-10', 45000),
                       (606, '2017-03-15', 50000)
            """)
            cur.execute("""
                CREATE TEMP TABLE IF NOT EXISTS Rates (
                    Report_date DATE,
                    Ccy_code INT,
                    CCy_rate NUMERIC
                )
            """)
            cur.execute("""
                INSERT INTO Rates (Report_date, Ccy_code, CCy_rate)
                VALUES ('2017-01-01', 840, 60.58),
                       ('2017-01-02', 840, 61.01),
                       ('2017-02-10', 840, 62.30),
                       ('2017-02-15', 840, 62.50),
                       ('2017-03-10', 840, 63.50),
                       ('2017-03-15', 840, 64.00)
            """)
            """"
            Проверка корректности созданных данных
            """
            cur.execute("SELECT * FROM Transactions")
            print("Таблица Transactions:")
            """
            Получение названий столбцов.
            """
            col_names = [desc[0] for desc in cur.description]
            print(col_names)
            rows = cur.fetchall()
            """
            Вывод результатов запроса.
            """
            for row in rows:
                print(row)
                
            cur.execute("SELECT * FROM Rates")
            print("Таблица Rates:")
            """
            Получение названий столбцов.
            """
            col_names = [desc[0] for desc in cur.description]
            print(col_names)
            rows = cur.fetchall()
            """
            Вывод результатов запроса.
            """
            for row in rows:
                print(row)
                
    except psycopg2.Error as e:
        print("Ошибка при создании тестовых данных:", e)


def check_query(conn):
    """
    Выполняет запрос к базе данных для вычисления суммы транзакций в долларах на основе курса валюты из таблицы Rates.
    
    Аргументы:
    conn (psycopg2.connection): Соединение с базой данных PostgreSQL.
    
    Возвращает:
    list: Список кортежей с результатами запроса, каждый кортеж содержит идентификатор клиента, дату отчета и сумму транзакции в долларах.
    
    Исключения:
    psycopg2.Error: Возникает при ошибке выполнения запроса.
    """
    try:
        start_time = time.time()

        with conn.cursor() as cur:
            cur.execute("""
                SELECT
                    t.Client_id,
                    t.Report_date,
                    ROUND(t.Txn_amount / r.CCy_rate, 2) AS txn_amount_usd
                FROM Transactions t
                LEFT JOIN LATERAL
                        (SELECT CCy_rate FROM Rates
                         WHERE Ccy_code = 840 AND Report_date <= t.Report_date
                         ORDER BY Report_date DESC
                         LIMIT 1) r ON true
                         ORDER BY t.Report_date, t.Client_id;
            """)
            print("Результат запроса:")
            """
            Получение названий столбцов.
            """
            col_names = [desc[0] for desc in cur.description]
            print(col_names)
            rows = cur.fetchall()
            """
            Вывод результатов запроса.
            """
            if rows is not None:
                for row in rows:
                    print(row)
            else:
                print("Запрос вернул пустой результат.")
            """
            Измерение времени выполнения.
            """
            end_time = time.time()
            print("Время выполнения запроса: {:.2f} секунд".format(end_time - start_time))

            return rows
    except psycopg2.Error as e:
        print("Ошибка при выполнении запроса:", e)


def main():
    """
    Основная функция.
    """
    try:
        """
        Создание контейнера Docker с PostgreSQL и базы данных.
        """
        create_postgres_container()
        time.sleep(5)
        create_database()
        time.sleep(5)

        """
        Подключение к базе данных.
        """
        conn = connect_to_database()

        """
        Создание тестовых данных.
        """
        create_test_data(conn)
        time.sleep(5)

        """
        Выполнение запроса.
        """
        check_query(conn)

        conn.close()

    except psycopg2.Error as e:
        print("Ошибка при выполнении операции с базой данных:", e)
        return

    except Exception as e:
        print("Ошибка:", e)
        return

if __name__ == "__main__":
    main()

beeba4699100adfad7132e885c3ed9244dfc91e718e83bd4938204e67604e545
Таблица Transactions:
['client_id', 'report_date', 'txn_amount']
(123, datetime.date(2017, 1, 1), Decimal('50000'))
(456, datetime.date(2017, 1, 2), Decimal('75000'))
(789, datetime.date(2017, 1, 3), Decimal('100000'))
(101, datetime.date(2017, 2, 10), Decimal('25000'))
(202, datetime.date(2017, 2, 15), Decimal('30000'))
(303, datetime.date(2017, 2, 20), Decimal('35000'))
(404, datetime.date(2017, 3, 5), Decimal('40000'))
(505, datetime.date(2017, 3, 10), Decimal('45000'))
(606, datetime.date(2017, 3, 15), Decimal('50000'))
Таблица Rates:
['report_date', 'ccy_code', 'ccy_rate']
(datetime.date(2017, 1, 1), 840, Decimal('60.58'))
(datetime.date(2017, 1, 2), 840, Decimal('61.01'))
(datetime.date(2017, 2, 10), 840, Decimal('62.30'))
(datetime.date(2017, 2, 15), 840, Decimal('62.50'))
(datetime.date(2017, 3, 10), 840, Decimal('63.50'))
(datetime.date(2017, 3, 15), 840, Decimal('64.00'))
Результат запроса:
['client_id', 'report

**Задание 2.1**

*Table: oper_data*
* Client_id	Report_date	office_number	Txn_type	Txn_amount
* 1233455	2017.05.02	1234/0123	debit	10000

В таблице oper_data содержится информация по транзакциям клиентов в офисах физической сети. txn_type принимает значения debit, credit

**Задание 2.1:**
* напишите sql запрос, который для каждого клиента выводит сумму debit, credit операций и последний посещенный офис по месяцам. 

**Результат представьте в виде:**
* Client_id	 Report_date Debit_amount Credit_amount	last_office


**Решение задачи включает:**

- *комплексное использование CTE*: для одновременного расчёта дебетовой и кредитовой сумм по месяцам для каждого клиента, а также для определения последнего посещённого офиса;

- *применение оконных функций*: `ROW_NUMBER()` определяет последнюю транзакцию по дате (и номеру офиса при совпадении дат), `SUM() OVER` вычисляет суммы по дебетам и кредитам в рамках каждого месяца;

- *фильтрацию и сортировка*: выводит только строки с последней транзакцией для каждого клиента по месяцам, упорядочивая итоги по клиенту и месяцу.

In [2]:
def create_test_data_2(conn):
    """
    Создание тестовых данных на основе исходной таблицы задания 2.
    """
    try:
        with conn.cursor() as cur:
            cur.execute("""
                CREATE TEMP TABLE IF NOT EXISTS oper_data (
                    Client_id INT,
                    Report_date DATE,
                    Office_number VARCHAR(20),
                    Txn_type VARCHAR(10),
                    Txn_amount NUMERIC
                )
            """)
            cur.execute("""
                INSERT INTO oper_data (Client_id, Report_date, 
                        Office_number, Txn_type, Txn_amount)
                VALUES (1233455, '2017-05-02', '1234/0123', 'debit', 10000),
                       (1233455, '2017-05-15', '5678/4567', 'credit', 15000),
                       (1233455, '2017-05-20', '9012/8901', 'credit', 5000),
                       (1233455, '2017-06-05', '3456/2345', 'debit', 20000),
                       (1233455, '2017-06-10', '6789/5678', 'credit', 10000),
                       (6789012, '2017-05-03', '2345/1234', 'debit', 8000),
                       (6789012, '2017-05-18', '6789/5678', 'debit', 5000),
                       (6789012, '2017-05-25', '9012/8901', 'credit', 3000),
                       (6789012, '2017-06-08', '3456/2345', 'debit', 12000),
                       (6789012, '2017-06-12', '7890/6789', 'credit', 7000),
                       (3456789, '2017-05-05', '7890/1234', 'debit', 12000),
                       (3456789, '2017-05-18', '2345/6789', 'debit', 7000),
                       (3456789, '2017-05-25', '5678/0123', 'debit', 3000),
                       (3456789, '2017-06-08', '9012/4567', 'debit', 15000),
                       (3456789, '2017-06-12', '3456/7890', 'debit', 6000);
            """)
            """"
            Проверка корректности созданных данных.
            """
            cur.execute("SELECT * FROM oper_data")
            print("Таблица oper_data:")
            """
            Получение названий столбцов.
            """
            col_names = [desc[0] for desc in cur.description]
            print(col_names)
            rows = cur.fetchall()
            """
            Вывод результатов запроса.
            """
            for row in rows:
                print(row)
                
    except psycopg2.Error as e:
        print("Ошибка при создании тестовых данных:", e)


def get_client_transactions(conn):
    """
    Получает данные о транзакциях клиентов из базы данных.

    Аргументы:
    conn (psycopg2.connection): Соединение с базой данных PostgreSQL.

    Исключения:
    psycopg2.Error: Возникает при ошибке выполнения запроса.
    """
    try:
        start_time = time.time()
        
        with conn.cursor() as cur:
            cur.execute("""
                WITH cte AS (
                    SELECT
                        client_id,
                        TO_CHAR(DATE_TRUNC('month', report_date), 'YYYY-MM') AS report_month,
                        txn_type,
                        txn_amount,
                        office_number,
                        ROW_NUMBER() OVER (PARTITION BY client_id, DATE_TRUNC('month', report_date)
                                           ORDER BY report_date DESC, office_number DESC) as rn,
                        SUM(CASE WHEN txn_type = 'debit' THEN txn_amount ELSE 0 END)
                            OVER (PARTITION BY client_id, DATE_TRUNC('month', report_date)) AS debit_amount,
                        SUM(CASE WHEN txn_type = 'credit' THEN txn_amount ELSE 0 END)
                            OVER (PARTITION BY client_id, DATE_TRUNC('month', report_date)) AS credit_amount
                    FROM oper_data
                )
                SELECT
                    client_id,
                    report_month,
                    debit_amount,
                    credit_amount,
                    office_number AS last_office
                FROM cte
                WHERE rn = 1
                ORDER BY client_id, report_month;
            """)
            
            print("Результат запроса:")
            
            col_names = [desc[0] for desc in cur.description]
            print(col_names)
            
            rows = cur.fetchall()
            
            if rows:
                for row in rows:
                    print(row)
            else:
                print("Запрос вернул пустой результат.")
            
            end_time = time.time()
            print("Время выполнения запроса: {:.2f} секунд".format(end_time - start_time))

            return rows
    except psycopg2.Error as e:
        print("Ошибка при выполнении запроса:", e)

if __name__ == "__main__":
    try:
        conn = connect_to_database()
        create_test_data_2(conn)
        get_client_transactions(conn)
        
    except psycopg2.Error as e:
        print("Ошибка при подключении к базе данных:", e)

Таблица oper_data:
['client_id', 'report_date', 'office_number', 'txn_type', 'txn_amount']
(1233455, datetime.date(2017, 5, 2), '1234/0123', 'debit', Decimal('10000'))
(1233455, datetime.date(2017, 5, 15), '5678/4567', 'credit', Decimal('15000'))
(1233455, datetime.date(2017, 5, 20), '9012/8901', 'credit', Decimal('5000'))
(1233455, datetime.date(2017, 6, 5), '3456/2345', 'debit', Decimal('20000'))
(1233455, datetime.date(2017, 6, 10), '6789/5678', 'credit', Decimal('10000'))
(6789012, datetime.date(2017, 5, 3), '2345/1234', 'debit', Decimal('8000'))
(6789012, datetime.date(2017, 5, 18), '6789/5678', 'debit', Decimal('5000'))
(6789012, datetime.date(2017, 5, 25), '9012/8901', 'credit', Decimal('3000'))
(6789012, datetime.date(2017, 6, 8), '3456/2345', 'debit', Decimal('12000'))
(6789012, datetime.date(2017, 6, 12), '7890/6789', 'credit', Decimal('7000'))
(3456789, datetime.date(2017, 5, 5), '7890/1234', 'debit', Decimal('12000'))
(3456789, datetime.date(2017, 5, 18), '2345/6789', 'debi

**Задача 2.2**

Напишите sql запрос, который для каждого клиента выведет долю debit операций клиента к debit операциям всех клиентов по месяцам. 

**Результат в виде таблицы:**

* Client_id	Report_date	Ratio


**Решение задачи использует:**

- *фильтрацию дебетовых операций*: ограничивает анализ только операциями типа 'debit';

- *группировку и оконные функции*: агрегирует операции по клиентам и месяцам, вычисляя долю операций каждого клиента относительно общего числа за месяц, с округлением до двух десятичных знаков;

- *сортировку по месяцу и клиенту*: обеспечивает логичную и удобную последовательность просмотра результатов.

In [3]:
def get_transactions_ratio(conn):
    """
    Функция для получения доли операций 'debit' по месяцам для каждого клиента.

    Args:
    conn: Подключение к базе данных.

    Returns:
    rows: Результат запроса.
    """
    try:
        start_time = time.time()
        
        with conn.cursor() as cur:
            cur.execute("""
                SELECT
                    client_id,
                    TO_CHAR(DATE_TRUNC('month', report_date), 'YYYY-MM') AS report_date,
                    ROUND((CAST(COUNT(*) AS FLOAT) /
                           SUM(COUNT(*)) OVER (PARTITION BY DATE_TRUNC('month', report_date)))::numeric, 2) AS ratio
                FROM oper_data
                WHERE txn_type = 'debit'
                GROUP BY client_id, DATE_TRUNC('month', report_date)
                ORDER BY DATE_TRUNC('month', report_date), client_id;
            """)
            
            print("Результат запроса:")
            col_names = [desc[0] for desc in cur.description]
            print(col_names)
            rows = cur.fetchall()
            
            if rows:
                for row in rows:
                    print(row)
            else:
                print("Запрос вернул пустой результат.")
            
            end_time = time.time()
            print("Время выполнения запроса: {:.2f} секунд".format(end_time - start_time))

            return rows
    except psycopg2.Error as e:
        print("Ошибка при выполнении запроса:", e)

if __name__ == "__main__":
    try:
        conn = connect_to_database()
        create_test_data_2(conn)
        get_transactions_ratio(conn)
        
    except psycopg2.Error as e:
        print("Ошибка при подключении к базе данных:", e)

Таблица oper_data:
['client_id', 'report_date', 'office_number', 'txn_type', 'txn_amount']
(1233455, datetime.date(2017, 5, 2), '1234/0123', 'debit', Decimal('10000'))
(1233455, datetime.date(2017, 5, 15), '5678/4567', 'credit', Decimal('15000'))
(1233455, datetime.date(2017, 5, 20), '9012/8901', 'credit', Decimal('5000'))
(1233455, datetime.date(2017, 6, 5), '3456/2345', 'debit', Decimal('20000'))
(1233455, datetime.date(2017, 6, 10), '6789/5678', 'credit', Decimal('10000'))
(6789012, datetime.date(2017, 5, 3), '2345/1234', 'debit', Decimal('8000'))
(6789012, datetime.date(2017, 5, 18), '6789/5678', 'debit', Decimal('5000'))
(6789012, datetime.date(2017, 5, 25), '9012/8901', 'credit', Decimal('3000'))
(6789012, datetime.date(2017, 6, 8), '3456/2345', 'debit', Decimal('12000'))
(6789012, datetime.date(2017, 6, 12), '7890/6789', 'credit', Decimal('7000'))
(3456789, datetime.date(2017, 5, 5), '7890/1234', 'debit', Decimal('12000'))
(3456789, datetime.date(2017, 5, 18), '2345/6789', 'debi

**Задача 3.**

Начнем с определения структуры данных и проектирования модели хранения, после этого перейдем к описанию последовательности шагов ETL процесса.

Мы получаем данные из двух источников - dealer и abon, на основе которых необходимо собрать витрину для визуализации в BI. Я бы создала две таблицы с сырыми данными "как есть", а для витрины создала материальное представление, кторое бы обновлялось ежемесячно (согласно условию задачи). Этот подход удобен тем, что если бизнес требования изменятся, мы легко сможем переписать код для витрины.

**Структура данных витрины для BI:**
* ym - Год и месяц операции (например, 2301 для января 2023 года)
* source - Источник данных (dealer или abon)
* region - Регион
* channel - Канал
* operation_type - Тип операции
* contract - Количество контрактов
* e_contract - Количество электронных контрактов

**Последовательность шагов ETL процесса:**

*Предположим, что данные поступают в виде csv и нам нужно загрузить их в базу данных PostgreSQL, трансформировать и собрать витрину для BI. Я бы написала скрипт python, который потом поместила бы в папку с DAGs. Скрипт включал бы несколько шагов.*

* Шаг 1: Извлечение данных
- Создание двух таблиц в базе данных для хранения из "Источника dealer" и "Источника abon".
- Проверка данных на дубли и полноту.
- Загрузка данных из CSV файлов в базу данных PostgreSQL в две таблицы, как есть.

* Шаг 2: Трансформация данных для будущей витрины
- Создание SQL запроса, который будет:
- - объединять данных из двух таблиц витрины с меткой источника 'source';
- - транспонировать столбцы 'contract' и 'e_contract' с агрегацией;
- - отфильтровывать лишние значения при необходимости;
- - сохранять результаты в материализованное представление final_view.

* Шаг 3: Написание DAG для автоматического обновления данных @monthly (Apache Airflow)
- В нашем DAG были бы следующие задачи (tasks), основанные на предыдущих двух шагах, которые создали все необходимые функции для tasks:
- - проверка наличия csv файлов с помощью FileSensor;
- - проверка данных dealer на дубли и полноту;
- - проверка данных abon на дубли и полноту;
- - загрузка данных "Источник dealer" в базу;
- - загрузка данных "Источника abon" в базу;
- - обновление материального представления для витрины;

* Шаг 4: Настройка мониторинга и управления (DummyOperator)
- Мониторинг выполнения DAG в Apache Airflow для отслеживания успешности выполнения процесса и его продолжительности.
- Настройка уведомлений об ошибках или сбоях в выполнении процесса.







In [None]:
"""
Зависимости задач могли бы выглядеть так.
"""
sensor_task1 >> load_dealer_data_task
sensor_task2 >> load_abon_data_task

[load_dealer_data_task, load_abon_data_task] >> check_data_task
check_data_task >> update_final_view_task

"""
Пример зависимостей для отправки уведомлений.
"""
update_final_view_task >> notify_success_task
[load_dealer_data_task, load_abon_data_task, check_data_task] >> notify_error_task