<a href="https://colab.research.google.com/github/isa-bay/Ventra_tasks/blob/main/%D0%98%D1%81%D0%B0_%D0%91%D0%B0%D0%B9%D1%80%D0%B0%D0%BC%D0%BE%D0%B2_%D0%A2%D0%97.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

У вас есть три таблицы с данными:
1. stores — информация о магазинах:

● store_id (INT) — уникальный идентификатор магазина.

● city (VARCHAR) — город, в котором расположен магазин.

● state (VARCHAR) — регион, где находится магазин.

2. customers — информация о клиентах:

● customer_id (INT) — уникальный идентификатор клиента.

● name (VARCHAR) — имя клиента.

● signup_date (DATE) — дата регистрации клиента.

● store_id (INT) — идентификатор магазина, к которому относится клиент (где он чаще всего совершает покупки).

3. transactions — информация о транзакциях:

● transaction_id (INT) — уникальный идентификатор транзакции.

● customer_id (INT) — идентификатор клиента, совершившего транзакцию.

● store_id (INT) — идентификатор магазина, в котором была совершена транзакция.

● transaction_date (DATE) — дата транзакции.

● category (VARCHAR) — категория товара, связанная с транзакцией (например, "Табак", "Аксессуары").

● amount (DECIMAL) — сумма покупки.




1.   SQL часть


*   Определите общее количество транзакций, совершенных клиентами в каждом магазине в 2023 году. Выведите store_id, city, state и transactions_count.
*   Найдите всех клиентов, которые зарегистрировались в 2023 году и сделали хотя бы одну покупку за первый месяц после регистрации. Выведите customer_id, name и количество транзакций (transactions_count).
*   Выведите три категории товаров с наибольшей общей суммой продаж (total_sales) за 2023 год, указав category, total_sales.


2.   Python часть

Напишите Python-скрипт, который:
*   Создает базу данных SQLite и создает в ней три таблицы (stores, customers, transactions), используя предоставленные ниже данные.
*   Выполняет описанные SQL-запросы и сохраняет результаты в три CSV-файла:
store_transactions_2023.csv, new_customers.csv, top_categories.csv

Реализуйте функцию analyze_sales_growth, которая:
*   Принимает DataFrame с суммой продаж по месяцам за 2023 год (например, из
top_categories.csv).
*   Возвращает DataFrame с колонками month и growth_rate, где growth_rate — процентное изменение суммы продаж по сравнению с предыдущим месяцем


3.   Вопросы по Airflow


*   Что такое DAG?
*   Назовите три главных компонента архитектуры Airflow?
*   Можно ли менять код DAG'а прямо в веб версии приложения?

1. SQL

Определил общее количество транзакций за 2023г:

In [None]:
SELECT t.store_id, s.city, s.state, COUNT(t.transaction_id) AS transactions_count
FROM transactions t
INNER JOIN stores s ON t.store_id = s.store_id
WHERE strftime('%Y', t.transaction_date) = '2023'
GROUP BY t.store_id, s.city, s.state;

Клиенты, зарегистрировавшиеся в 2023 году, совершившие покупку в первый месяц:

In [None]:
SELECT c.customer_id, c.name, COUNT(t.transaction_id) AS transactions_count
FROM customers c
INNER JOIN transactions t ON c.customer_id = t.customer_id
WHERE strftime('%Y', c.signup_date) = '2023' AND
    t.transaction_date BETWEEN c.signup_date AND
    date(c.signup_date, '+1 month')
GROUP BY c.customer_id, c.name
HAVING COUNT(t.transaction_id) > 0;

Три категории товаров с наибольшей суммой продаж за 2023 год:

In [None]:
SELECT t.category, SUM(t.amount) AS total_sales
FROM transactions t
WHERE strftime('%Y', t.transaction_date) = '2023'
GROUP BY t.category
ORDER BY total_sales DESC
LIMIT 3;

2. Python

- подключаем библиотеки
- подключаемся к БД, если ее нет - создается автоматически
- создаем таблицы и наполняем данными
- создаем переменные запросов и читаем их
- сохраняем в csv
- исключенре ошибок
- функция analyze_sales_growth

In [None]:
import sqlite3
import pandas as pd


