# Инициализация

Загружаем библиотеки необходимые для выполнения кода ноутбука.

### Imports

In [2]:
import os
import requests
from dotenv import load_dotenv

import pandas as pd
import polars as pl
import numpy as np
import scipy

import matplotlib.pyplot as plt
import seaborn as sns


from implicit.als import AlternatingLeastSquares
from sklearn.preprocessing import LabelEncoder

from catboost import CatBoostClassifier, Pool

#import calendar
import joblib
#import s3fs
import gc
import boto3
from botocore.exceptions import ClientError



  from .autonotebook import tqdm as notebook_tqdm


### Config

In [3]:
# Fix random state for reproducibility
RANDOM_STATE = 42

# Load environment variables
load_dotenv()

datasets = {
    'tracks.parquet': os.getenv('RAW_URL_TRACKS'),
    'catalog_names.parquet': os.getenv('RAW_URL_CATALOG_NAMES'),
    'interactions.parquet': os.getenv('RAW_URL_INTERACTIONS'),
}

raw_dir = os.getenv('RAW_DATA_DIR', '../data/raw')
preprocessed_dir = os.getenv('PREPROCESSED_DATA_DIR', '../data/preprocessed')
encoder_dir = os.getenv('ENCODERS_DIR', '../encoders')

s3_bucket = os.getenv('S3_BUCKET_NAME')
s3_prefix = os.getenv('S3_PREFIX', 'recsys/data/')
s3_region = os.getenv('S3_REGION', 'us-east-1')
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')


# === ЭТАП 1 ===

# Загрузка первичных данных

Загружаем первичные данные из файлов:
- tracks.parquet
- catalog_names.parquet
- interactions.parquet

In [None]:
# ---------- Download datasets and save locally ---------- #
# Create directory
os.makedirs(raw_dir, exist_ok=True)

# Download and save each dataset
for filename, url in datasets.items():
    save_path = os.path.join(raw_dir, filename)
    response = requests.get(url)
    response.raise_for_status()
    with open(save_path, 'wb') as f:
        f.write(response.content)
    print(f'Saved {filename} to {save_path}')

In [None]:
# ---------- Load datasets ---------- #
tracks = pl.read_parquet(os.path.join(raw_dir, 'tracks.parquet'))
catalog_names = pl.read_parquet(os.path.join(raw_dir, 'catalog_names.parquet'))
interactions = pl.read_parquet(os.path.join(raw_dir, 'interactions.parquet'))

# Обзор данных

Проверяем данные, есть ли с ними явные проблемы.

In [None]:
# ---------- Check data summary ---------- #
def data_summary(df: pd.DataFrame, name: str):
    '''
        Display a quick overview of a DataFrame.
    '''

    print(f'\n===== {name.upper()} =====')  
  
    # Sample rows
    print('\nSample rows:')
    display(df.head())

    # Shape
    rows, cols = df.shape
    print(f'\nShape: {rows} rows x {cols} columns')
    
    # Data info
    print('\nSummary for numeric columns:')
    print(df.describe())
   
    # Unique values (column-wise, skip if error occurs)
    print('\nUnique values (for each column):')
    try:
        for col in df.columns:
            print(f'\nColumn: {col}')
            print(df[col].value_counts())
    except Exception as e:
        print(f'Skipped value_counts due to error: {e}')
    
    # Missing values
    print('\nMissing values:')
    print(df.null_count())

In [None]:
data_summary(tracks, 'tracks')

In [None]:
data_summary(catalog_names, 'catalog_names')

In [None]:
data_summary(interactions, 'interactions')

### Main takeaways
1. Tracs dataframe contains lists instead of scalar values. This can cause several problems:
- Unable to get insights on data;
Counting, merging, or joining on list columns is tricky.
- Missing values are hidden;
df.isna().sum() can’t detect empty lists, so there might be tracks with no genres or artists but they look as not missing values.
- Hard to work with for ML models.
Most algorithms expect scalar values, not lists.
Thus, it's necessary to explode the lists to get a dataframe with one row per track-per-item (artist, genre, album).

2. Catalog_names dataframe is in a format where everything (tracks, albums, artists, genres) is stacked in one column, and the type column tells what each row represents. This format isn't convenient for futher work. Thus, it's necessary to split catalog_names into several dataframes.

# Data Preprocessing

**Note:** For large-scale processing, use `python3 -m src.main` from terminal instead of running cells below. The notebook cells are for exploration only and may cause memory issues.

In [None]:
# ---------- Explode lists of tracs dataframe into separate rows ---------- #
tracks_exploded = (
    tracks
        .explode('albums')
        .explode('artists')
        .explode('genres')
        .rename({
            'albums': 'album_id',
            'artists': 'artist_id',
            'genres': 'genre_id'
        })
)
data_summary(tracks_exploded, 'tracks_exploded')

In [None]:
# ---------- Helper functions (imported from src/preprocess_data.py) ---------- #
# These match the production pipeline logic

UNKNOWN_TOKEN = 'Unknown'
BRACKETS_PATTERN = r'[\(\[][^\)\]]*[\)\]]'
FEATURE_PATTERN = r'\b(feat\.?|ft\.?|featuring)\b.*'
VERSION_TAG_PATTERN = r'\b(live|remix|extended|radio edit|acoustic|remastered(?:\s+\d{4})?)\b'
COVER_PATTERN = r'\bcover(ed)?\b.*'

def basic_standardize(colname: str, alias: str) -> pl.Expr:
    """STEP 0: Basic text standardization."""
    return (
        pl.col(colname)
        .cast(pl.Utf8)
        .fill_null('')
        .str.normalize('NFKD')
        .str.replace_all(r'[\p{M}]', '')
        .str.to_lowercase()
        .str.replace_all(r'[^\w\s]', ' ')
        .str.replace_all(r'\s+', ' ')
        .str.strip_chars()
        .alias(alias)
    )

