## 1) Setup Spark + Drive

Sets up PySpark and mounts Google Drive, then loads the bronze Parquet dataset from the ingestion notebook.
Initializes a SparkSession for preprocessing and previews the schema and a small sample. The silver path is defined for cleaned/normalized output.

In [1]:
# Install PySpark runtime in Colab
!pip -q install pyspark

# Mount Google Drive for persistent I/O
from google.colab import drive
drive.mount('/content/drive')

# Core Spark imports for transforms and types
from pyspark.sql import SparkSession, functions as F, types as T, Window as W

# Start a Spark session for preprocessing tasks
spark = (
    SparkSession.builder
    .appName("SpotifyPreprocessing")
    .getOrCreate()
)

# Input/Output lakehouse layers
BRONZE = "file:///content/drive/MyDrive/data/spotify/bronze_parquet"  # from 01_Data_Ingestion.ipynb
SILVER = "file:///content/drive/MyDrive/data/spotify/silver_parquet"  # target for cleaned data

# Load bronze-level Parquet dataset
df = spark.read.parquet(BRONZE)

print("Rows:", df.count())

df.printSchema()

df.show(5, truncate=False)

Mounted at /content/drive
Rows: 114000
root
 |-- Unnamed: 0: integer (nullable = true)
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- explicit: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- track_genre: string (nullable = true)

+----------+----------------------+----------------------+---------------------------------

## 2) Column plan

Plans column handling and normalizes types: drops junk columns, trims all strings, converts empty strings to nulls in a single pass, then safely casts booleans and numeric-like fields to proper types.
This produces a cleaned silver-ready DataFrame with consistent nulls and dtypes.

In [2]:
# Logical column groups for downstream reference/validation
id_cols = ["track_id"]
cat_cols = ["artists", "album_name", "track_name", "track_genre", "key"]
bool_cols = ["explicit"]
num_cols_as_string = [
    "popularity","duration_ms","danceability","energy","loudness","mode",
    "speechiness","acousticness","instrumentalness","liveness","valence",
    "tempo","time_signature"
]
drop_cols = ["Unnamed: 0"]  # junk index from CSV export

# Drop junk columns if present
df1 = df.drop(*[c for c in drop_cols if c in df.columns])

# Detect string columns programmatically to avoid hard-coding
string_cols = [f.name for f in df1.schema.fields if isinstance(f.dataType, T.StringType)]

# Trim whitespace in all string cols and normalize "" -> null
exprs = []
for c in df1.columns:
    if c in string_cols:
        trimmed = F.trim(F.col(c))
        exprs.append(F.when(trimmed == "", None).otherwise(trimmed).alias(c))
    else:
        exprs.append(F.col(c))
df1 = df1.select(*exprs)

# Cast with null-on-failure semantics
def safe_cast(col, new_type):
    casted = F.col(col).cast(new_type)
    return F.when(casted.isNotNull(), casted).otherwise(None)

# Begin type normalization on a working DataFrame
df2 = df1

# Robust explicit → boolean mapping from common truthy/falsey strings
if "explicit" in df2.columns:
    df2 = df2.withColumn(
        "explicit",
        F.when(F.lower(F.col("explicit")).isin("true","1","t","yes"), F.lit(True))
         .when(F.lower(F.col("explicit")).isin("false","0","f","no"), F.lit(False))
         .otherwise(None)  # unrecognized values -> null
         .cast(T.BooleanType())
    )

# Cast numeric-looking columns safely to their intended types
for c in num_cols_as_string:
    if c in df2.columns:
        if c in ["popularity", "duration_ms", "mode", "time_signature"]:
            df2 = df2.withColumn(c, safe_cast(c, T.IntegerType()))
        else:
            df2 = df2.withColumn(c, safe_cast(c, T.DoubleType()))

# 'key' is typically an integer musical key index; cast if currently string
if "key" in df2.columns and dict(df2.dtypes).get("key") == "string":
    df2 = df2.withColumn("key", safe_cast("key", T.IntegerType()))

df2.printSchema()
df2.show(5, truncate=False)

root
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: integer (nullable = true)
 |-- track_genre: string (nullable = true)

+----------------------+----------------------+------------------------------------------------------+--------------------------+----------+-----------+--------+------

