In [None]:
import pandas as pd
import numpy as np
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import random
from datetime import datetime, timedelta
import itertools
from sqlalchemy import create_engine

# Параметры подключения
DB_PARAMS = {
    'host': 'localhost',
    'port': 5442,
    'user': 'postgres',
    'password': 'postgres',
    'database': 'etl_demo'
}

def connect_db():
    """Подключение к БД"""
    conn = psycopg2.connect(**DB_PARAMS)
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    return conn, conn.cursor()

def create_tables(cursor, conn):
    """Создание таблиц"""
    # Таблица клиентов
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS c_python (
        client_id BIGINT PRIMARY KEY,
        gender CHAR(1) CHECK (gender IN ('M', 'F', 'U')),
        age INTEGER CHECK (age >= 0)
    );""")
    
    # Таблица торговых точек
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS m_python (
        merchant_id BIGINT PRIMARY KEY,
        latitude DECIMAL(10, 6),
        longtitude DECIMAL(10, 6),
        mcc_cd smallint
    );""")
    
    # Таблица транзакций
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS t_python (
        transaction_id SERIAL PRIMARY KEY,
        merchant_id BIGINT REFERENCES m_python(merchant_id),
        client_id BIGINT REFERENCES c_python(client_id),
        transaction_dttm TIMESTAMP,
        transaction_amt DECIMAL(15, 2)
    );""")
    conn.commit()
    print("Таблицы созданы")

def gen_clients(n=10000):
    """Генерация данных о клиентах"""
    client_ids = np.arange(1, n+1)
    # Пол: M, F или U (2% случаев)
    gender_probs = np.random.random(n)
    genders = np.where(gender_probs < 0.02, 'U', np.where(gender_probs < 0.51, 'M', 'F'))
    
    # Возраст: случайный от 14 до 88, 0 (неизвестно) в 4% случаев
    age_probs = np.random.random(n)
    ages = np.where(age_probs < 0.04, 0, np.random.randint(14, 89, n))
    
    return pd.DataFrame({'client_id': client_ids, 'gender': genders, 'age': ages})

def gen_merchants(n=500):
    """Генерация данных о торговых точках"""
    # MCC коды категорий
    mcc_codes = [5411, 5812, 5912, 5541, 5311, 5691, 5945, 7011, 4121, 7832]
    
    merchant_ids = np.arange(1, n+1)
    latitudes = 55 + np.random.random(n) * 10  # 55-65°
    longitudes = 37 + np.random.random(n) * 10  # 37-47°
    mccs = np.random.choice(mcc_codes, n)
    
    return pd.DataFrame({
        'merchant_id': merchant_ids,
        'latitude': latitudes,
        'longtitude': longitudes,
        'mcc_cd': mccs
    })

def gen_transactions(n=50000, client_ids=None, merchant_ids=None):
    """Генерация данных о транзакциях"""
    # Случайный выбор клиентов и торговых точек
    merchant_indices = np.random.randint(0, len(merchant_ids), n)
    client_indices = np.random.randint(0, len(client_ids), n)
    
    # Случайная дата в диапазоне 2020-2023 гг.
    start_date = datetime(2020, 1, 1)
    end_date = datetime(2023, 12, 31, 23, 59, 59)
    delta_seconds = int((end_date - start_date).total_seconds())
    random_seconds = np.random.randint(0, delta_seconds, n)
    transaction_dttms = [start_date + timedelta(seconds=int(sec)) for sec in random_seconds]
    
    # Суммы от 10 до 10000
    transaction_amts = np.round(np.random.uniform(10, 10000, n), 2)
    
    return pd.DataFrame({
        'merchant_id': [merchant_ids[idx] for idx in merchant_indices],
        'client_id': [client_ids[idx] for idx in client_indices],
        'transaction_dttm': transaction_dttms,
        'transaction_amt': transaction_amts
    })

def insert_data(cursor, conn, engine, clients_df, merchants_df, transactions_df):
    """Вставка данных в БД"""
    # Очистка таблиц
    cursor.execute("TRUNCATE TABLE t_python CASCADE;")
    cursor.execute("TRUNCATE TABLE c_python CASCADE;")
    cursor.execute("TRUNCATE TABLE m_python CASCADE;")
    conn.commit()
    

    try:
        # Вставка клиентов
        for _, row in clients_df.iterrows():
            cursor.execute(
                "INSERT INTO c_python (client_id, gender, age) VALUES (%s, %s, %s)",
                (row['client_id'], row['gender'], row['age'])
            )
        conn.commit()
        print("Данные о клиентах вставлены")
        
        # Вставка торговых точек
        for _, row in merchants_df.iterrows():
            cursor.execute(
                "INSERT INTO m_python (merchant_id, latitude, longtitude, mcc_cd) VALUES (%s, %s, %s, %s)",
                (row['merchant_id'], row['latitude'], row['longtitude'], row['mcc_cd'])
            )
        conn.commit()
        print("Данные о торговых точках вставлены")
        
        # Вставка транзакций
        for _, row in transactions_df.iterrows():
            cursor.execute(
                "INSERT INTO t_python (merchant_id, client_id, transaction_dttm, transaction_amt) VALUES (%s, %s, %s, %s)",
                (row['merchant_id'], row['client_id'], row['transaction_dttm'], row['transaction_amt'])
            )
        conn.commit()
        print("Данные о транзакциях вставлены")
        return True
    except Exception as e2:
        print(f"Ошибка при альтернативной вставке данных: {e2}")
        return False

def create_pandas_cube(transactions_df, clients_df, merchants_df):
    """Создание OLAP-куба через pandas"""
    # Объединяем таблицы
    merged_data = pd.merge(transactions_df, clients_df, on='client_id')
    merged_data = pd.merge(merged_data, merchants_df, on='merchant_id')
    
    # Группировка по возрасту
    merged_data['age_group'] = pd.cut(
        merged_data['age'], 
        bins=[0, 19, 31, float('inf')], 
        labels=['До 18', '19-30', 'От 31'],
        right=False
    )
    
    # Выделяем год и месяц
    merged_data['year'] = merged_data['transaction_dttm'].dt.year
    merged_data['month'] = merged_data['transaction_dttm'].dt.month
    
    # Группировки для CUBE
    group_columns = ['gender', 'age_group', 'mcc_cd', 'year', 'month']
    all_combinations = []
    for i in range(len(group_columns) + 1):
        all_combinations.extend(list(itertools.combinations(group_columns, i)))
    
    # Агрегируем данные по всем комбинациям
    cube_dfs = []
    for combo in all_combinations:
        if not combo:  # Для общей суммы (все NULL)
            agg_df = pd.DataFrame({
                'total_amount': [merged_data['transaction_amt'].sum()],
                'avg_amount': [merged_data['transaction_amt'].mean()],
                'transaction_count': [len(merged_data)]
            })
            for col in group_columns:
                agg_df[col] = None
        else:
            agg_df = merged_data.groupby(list(combo), observed=False).agg(
                total_amount=('transaction_amt', 'sum'),
                avg_amount=('transaction_amt', 'mean'),
                transaction_count=('transaction_amt', 'count')
            ).reset_index()
            
            # Добавляем отсутствующие столбцы как NULL
            for col in group_columns:
                if col not in combo:
                    agg_df[col] = None
        
        cube_dfs.append(agg_df)
    
    # Объединяем все в один DataFrame
    non_empty_dfs = [df for df in cube_dfs if not df.empty]
    cube_df = pd.concat(non_empty_dfs, ignore_index=True)
    cube_df = cube_df.rename(columns={'mcc_cd': 'industry'})
    
    print(f"Pandas-куб создан: {len(cube_df)} строк")
    return cube_df

def create_sql_cube(cursor, conn):
    """Создание OLAP-куба в SQL"""
    cursor.execute("DROP TABLE IF EXISTS sales_cube_python;")
    
    cursor.execute("""
    CREATE TABLE sales_cube_python AS
    WITH sales_data AS (
        SELECT 
            c.gender,
            CASE 
                WHEN c.age < 19 THEN 'До 18'
                WHEN c.age BETWEEN 19 AND 30 THEN '19-30'
                WHEN c.age > 30 THEN 'От 31'
            END AS age_group,
            m.mcc_cd AS industry,
            EXTRACT(YEAR FROM t.transaction_dttm)::INTEGER AS year,
            EXTRACT(MONTH FROM t.transaction_dttm)::INTEGER AS month,
            t.transaction_amt
        FROM 
            t_python t
        INNER JOIN 
            c_python c ON t.client_id = c.client_id
        INNER JOIN 
            m_python m ON t.merchant_id = m.merchant_id
    )
    SELECT 
        gender,
        age_group,
        industry,
        year,
        month,
        SUM(transaction_amt) AS total_amount,
        AVG(transaction_amt) AS avg_amount,
        COUNT(transaction_amt) AS transaction_count
    FROM 
        sales_data
    GROUP BY 
        CUBE(gender, age_group, industry, year, month);
    """)
    
    conn.commit()
    print("SQL OLAP-куб создан")

def query_pandas_cube(cube_df, year=None, month=None, gender=None, age_group=None, industry=None):
    """Запрос к pandas кубу"""
    query = cube_df.copy()
    
    # Фильтрация
    if year is not None:
        query = query[query['year'] == year]
    else:
        query = query[query['year'].isna()]
        
    if month is not None:
        query = query[query['month'] == month]
    else:
        query = query[query['month'].isna()]
        
    if gender is not None:
        query = query[query['gender'] == gender]
    else:
        query = query[query['gender'].isna()]
        
    if age_group is not None:
        query = query[query['age_group'] == age_group]
    else:
        query = query[query['age_group'].isna()]
        
    if industry is not None:
        query = query[query['industry'] == industry]
    else:
        query = query[query['industry'].isna()]
    
    return query[['total_amount', 'avg_amount', 'transaction_count']]

def run_sample_queries(cube_df, cursor):
    """Выполняем примеры запросов"""
    print("\n--- Pandas запросы ---")
    
    # Запрос 1: Сумма всех покупок за 2020 год
    result = query_pandas_cube(cube_df, year=2020)
    print("\nСумма всех покупок (2020):")
    print(result)
    
    # Запрос 2: Сумма всех покупок за апрель 2020 года
    result = query_pandas_cube(cube_df, year=2020, month=4)
    print("\nСумма всех покупок (апрель 2020):")
    print(result)
    
    # Запрос 3: Сумма покупок мужчин за 2020 год
    result = query_pandas_cube(cube_df, year=2020, gender='M')
    print("\nСумма покупок мужчин (2020):")
    print(result)
    
    # Запрос 4: Сумма покупок мужчин 19-30 лет за 2020 год
    result = query_pandas_cube(cube_df, year=2020, gender='M', age_group='19-30')
    print("\nСумма покупок мужчин 19-30 лет (2020):")
    print(result)
    
    print("\n--- SQL запросы ---")
    
    # SQL-запросы
    queries = [
        ("Сумма всех покупок (2020)", """
        SELECT total_amount FROM sales_cube_python
        WHERE year = 2020 AND gender IS NULL AND age_group IS NULL 
        AND industry IS NULL AND month IS NULL;
        """),
        
        ("Сумма всех покупок (апрель 2020)", """
        SELECT total_amount FROM sales_cube_python
        WHERE year = 2020 AND month = 4 AND gender IS NULL 
        AND age_group IS NULL AND industry IS NULL;
        """),
        
        ("Сумма покупок мужчин (2020)", """
        SELECT total_amount FROM sales_cube_python
        WHERE year = 2020 AND gender = 'M' AND age_group IS NULL 
        AND industry IS NULL AND month IS NULL;
        """),
        
        ("Сумма покупок мужчин 19-30 лет (2020)", """
        SELECT total_amount FROM sales_cube_python
        WHERE year = 2020 AND gender = 'M' AND age_group = '19-30' 
        AND industry IS NULL AND month IS NULL;
        """)
    ]
    
    for name, sql in queries:
        cursor.execute(sql)
        result = cursor.fetchone()
        print(f"\n{name}: {result[0] if result else 'Нет данных'}")

def main():
    """Основная функция"""
    print("Запуск ETL-процесса")
    
    # Параметры
    num_clients = 10000
    num_merchants = 500
    num_transactions = 50000
    
    # Подключение к БД
    conn, cursor = connect_db()
    if not conn:
        return
    
    # SQLAlchemy для pandas
    try:
        engine = create_engine(f"postgresql://{DB_PARAMS['user']}:{DB_PARAMS['password']}@{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['database']}")
        print("SQLAlchemy engine создан")
    except Exception as e:
        print(f"Ошибка при создании SQLAlchemy engine: {e}")
        conn.close()
        return
    
    # Выполняем ETL
    create_tables(cursor, conn)
    
    # Генерация данных
    clients_df = gen_clients(num_clients)
    merchants_df = gen_merchants(num_merchants)
    transactions_df = gen_transactions(
        num_transactions, 
        clients_df['client_id'].tolist(), 
        merchants_df['merchant_id'].tolist()
    )
    
    # Загрузка в БД
    if not insert_data(cursor, conn, engine, clients_df, merchants_df, transactions_df):
        print("Не удалось вставить данные в БД. Остановка процесса.")
        conn.close()
        return
    
    # Создание кубов
    cube_df = create_pandas_cube(transactions_df, clients_df, merchants_df)
    create_sql_cube(cursor, conn)
    
    # Запросы
    run_sample_queries(cube_df, cursor)
    
    conn.close()
    print("\nETL завершен")

if __name__ == "__main__":
    main()

Запуск ETL-процесса
SQLAlchemy engine создан
Таблицы созданы
Данные о клиентах вставлены
Данные о торговых точках вставлены
