# Amazon Reviews Data Processing

Preprocessing pipeline for Amazon All_Beauty reviews dataset.

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
from itertools import islice
import subprocess
import textwrap
import warnings
import sys

warnings.filterwarnings('ignore')

In [None]:
# Configuration
CATEGORY = "raw_review_All_Beauty"
MAX_REVIEWS = 200_000
SEED = 42
OUTPUT_DIR = Path("data")
OUTPUT_DIR.mkdir(exist_ok=True)

# External Python interpreter for dataset loading fallback
EXTERNAL_PYTHON = "/Library/Frameworks/Python.framework/Versions/3.12/bin/python3"

# Check dependencies
try:
    from datasets import load_dataset
    DATASETS_AVAILABLE = True
except ImportError:
    DATASETS_AVAILABLE = False
    print("Warning: datasets library not found")

try:
    from textblob import TextBlob
    TEXTBLOB_AVAILABLE = True
except ImportError:
    TextBlob = None
    TEXTBLOB_AVAILABLE = False
    print("Warning: textblob not found, sentiment analysis disabled")

## Helper Functions

In [None]:
def _cache_path(category, max_reviews):
    limit = "full" if max_reviews is None else str(max_reviews)
    safe_cat = category.replace("/", "_")
    return OUTPUT_DIR / f"raw_{safe_cat}_{limit}.csv"

def _load_external(category, max_reviews):
    """Load dataset using external Python interpreter."""
    cache = _cache_path(category, max_reviews)
    if cache.exists():
        return pd.read_csv(cache)
    
    split = f"full[:{max_reviews}]" if max_reviews else "full"
    script = textwrap.dedent(f'''
import pandas as pd
from datasets import load_dataset
ds = load_dataset("McAuley-Lab/Amazon-Reviews-2023", "{category}", split="{split}", trust_remote_code=True)
df = ds.to_pandas() if hasattr(ds, "to_pandas") else pd.DataFrame(ds)
df.to_csv("{cache}", index=False)
''')
    
    subprocess.run([EXTERNAL_PYTHON, "-c", script], check=True, timeout=600)
    return pd.read_csv(cache)

def load_reviews(category, max_reviews, seed):
    """Load reviews from Amazon dataset with fallback strategies."""
    cache = _cache_path(category, max_reviews)
    if cache.exists():
        print(f"Loading from cache: {cache}")
        return pd.read_csv(cache)
    
    split = f"full[:{max_reviews}]" if max_reviews else "full"
    
    if DATASETS_AVAILABLE:
        try:
            print(f"Loading {category} [{split}]...")
            ds = load_dataset("McAuley-Lab/Amazon-Reviews-2023", category, 
                            split=split, trust_remote_code=True)
            df = ds.to_pandas() if hasattr(ds, "to_pandas") else pd.DataFrame(ds)
            df.to_csv(cache, index=False)
            return df
        except Exception as e:
            print(f"Direct load failed: {e}")
            
            if max_reviews is not None:
                try:
                    print("Trying streaming mode...")
                    stream = load_dataset("McAuley-Lab/Amazon-Reviews-2023", category,
                                        split="full", streaming=True, trust_remote_code=True)
                    rows = list(islice(stream, max_reviews))
                    df = pd.DataFrame(rows)
                    df.to_csv(cache, index=False)
                    return df
                except Exception as e2:
                    print(f"Streaming failed: {e2}")
    
    print("Using external interpreter...")
    return _load_external(category, max_reviews)

def clean_reviews(df):
    """Remove invalid records and duplicates."""
    required = ["rating", "user_id", "parent_asin", "timestamp"]
    df = df.dropna(subset=required)
    
    has_text = (
        df["text"].fillna("").str.strip().ne("") |
        df["title"].fillna("").str.strip().ne("")
    )
    df = df[has_text]
    df = df[(df["rating"] >= 1) & (df["rating"] <= 5)]
    
    df["verified_purchase"] = df.get("verified_purchase", False).fillna(False).astype(bool)
    df["helpful_vote"] = df.get("helpful_vote", 0).fillna(0)
    df = df.drop_duplicates(subset=["user_id", "parent_asin", "text", "timestamp"], keep="first")
    
    df["review_time"] = pd.to_datetime(df["timestamp"], unit="ms")
    df["review_date"] = df["review_time"].dt.date
    
    return df