## 3)  De-duplication & basic filters

De-duplicates records and applies minimal quality filters.
Keeps one row per (track_id, album_name, track_name) using popularity (then duration) to choose the winner; warns if key columns are missing, drops rows without essential IDs/genre, and filters out non-positive durations.

In [8]:
# Cache to avoid re-computation when counting before/after
df2 = df2.cache()
before = df2.count()  # triggers evaluation

# Keep the "best" row by popularity, then duration
keys = ["track_id", "album_name", "track_name"]
existing_keys = [k for k in keys if k in df2.columns]
if existing_keys:
    w = W.partitionBy(*existing_keys).orderBy(
        F.col("popularity").desc_nulls_last(),
        F.col("duration_ms").desc_nulls_last()
    )
    df3 = (
        df2.withColumn("_rn", F.row_number().over(w))
           .where(F.col("_rn") == 1)  # keep top-ranked per key
           .drop("_rn")
    )
else:
    # Fallback: if keys are missing, remove full-row duplicates
    df3 = df2.dropDuplicates()

after = df3.count()
print("Dropped duplicates:", before - after)

# Ensure essential identifiers/genre are present
required = [c for c in ["track_id", "track_name", "track_genre"] if c in df3.columns]
if len(required) < 3:
    missing = set(["track_id", "track_name", "track_genre"]) - set(required)
    print(f"Warning: missing expected columns: {sorted(missing)}")

if required:
    df3 = df3.where(
        F.col("track_id").isNotNull()
        & F.col("track_name").isNotNull()
        & F.col("track_genre").isNotNull()
    )

# Sanity check: drop rows with non-positive durations when available
if "duration_ms" in df3.columns:
    df3 = df3.where((F.col("duration_ms").isNull()) | (F.col("duration_ms") > 0))

# Remove rows where track_genre looks numeric
numeric_pattern = r"^[+-]?\d+(\.\d+)?([eE][+-]?\d+)?$"
df3 = df3.filter(~F.col("track_genre").rlike(numeric_pattern))

df3.select("track_genre").distinct().show(truncate=False)

Dropped duplicates: 24259
+-----------------+
|track_genre      |
+-----------------+
|anime            |
|singer-songwriter|
|folk             |
|hardstyle        |
|pop              |
|alternative      |
|death-metal      |
|idm              |
|detroit-techno   |
|k-pop            |
|j-dance          |
|ambient          |
|guitar           |
|goth             |
|cantopop         |
|blues            |
|study            |
|malay            |
|dance            |
|breakbeat        |
+-----------------+
only showing top 20 rows



## 4) Missing values — median imputation per genre

Imputes missing numeric values using per-genre medians, with global medians as a fallback.

In [9]:
# Target numeric-like columns to impute if present in the DataFrame
target_num_cols = [
    "popularity","duration_ms","danceability","energy","loudness","mode",
    "speechiness","acousticness","instrumentalness","liveness","valence",
    "tempo","time_signature","key"
]
target_num_cols = [c for c in target_num_cols if c in df3.columns]

# If we lack genre or there are no numeric targets, fall back to global medians
if "track_genre" not in df3.columns or not target_num_cols:
    print("Note: track_genre not found or no numeric columns; using global medians only.")

    # Compute global medians for distributed efficiency
    global_row = df3.select(
        *[F.expr(f"percentile_approx({c}, 0.5)").alias(f"med_{c}") for c in target_num_cols]
    ).first()

    # Convert Row to a simple dict of column -> float/None
    global_vals = {
        c: float(global_row[f"med_{c}"]) if global_row[f"med_{c}"] is not None else None
        for c in target_num_cols
    }

    # Impute using global medians only
    df4 = df3
    for c in target_num_cols:
        df4 = df4.withColumn(c, F.coalesce(F.col(c), F.lit(global_vals[c])))