def clean_entity_name(std_col: str, alias: str, strip_versions: bool = False, strip_features: bool = False) -> pl.Expr:
    """STEP 3: Clean entity names."""
    expr = pl.col(std_col).fill_null('')
    expr = expr.str.replace_all(BRACKETS_PATTERN, ' ')
    if strip_features:
        expr = expr.str.replace_all(FEATURE_PATTERN, ' ')
    if strip_versions:
        expr = expr.str.replace_all(VERSION_TAG_PATTERN, ' ')
        expr = expr.str.replace_all(COVER_PATTERN, ' ')
    expr = expr.str.replace_all(r'[^\w\s]', ' ')
    expr = expr.str.replace_all(r'\s+', ' ')
    return expr.str.strip_chars().str.to_titlecase().alias(alias)

def normalize_track_title(source_col: str, alias: str = 'title_normalized') -> pl.Expr:
    """STEP 4: Normalize track titles for grouping."""
    expr = pl.col(source_col).cast(pl.Utf8).fill_null('')
    expr = expr.str.to_lowercase()
    expr = expr.str.replace_all(BRACKETS_PATTERN, ' ')
    expr = expr.str.replace_all(FEATURE_PATTERN, ' ')
    expr = expr.str.replace_all(VERSION_TAG_PATTERN, ' ')
    expr = expr.str.replace_all(COVER_PATTERN, ' ')
    expr = expr.str.replace_all(r'[^\w\s]', ' ')
    expr = expr.str.replace_all(r'\s+', ' ')
    return expr.str.strip_chars().alias(alias)

def ensure_token(col: str, token: str = UNKNOWN_TOKEN) -> pl.Expr:
    """Fill empty strings with default token."""
    return (
        pl.when(pl.col(col).is_null() | (pl.col(col).str.len_chars() == 0))
        .then(pl.lit(token))
        .otherwise(pl.col(col))
        .alias(col)
    )

In [None]:
# ---------- STEP 0: Basic standardization for catalog entities ---------- #

tracks_catalog = (
    catalog_names
        .filter(pl.col('type') == 'track')
        .select([
            pl.col('id').alias('track_id'),
            pl.col('name').alias('track_name')
        ])
        .with_columns([basic_standardize('track_name', 'track_name_std')])
        .with_columns([
            pl.when(pl.col('track_name_std') == '')
              .then(pl.lit(UNKNOWN_TOKEN))
              .otherwise(pl.col('track_name_std'))
              .alias('track_name_std')
        ])
)

artists_catalog = (
    catalog_names
        .filter(pl.col('type') == 'artist')
        .select([
            pl.col('id').alias('artist_id'),
            pl.col('name').alias('artist_name')
        ])
        .with_columns([basic_standardize('artist_name', 'artist_name_std')])
        .with_columns([
            pl.when(pl.col('artist_name_std') == '')
              .then(pl.lit(UNKNOWN_TOKEN))
              .otherwise(pl.col('artist_name_std'))
              .alias('artist_name_std')
        ])
)

albums_catalog = (
    catalog_names
        .filter(pl.col('type') == 'album')
        .select([
            pl.col('id').alias('album_id'),
            pl.col('name').alias('album_name')
        ])
        .with_columns([basic_standardize('album_name', 'album_name_std')])
        .with_columns([
            pl.when(pl.col('album_name_std') == '')
              .then(pl.lit(UNKNOWN_TOKEN))
              .otherwise(pl.col('album_name_std'))
              .alias('album_name_std')
        ])
)

genres_catalog = (
    catalog_names
        .filter(pl.col('type') == 'genre')
        .select([
            pl.col('id').alias('genre_id'),
            pl.col('name').alias('genre_name')
        ])
        .with_columns([basic_standardize('genre_name', 'genre_name_std')])
        .with_columns([
            pl.when(pl.col('genre_name_std') == '')
              .then(pl.lit(UNKNOWN_TOKEN))
              .otherwise(pl.col('genre_name_std'))
              .alias('genre_name_std')
        ])
)

data_summary(tracks_catalog, 'tracks_catalog_std')
data_summary(artists_catalog, 'artists_catalog_std')
data_summary(albums_catalog, 'albums_catalog_std')
data_summary(genres_catalog, 'genres_catalog_std')

In [None]:
# ---------- STEP 1.1: Canonical artists (deduplicate entities) ---------- #

artists_dedup = (
    artists_catalog
        .group_by('artist_name_std')
        .agg(pl.min('artist_id').alias('artist_id_canonical'))
        .with_columns([
            clean_entity_name('artist_name_std', 'artist_clean')
        ])
        .with_columns([
            pl.when(pl.col('artist_clean') == '')
              .then(pl.lit('Unknown'))
              .otherwise(pl.col('artist_clean'))
              .alias('artist_clean')
        ])
)

artist_id_map = (
    artists_catalog
        .join(
            artists_dedup.select(['artist_name_std', 'artist_id_canonical']),
            on='artist_name_std',
            how='left'
        )
        .with_columns([
            pl.when(pl.col('artist_id_canonical').is_null())
              .then(pl.col('artist_id'))
              .otherwise(pl.col('artist_id_canonical'))
              .alias('artist_id_canonical')
        ])
        .select(['artist_id', 'artist_id_canonical'])
        .unique()
)

data_summary(artists_dedup, 'artists_dedup')

In [None]:
# ---------- STEP 1.2: Canonical albums (deduplicate per artist) ---------- #

album_artist_bridge = (
    tracks_exploded
        .select(['album_id', 'artist_id'])
        .drop_nulls()
        .unique()
        .join(artist_id_map, on='artist_id', how='left')
        .with_columns([
            pl.when(pl.col('artist_id_canonical').is_null())
              .then(pl.lit(UNKNOWN_TOKEN))
              .otherwise(pl.col('artist_id_canonical'))
              .alias('artist_id_canonical')
        ])
        .select(['album_id', 'artist_id_canonical'])
)

albums_with_artist = (
    albums_catalog
        .join(album_artist_bridge, on='album_id', how='left')
        .with_columns([
            pl.when(pl.col('artist_id_canonical').is_null())
              .then(pl.lit(UNKNOWN_TOKEN))
              .otherwise(pl.col('artist_id_canonical'))
              .alias('artist_id_canonical')
        ])
)

