# Предобработка данных для рекомендательной системы

In [1]:
import pandas as pd


class Constants:
    USER_ID = "user_id"
    ITEM_ID = "item_id"
    TIMESTAMP = "time"

    TRANSACTIONS_PATH = "/Users/alfa/Documents/diplom/graphnn-recommendation-system/data/transactions_train.csv"
    CUSTOMERS_PATH = "/Users/alfa/Documents/diplom/graphnn-recommendation-system/data/customers.csv"
    ARTICLES_PATH = "/Users/alfa/Documents/diplom/graphnn-recommendation-system/data/articles.csv"

    RESULT_TRANSACTIONS_PATH = "/Users/alfa/Documents/diplom/graphnn-recommendation-system/data/processed_transactions_train.csv"
    RESULT_CUSTOMERS_PATH = "/Users/alfa/Documents/diplom/graphnn-recommendation-system/data/processed_customers_train.csv"
    RESULT_ARTICLES_PATH = "/Users/alfa/Documents/diplom/graphnn-recommendation-system/data/processed_articles_train.csv"


In [2]:
def prepare_filtered_dataset(
    transactions: pd.DataFrame,
    num_customers: int = 100,
    num_articles: int = 100,
    min_articles_per_user: int = 5,
    min_users_per_article: int = 5,
) -> tuple:
    """
    Отбираем сбалансированный набор данных:
    - num_customers: количество пользователей для отбора
    - min_articles_per_user: минимальное количество покупок у пользователя
    - min_users_per_article: минимальное количество покупателей у товара
    """

    # Фильтрация товаров
    article_popularity = transactions[Constants.ITEM_ID].value_counts()
    popular_articles = article_popularity[article_popularity >= min_users_per_article]

    # Отбираем топ-N самых активных пользователей
    selected_items = popular_articles.head(num_articles).index

    # Окончательный набор данных
    filtered_data = transactions[
        transactions[Constants.ITEM_ID].isin(selected_items)
    ]

    # Первичная фильтрация пользователей
    user_purchase_counts = filtered_data[Constants.USER_ID].value_counts()
    active_users = user_purchase_counts[user_purchase_counts >= min_articles_per_user]

    # Отбираем топ-N самых активных пользователей
    selected_users = active_users.head(num_customers).index

    # Фильтруем транзакции только по выбранным пользователям
    final_data = filtered_data[filtered_data[Constants.USER_ID].isin(selected_users)]
    final_data = final_data.reset_index(drop=True)

    print(f"Исходный размер данных: {len(transactions)} транзакций")
    print(f"Отобрано пользователей: {len(selected_users)}")
    print(f"Отобрано товаров: {len(selected_items)}")
    print(f"Финальный размер данных: {len(final_data)} транзакций")

    return final_data, selected_users, selected_items


In [3]:
transactions = pd.read_csv(Constants.TRANSACTIONS_PATH)
transactions = transactions.rename(
    columns={
        "t_dat": Constants.TIMESTAMP,
        "customer_id": Constants.USER_ID,
        "article_id": Constants.ITEM_ID,
    }
)
transactions[Constants.USER_ID] = transactions[Constants.USER_ID].astype(str)
transactions[Constants.ITEM_ID] = transactions[Constants.ITEM_ID].astype(str)

customers = pd.read_csv(Constants.CUSTOMERS_PATH)
customers = (
    customers.rename(
        columns={
            "customer_id": Constants.USER_ID,
            "Active": "is_active",
            "age": "age",
        }
    )
    .drop(columns=["FN", "postal_code"])
)
customers[Constants.USER_ID] = customers[Constants.USER_ID].astype(str)

articles = pd.read_csv(Constants.ARTICLES_PATH)
articles = (
    articles.rename(
        columns={"article_id": Constants.ITEM_ID}
    )
    [[
        Constants.ITEM_ID,
        "prod_name",
        "product_type_name",
        "product_group_name",
        "colour_group_name",
        "detail_desc",
    ]]
)
articles[Constants.ITEM_ID] = articles[Constants.ITEM_ID].astype(str)

In [4]:
articles.head()

Unnamed: 0,item_id,prod_name,product_type_name,product_group_name,colour_group_name,detail_desc
0,108775015,Strap top,Vest top,Garment Upper body,Black,Jersey top with narrow shoulder straps.
1,108775044,Strap top,Vest top,Garment Upper body,White,Jersey top with narrow shoulder straps.
2,108775051,Strap top (1),Vest top,Garment Upper body,Off White,Jersey top with narrow shoulder straps.
3,110065001,OP T-shirt (Idro),Bra,Underwear,Black,"Microfibre T-shirt bra with underwired, moulde..."
4,110065002,OP T-shirt (Idro),Bra,Underwear,White,"Microfibre T-shirt bra with underwired, moulde..."