else:
    # Compute per-genre medians for all target columns
    per_genre = (
        df3.groupBy("track_genre")
           .agg(*[F.expr(f"percentile_approx({c}, 0.5)").alias(f"med_{c}") for c in target_num_cols])
           .cache()
    )

    # Also compute global medians when genre medians are null
    global_row = df3.select(
        *[F.expr(f"percentile_approx({c}, 0.5)").alias(f"med_{c}") for c in target_num_cols]
    ).first()
    global_vals = {
        f"med_{c}": (float(global_row[f"med_{c}"]) if global_row[f"med_{c}"] is not None else None)
        for c in target_num_cols
    }

    # Attach per-genre median columns to each row
    df4 = df3.join(per_genre, on="track_genre", how="left")

    # For each target column: value <- value or genre median or global median
    for c in target_num_cols:
        med_col = f"med_{c}"
        df4 = df4.withColumn(
            c,
            F.coalesce(F.col(c), F.col(med_col), F.lit(global_vals[med_col]))
        )

    # Clean up helper columns
    df4 = df4.drop(*[f"med_{c}" for c in target_num_cols])

df4.select("track_genre", *[c for c in target_num_cols[:6]]).show(5, truncate=False)

+--------------+----------+-----------+------------+------+--------+----+
|track_genre   |popularity|duration_ms|danceability|energy|loudness|mode|
+--------------+----------+-----------+------------+------+--------+----+
|hip-hop       |62.0      |190203.0   |0.679       |0.77  |-3.537  |1.0 |
|minimal-techno|19.0      |331240.0   |0.519       |0.431 |-13.606 |0.0 |
|comedy        |24.0      |127040.0   |0.536       |0.78  |-9.449  |0.0 |
|chill         |0.0       |176320.0   |0.613       |0.471 |-6.644  |0.0 |
|punk-rock     |38.0      |177266.0   |0.554       |0.921 |-4.589  |1.0 |
+--------------+----------+-----------+------------+------+--------+----+
only showing top 5 rows



## 5) Outliers — IQR capping per genre

Caps numeric outliers per genre using the IQR rule, then applies domain-aware clamps.
Skips categorical-like integers and safely handles null stats.

In [10]:
# Columns eligible for IQR capping
cap_cols = [c for c in target_num_cols if c not in ["mode", "key", "time_signature"]]

# If we can't segment by genre or nothing to cap, pass-through
if "track_genre" not in df4.columns or not cap_cols:
    df5 = df4
else:
    # Compute per-genre quartiles for each target column
    q1_exprs = [F.expr(f"percentile_approx({c}, 0.25)").alias(f"q1_{c}") for c in cap_cols]
    q3_exprs = [F.expr(f"percentile_approx({c}, 0.75)").alias(f"q3_{c}") for c in cap_cols]
    iqr_stats = df4.groupBy("track_genre").agg(*q1_exprs, *q3_exprs).cache()

    # Attach per-genre Q1/Q3 to each row
    df5 = df4.join(iqr_stats, on="track_genre", how="left")

    # Apply IQR capping per column; leave as-is when value or bounds are null
    for c in cap_cols:
        q1c, q3c = F.col(f"q1_{c}"), F.col(f"q3_{c}")
        iqr = (q3c - q1c)
        low = q1c - F.lit(1.5) * iqr
        high = q3c + F.lit(1.5) * iqr

        capped = (
            F.when(F.col(c).isNull() | q1c.isNull() | q3c.isNull(), F.col(c))
             .when(F.col(c) < low, low)
             .when(F.col(c) > high, high)
             .otherwise(F.col(c))
        )
        df5 = df5.withColumn(c, capped)

    # Domain-aware clamps after IQR capping:
    #  - bounded rates should fall within [0, 1]
    for c in set(cap_cols).intersection({
        "danceability","energy","speechiness","acousticness",
        "instrumentalness","liveness","valence"
    }):
        df5 = df5.withColumn(
            c,
            F.when(F.col(c).isNull(), None)
             .when(F.col(c) < 0, F.lit(0.0))
             .when(F.col(c) > 1, F.lit(1.0))
             .otherwise(F.col(c))
        )

    #  - duration must be non-negative
    if "duration_ms" in df5.columns:
        df5 = df5.withColumn(
            "duration_ms",
            F.when(F.col("duration_ms").isNull(), None)
             .when(F.col("duration_ms") < 0, F.lit(0))
             .otherwise(F.col("duration_ms"))
        )

    #  - popularity constrained to [0, 100]
    if "popularity" in df5.columns:
        df5 = df5.withColumn(
            "popularity",
            F.when(F.col("popularity").isNull(), None)
             .when(F.col("popularity") < 0, F.lit(0))
             .when(F.col("popularity") > 100, F.lit(100))
             .otherwise(F.col("popularity"))
        )

    # Drop helper quartile columns added during the join
    helper_cols = [f"q1_{c}" for c in cap_cols] + [f"q3_{c}" for c in cap_cols]
    df5 = df5.drop(*[h for h in helper_cols if h in df5.columns])

