In [None]:
# scripts/02_preprocess_spotify.py
# Purpose: Clean + standardize Spotify dataset for Power BI ingestion
# Output: data/processed/spotify_clean.csv

import os
import re
import unicodedata
import pandas as pd


RAW_PATH = os.path.join("data", "raw", "spotify_raw.csv")
OUT_PATH = os.path.join("data", "processed", "spotify_clean.csv")


def _normalize_text(x: str) -> str:
    if pd.isna(x):
        return ""
    x = str(x).strip()
    x = unicodedata.normalize("NFKD", x)
    x = "".join(ch for ch in x if not unicodedata.combining(ch))
    x = re.sub(r"\s+", " ", x)  # collapse whitespace
    return x


def _titlecase_artist(x: str) -> str:
    x = _normalize_text(x)
    # keep common stylings: ALL CAPS acronyms, & and ' etc.
    # simple approach: title-case, then fix known patterns
    x = x.title()
    x = re.sub(r"\bDj\b", "DJ", x)
    x = re.sub(r"\bRnb\b", "R&B", x)
    return x


def _standardize_album_type(x: str) -> str:
    x = _normalize_text(x).lower()
    mapping = {
        "album": "album",
        "single": "single",
        "compilation": "compilation",
        "ep": "ep",
    }
    return mapping.get(x, "other")


def _to_bool_explicit(x):
    # handles True/False, 0/1, "explicit"/"non-explicit"
    if pd.isna(x):
        return False
    if isinstance(x, bool):
        return x
    s = str(x).strip().lower()
    if s in {"1", "true", "t", "yes", "explicit"}:
        return True
    return False


def main():
    if not os.path.exists(RAW_PATH):
        raise FileNotFoundError(
            f"Missing raw dataset at {RAW_PATH}. Put your CSV there as data/raw/spotify_raw.csv"
        )

    df = pd.read_csv(RAW_PATH)

    # ---- Expected columns (adjust if your dataset uses different names) ----
    # track_name, artist_name, album_name, album_type, release_date, duration_ms, popularity, explicit
    col_map_candidates = {
        "track_name": ["track_name", "song", "track", "name"],
        "artist_name": ["artist_name", "artist", "artists"],
        "album_name": ["album_name", "album"],
        "album_type": ["album_type", "type"],
        "release_date": ["release_date", "released", "release"],
        "duration_ms": ["duration_ms", "duration", "durationmilliseconds"],
        "popularity": ["popularity", "popularity_score", "score"],
        "explicit": ["explicit", "is_explicit"],
    }

    # Auto-map columns if naming differs
    lower_cols = {c.lower(): c for c in df.columns}
    resolved = {}
    for std, opts in col_map_candidates.items():
        for o in opts:
            if o.lower() in lower_cols:
                resolved[std] = lower_cols[o.lower()]
                break

    missing = [k for k in ["track_name", "artist_name", "release_date", "popularity"] if k not in resolved]
    if missing:
        raise ValueError(
            f"Missing required fields in your CSV: {missing}. "
            f"Found columns: {list(df.columns)}. "
            f"Edit the mapping section in this script to match your dataset."
        )

    # Rename to standard names
    df = df.rename(columns={resolved[k]: k for k in resolved})

    # ---- Cleaning / Standardization ----
    df["track_name"] = df["track_name"].apply(_normalize_text)
    df["artist_name"] = df["artist_name"].apply(_titlecase_artist)

    if "album_name" in df.columns:
        df["album_name"] = df["album_name"].apply(_normalize_text)

    if "album_type" in df.columns:
        df["album_type"] = df["album_type"].apply(_standardize_album_type)
    else:
        df["album_type"] = "other"

    if "explicit" in df.columns:
        df["explicit"] = df["explicit"].apply(_to_bool_explicit)
    else:
        df["explicit"] = False

    # release_date parsing
    df["release_date"] = pd.to_datetime(df["release_date"], errors="coerce")
    df = df[~df["release_date"].isna()].copy()

    df["release_year"] = df["release_date"].dt.year.astype(int)
    df["release_month_num"] = df["release_date"].dt.month.astype(int)
    df["release_month"] = df["release_date"].dt.strftime("%b")  # Jan, Feb...
    df["release_year_month"] = df["release_date"].dt.to_period("M").astype(str)

    # duration
    if "duration_ms" in df.columns:
        df["duration_ms"] = pd.to_numeric(df["duration_ms"], errors="coerce")
        df["duration_minutes"] = (df["duration_ms"] / 1000 / 60).round(2)
    else:
        df["duration_minutes"] = pd.NA

    # popularity
    df["popularity"] = pd.to_numeric(df["popularity"], errors="coerce")
    df = df[~df["popularity"].isna()].copy()

    # Deduplicate: keep the most popular record for same artist+track+date
    dedupe_cols = ["artist_name", "track_name", "release_date"]
    df = (
        df.sort_values(["popularity"], ascending=False)
          .drop_duplicates(subset=dedupe_cols, keep="first")
          .reset_index(drop=True)
    )

    # Save
    os.makedirs(os.path.dirname(OUT_PATH), exist_ok=True)
    df.to_csv(OUT_PATH, index=False)

    print("âœ… Clean file created:", OUT_PATH)
    print("Rows:", len(df), " | Columns:", len(df.columns))
    print("Artists:", df["artist_name"].nunique(), " | Songs:", df["track_name"].nunique())


if __name__ == "__main__":
    main()