albums_dedup = (
    albums_with_artist
        .group_by(['album_name_std', 'artist_id_canonical'])
        .agg(pl.min('album_id').alias('album_id_canonical'))
        .with_columns([
            clean_entity_name('album_name_std', 'album_clean', strip_versions=True, strip_features=True)
        ])
        .with_columns([
            pl.when(pl.col('album_clean') == '')
              .then(pl.lit('Unknown'))
              .otherwise(pl.col('album_clean'))
              .alias('album_clean')
        ])
)

album_id_map = (
    albums_with_artist
        .join(
            albums_dedup.select(['album_name_std', 'artist_id_canonical', 'album_id_canonical']),
            on=['album_name_std', 'artist_id_canonical'],
            how='left'
        )
        .with_columns([
            pl.when(pl.col('album_id_canonical').is_null())
              .then(pl.col('album_id'))
              .otherwise(pl.col('album_id_canonical'))
              .alias('album_id_canonical')
        ])
        .select(['album_id', 'album_id_canonical'])
        .unique()
)

data_summary(albums_dedup, 'albums_dedup')

In [None]:
# ---------- STEP 1.3: Canonical tracks (per artist) ---------- #

track_artist_bridge = (
    tracks_exploded
        .select(['track_id', 'artist_id'])
        .drop_nulls()
        .unique()
        .join(artist_id_map, on='artist_id', how='left')
        .with_columns([
            pl.when(pl.col('artist_id_canonical').is_null())
              .then(pl.lit(UNKNOWN_TOKEN))
              .otherwise(pl.col('artist_id_canonical'))
              .alias('artist_id_canonical')
        ])
        .select(['track_id', 'artist_id_canonical'])
)

tracks_with_artist = (
    tracks_catalog
        .join(track_artist_bridge, on='track_id', how='left')
        .with_columns([
            pl.when(pl.col('artist_id_canonical').is_null())
              .then(pl.lit(UNKNOWN_TOKEN))
              .otherwise(pl.col('artist_id_canonical'))
              .alias('artist_id_canonical')
        ])
)

tracks_dedup = (
    tracks_with_artist
        .group_by(['track_name_std', 'artist_id_canonical'])
        .agg(pl.min('track_id').alias('track_id_canonical'))
        .with_columns([
            clean_entity_name('track_name_std', 'track_clean', strip_versions=True, strip_features=True)
        ])
        .with_columns([
            normalize_track_title('track_clean', 'title_normalized')
        ])
        .with_columns([
            pl.when(pl.col('track_clean') == '')
              .then(pl.lit('Unknown'))
              .otherwise(pl.col('track_clean'))
              .alias('track_clean'),
            pl.when(pl.col('title_normalized') == '')
              .then(pl.lit(UNKNOWN_TOKEN))
              .otherwise(pl.col('title_normalized'))
              .alias('title_normalized')
        ])
)

track_group_lookup = (
    tracks_dedup
        .group_by(['title_normalized', 'artist_id_canonical'])
        .agg(pl.min('track_id_canonical').alias('track_group_id'))
)

tracks_dedup = (
    tracks_dedup
        .join(track_group_lookup, on=['title_normalized', 'artist_id_canonical'], how='left')
)

track_id_map = (
    tracks_with_artist
        .join(
            tracks_dedup.select(['track_name_std', 'artist_id_canonical', 'track_id_canonical']),
            on=['track_name_std', 'artist_id_canonical'],
            how='left'
        )
        .with_columns([
            pl.when(pl.col('track_id_canonical').is_null())
              .then(pl.col('track_id'))
              .otherwise(pl.col('track_id_canonical'))
              .alias('track_id_canonical')
        ])
        .select(['track_id', 'track_id_canonical'])
        .unique()
)

data_summary(tracks_dedup, 'tracks_dedup')

In [None]:
# ---------- STEP 1.4: Canonical genres ---------- #

genres_dedup = (
    genres_catalog
        .group_by('genre_name_std')
        .agg(pl.min('genre_id').alias('genre_id_canonical'))
        .with_columns([
            clean_entity_name('genre_name_std', 'genre_clean')
        ])
        .with_columns([
            pl.when(pl.col('genre_clean') == '')
              .then(pl.lit('Unknown'))
              .otherwise(pl.col('genre_clean'))
              .alias('genre_clean')
        ])
)

genre_id_map = (
    genres_catalog
        .join(
            genres_dedup.select(['genre_name_std', 'genre_id_canonical']),
            on='genre_name_std',
            how='left'
        )
        .with_columns([
            pl.when(pl.col('genre_id_canonical').is_null())
              .then(pl.col('genre_id'))
              .otherwise(pl.col('genre_id_canonical'))
              .alias('genre_id_canonical')
        ])
        .select(['genre_id', 'genre_id_canonical'])
        .unique()
)

data_summary(genres_dedup, 'genres_dedup')

In [None]:
# ---------- STEP 5–7: Canonical items dataframe (dedupe, drop nulls, validate) ---------- #

CHECKPOINT_DIR = os.path.join(preprocessed_dir, 'checkpoints')
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
FACT_IDS_PATH = os.path.join(CHECKPOINT_DIR, 'fact_ids_checkpoint.parquet')
ITEMS_PATH = os.path.join(preprocessed_dir, 'items_canonical.parquet')
CACHE_FACT_IDS = True
CACHE_ITEMS = True

# Phase 1: map raw IDs to canonical IDs using lightweight dict lookups
track_lookup = dict(zip(track_id_map['track_id'], track_id_map['track_id_canonical']))
artist_lookup = dict(zip(artist_id_map['artist_id'], artist_id_map['artist_id_canonical']))
album_lookup = dict(zip(album_id_map['album_id'], album_id_map['album_id_canonical']))
genre_lookup = dict(zip(genre_id_map['genre_id'], genre_id_map['genre_id_canonical']))

lazy_fact_ids = (
    tracks_exploded
        .lazy()
        .select(['track_id', 'artist_id', 'album_id', 'genre_id'])
        .with_columns([
            pl.col('track_id').replace(track_lookup, default=None).alias('track_id'),
            pl.col('artist_id').replace(artist_lookup, default=None).alias('artist_id'),
            pl.col('album_id').replace(album_lookup, default=None).alias('album_id'),
            pl.col('genre_id').replace(genre_lookup, default=None).alias('genre_id')
        ])
        .with_columns([
            pl.col(col).cast(pl.Int64).alias(col)
            for col in ['track_id', 'artist_id', 'album_id', 'genre_id']
        ])
        .drop_nulls(['track_id', 'artist_id', 'album_id', 'genre_id'])
)