try:
    with sqlite3.connect('retail_db') as conn:
        cursor = conn.cursor()

        cursor.execute('''
        CREATE TABLE IF NOT EXISTS stores (
            store_id INTEGER PRIMARY KEY,
            city TEXT,
            state TEXT
        )
        ''')

        stores_data = [
            (1, 'Москва', 'ЦФО'),
            (2, 'Санкт-Петербург', 'СЗФО'),
            (3, 'Екатеринбург', 'УФО')
        ]
        cursor.executemany('INSERT INTO stores VALUES (?, ?, ?)', stores_data)

        cursor.execute('''
        CREATE TABLE IF NOT EXISTS customers (
            customer_id INTEGER PRIMARY KEY,
            name TEXT,
            signup_date DATE,
            store_id INTEGER,
            FOREIGN KEY (store_id) REFERENCES stores(store_id)
        )
        ''')

        customers_data = [
            (1, 'Иван', '2023-01-05', 1),
            (2, 'Ольга', '2023-02-10', 2),
            (3, 'Сергей', '2023-01-15', 3),
            (4, 'Мария', '2023-03-22', 2),
            (5, 'Анна', '2023-05-13', 1)
        ]
        cursor.executemany('INSERT INTO customers VALUES (?, ?, ?, ?)', customers_data)

        cursor.execute('''
        CREATE TABLE IF NOT EXISTS transactions (
            transaction_id INTEGER PRIMARY KEY,
            customer_id INTEGER,
            store_id INTEGER,
            transaction_date DATE,
            category TEXT,
            amount REAL,
            FOREIGN KEY (customer_id) REFERENCES customers(customer_id),
            FOREIGN KEY (store_id) REFERENCES stores(store_id)
        )
        ''')

        transactions_data = [
            (1, 1, 1, '2023-01-10', 'Табак', 150),
            (2, 1, 1, '2023-01-15', 'Аксессуары', 50),
            (3, 2, 2, '2023-03-15', 'Табак', 200),
            (4, 3, 3, '2023-01-18', 'Напитки', 75),
            (5, 4, 2, '2023-04-20', 'Аксессуары', 30),
            (6, 5, 1, '2023-06-18', 'Табак', 250)
        ]
        cursor.executemany('INSERT INTO transactions VALUES (?, ?, ?, ?, ?, ?)', transactions_data)

        conn.commit()


        query1 = '''
        SELECT t.store_id, s.city, s.state,
            COUNT(t.transaction_id) AS transactions_count
        FROM transactions t
        INNER JOIN stores s ON t.store_id = s.store_id
        WHERE strftime('%Y', t.transaction_date) = '2023'
        GROUP BY t.store_id, s.city, s.state;
        '''

        query2 = '''
        SELECT c.customer_id, c.name, COUNT(t.transaction_id) AS transactions_count
        FROM customers c
        INNER JOIN transactions t ON c.customer_id = t.customer_id
        WHERE strftime('%Y', c.signup_date) = '2023' AND
            t.transaction_date BETWEEN c.signup_date AND
            date(c.signup_date, '+1 month')
        GROUP BY c.customer_id, c.name
        HAVING COUNT(t.transaction_id) > 0;
        '''

        query3 = '''
        SELECT t.category, SUM(t.amount) AS total_sales
        FROM transactions t
        WHERE strftime('%Y', t.transaction_date) = '2023'
        GROUP BY t.category
        ORDER BY total_sales DESC
        LIMIT 3;
        '''
        query4 = '''
        SELECT strftime('%Y-%m', transaction_date) AS month, SUM(amount) AS total_sales
        FROM transactions
        WHERE strftime('%Y', transaction_date) = '2023'
        GROUP BY month
        ORDER BY month;
        '''


        df1 = pd.read_sql_query(query1, conn)
        df2 = pd.read_sql_query(query2, conn)
        df3 = pd.read_sql_query(query3, conn)
        df4 = pd.read_sql_query(query4, conn)

        df1.to_csv('store_transactions_2023.csv', index=False)
        df2.to_csv('new_customers.csv', index=False)
        df3.to_csv('top_categories.csv', index=False)
        df4.to_csv('monthly_sales.csv', index=False)

except sqlite3.Error as e:
    print(f"произошла ошибка: {e.args[0]}")


def analyze_sales_growth(sales_df):
    sales_df = sales_df.sort_values('month')
    sales_df['growth_rate'] = sales_df['total_sales'].pct_change() * 100
    sales_df['growth_rate'] = sales_df['growth_rate'].fillna(0)
    return sales_df[['month', 'growth_rate']]

3.   Вопросы по Airflow:


*   DAG - это направленный ацикличный граф, который задает порядок выполнения задач(data pipeline).
*   Scheduler - распределяет задачи по воркерам;

Executor - управляет запуском задач на воркерах;

Workers - узлы, на которых выполняются задачи.
*   Нет, чтобы изменить код DAG'а нужно обновить python-файл, в котором определён DAG. После обновления файла Airflow сам автоматически обнаружит изменения и обновит DAG.