In [5]:
customers.head()

Unnamed: 0,user_id,is_active,club_member_status,fashion_news_frequency,age
0,00000dbacae5abe5e23885899a1fa44253a17956c6d1c3...,,ACTIVE,NONE,49.0
1,0000423b00ade91418cceaf3b26c6af3dd342b51fd051e...,,ACTIVE,NONE,25.0
2,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,,ACTIVE,NONE,24.0
3,00005ca1c9ed5f5146b52ac8639a40ca9d57aeff4d1bd2...,,ACTIVE,NONE,54.0
4,00006413d8573cd20ed7128e53b7b13819fe5cfc2d801f...,1.0,ACTIVE,Regularly,52.0


In [6]:
transactions.head()

Unnamed: 0,time,user_id,item_id,price,sales_channel_id
0,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,663713001,0.050831,2
1,2018-09-20,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,541518023,0.030492,2
2,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,505221004,0.015237,2
3,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687003,0.016932,2
4,2018-09-20,00007d2de826758b65a93dd24ce629ed66842531df6699...,685687004,0.016932,2


In [7]:
filtered_transactions, selected_users, selected_items = prepare_filtered_dataset(transactions=transactions)
filtered_customers = customers[customers[Constants.USER_ID].isin(selected_users)]
filtered_customers = filtered_customers.reset_index(drop=True)
filtered_articles = articles[articles[Constants.ITEM_ID].isin(selected_items)]
filtered_articles = filtered_articles.reset_index(drop=True)

Исходный размер данных: 31788324 транзакций
Отобрано пользователей: 100
Отобрано товаров: 100
Финальный размер данных: 5542 транзакций


In [8]:
filtered_transactions.to_parquet(Constants.RESULT_TRANSACTIONS_PATH, index=False)
filtered_customers.to_parquet(Constants.RESULT_CUSTOMERS_PATH, index=False)
filtered_articles.to_parquet(Constants.RESULT_ARTICLES_PATH, index=False)

# Загрузка предобработанных данных в PostgreSQL

In [15]:
import psycopg2
from psycopg2 import sql
import pandas as pd
from io import StringIO

def clear_tables(conn, schema_name='recsys'):
    """Очищает все таблицы в схеме"""
    with conn.cursor() as cur:
        try:
            # Отключаем проверку внешних ключей для очистки
            cur.execute("SET CONSTRAINTS ALL DEFERRED;")
            
            # Получаем список всех таблиц в схеме
            cur.execute(sql.SQL("""
                SELECT table_name 
                FROM information_schema.tables 
                WHERE table_schema = %s
            """), [schema_name])
            
            tables = [row[0] for row in cur.fetchall()]
            
            # Очищаем каждую таблицу (в обратном порядке из-за foreign keys)
            for table in sorted(tables, reverse=True):
                cur.execute(sql.SQL("TRUNCATE TABLE {}.{} CASCADE").format(
                    sql.Identifier(schema_name),
                    sql.Identifier(table)
                ))
                print(f"Таблица {schema_name}.{table} очищена")
            
            conn.commit()
            print(f"Все таблицы в схеме {schema_name} успешно очищены")
        except Exception as e:
            print(f"Ошибка при очистке таблиц: {e}")
            conn.rollback()

def create_schema_and_tables(conn, schema_name='recsys'):
    """Создает схему и таблицы, если они не существуют"""
    with conn.cursor() as cur:
        try:
            # Создаем схему
            cur.execute(sql.SQL("CREATE SCHEMA IF NOT EXISTS {};").format(sql.Identifier(schema_name)))
            
            # Сначала создаем таблицы customers и articles, так как transactions будет на них ссылаться
            cur.execute(sql.SQL("""
            CREATE TABLE IF NOT EXISTS {}.customers (
                user_id VARCHAR(255) PRIMARY KEY,
                is_active BOOLEAN,
                club_member_status VARCHAR(255),
                fashion_news_frequency VARCHAR(255),
                age INTEGER
            );
            """).format(sql.Identifier(schema_name)))
            
            cur.execute(sql.SQL("""
            CREATE TABLE IF NOT EXISTS {}.articles (
                item_id VARCHAR(255) PRIMARY KEY,
                prod_name VARCHAR(255),
                product_type_name VARCHAR(255),
                product_group_name VARCHAR(255),
                colour_group_name VARCHAR(255),
                detail_desc TEXT
            );
            """).format(sql.Identifier(schema_name)))
            
            # Теперь создаем transactions с внешними ключами
            cur.execute(sql.SQL("""
            CREATE TABLE IF NOT EXISTS {}.transactions (
                time TEXT,
                user_id VARCHAR(255) REFERENCES {}.customers(user_id),
                item_id VARCHAR(255) REFERENCES {}.articles(item_id),
                price NUMERIC(10, 2),
                sales_channel_id INTEGER,
                CONSTRAINT fk_user FOREIGN KEY(user_id) REFERENCES {}.customers(user_id),
                CONSTRAINT fk_item FOREIGN KEY(item_id) REFERENCES {}.articles(item_id)
            );
            """).format(
                sql.Identifier(schema_name),
                sql.Identifier(schema_name),
                sql.Identifier(schema_name),
                sql.Identifier(schema_name),
                sql.Identifier(schema_name)
            ))
            
            conn.commit()
            print(f"Схема {schema_name} и таблицы успешно созданы")
        except Exception as e:
            print(f"Ошибка при создании схемы и таблиц: {e}")
            conn.rollback()