if CACHE_FACT_IDS:
    fact_ids = lazy_fact_ids.collect(engine='streaming')
    fact_ids.write_parquet(FACT_IDS_PATH)
    del fact_ids
    gc.collect()
    fact_ids_lazy = pl.scan_parquet(FACT_IDS_PATH)
else:
    fact_ids_lazy = lazy_fact_ids

# Phase 2: add cleaned names and group ids (dimensions are smaller)
lazy_items = (
    fact_ids_lazy
        .join(
            tracks_dedup.select(['track_id_canonical', 'track_clean', 'track_group_id']).lazy(),
            left_on='track_id',
            right_on='track_id_canonical',
            how='left'
        )
        .join(
            artists_dedup.select(['artist_id_canonical', 'artist_clean']).lazy(),
            left_on='artist_id',
            right_on='artist_id_canonical',
            how='left'
        )
        .join(
            albums_dedup.select(['album_id_canonical', 'album_clean']).lazy(),
            left_on='album_id',
            right_on='album_id_canonical',
            how='left'
        )
        .join(
            genres_dedup.select(['genre_id_canonical', 'genre_clean']).lazy(),
            left_on='genre_id',
            right_on='genre_id_canonical',
            how='left'
        )
        .select([
            pl.col('track_id').alias('track_id'),
            'track_clean',
            'track_group_id',
            pl.col('artist_id').alias('artist_id'),
            'artist_clean',
            pl.col('album_id').alias('album_id'),
            'album_clean',
            pl.col('genre_id').alias('genre_id'),
            'genre_clean'
        ])
        .with_columns([
            pl.when(pl.col('track_group_id').is_null())
              .then(pl.col('track_id'))
              .otherwise(pl.col('track_group_id'))
              .alias('track_group_id')
        ])
        .with_columns([
            pl.when(pl.col(col).is_null() | (pl.col(col).str.len_chars() == 0))
              .then(pl.lit('Unknown'))
              .otherwise(pl.col(col))
              .alias(col)
            for col in ['track_clean', 'artist_clean', 'album_clean', 'genre_clean']
        ])
)

if CACHE_ITEMS:
    items_df = (
        lazy_items
            .collect(engine='streaming')
            .unique(subset=['track_id', 'artist_id', 'album_id', 'genre_id'])
    )
    items_df.write_parquet(ITEMS_PATH)
else:
    items_df = (
        lazy_items
            .collect(engine='streaming')
            .unique(subset=['track_id', 'artist_id', 'album_id', 'genre_id'])
    )

data_summary(items_df, 'items_df')

In [None]:
# ---------- STEP 8: Final canonical catalog tables ---------- #

tracks_catalog_clean = (
    items_df
        .group_by('track_id')
        .agg([
            pl.first('track_clean').alias('track_clean'),
            pl.first('track_group_id').alias('track_group_id'),
            pl.first('artist_id').alias('artist_id'),
            pl.first('album_id').alias('album_id'),
            pl.first('genre_id').alias('genre_id')
        ])
)

artists_catalog_clean = (
    artists_dedup
        .select([
            pl.col('artist_id_canonical').alias('artist_id'),
            pl.col('artist_clean')
        ])
        .unique('artist_id')
)

albums_catalog_clean = (
    albums_dedup
        .select([
            pl.col('album_id_canonical').alias('album_id'),
            pl.col('album_clean'),
            pl.col('artist_id_canonical').alias('artist_id')
        ])
        .unique('album_id')
)

genres_catalog_clean = (
    genres_dedup
        .select([
            pl.col('genre_id_canonical').alias('genre_id'),
            pl.col('genre_clean')
        ])
        .unique('genre_id')
)

data_summary(tracks_catalog_clean, 'tracks_catalog_clean')
data_summary(artists_catalog_clean, 'artists_catalog_clean')
data_summary(albums_catalog_clean, 'albums_catalog_clean')
data_summary(genres_catalog_clean, 'genres_catalog_clean')

In [4]:
# ---------- Load preprocessed data (recommended approach) ---------- #
# If you ran: python3 -m src.main
# Load the outputs directly instead of running memory-heavy cells above

items_path = os.path.join(preprocessed_dir, 'items.parquet')
events_path = os.path.join(preprocessed_dir, 'events.parquet')

if os.path.exists(items_path) and os.path.exists(events_path):
    print('✓ Loading preprocessed data from disk...')
    items = pl.read_parquet(items_path)
    events = pl.read_parquet(events_path)
    
    tracks_catalog_clean = pl.read_parquet(os.path.join(preprocessed_dir, 'tracks_catalog_clean.parquet'))
    artists_catalog_clean = pl.read_parquet(os.path.join(preprocessed_dir, 'artists_catalog_clean.parquet'))
    albums_catalog_clean = pl.read_parquet(os.path.join(preprocessed_dir, 'albums_catalog_clean.parquet'))
    genres_catalog_clean = pl.read_parquet(os.path.join(preprocessed_dir, 'genres_catalog_clean.parquet'))
    
    print(f'Items: {items.shape}')
    print(f'Events: {events.shape}')
    print('All catalog tables loaded.')
else:
    print('⚠ Preprocessed data not found.')
    print('Run from terminal: python3 -m src.main --raw-dir data/raw --preprocessed-dir data/preprocessed')
    print('Or execute preprocessing cells above (may cause memory issues on this VM)')

✓ Loading preprocessed data from disk...
Items: (5606516, 9)
Events: (213656164, 4)
All catalog tables loaded.


In [5]:
# Define a regex pattern for "safe" characters: letters, numbers, whitespace
safe_pattern = r'^[\w\s]+$'

# Filter rows where any column contains characters outside the safe pattern
problematic_rows = items.filter(
    ~pl.col('track_clean').str.contains(safe_pattern) |
    ~pl.col('artist_clean').str.contains(safe_pattern) |
    ~pl.col('album_clean').str.contains(safe_pattern) |
    ~pl.col('genre_clean').str.contains(safe_pattern)
)

print(f'Number of problematic rows: {problematic_rows.height}')

