# Data Collection & Preprocessing

## Overview
This notebook downloads and preprocesses Amazon 2023 product review data from McAuley Lab. It handles downloading raw CSV files, converting to Parquet format, creating stratified samples, and processing metadata. The pipeline ensures data consistency across train/validation/test splits while maintaining the 5-core property (each user and item has at least 5 interactions).

## Notebook Structure
1. **Setup**: Import libraries, configure paths, and define constants
2. **Download Functions**: Fetch raw data from URLs with retry logic
3. **Conversion**: Transform CSV.GZ to Parquet for efficient storage
4. **Sampling**: Create stratified samples (small/medium/large/big) maintaining user-item distributions
5. **Metadata Processing**: Download and extract essential product information (title, features, description, etc.)
6. **Pipeline**: Automated execution for all categories

## Process Flow
**Data Collection:**
- For each category: Download train/valid/test CSV.GZ from McAuley Lab → Convert to Parquet
- Download metadata JSONL.GZ → Extract essential fields → Save as Parquet

**Sampling:**
- Create samples from full dataset: small (2k users), medium (20k), large (50k), big (50k with item filtering)
- For train: Sample top N active users → Filter items by popularity threshold
- For valid/test: Filter to match train users and items for consistency

**Output:**
- Full datasets: `category.5core.{split}.parquet`
- Samples: `category.5core.{split}.{size}.parquet`
- Metadata: `category.meta.parquet`

In [30]:
import sys, os, json, gzip, urllib.request
from pathlib import Path
import pandas as pd
import polars as pl
import numpy as np

module_path = str((Path("..") / "utilities").resolve())
if module_path not in sys.path:
    sys.path.append(module_path)

from logger import Logger
from configurations import Configurations

logger = Logger(process_name="data_collection", log_file=Configurations.LOG_PATH)

RAW_DIR = Path(Configurations.DATA_RAW_PATH)
RAW_DIR.mkdir(parents=True, exist_ok=True)

PROCESSED_DIR = Path(Configurations.DATA_PROCESSED_PATH)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

CATEGORIES = Configurations.CATEGORIES
CORES = Configurations.CORES
SPLITS = Configurations.SPLITS
BASE_URL = Configurations.BASE_URL
meta_base_url = "https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/raw/meta_categories/meta_{category}.jsonl.gz"
meta_urls = {cat: meta_base_url.format(category=cat) for cat in CATEGORIES}

In [2]:
def build_url(core: str, category: str, split: str) -> str:
    return f"{BASE_URL}/{core}/last_out_w_his/{category}.{split}.csv.gz"

def local_path_for_parquet(core: str, category: str, split: str, sample: str = None, raw_dir=RAW_DIR) -> Path:
    safe_cat = category.replace("/", "-")
    if raw_dir == RAW_DIR:
        return RAW_DIR / f"{safe_cat}.{core}.{split}.csv.gz"
    elif raw_dir == PROCESSED_DIR:
        if sample is None:
            return PROCESSED_DIR / f"{safe_cat}.{core}.{split}.parquet"
        else:
            return PROCESSED_DIR / f"{safe_cat}.{core}.{split}.{sample}.parquet"
    else:
        raise ValueError(f"Invalid directory: {raw_dir}")

def download_file(url: str, out_path: Path, max_retries: int = 3) -> None:
    if out_path.exists() and out_path.stat().st_size > 0:
        logger.log_info(f"Exists, skip: {out_path.name}")
        return
    attempt = 0
    while attempt < max_retries:
        try:
            attempt += 1
            logger.log_info(f"Downloading (attempt {attempt}/{max_retries}): {url}")
            tmp = str(out_path) + ".part"
            urllib.request.urlretrieve(url, tmp)
            os.replace(tmp, out_path)
            logger.log_info(f"Saved: {out_path.name}")
            return
        except Exception as e:
            logger.log_warning(f"Failed attempt {attempt} for {url}: {e}")
    raise RuntimeError(f"Exceeded retries: {url}")

def save_dataset_to_parquet(csv_gz_path: Path, out_parquet_path: Path):
    if out_parquet_path.exists():
        logger.log_info(f"Skip: {out_parquet_path.name}")
        return
    logger.log_info(f"Reading: {csv_gz_path.name}")
    df = pl.from_pandas(pd.read_csv(csv_gz_path, compression='gzip')[Configurations.COLUMNS])
    logger.log_info(f"  Shape: {df.shape}")
    logger.log_info(f"  Users: {df['user_id'].n_unique():,}")
    logger.log_info(f"  Items: {df['parent_asin'].n_unique():,}")
    df.to_pandas().to_parquet(out_parquet_path, engine='pyarrow', index=False)
    logger.log_info(f"Saved: {out_parquet_path.name}\n")