def fast_load(conn, df, table_name, schema_name='recsys'):
    """Быстрая загрузка данных с использованием copy_from"""
    try:
        # Подготовка данных
        buffer = StringIO()
        df.to_csv(buffer, index=False, header=False, sep='\t', na_rep='NULL')
        buffer.seek(0)
        
        with conn.cursor() as cur:
            # Временно отключаем проверку внешних ключей для загрузки данных
            cur.execute("SET CONSTRAINTS ALL DEFERRED;")
            
            # Копирование данных
            cur.copy_expert(
                sql.SQL("COPY {}.{} FROM STDIN WITH DELIMITER AS '\t' NULL AS 'NULL'").format(
                    sql.Identifier(schema_name),
                    sql.Identifier(table_name)
                ),
                buffer
            )
            conn.commit()
            print(f"Успешно загружено {len(df)} записей в таблицу {schema_name}.{table_name}")
    except Exception as e:
        print(f"Ошибка при загрузке данных в {schema_name}.{table_name}: {e}")
        conn.rollback()

def main():
    # Подключение к PostgreSQL
    conn = psycopg2.connect(
        host="localhost",
        database="appdata",
        user="appdata",
        password="appdata123",
        port=5436
    )
    
    try:
        # Создаем схему и таблицы (если не существуют)
        create_schema_and_tables(conn, schema_name='recsys')
        
        # Очищаем таблицы перед загрузкой новых данных
        clear_tables(conn, schema_name='recsys')

        # Важно сначала загрузить customers и articles, так как transactions на них ссылается
        filtered_customers['is_active'] = filtered_customers['is_active'].astype(bool)
        filtered_customers['age'] = filtered_customers['age'].astype('Int64')
        fast_load(conn, filtered_customers, 'customers', 'recsys')
        
        fast_load(conn, filtered_articles, 'articles', 'recsys')
        
        # Теперь можно загружать transactions
        fast_load(conn, filtered_transactions, 'transactions', 'recsys')
        
    except Exception as e:
        print(f"Общая ошибка: {e}")
    finally:
        conn.close()

if __name__ == "__main__":
    main()

Схема recsys и таблицы успешно созданы
Таблица recsys.transactions очищена
Таблица recsys.customers очищена
Таблица recsys.articles очищена
Все таблицы в схеме recsys успешно очищены
Успешно загружено 100 записей в таблицу recsys.customers
Успешно загружено 100 записей в таблицу recsys.articles
Успешно загружено 5542 записей в таблицу recsys.transactions


# Загрузка изображений в S3

In [None]:
import boto3
import os
from pathlib import Path


def upload_to_s3(local_folder: str, bucket_name: str):
    # Инициализация клиента
    s3 = boto3.client(
        's3',
        endpoint_url='http://localhost:9000',
        aws_access_key_id='minio',
        aws_secret_access_key='minio123'
    )

    # Создание бакета если не существует
    try:
        s3.create_bucket(Bucket=bucket_name)
    except Exception as e:
        print(f"Bucket exists: {e}")

    # Рекурсивная загрузка файлов
    for root, _, files in os.walk(local_folder):
        for file in files:
            local_path = Path(root) / file
            s3_path = str(local_path.relative_to(local_folder))
            
            s3.upload_file(
                str(local_path),
                bucket_name,
                s3_path
            )
            print(f"Uploaded: {s3_path}")


upload_to_s3("/Users/alfa/Documents/diplom/graphnn-recommendation-system/data/images", "product-images")