Number of problematic rows: 0


In [10]:
display(items.head())

track_id,track_clean,track_group_id,artist_id,artist_clean,album_id,album_clean,genre_id,genre_clean
i64,str,i64,i64,str,i64,str,i64,str
15421156,"""Saint Saens Le Carnaval Des An…",15421156,378040,"""Pascal Roge""",10515261,"""Classical 100""",69,"""Classicalmusic"""
15431752,"""Verdi Rigoletto Act Ii Cortigi…",15431752,83201,"""Wiener Philharmoniker""",31947,"""Verdi Rigoletto""",117,"""Classicalmasterpieces"""
15432989,"""Vivaldi The Four Seasons Winte…",15432989,87464,"""Albrecht Mayer""",13427069,"""In The Mood For Vivaldi""",69,"""Classicalmusic"""
15432989,"""Vivaldi The Four Seasons Winte…",15432989,87464,"""Albrecht Mayer""",14145191,"""Vivaldi Baroque Legends""",23,"""Classical"""
9883457,"""World Wide Sunrise""",9883457,555749,"""Damian Wasse""",2711145,"""Dreamtrance Party""",86,"""Trance"""


In [15]:
lp=items.filter(pl.col('track_group_id') == 178480)


In [None]:
lp.select(pl.col('artist_clean').unique())


In [12]:
items.filter(pl.col('track_clean').str.to_lowercase() == 'in the end'.lower()).head(10)



track_id,track_clean,track_group_id,artist_id,artist_clean,album_id,album_clean,genre_id,genre_clean
i64,str,i64,i64,str,i64,str,i64,str
19237423,"""In The End""",19237423,3127658,"""Tomx""",3545921,"""Electronic Dance Music Miami T…",101,"""House"""
178480,"""In The End""",178480,36800,"""Linkin Park""",16252,"""In The End""",41,"""Numetal"""
470224,"""In The End""",470224,131549,"""Charlotte Gainsbourg""",49596,"""Irm""",11,"""Pop"""
1621152,"""In The End""",1621152,352469,"""Stream Of Passion""",162492,"""The Flame Within""",43,"""Epicmetal"""
1621152,"""In The End""",1621152,352469,"""Stream Of Passion""",7210462,"""Flame Within""",43,"""Epicmetal"""
5500863,"""In The End""",5500863,452194,"""Black Veil Brides""",16525363,"""Punk Music""",47,"""Metal"""
5500863,"""In The End""",5500863,452194,"""Black Veil Brides""",20342138,"""Emo Pop Punk""",47,"""Metal"""
35951269,"""In The End""",35951269,564433,"""Barbara Mendes""",4506355,"""Superstar""",138,"""Latinfolk"""
35951269,"""In The End""",35951269,564433,"""Barbara Mendes""",4506355,"""Superstar""",21,"""Folk"""
39075991,"""In The End""",39075991,1087991,"""Akacia""",5027979,"""In The End""",68,"""Electronics"""


In [7]:
display(events.head(10))

user_id,track_id,listen_count,last_listen
i32,i64,u32,date
668,74566,1,2022-09-04
673,78780542,1,2022-11-19
677,676087,1,2022-08-26
677,693906,1,2022-08-27
677,4777100,1,2022-09-04
677,20412797,1,2022-09-18
677,33315053,1,2022-10-07
677,35419758,1,2022-11-10
677,15397472,1,2022-11-12
678,61886459,1,2022-09-03


In [6]:
# ---------- Clean up interactions dataframe (canonical track ids) ---------- #

interactions = (
    interactions
        .with_columns(pl.col('started_at').dt.date().alias('started_at'))
        .drop('__index_level_0__')
)

# Drop observations with track_seq bigger than 95% percentile
track_seq_upper_threshold = (
    interactions
        .select(pl.col('track_seq').quantile(0.95))
        .item()
)

print('Upper threshold track_seq:', track_seq_upper_threshold)

interactions = (
    interactions
        .filter(pl.col('track_seq') <= track_seq_upper_threshold)
        .join(track_id_map, on='track_id', how='left')
        .with_columns([
            pl.col('track_id_canonical').alias('track_id')
        ])
        .drop_nulls(['track_id'])
        .select(['user_id', 'track_id', 'track_seq', 'started_at'])
)

data_summary(interactions, 'interactions (canonical tracks)')

NameError: name 'interactions' is not defined

# Выводы

# === ЭТАП 2 ===

# EDA

Распределение количества прослушанных треков.

Наиболее популярные треки

In [None]:
tracks_by_listen_number = (
    interactions
        .group_by('track_id')
        .agg(pl.count('track_seq').alias('listen_count'))
        .join(tracks_catalog_clean.select(['track_id', 'track_clean']), on='track_id', how='left')
        .sort('listen_count', descending=True)
)

top_tracks = tracks_by_listen_number.head(10)
display(top_tracks)

Наиболее популярные жанры

In [None]:
# Top 5 genres by listening number
genres_by_listen_count = (
    interactions
        .group_by('track_id')
        .agg(pl.len().alias('track_listen_count'))
        .join(
            items.select(['track_id', 'genre_clean']).unique(['track_id', 'genre_clean']),
            on='track_id',
            how='left'
        )
        .group_by('genre_clean')
        .agg(pl.sum('track_listen_count').alias('listen_count'))
        .sort('listen_count', descending=True)
)

top_5_genres = genres_by_listen_count.head(5)
display(top_5_genres)

Треки, которые никто не прослушал

In [None]:
# Tracks that haven't been listened to by anybody
unlistened_tracks = (
    items
        .select(['track_id', 'track_clean', 'artist_clean', 'album_clean', 'genre_clean'])
        .unique('track_id')
        .join(
            interactions.select('track_id').unique(),
            on='track_id',
            how='anti'
        )
)

print(f'Number of unlistened tracks: {unlistened_tracks.height}')
print(unlistened_tracks.head(10))

# Преобразование данных

Преобразуем данные в формат, более пригодный для дальнейшего использования в расчётах рекомендаций.

In [None]:
# Set up events dataset by aggregating interactions: count listens per user-track pair
# Polars lazy mode with streaming used in order not to crash kernel by dealing with the whole dataset at once
events = (
    interactions.lazy()
        .group_by(['user_id', 'track_id'])
        .agg([
            pl.len().alias('listen_count'),
            pl.max('started_at').alias('last_listen')
        ])
        .sort(['user_id', 'listen_count'], descending=[False, True])
        .collect(engine='streaming')
)
print(f"Aggregated interactions: {events.shape}")
print(events.head())

In [None]:
# Create label encoders for user_id and track_id
# ALS requires consecutive integer indices starting from 0
user_encoder = LabelEncoder()
track_encoder = LabelEncoder()
# Fit encoders
user_encoder.fit(events['user_id'].to_numpy())
track_encoder.fit(events['track_id'].to_numpy())
# Transform to encoded indices
events_data = (
    events
        .with_columns([
            pl.Series('user_idx', user_encoder.transform(events['user_id'].to_numpy())),
            pl.Series('track_idx', track_encoder.transform(events['track_id'].to_numpy()))
        ])
)
print(f"Encoded data shape: {events_data.shape}")
print(f"Unique users: {events_data['user_idx'].n_unique()}")
print(f"Unique tracks: {events_data['track_idx'].n_unique()}")

In [None]:
# Create scipy sparse matrix (COO format)
# Using log-transformed listen counts as implicit feedback weights
user_track_sparse = scipy.sparse.coo_matrix(
    (
        np.log1p(events_data['listen_count'].to_numpy()),  # values (log-scaled)
        (
            events_data['user_idx'].to_numpy(),  # row indices
            events_data['track_idx'].to_numpy()   # col indices
        )
    ),
    shape=(
        events_data['user_idx'].max() + 1,
        events_data['track_idx'].max() + 1
    )
)
# Convert to CSR format for efficient row operations
user_track_sparse = user_track_sparse.tocsr()
print(f"Sparse matrix shape: {user_track_sparse.shape}")
print(f"Sparsity: {1 - user_track_sparse.nnz / (user_track_sparse.shape[0] * user_track_sparse.shape[1]):.4%}")

In [None]:
# Save encoders for decoding predictions back to original ids
encoder_mappings = {
    'user_encoder': user_encoder,
    'track_encoder': track_encoder,
    'user_id_to_idx': dict(zip(events['user_id'].unique(), 
                                user_encoder.transform(events['user_id'].unique()))),
    'track_id_to_idx': dict(zip(events['track_id'].unique(),
                                 track_encoder.transform(events['track_id'].unique())))
}

# Save encoder locally
os.makedirs(encoder_dir, exist_ok=True)
with open(os.path.join(encoder_dir, 'encoder_mappings.joblib'), 'wb') as f:
    joblib.dump(encoder_mappings, f)


# Сохранение данных

Сохраним данные в двух файлах в персональном S3-бакете по пути `recsys/data/`:
- `items.parquet` — все данные о музыкальных треках,
- `events.parquet` — все данные о взаимодействиях.

In [None]:
# Save datasets locally
os.makedirs(preprocessed_dir, exist_ok=True)
items.write_parquet(os.path.join(preprocessed_dir, 'items.parquet'))
events.write_parquet(os.path.join(preprocessed_dir, 'events.parquet'))
print(f'Files saved locally')

In [None]:
## Save data to S3 bucket
#
## Initialize S3 client
#s3_client = boto3.client(
#    's3',
#    region_name=s3_region,
#    aws_access_key_id=aws_access_key_id,
#    aws_secret_access_key=aws_secret_access_key
#)
#
#def upload_to_s3(local_path, s3_key):
#    '''
#        Upload a file to S3 bucket
#    '''
#    try:
#        s3_client.upload_file(local_path, s3_bucket, s3_key)
#        print(f'Uploaded {local_path} to s3://{s3_bucket}/{s3_key}')
#    except ClientError as e:
#        print(f'Error uploading {local_path}: {e}')
#        raise
#
## Upload to S3
#upload_to_s3(
#    os.path.join(preprocessed_dir, 'items.parquet'),
#    f'{s3_prefix}items.parquet'
#)
#upload_to_s3(
#    os.path.join(preprocessed_dir, 'events.parquet'),
#    f'{s3_prefix}events.parquet'
#)
#
#print(f'All files uploaded to S3 bucket: {s3_bucket}')

# Очистка памяти

Здесь, может понадобится очистка памяти для высвобождения ресурсов для выполнения кода ниже. 

Приведите соответствующие код, комментарии, например:
- код для удаление более ненужных переменных,
- комментарий, что следует перезапустить kernel, выполнить такие-то начальные секции и продолжить с этапа 3.

In [None]:
# Clean up unnecessary variables to free memory

# List of objects to delete
variables_to_delete = [
    'interactions',
    'tracks',
    'catalog_names',
    'tracks_exploded',
    'user_track_interactions',
    'albums_catalog',
    'artists_catalog',
    'genres_catalog',
    'tracks_catalog',
    'album_duplicates',
    'artist_duplicates',
    'genre_duplicates',
    'track_duplicates',
    'album_id_map',
    'artist_id_map',
    'genre_id_map',
    'track_id_map',
    'albums_dedup',
    'artists_dedup',
    'genres_dedup',
    'tracks_dedup'
]

# Delete variables
for var in variables_to_delete:
    if var in globals():
        del globals()[var]
        print(f"Deleted {var}")

# Force garbage collection
gc.collect()

print('Memory cleanup complete')
print('To fully free memory, restart the kernel:')
print('  1. Click "Kernel" → "Restart Kernel..."')
print('  2. Re-run initial cells:')
print('     - Cell 3: Imports')
print('     - Cell 5: Config')
print('  3. Load preprocessed data:')
print("     items = pl.read_parquet('../data/preprocessed/items.parquet')")
print("     events = pl.read_parquet('../data/preprocessed/events.parquet')")
print("  4. Continue from Stage 3")

# === ЭТАП 3 ===

# Загрузка данных

Если необходимо, то загружаем items.parquet, events.parquet.

In [None]:
#items = pl.read_parquet('../data/preprocessed/items.parquet')
#events = pl.read_parquet('../data/preprocessed/events.parquet')

# Разбиение данных

Разбиваем данные на тренировочную, тестовую выборки.

In [None]:
# ---------- Split data chronologically ---------- #

# Define split date
# Convert date to days since epoch, find quantile, convert back as Polars cannot handle date quantiles
date_threshold = (
    events
        .select(
            pl.col('last_listen')
              .cast(pl.Date)
              .to_physical()
              .quantile(0.8)
              .cast(pl.Int32)
        )
        .item()
)

# Convert back to date
date_threshold = pl.Series([date_threshold]).cast(pl.Date).item()
print(f'Split date: {date_threshold}')

# Split based on time
train_events = events.filter(pl.col('last_listen') <= date_threshold)
test_events = events.filter(pl.col('last_listen') > date_threshold)

print(f'Train set: {train_events.shape[0]:,}')
print(f'Test set: {test_events.shape[0]:,}')
print(f'Split ratio: {train_events.shape[0]/events.shape[0]:.1%} / {test_events.shape[0]/events.shape[0]:.1%}')

# Топ популярных

Рассчитаем рекомендации как топ популярных.

In [None]:
# Find popularity score from training data and get top 100 tracks

def get_popular_tracks(train_events, tracks_catalog, items=None, top_n=100, min_users=10, max_avg_listens=50):
    ''' 
        Get most popular tracks with protection against data corruption.
        
        Parameters:
        - train_events: aggregated user-track interactions
        - tracks_catalog: deduplicated track catalog (tracks_catalog_clean)
        - items: optional DataFrame for genre/artist info
        - top_n: number of top tracks to return
        - min_users: minimum unique users required
        - max_avg_listens: maximum average listens per user (anti-bot protection)
        
        Anti-bot protection applied as filters of:
        - minimum user_count,
        - maximum average listens per user.
        
        Popularity score is calculated as multiplicative combination of 
        log(total_listens) * log(user_count), 
        so low user_count works as penalty and drastically reduces score.

        Return is the top-N tracks with highest popularity score, each track_id 
        combined with the most common genre and artists
        so that track in the top is original and not a remix or cover.

        Returns:
        - DataFrame with track_id, popularity_score, and optional genre, artist, album. 
    '''
    
    popular_tracks = (
        train_events
            .group_by('track_id')
            .agg([
                pl.sum('listen_count').alias('total_listens'),
                pl.len().alias('user_count')  
            ])
            # Calculate average listens per user
            .with_columns([
                (pl.col('total_listens') / pl.col('user_count')).alias('avg_per_user')
            ])
            # Filter suspicious tracks
            .filter(
                (pl.col('user_count') >= min_users) &  # Minimum user diversity
                (pl.col('avg_per_user') <= max_avg_listens)  # Anti-bot filter
            )
            # Multiplicative popularity score (both must be high in order to get a high popularity score)
            .with_columns([
                (pl.col('total_listens').log1p() * 
                 pl.col('user_count').log1p()).alias('popularity_score')
            ])
            .sort('popularity_score', descending=True)
            .head(top_n)
    )
    
    # Join with deduplicated track catalog to get unique track names
    popular_tracks_with_info = (
        popular_tracks
            .join(
                tracks_catalog.select(['track_id', 'track_clean']),
                on='track_id',
                how='left'
            )
    )
    
    # Add most common genre and artist for each track
    if items is not None:
        track_meta = (
            items
                .group_by('track_id')
                .agg([
                    pl.col('genre_clean').mode().first().alias('genre_clean'),
                    pl.col('artist_clean').mode().first().alias('artist_clean'),
                    pl.col('album_clean').mode().first().alias('album_clean')
                ])
        )
        popular_tracks_with_info = popular_tracks_with_info.join(
            track_meta,
            on='track_id',
            how='left'
        )
    
    return popular_tracks_with_info

In [None]:
# Generate top 100 popular tracks
top_popular = get_popular_tracks(
    train_events=train_events, 
    tracks_catalog=tracks_catalog_clean,
    items=items,
    top_n=100, 
    min_users=10, 
    max_avg_listens=50
)

print(f'Top 10 Popular Tracks:')
display(top_popular.head(10))


# Персональные

Рассчитаем персональные рекомендации.

In [None]:
# Cell: Prepare Data for ALS Model

# Encode user and track IDs for train data
user_encoder = LabelEncoder()
track_encoder = LabelEncoder()

# Fit encoders on training data only
user_encoder.fit(train_events['user_id'].to_numpy())
track_encoder.fit(train_events['track_id'].to_numpy())

# Transform training events
train_events_encoded = train_events.with_columns([
    pl.Series('user_idx', user_encoder.transform(train_events['user_id'].to_numpy())),
    pl.Series('track_idx', track_encoder.transform(train_events['track_id'].to_numpy()))
])

print(f"Encoded training data shape: {train_events_encoded.shape}")
print(f"Unique users: {train_events_encoded['user_idx'].n_unique()}")
print(f"Unique tracks: {train_events_encoded['track_idx'].n_unique()}")

# Create sparse user-track matrix (CSR format for efficiency)
user_track_sparse = scipy.sparse.coo_matrix(
    (
        np.log1p(train_events_encoded['listen_count'].to_numpy()),  # log-scaled weights
        (
            train_events_encoded['user_idx'].to_numpy(),  # row indices
            train_events_encoded['track_idx'].to_numpy()   # col indices
        )
    ),
    shape=(
        train_events_encoded['user_idx'].max() + 1,
        train_events_encoded['track_idx'].max() + 1
    )
).tocsr()

print(f"\nSparse matrix shape: {user_track_sparse.shape}")
print(f"Sparsity: {1 - user_track_sparse.nnz / (user_track_sparse.shape[0] * user_track_sparse.shape[1]):.4%}")

In [None]:
# Cell: Train ALS Model

# Initialize ALS model
als_model = AlternatingLeastSquares(
    factors=64,              # Number of latent factors
    regularization=0.01,     # L2 regularization
    iterations=15,           # Number of training iterations
    calculate_training_loss=True,
    random_state=RANDOM_STATE
)

# Train the model (items x users matrix for implicit library)
print("Training ALS model...")
als_model.fit(user_track_sparse.T.tocsr(), show_progress=True)

print("\n✓ ALS model trained successfully")
print(f"Factors: {als_model.factors}")
print(f"User factors shape: {als_model.user_factors.shape}")
print(f"Item factors shape: {als_model.item_factors.shape}")

In [None]:
# Cell: Generate Personal Recommendations

def get_personal_recommendations(
    user_id, 
    als_model, 
    user_encoder, 
    track_encoder, 
    user_track_sparse,
    items,
    n_recommendations=10,
    filter_already_listened=True
):
    """
    Get personalized recommendations for a user using ALS model.
    
    Parameters:
    - user_id: original user ID
    - als_model: trained ALS model
    - user_encoder: fitted LabelEncoder for users
    - track_encoder: fitted LabelEncoder for tracks
    - user_track_sparse: sparse user-track matrix
    - items: items DataFrame with track information
    - n_recommendations: number of recommendations to return
    - filter_already_listened: whether to filter out already listened tracks
    
    Returns:
    - DataFrame with recommended tracks and scores
    """
    try:
        # Encode user_id
        user_idx = user_encoder.transform([user_id])[0]
    except ValueError:
        # User not in training set (cold start)
        print(f"User {user_id} not found in training data. Returning popular tracks.")
        return get_popular_tracks(train_events, items, top_n=n_recommendations)
    
    # Get recommendations from ALS
    track_ids, scores = als_model.recommend(
        user_idx,
        user_track_sparse[user_idx],
        N=n_recommendations + 100,  # Get more to filter out listened tracks
        filter_already_liked_items=filter_already_listened
    )
    
    # Decode track indices to original track IDs
    recommended_track_ids = track_encoder.inverse_transform(track_ids)
    
    # Create DataFrame with recommendations
    recommendations = pl.DataFrame({
        'track_id': recommended_track_ids[:n_recommendations],
        'score': scores[:n_recommendations]
    })
    
    # Join with items to get track details
    recommendations_with_info = recommendations.join(
        items.select(['track_id', 'albums', 'artists', 'genres']),
        on='track_id',
        how='left'
    )
    
    return recommendations_with_info


# Test recommendations for a few users
test_user_ids = train_events['user_id'].unique().head(5).to_list()

for user_id in test_user_ids:
    print(f"\n{'='*70}")
    print(f"Recommendations for User ID: {user_id}")
    print('='*70)
    
    recs = get_personal_recommendations(
        user_id=user_id,
        als_model=als_model,
        user_encoder=user_encoder,
        track_encoder=track_encoder,
        user_track_sparse=user_track_sparse,
        items=items,
        n_recommendations=10
    )
    
    print(recs)

In [None]:
# Cell: Generate Recommendations for All Users

def generate_all_recommendations(
    user_ids,
    als_model,
    user_encoder,
    track_encoder,
    user_track_sparse,
    items,
    top_popular,
    n_recommendations=10
):
    """
    Generate recommendations for all users (with fallback to popular for cold start).
    
    Returns:
    - DataFrame with user_id, track_id, score, rank
    """
    all_recommendations = []
    
    for i, user_id in enumerate(user_ids):
        if i % 10000 == 0:
            print(f"Processing user {i}/{len(user_ids)}...")
        
        try:
            # Try to get personalized recommendations
            user_idx = user_encoder.transform([user_id])[0]
            track_ids, scores = als_model.recommend(
                user_idx,
                user_track_sparse[user_idx],
                N=n_recommendations,
                filter_already_liked_items=True
            )
            recommended_track_ids = track_encoder.inverse_transform(track_ids)
            
        except (ValueError, IndexError):
            # Cold start user - use popular tracks
            recommended_track_ids = top_popular['track_id'].head(n_recommendations).to_numpy()
            scores = top_popular['popularity_score'].head(n_recommendations).to_numpy()
        
        # Create recommendation records
        for rank, (track_id, score) in enumerate(zip(recommended_track_ids, scores), 1):
            all_recommendations.append({
                'user_id': user_id,
                'track_id': track_id,
                'score': score,
                'rank': rank
            })
    
    return pl.DataFrame(all_recommendations)


# Generate recommendations for all test users
print("Generating recommendations for test set users...")
test_user_ids = test_events['user_id'].unique().sort().to_list()

test_recommendations = generate_all_recommendations(
    user_ids=test_user_ids,
    als_model=als_model,
    user_encoder=user_encoder,
    track_encoder=track_encoder,
    user_track_sparse=user_track_sparse,
    items=items,
    top_popular=top_popular,
    n_recommendations=10
)

print(f"\n✓ Generated {len(test_recommendations):,} recommendations")
print(f"For {test_recommendations['user_id'].n_unique():,} users")
print(test_recommendations.head(20))

In [None]:
# Cell: Save ALS Model and Encoders

import joblib

# Create encoder directory
os.makedirs(encoder_dir, exist_ok=True)

# Save encoders and model
joblib.dump(user_encoder, os.path.join(encoder_dir, 'user_encoder.pkl'))
joblib.dump(track_encoder, os.path.join(encoder_dir, 'track_encoder.pkl'))
joblib.dump(als_model, os.path.join(encoder_dir, 'als_model.pkl'))

# Save popular tracks for cold start
top_popular.write_parquet(os.path.join(preprocessed_dir, 'popular_tracks.parquet'))

# Save recommendations
test_recommendations.write_parquet(os.path.join(preprocessed_dir, 'test_recommendations.parquet'))

print("✓ Saved:")
print(f"  - user_encoder.pkl")
print(f"  - track_encoder.pkl")
print(f"  - als_model.pkl")
print(f"  - popular_tracks.parquet")
print(f"  - test_recommendations.parquet")

# Похожие

Рассчитаем похожие, они позже пригодятся для онлайн-рекомендаций.

# Построение признаков

Построим три признака, можно больше, для ранжирующей модели.

# Ранжирование рекомендаций

Построим ранжирующую модель, чтобы сделать рекомендации более точными. Отранжируем рекомендации.

# Оценка качества

Проверим оценку качества трёх типов рекомендаций: 

- топ популярных,
- персональных, полученных при помощи ALS,
- итоговых
  
по четырем метрикам: recall, precision, coverage, novelty.

# === Выводы, метрики ===

Основные выводы при работе над расчётом рекомендаций, рассчитанные метрики.