In [3]:
def create_n_sample(input_path: Path, n: int, item_mult: float = 1.0, n_name_out: str = None):
    stem = input_path.stem
    parts = stem.split('.')
    category, core, split = parts[0], parts[1] if len(parts) > 1 else '5core', parts[2] if len(parts) > 2 else 'train'
    output_path = input_path.parent / f"{category}.{core}.{split}.{n_name_out}.parquet"
    
    if output_path.exists():
        logger.log_info(f"Skip: {output_path.name}")
        return
    
    logger.log_info(f"\nSampling {n} users")
    logger.log_info(f"  Input:  {input_path.name}")
    logger.log_info(f"  Output: {output_path.name}")
    
    df = pl.read_parquet(input_path)
    total = df['user_id'].n_unique()
    logger.log_info(f"  Total users: {total:,}")
    
    if split == 'train':
        if n >= total:
            df_sampled = df
            users_n = df['user_id'].unique().to_list()
        else:
            activity = df.group_by('user_id').agg(pl.len().alias('n')).sort('n', descending=True)
            users_n = activity.head(n)['user_id'].to_list()
            df_sampled = df.filter(pl.col('user_id').is_in(users_n))
        
        if item_mult > 1.0:
            logger.log_info(f"  Before item filter: {df_sampled.shape} shape, {len(df_sampled):,} ratings, {df_sampled['user_id'].n_unique():,}  users, {df_sampled['parent_asin'].n_unique():,} items")
            item_counts = df_sampled.group_by('parent_asin').agg(pl.len().alias('n'))
            avg_item = item_counts['n'].mean()
            min_item = avg_item * item_mult
            logger.log_info(f"  Avg ratings/item: {avg_item:.2f}")
            logger.log_info(f"  Min threshold: {min_item:.2f} ({item_mult}x avg)")
            popular = item_counts.filter(pl.col('n') > min_item)['parent_asin'].to_list()
            df_sampled = df_sampled.filter(pl.col('parent_asin').is_in(popular))
            logger.log_info(f"  After item filter: {df_sampled.shape} shape, {len(df_sampled):,} ratings, {df_sampled['user_id'].n_unique():,}  users, {df_sampled['parent_asin'].n_unique():,} items")
        
        logger.log_info(f"  Sampled: {df_sampled.shape} shape, {len(df_sampled):,} ratings, {len(users_n):,} users")
    else:
        train_path = input_path.parent / f"{category}.{core}.train.{n_name_out}.parquet"
        if not train_path.exists():
            logger.log_warning(f"  Train sample not found: {train_path.name}")
            return
        df_train = pl.read_parquet(train_path)
        users_n = df_train['user_id'].unique().to_list()
        items_n = df_train['parent_asin'].unique().to_list()
        df_sampled = df.filter(pl.col('user_id').is_in(users_n) & pl.col('parent_asin').is_in(items_n))
        logger.log_info(f"  Filtered: {len(df_sampled):,} ratings, {df_sampled['user_id'].n_unique():,} users")
    
    df_sampled.to_pandas().to_parquet(output_path, engine='pyarrow', index=False)
    logger.log_info(f"  Saved: {output_path.name}\n")

In [4]:
def download_meta(category: str, url: str = None):
    url = url or meta_urls.get(category)
    if not url:
        return
    dst = RAW_DIR / f"{category}.meta.jsonl.gz"
    if dst.exists() and dst.stat().st_size > 0:
        logger.log_info(f"[META] Skip: {dst.name}")
        return
    logger.log_info(f"[META] Downloading: {category}")
    try:
        urllib.request.urlretrieve(url, str(dst))
        logger.log_info(f"[META] {dst.name}")
    except Exception as e:
        logger.log_exception(f"[META] Failed: {e}")