df5.select("track_genre", *[c for c in cap_cols[:6] if c in df5.columns]).show(5, truncate=False)

+--------------+----------+-----------+------------+------+--------+-----------+
|track_genre   |popularity|duration_ms|danceability|energy|loudness|speechiness|
+--------------+----------+-----------+------------+------+--------+-----------+
|hip-hop       |62.0      |190203.0   |0.679       |0.77  |-3.537  |0.19       |
|minimal-techno|19.0      |331240.0   |0.519       |0.431 |-13.606 |0.0291     |
|comedy        |24.0      |127040.0   |0.536       |0.78  |-9.449  |0.945      |
|chill         |36.0      |176320.0   |0.613       |0.471 |-6.644  |0.107      |
|punk-rock     |38.0      |177266.0   |0.554       |0.921 |-4.589  |0.0758     |
+--------------+----------+-----------+------------+------+--------+-----------+
only showing top 5 rows



## 6) Normalization — Z-score for numerics

Standardizes continuous features with global Z-scores.
Skips discrete/categorical-like integers and only normalizes when stats are valid.

In [11]:
# Continuous columns only; exclude categorical-like integer fields
cont_cols = [
    c for c in target_num_cols
    if c in df5.columns and c not in ["mode", "time_signature", "key"]
]

# Compute global means and population stddevs for all targets
agg_exprs = []
for c in cont_cols:
    agg_exprs.append(F.mean(c).alias(f"mean_{c}"))
    agg_exprs.append(F.stddev_pop(c).alias(f"std_{c}"))  # population stddev

stats_row = df5.agg(*agg_exprs).first()
stats = stats_row.asDict()

df6 = df5

# Add {col}_z only when mean/std are defined and std != 0
for c in cont_cols:
    mean_c = stats.get(f"mean_{c}")
    std_c  = stats.get(f"std_{c}")
    if mean_c is not None and std_c not in (None, 0.0):
        df6 = df6.withColumn(
            f"{c}_z",
            (F.col(c) - F.lit(float(mean_c))) / F.lit(float(std_c))
        )

df6.select(
    *(id_cols + ["track_name", "track_genre"] + cont_cols + [f"{c}_z" for c in cont_cols])
).show(5, truncate=False)

+----------------------+----------------------------------------+--------------+----------+-----------+------------+------+--------+-----------+------------+---------------------+--------+-------+-------+-------------------+--------------------+---------------------+-------------------+--------------------+--------------------+---------------------+--------------------+-------------------+--------------------+------------------+
|track_id              |track_name                              |track_genre   |popularity|duration_ms|danceability|energy|loudness|speechiness|acousticness|instrumentalness     |liveness|valence|tempo  |popularity_z       |duration_ms_z       |danceability_z       |energy_z           |loudness_z          |speechiness_z       |acousticness_z       |instrumentalness_z  |liveness_z         |valence_z           |tempo_z           |
+----------------------+----------------------------------------+--------------+----------+-----------+------------+------+--------+--

## 7) Save silver dataset to Drive (Parquet)

Saves the fully cleaned and normalized silver-level dataset to Google Drive in Parquet format.
The silver layer represents standardized, deduplicated, and imputed data ready for analytics or modeling.

In [12]:
# Persist the preprocessed DataFrame as Parquet (overwrite existing version)
df6.write.mode("overwrite").parquet(SILVER)

# Reload the written dataset to confirm successful save
df_silver = spark.read.parquet(SILVER)

print("Silver rows:", df_silver.count())
df_silver.printSchema()

df_silver.show(5, truncate=False)

Silver rows: 89620
root
 |-- track_genre: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- popularity_z: double (nullable = true)
 |-- duration_ms_z: double (nullable = true)
 |-- danceability_z: double (nullable = true)
 |-- energy_z: doub