
# 00 — Detailed Data Cleaning (Amazon Electronics Reviews)

This notebook performs **thorough data cleaning** on the Amazon Electronics reviews dataset
(e.g., `Electronics_5.json.gz`) **before any modeling**.

**Expected input columns** (matching your dataset):
```
['overall','vote','verified','reviewTime','reviewerID','asin','style',
 'reviewerName','reviewText','summary','unixReviewTime','image']
```

### What this notebook does
- Streams the input JSON-lines (gz OK) to be **memory safe**.
- Drops the **`image`** column (not used).
- Normalizes: whitespace, control chars, unescape HTML, **mask PII** (URLs, emails, phone numbers).
- Converts `vote` → **int**, `verified` → **bool**.
- Parses time (`unixReviewTime` preferred; falls back to `reviewTime`) → **unix_time** + **ISO datetime**.
- Detects **language**; keeps **English**, logs counts.
- Filters **too-short**/**nonsensical** reviews.
- **Deduplicates** on `(reviewerID, asin, unix_time, reviewText)`.
- Flattens `style` into `style_color`, `style_size`, `style_other`.
- Writes:
  - `data/cleaned_reviews.csv`
  - `data/removed_reviews.csv`
  - `data/language_stats.json`
- Quick EDA with **matplotlib** (one chart per figure; no seaborn; default colors).

> Adjust the **paths & limits** in the next cell as needed.


In [None]:

# === Configuration ===
from pathlib import Path

# Change this if your project root is different
PROJECT_ROOT = Path.cwd()

# Input dataset path (JSON-lines; gz is fine)
RAW_PATH = PROJECT_ROOT / "data" / "Electronics_5.json.gz"

# Output paths
CLEANED = PROJECT_ROOT / "data" / "cleaned_reviews.csv"
REMOVED = PROJECT_ROOT / "data" / "removed_reviews.csv"
LANGJSON = PROJECT_ROOT / "data" / "language_stats.json"

# Processing thresholds
MIN_TEXT_LEN = 20        # drop reviews shorter than this (after cleaning)
KEEP_LANGS = {"en"}      # keep English reviews only

# Streaming write chunk size (rows per flush)
WRITE_CHUNK_SIZE = 50000  # lower if RAM is tight

# For quick test runs on laptop, set a positive limit (e.g., 200_000). 0 = no limit
MAX_ROWS = 0

print("Input:", RAW_PATH)
print("Outputs:", CLEANED, REMOVED, LANGJSON)


In [None]:

# === Imports ===
import os, re, json, html, math, gzip, io, hashlib, ast, datetime as dt
from collections import Counter

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from langdetect import detect, DetectorFactory
DetectorFactory.seed = 42

# Ensure output dirs exist
CLEANED.parent.mkdir(parents=True, exist_ok=True)
REMOVED.parent.mkdir(parents=True, exist_ok=True)
LANGJSON.parent.mkdir(parents=True, exist_ok=True)

assert RAW_PATH.exists(), f"Input file not found: {RAW_PATH}\nPlace your Electronics_5.json.gz there."
print("✅ Environment ready.")


### Helper functions: streaming, cleaning, parsing, PII masking, style flattening

In [None]:

# --- Streaming JSON-lines reader (handles .gz) ---
def _open_text(path: Path):
    if str(path).endswith(".gz"):
        return io.TextIOWrapper(gzip.open(path, "rb"), encoding="utf-8", errors="ignore")
    return open(path, "r", encoding="utf-8", errors="ignore")

def stream_jsonl(path: Path):
    with _open_text(path) as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                yield json.loads(line)
            except json.JSONDecodeError:
                # Skip malformed line
                continue

# --- ID and parsing helpers ---
def stable_id(*parts: str) -> str:
    return hashlib.md5(("||".join(parts)).encode("utf-8")).hexdigest()

WS_RE = re.compile(r"\s+")
CTRL_RE = re.compile(r"[\x00-\x1f\x7f]")
URL_RE = re.compile(r"(https?://\S+)", re.I)
EMAIL_RE = re.compile(r"\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[A-Za-z]{2,}\b")
PHONE_RE = re.compile(r"\b(?:\+?\d{1,3}[-.\s]?)?(?:\(?\d{3}\)?[-.\s]?){1,2}\d{4}\b")

def normalize_ws(txt: str) -> str:
    return WS_RE.sub(" ", (txt or "").strip())

def strip_ctrl(txt: str) -> str:
    return CTRL_RE.sub(" ", txt)

def strip_html_entities(txt: str) -> str:
    return html.unescape(txt)

def mask_pii(txt: str) -> str:
    txt = URL_RE.sub("[URL]", txt)
    txt = EMAIL_RE.sub("[EMAIL]", txt)
    txt = PHONE_RE.sub("[PHONE]", txt)
    return txt

def clean_text(txt: str) -> str:
    txt = str(txt or "")
    txt = strip_html_entities(txt)
    txt = strip_ctrl(txt)
    txt = normalize_ws(txt)
    txt = mask_pii(txt)
    return txt

def to_int_votes(v) -> int:
    if v is None or (isinstance(v, float) and math.isnan(v)):
        return 0
    try:
        return int(str(v).replace(",", "").strip())
    except Exception:
        return 0

def to_bool_verified(v) -> bool:
    if isinstance(v, bool):
        return v
    if v is None:
        return False
    s = str(v).strip().lower()
    if s in ("true","t","1","yes","y"):
        return True
    if s in ("false","f","0","no","n"):
        return False
    return False

def parse_review_time(rtime, unix_time):
    # Prefer unixReviewTime if present; else parse reviewTime
    if unix_time is not None and not (isinstance(unix_time, float) and math.isnan(unix_time)):
        try:
            ut = int(unix_time)
            dt_iso = dt.datetime.utcfromtimestamp(ut).isoformat() + "Z"
            return ut, dt_iso
        except Exception:
            pass
    if rtime:
        try:
            dtobj = pd.to_datetime(rtime, errors="coerce")
            if pd.isna(dtobj):
                return None, None
            ut = int(dtobj.timestamp())
            return ut, dtobj.isoformat() + "Z"
        except Exception:
            return None, None
    return None, None

def detect_lang_safe(txt: str) -> str:
    try:
        return detect(txt) if len(txt) >= 20 else "en"
    except Exception:
        return "unknown"

def is_nonsensical(txt: str) -> bool:
    t = txt.replace(" ", "")
    if len(t) >= 30:
        uniq_ratio = len(set(t)) / max(1, len(t))
        if uniq_ratio < 0.1:
            return True
    return False

def parse_style(style_field):
    """Extract color/size-like fields from 'style' column.
    Returns: (style_raw, style_color, style_size, style_other)
    """
    raw = None
    d = {}
    color = None
    size = None
    other = None

    if style_field is None or (isinstance(style_field, float) and math.isnan(style_field)):
        return "", "", "", ""

    if isinstance(style_field, dict):
        raw = json.dumps(style_field, ensure_ascii=False)
        d = style_field
    else:
        s = str(style_field).strip()
        raw = s
        # Try parse as dict
        try:
            v = ast.literal_eval(s)
            if isinstance(v, dict):
                d = v
        except Exception:
            # fallback: split on ; or , as key:val
            parts = [p.strip() for p in re.split(r"[;,]", s) if p.strip()]
            for p in parts:
                if ":" in p:
                    k, v = p.split(":", 1)
                    d[k.strip()] = v.strip()

    for k, v in d.items():
        kl = k.lower()
        if ("color" in kl or "colour" in kl) and color is None:
            color = str(v)
        if ("size" in kl or "capacity" in kl or "storage" in kl) and size is None:
            size = str(v)

    if d and not (color or size):
        other = "; ".join([f"{k}: {v}" for k, v in d.items()])

    return raw or "", color or "", size or "", other or ""


### Cleaning loop: stream → clean → filter → dedupe → write CSV

In [None]:

from collections import Counter

COLUMNS = [
    "review_id", "product_id", "user_id",
    "rating", "verified", "vote", "vote_int",
    "unix_time", "review_time", "review_time_iso",
    "summary", "text", "lang",
    "text_len", "word_count",
    "style_raw", "style_color", "style_size", "style_other"
]

# Initialize output CSVs with headers
pd.DataFrame(columns=COLUMNS).to_csv(CLEANED, index=False)
pd.DataFrame(columns=["reason","reviewerID","asin"]).to_csv(REMOVED, index=False)

# State
dedupe_seen = set()
lang_counts = Counter()
clean_buffer = []
removed_all = []
n_seen = 0

for rec in stream_jsonl(RAW_PATH):
    n_seen += 1
    # Extract fields
    overall = rec.get("overall", None)
    vote = rec.get("vote", None)
    verified = rec.get("verified", None)
    reviewTime = rec.get("reviewTime", None)
    reviewerID = rec.get("reviewerID", None)
    asin = rec.get("asin", None)
    style = rec.get("style", None)
    reviewerName = rec.get("reviewerName", None)  # unused downstream
    reviewText = rec.get("reviewText", rec.get("review", ""))
    summary = rec.get("summary", "")
    unixReviewTime = rec.get("unixReviewTime", None)
    # (drop 'image' entirely)

    # Required keys
    if not reviewerID or not asin:
        removed_all.append({"reason": "missing_keys", "reviewerID": reviewerID, "asin": asin})
        continue

    # Clean text
    txt = clean_text(reviewText)
    summ = clean_text(summary)

    # Drop short/nonsense
    if len(txt) < MIN_TEXT_LEN or is_nonsensical(txt):
        removed_all.append({"reason": "short_or_nonsense", "reviewerID": reviewerID, "asin": asin})
        continue

    # Language
    lang = detect_lang_safe(txt)
    lang_counts[lang] += 1
    if lang not in KEEP_LANGS:
        removed_all.append({"reason": f"lang_{lang}", "reviewerID": reviewerID, "asin": asin})
        continue

    # Rating
    try:
        rating = float(overall)
    except Exception:
        rating = None
    if rating is None or not (1.0 <= rating <= 5.0):
        removed_all.append({"reason": "bad_rating", "reviewerID": reviewerID, "asin": asin})
        continue

    # Time
    ut, dt_iso = parse_review_time(reviewTime, unixReviewTime)
    if ut is None:
        removed_all.append({"reason": "bad_time", "reviewerID": reviewerID, "asin": asin})
        continue

    # Dedup
    dedupe_key = (str(reviewerID), str(asin), int(ut), txt)
    if dedupe_key in dedupe_seen:
        removed_all.append({"reason": "duplicate", "reviewerID": reviewerID, "asin": asin})
        continue
    dedupe_seen.add(dedupe_key)

    # Votes/verified
    vote_int = to_int_votes(vote)
    verified_bool = to_bool_verified(verified)

    # Style normalization
    style_raw, style_color, style_size, style_other = parse_style(style)

    # Stable review_id
    review_id = stable_id(str(reviewerID), str(asin), str(ut), txt[:64])

    # Derived
    words = txt.split()
    word_count = len(words)

    clean_buffer.append({
        "review_id": review_id,
        "product_id": str(asin),
        "user_id": str(reviewerID),
        "rating": rating,
        "verified": verified_bool,
        "vote": vote if vote is not None else "",
        "vote_int": int(vote_int),
        "unix_time": int(ut),
        "review_time": reviewTime if reviewTime is not None else "",
        "review_time_iso": dt_iso if dt_iso is not None else "",
        "summary": summ,
        "text": txt,
        "lang": lang,
        "text_len": len(txt),
        "word_count": word_count,
        "style_raw": style_raw,
        "style_color": style_color,
        "style_size": style_size,
        "style_other": style_other,
    })

    # Periodic flush
    if len(clean_buffer) >= WRITE_CHUNK_SIZE:
        pd.DataFrame(clean_buffer, columns=COLUMNS).to_csv(CLEANED, mode="a", header=False, index=False)
        clean_buffer = []

    # Progress + max rows guard
    if n_seen % 10000 == 0:
        print(f"Processed {n_seen:,} lines... cleaned so far: {sum(1 for _ in open(CLEANED, 'r', encoding='utf-8'))-1:,}")
    if MAX_ROWS and n_seen >= MAX_ROWS:
        print(f"Reached MAX_ROWS={MAX_ROWS}, stopping early.")
        break

# Final flush
if clean_buffer:
    pd.DataFrame(clean_buffer, columns=COLUMNS).to_csv(CLEANED, mode="a", header=False, index=False)

# Write removed & language stats
if removed_all:
    pd.DataFrame(removed_all)[["reason","reviewerID","asin"]].to_csv(REMOVED, mode="a", header=False, index=False)

with open(LANGJSON, "w", encoding="utf-8") as f:
    json.dump({"counts": dict(lang_counts)}, f, indent=2)

print("✅ Cleaning complete.")
print("Wrote:", CLEANED, REMOVED, LANGJSON)


### Quick EDA (matplotlib-only, one chart per figure)

In [None]:

# Load a manageable slice for EDA
df = pd.read_csv(CLEANED, nrows=200000)  # adjust if needed
print("EDA sample shape:", df.shape)
df.head()


In [None]:

# Ratings distribution
fig, ax = plt.subplots()
ax.hist(df['rating'].dropna(), bins=[1,2,3,4,5,6], align='left', rwidth=0.9)
ax.set_xlabel("Stars"); ax.set_ylabel("Count"); ax.set_title("Ratings distribution")
plt.show()


In [None]:

# Text length distribution (log scale)
fig, ax = plt.subplots()
ax.hist(df['text_len'].clip(upper=2000), bins=50)
ax.set_yscale('log')
ax.set_xlabel("Text length (chars)"); ax.set_ylabel("Count (log)"); ax.set_title("Review length distribution")
plt.show()


In [None]:

# Helpful votes vs rating
fig, ax = plt.subplots()
ax.scatter(df['rating'], df['vote_int'], s=2, alpha=0.3)
ax.set_xlabel("Rating"); ax.set_ylabel("Votes"); ax.set_title("Helpful votes vs Rating")
plt.show()


In [None]:

# Language stats
try:
    with open(LANGJSON, "r", encoding="utf-8") as f:
        lang_stats = json.load(f)["counts"]
    lang_stats
except FileNotFoundError:
    print("Language stats file not found; run the cleaning cell first.")