def save_meta_for_training_ui(category: str):
    safe_cat = category.replace('/', '-')
    out_path = PROCESSED_DIR / f"{safe_cat}.meta.parquet"
    
    if out_path.exists():
        logger.log_info(f"[META] Skip: {out_path.name}")
        return out_path
    
    fp = RAW_DIR / f"{category}.meta.jsonl.gz"
    if not fp.exists():
        logger.log_warning(f"[META] Not found: {fp}")
        return None
    
    logger.log_info(f"[META] Reading: {fp.name}")
    
    def extract_item(obj):
        def process_list(field, max_items):
            if not isinstance(field, list):
                return []
            if field and isinstance(field[0], list):
                field = field[0]
            return field[:max_items]
        
        def process_desc(desc, max_len=2000):
            if isinstance(desc, list):
                desc = " ".join(str(d) for d in desc if d)
            elif not desc:
                return ""
            else:
                desc = str(desc)
            return desc[:max_len] + ("..." if len(desc) > max_len else "")
        
        def process_images(img_list):
            if not img_list or not isinstance(img_list, list):
                return []
            return [{"hi_res": img.get("hi_res"), "thumb": img.get("thumb")} 
                    for img in img_list[:3] if isinstance(img, dict)]
        
        return {
            "parent_asin": obj.get("parent_asin"),
            "title": obj.get("title", ""),
            "price": obj.get("price"),
            "average_rating": obj.get("average_rating"),
            "rating_number": obj.get("rating_number"),
            "features": process_list(obj.get("features"), 10),
            "description": process_desc(obj.get("description")),
            "categories": process_list(obj.get("categories"), 5),
            "images": process_images(obj.get("images")),
            "store": obj.get("store", "")
        }
    
    rows = []
    with gzip.open(fp, "rt", encoding="utf-8") as f:
        for line in f:
            try:
                obj = json.loads(line)
                if obj.get("parent_asin"):
                    rows.append(extract_item(obj))
            except:
                continue
    
    if not rows:
        logger.log_warning(f"[META] No data for {category}")
        return None
    
    df = pd.DataFrame(rows)
    for col in ['price', 'average_rating', 'rating_number']:
        df[col] = pd.to_numeric(df[col], errors='coerce')
    for col in ['title', 'description', 'store']:
        df[col] = df[col].fillna('')
    
    df.to_parquet(out_path, index=False)
    logger.log_info(f"[META] {out_path.name}: {len(df):,} items")
    return out_path

In [5]:
def run():
    logger.log_info("="*70)
    logger.log_info("DATASET COLLECTION")
    logger.log_info("="*70 + "\n")

    for core in CORES:
        for cat in CATEGORIES:
            logger.log_info(f"\n{'='*70}")
            logger.log_info(f"{cat}")
            logger.log_info(f"{'='*70}")
            
            for split in SPLITS:
                logger.log_info(f"\nProcessing {split.upper()}...")
                url = build_url(core, cat, split)
                in_path = local_path_for_parquet(core, cat, split, raw_dir=RAW_DIR)
                out_path = local_path_for_parquet(core, cat, split, raw_dir=PROCESSED_DIR)
                try:
                    download_file(url, in_path)
                    save_dataset_to_parquet(in_path, out_path)
                except Exception as e:
                    logger.log_exception(f"{split} error: {e}")

    logger.log_info("\n" + "="*70)
    logger.log_info("METADATA")
    logger.log_info("="*70)
    
    for cat in CATEGORIES:
        download_meta(cat)
        save_meta_for_training_ui(cat)

    logger.log_info("\n" + "="*70)
    logger.log_info("CREATING SAMPLES")
    logger.log_info("="*70)
    
    for core in CORES:
        for cat in CATEGORIES:
            for split in SPLITS:
                for sample in Configurations.SAMPLE_SIZES:
                    if sample != "full":
                        n = Configurations.SAMPLE_SIZES[sample]
                        logger.log_info(f"\ncat={cat} - split={split} - sample={sample} - n={n} sampling...")
                        in_path = local_path_for_parquet(core, cat, split, raw_dir=PROCESSED_DIR)
                        out_path = local_path_for_parquet(core, cat, split, sample, raw_dir=PROCESSED_DIR)
                        logger.log_info(f"Input: {in_path.name} \n → Output: {out_path.name}")
                        create_n_sample(in_path, n, Configurations.ITEM_MULTI, sample)

    logger.log_info("\n COMPLETED")

In [6]:
run()

2025-10-24 21:13:38,411 - INFO - DATASET COLLECTION

2025-10-24 21:13:38,411 - INFO - 
2025-10-24 21:13:38,411 - INFO - Electronics
2025-10-24 21:13:38,412 - INFO - 
Processing TRAIN...
2025-10-24 21:13:38,412 - INFO - Exists, skip: Electronics.5core.train.csv.gz
2025-10-24 21:13:38,413 - INFO - Skip: Electronics.5core.train.parquet
2025-10-24 21:13:38,413 - INFO - 
Processing VALID...
2025-10-24 21:13:38,428 - INFO - Exists, skip: Electronics.5core.valid.csv.gz
2025-10-24 21:13:38,428 - INFO - Skip: Electronics.5core.valid.parquet
2025-10-24 21:13:38,429 - INFO - 
Processing TEST...
2025-10-24 21:13:38,429 - INFO - Exists, skip: Electronics.5core.test.csv.gz
2025-10-24 21:13:38,450 - INFO - Skip: Electronics.5core.test.parquet
2025-10-24 21:13:38,451 - INFO - 
2025-10-24 21:13:38,451 - INFO - Beauty_and_Personal_Care
2025-10-24 21:13:38,452 - INFO - 
Processing TRAIN...
2025-10-24 21:13:38,452 - INFO - Exists, skip: Beauty_and_Personal_Care.5core.train.csv.gz
2025-10-24 21:13:38,453 -