def lexical_diversity(text):
    tokens = [t.lower() for t in text.split() if t.strip()]
    return len(set(tokens)) / len(tokens) if tokens else 0.0

def get_sentiment(text):
    if not TEXTBLOB_AVAILABLE or not text.strip():
        return 0.0
    try:
        return TextBlob(text).sentiment.polarity
    except:
        return 0.0

def add_textual_features(df):
    """Engineer text-based indicators."""
    text = df["text"].fillna("")
    title = df["title"].fillna("")
    
    df = df.assign(
        text_length_chars=text.str.len(),
        text_length_words=text.str.split().str.len(),
        title_length_chars=title.str.len(),
        uppercase_ratio=text.apply(lambda x: sum(c.isupper() for c in x) / max(len(x), 1)),
        punctuation_ratio=text.apply(lambda x: sum(c in "!?.,;:" for c in x) / max(len(x), 1)),
        sentiment=text.apply(get_sentiment),
        lexical_diversity=text.apply(lexical_diversity)
    )
    
    return df

def add_behavioral_features(df):
    """Aggregate user-level metrics."""
    df = df.sort_values(["user_id", "review_time"]).copy()
    df["inter_review_seconds"] = df.groupby("user_id")["review_time"].diff().dt.total_seconds()
    
    agg = df.groupby("user_id").agg(
        user_review_count=("rating", "size"),
        user_rating_mean=("rating", "mean"),
        user_rating_var=("rating", "var"),
        user_verified_ratio=("verified_purchase", "mean"),
        user_helpful_mean=("helpful_vote", "mean"),
        user_helpful_sum=("helpful_vote", "sum"),
        user_time_delta_median=("inter_review_seconds", "median")
    ).reset_index()
    
    agg = agg.fillna({"user_rating_var": 0.0, "user_time_delta_median": 0.0})
    df = df.merge(agg, on="user_id", how="left")
    df["rating_deviation_from_user_mean"] = df["rating"] - df["user_rating_mean"]
    
    return df

## Load Data

In [None]:
df_raw = load_reviews(CATEGORY, MAX_REVIEWS, SEED)
print(f"Loaded {len(df_raw)} reviews")
df_raw.head()

## Clean and Feature Engineering

In [None]:
df_clean = clean_reviews(df_raw)
print(f"After cleaning: {len(df_clean)} reviews")

df_text = add_textual_features(df_clean)
df_features = add_behavioral_features(df_text)

print(f"Added {len(df_features.columns) - len(df_raw.columns)} new features")

## Summary Statistics

In [None]:
feature_cols = [
    "text_length_chars", "text_length_words", "sentiment", "lexical_diversity",
    "user_review_count", "user_rating_var", "user_verified_ratio",
    "user_helpful_sum", "user_time_delta_median"
]

df_features[feature_cols].describe().round(3)

## Save Outputs

In [None]:
review_path = OUTPUT_DIR / "amazon_reviews_cleaned.csv"
user_path = OUTPUT_DIR / "user_behavior_features.csv"

df_features.to_csv(review_path, index=False)

user_cols = [
    "user_id", "user_review_count", "user_rating_mean", "user_rating_var",
    "user_verified_ratio", "user_helpful_mean", "user_helpful_sum",
    "user_time_delta_median"
]
df_features[user_cols].drop_duplicates("user_id").to_csv(user_path, index=False)

print(f"Saved review data: {review_path}")
print(f"Saved user data: {user_path}")
print(f"Unique users: {df_features['user_id'].nunique()}")
print(f"Unique products: {df_features['parent_asin'].nunique()}")