In [7]:
def diagnose_dataset(category: str, suffix: str = 'small'):
    safe_cat = category.replace('/', '-')
    logger.log_info("="*70)
    logger.log_info(f"DIAGNOSTIC: {category} (suffix={suffix})")
    logger.log_info("="*70)
    
    train = PROCESSED_DIR / f"{safe_cat}.5core.train.{suffix}.parquet"
    valid = PROCESSED_DIR / f"{safe_cat}.5core.valid.{suffix}.parquet"
    test = PROCESSED_DIR / f"{safe_cat}.5core.test.{suffix}.parquet"
    
    if not train.exists():
        logger.log_error(f"File not found: {train.name}")
        return
    
    df_train = pl.read_parquet(train)
    df_valid = pl.read_parquet(valid) if valid.exists() else None
    df_test = pl.read_parquet(test) if test.exists() else None
    
    def stats(df, name):
        u, i, r = df['user_id'].n_unique(), df['parent_asin'].n_unique(), len(df)
        s = 1 - (r / (u * i))
        logger.log_info(f"{name}: {r:,} ratings, {u:,} users, {i:,} items, sparsity {s:.2%}")
        return u, i, r
    
    train_u, train_i, train_r = stats(df_train, "TRAIN")
    
    if df_valid is not None:
        valid_u, valid_i, valid_r = stats(df_valid, "VALID")
        train_users = set(df_train['user_id'].unique())
        valid_users = set(df_valid['user_id'].unique())
        train_items = set(df_train['parent_asin'].unique())
        valid_items = set(df_valid['parent_asin'].unique())
        user_overlap = len(train_users & valid_users)
        item_overlap = len(train_items & valid_items)
        logger.log_info(f"\nOVERLAP:")
        logger.log_info(f"  Users: {user_overlap:,} / {valid_u:,} ({user_overlap/valid_u*100:.1f}%)")
        logger.log_info(f"  Items: {item_overlap:,} / {valid_i:,} ({item_overlap/valid_i*100:.1f}%)")
        if user_overlap < valid_u:
            logger.log_warning(f"  {valid_u - user_overlap:,} valid users NOT in train!")
        if item_overlap < valid_i:
            logger.log_warning(f"  {valid_i - item_overlap:,} valid items NOT in train!")
    
    if df_test is not None:
        stats(df_test, "TEST")
    
    logger.log_info("="*70 + "\n")

In [8]:
for cat in Configurations.CATEGORIES:
    diagnose_dataset(cat, "small")

2025-10-24 21:13:40,961 - INFO - DIAGNOSTIC: Electronics (suffix=big)
2025-10-24 21:13:40,983 - INFO - TRAIN: 85,890 ratings, 35,494 users, 82 items, sparsity 97.05%
2025-10-24 21:13:40,983 - INFO - VALID: 858 ratings, 858 users, 81 items, sparsity 98.77%
2025-10-24 21:13:40,990 - INFO - 
OVERLAP:
2025-10-24 21:13:40,990 - INFO -   Users: 858 / 858 (100.0%)
2025-10-24 21:13:40,991 - INFO -   Items: 81 / 81 (100.0%)
2025-10-24 21:13:40,991 - INFO - TEST: 675 ratings, 675 users, 76 items, sparsity 98.68%

2025-10-24 21:13:40,993 - INFO - DIAGNOSTIC: Beauty_and_Personal_Care (suffix=big)
2025-10-24 21:13:40,997 - INFO - TRAIN: 7,718 ratings, 6,613 users, 8 items, sparsity 85.41%
2025-10-24 21:13:40,998 - INFO - VALID: 66 ratings, 66 users, 8 items, sparsity 87.50%
2025-10-24 21:13:40,999 - INFO - 
OVERLAP:
2025-10-24 21:13:40,999 - INFO -   Users: 66 / 66 (100.0%)
2025-10-24 21:13:40,999 - INFO -   Items: 8 / 8 (100.0%)
2025-10-24 21:13:41,000 - INFO - TEST: 65 ratings, 65 users, 8 items,