# Spotify Recommendation System Project

**Purpose:** Build a scalable music recommendation system using PySpark and Spotify datasets.

**Group members:**

* Raakin Bhatti
* Aneesh Bulusu
* Walid Farhat
* Long Nguyen
* Strahinja Radakovic

# Introduction

**What is our big data problem? What is our goal?**
* Build a scalable music recommendation system using a large dataset of songs and their audio features from Spotify.
* Given the names of some songs, the algorithm will predict and recommend songs similar to the input songs based on their audio features (e.g. danceability, energy, acoustics, etc.) and categorical data like genres.
* Use Spark/PySpark to process large-scale data and develop machine learning algorithms.

**Why we chose building Recommendation Systems:**
* Recommendation systems are widely used in modern digital platforms to enhance user experience (e.g., Spotify, Netflix, Amazon).
* Help users discover relevant content, which helps to increase engagement and satisfaction.

**Why Spotify dataset?**
* Spotify is a leading music streaming platform which has rich data on songs, audio features, and artists.
* The dataset provides an opportunity to analyze music preferences and recommend personalized songs or playlists.

**What is a big data challenge?**
* Spotify data involves thousands of users, songs, and interactions, requiring storage and processing at scale.
* PySpark is well-suited for handling and analyzing big data with its distributed computing capabilities.

**Broader applications:**
* Insights from the project can extend to other recommendation systems.
* Demonstrates the integration of big data tools and machine learning for real-world applications.


**Filtering method:**

Our group's recommendation system will use Content-Based Filtering method instead of Collaborative filtering. Content-Based Filtering method analyzes the audio characteristics of songs you’ve previously enjoyed, then the model will make personalized suggestions. We do not use Collaborative filtering or user-based filtering method becuase we cannot collect the information related to Spotify's users such as `user_id`.

**What is the Target Variable for this project?**

In a content-based recommendation system, we do not have a traditional "target variable" like in supervised learning. Instead, the goal is to calculate similarity metrics between songs based on their features. However, you can think of the **song similarity score** (e.g., cosine similarity, Euclidean distance) as the implicit target metric for creating recommendations.

* The goal is to recommend songs similar to the input songs based on their audio features and genres.
* For our recommendation system, the focus is on matching songs based on audio features like danceability, energy, acousticness, etc., and categorical data like genre.

**What are the interesting Features of the dataset?**

*Numerical Features (Audio Characteristics):* 

* danceability: Indicates how suitable a track is for dancing.
* energy: Represents the intensity and activity level of a track.
* loudness: Measures the decibel level of the track.
* acousticness: Likelihood of the track being acoustic.
* instrumentalness: Determines the degree to which a track is instrumental.
* valence: Describes the musical positiveness conveyed by a track.
* tempo: The speed of the song in beats per minute (BPM).
* duration_ms: Song duration, which can help differentiate between shorter and longer tracks.

*Categorical Features:*

* genre: A key factor in identifying similar songs. 
* key: Musical key in which the song is composed.
* mode: Indicates whether the song is in a major or minor scale.

*Meta Information (Optional):*

* popularity: While not directly linked to audio characteristics, it can serve as a secondary ranking factor in your recommendations.
* year: Could help in filtering songs by era if needed.

# Setting up Spark

This section is optional. In case that you have not installed Spark, Hadoop, etc. in your local machine, then this part will help setting up Spark in the Jupyter Notebook for running.

In [1]:
## OPTIONAL: Setting up Spark in Jupyter Notebook

# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
# !tar -xvf spark-3.5.3-bin-hadoop3.tgz
# !pip install findspark
# 
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"
# 
# import findspark
# findspark.init()
# findspark.find()

In [2]:
## OPTIONAL: Test if PySpark is ready to go

# from pyspark.sql import SparkSession
# 
# spark = SparkSession.builder.appName("Test").getOrCreate()
# print(f"Spark version:", spark.version)
# 
# spark.stop()

## expected result: 3.5.3 or similar

# Data Collection

**Required Tasks:**
* Load the dataset into a PySpark DataFrame.
* Verify the dataset schema and check if the data is loaded correctly.

**Output:** A PySpark DataFrame loaded and ready for processing, with the schema verified.


### How we get the dataset?

We extracted the dataset from Spotify using Spotify API and the `Spotipy` library in Python.

Attached the Python files for extraction and transformation (from JSON to CSV). Please note that these files are for references only, because in order to run those files, you will have to set up a virtual environment.

### Loading datasets

In [3]:
import findspark
findspark.init()
findspark.find()

'C:\\Program Files\\spark-3.5.3'

In [4]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, lit
import pandas as pd
import matplotlib.pyplot as plt

In [5]:
# initialize a SparkSession
spark = SparkSession.builder \
    .appName("SpotifyRecommendationSystem") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

print(f"Spark version:", spark.version)

Spark version: 3.5.3


In [6]:
# load dataset into a DataFrame
file_path = "./dataset/spotify_dataset.csv" # Note: The dataset file is too large (>250 MB) to commit to a GitHub repo.
spotify_df = spark.read.csv(file_path, header=True, inferSchema=True)

# display the first few rows
spotify_df.show(5)


+---+-------------+----------------+--------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+--------------+
|_c0|  artist_name|      track_name|            track_id|popularity|year|   genre|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration_ms|time_signature|
+---+-------------+----------------+--------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+--------------+
|  0|   Jason Mraz| I Won't Give Up|53QF56cjZA9RTuuMZ...|        68|2012|acoustic|       0.483| 0.303|  4| -10.058|   1|     0.0429|       0.694|             0.0|   0.115|  0.139|133.406|   240166.0|           3.0|
|  1|   Jason Mraz|93 Million Miles|1s8tP3jP4GZcyHDsj...|        50|2012|acoustic|       0.572| 0.454|  3| -10.286|   1|     0.0258|       0

# Data Inspection and Validation

In this section, we will do the following tasks:

* Check the data schema with column names and data types.
* Convert data types if needed.
* Check for the summary statistics of the dataset.
* Check for missing values. Handle missing values properly.
* Check for outliers. Handle the outliers.
* Check for distinct values.

In [7]:
# print schema to verify column names and data types
spotify_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- year: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: string (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- time_signature: double (nullable = true)



**Data type conversins:**

From [Spotify's API documentation](https://developer.spotify.com/documentation/web-api/reference/get-audio-features) about track's audio features, each feature has its own meaning and data types. So we will convert the data types of features in our dataset to match Spotify's documentation.

In [8]:
# casting columns to their appropriate data types as per Spotify's documentation
spotify_df = spotify_df \
    .withColumn("popularity", col("popularity").cast("int")) \
    .withColumn("year", col("year").cast("int")) \
    .withColumn("danceability", col("danceability").cast("float")) \
    .withColumn("energy", col("energy").cast("float")) \
    .withColumn("key", col("key").cast("int")) \
    .withColumn("loudness", col("loudness").cast("float")) \
    .withColumn("mode", col("mode").cast("int")) \
    .withColumn("speechiness", col("speechiness").cast("float")) \
    .withColumn("acousticness", col("acousticness").cast("float")) \
    .withColumn("instrumentalness", col("instrumentalness").cast("float")) \
    .withColumn("liveness", col("liveness").cast("float")) \
    .withColumn("tempo", col("tempo").cast("float")) \
    .withColumn("time_signature", col("time_signature").cast("int"))

# re-check the updated schema
spotify_df.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- danceability: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: float (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- liveness: float (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: float (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- time_signature: integer (nullable = true)



In [9]:
# count total rows in the dataset
total_rows = spotify_df.count()
print(f"Total rows in the dataset: {total_rows}")

Total rows in the dataset: 1159764


Remarks: The dataset contains 1,159,764 rows, which is quite large, indicating the need for big data tools like PySpark.

**Check for Summary Statistics:**

Next, we want to check for the summary statistics of the dataset. In [Spark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.describe.html), if using `.describe()` method, it will by default calculate the stats for all columns including both numerical and non-numerical (string) columns. Therefore, we have to filter out the columns based on their data types as follows.

In [10]:
# filter numeric columns using dtypes
numeric_columns = [name for name, dtype in spotify_df.dtypes if dtype in ('int', 'bigint', 'double', 'float', 'decimal')]

# select only numeric columns
numeric_df = spotify_df.select(*numeric_columns)

# show summary statistics for numerical columns only
numeric_df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+
|summary|              _c0|        popularity|              year|     danceability|            energy|               key|          loudness|             mode|        speechiness|       acousticness|   instrumentalness|           liveness|            valence|             tempo|       duration_ms|    time_signature|
+-------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+
|  count|          1159764|           1158091|      

**Remarks for summary statistics:**

* `popularity`: Ranges from 0 to 100, indicating a reasonable scale for popularity.
* `year`: The dataset includes tracks from 0 to 2023. The value 0 seems anomalous and might need further investigation.
* Other features like `danceability`, `energy`, `tempo`, and `duration_ms` have a wide range of values, which may need normalization or standardization for machine learning.
* Potential outliers: Columns like `tempo` (min = -24.073) and `loudness` (min = -58.1) have unusual values that might indicate outliers or data entry issues.

In [11]:
# check for missing values in each column
missing_values = spotify_df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in spotify_df.columns
])
missing_values.show()

+---+-----------+----------+--------+----------+----+-----+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+-----------+--------------+
|_c0|artist_name|track_name|track_id|popularity|year|genre|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|duration_ms|time_signature|
+---+-----------+----------+--------+----------+----+-----+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+-----------+--------------+
|  0|          0|         0|       0|      1673| 623|    0|        1184|   408|162|      94|  33|         16|           8|               3|       2|      0|    0|          0|             0|
+---+-----------+----------+--------+----------+----+-----+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+-----------+--------------+



**Remarks for missing values:**

* Columns with missing data: popularity (1,673 missing values), year (623), and others such as  danceability, energy, key, loudness, and mode.
* Given the dataset has over 1.15M rows, the proportion of missing data is extremely small (less than 0.1% for all affected columns).
* In this case, we will drop rows with missing values in features that are critical for our recommendation system.

In [12]:
# drop rows with missing data in critical columns
columns_to_check = ['popularity', 'year', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness']

spotify_df_clean = spotify_df.na.drop(subset=columns_to_check)

# verify the number of rows after dropping
cleaned_rows = spotify_df_clean.count()
print(f"Rows after dropping missing data: {cleaned_rows}")

dropped_rows = total_rows - cleaned_rows
print(f"Number of rows dropped: {dropped_rows}")


Rows after dropping missing data: 1158091
Number of rows dropped: 1673


**Anomalies and Outliers**:

We will investigate the folloiwng things:

* Check for distinct values of `year` column to make sure no abnormal values (such as 1500).
* 



In [13]:
# get distinct years and their frequencies
year_distribution = spotify_df_clean.groupBy("year").count().orderBy("year")

year_distribution.show(50)

+----+-----+
|year|count|
+----+-----+
|2000|43944|
|2001|42316|
|2002|42084|
|2003|42250|
|2004|43293|
|2005|43708|
|2006|45419|
|2007|45920|
|2008|47336|
|2009|46810|
|2010|46818|
|2011|46381|
|2012|54725|
|2013|53105|
|2014|53120|
|2015|51569|
|2016|40246|
|2017|56171|
|2018|56541|
|2019|55739|
|2020|55035|
|2021|53529|
|2022|53637|
|2023|38395|
+----+-----+



Remarks for `year`: The years range from 2000 to 2023, which are expected and normal.

**Check for anomalies in numerical columns:**

Compare data against the value ranges specified in the [Spotify documentation](https://developer.spotify.com/documentation/web-api/reference/get-audio-features):
* acousticness: [0, 1]
* danceability: [0, 1]
* energy: [0, 1] 
* instrumentalness: [0, 1]
* key: [-1, 11]
* liveness: [0, 1]
* mode: {0, 1}
* speechiness: [0, 1]
* tempo: Typically [1, 250], (note: this is a typical range; Spotify docs don't strictly enforce this.)
* time_signature: [3, 7]
* valence: [0, 1]

In [14]:
# define the range checks for each feature
range_checks = {
    "acousticness": (0, 1),
    "danceability": (0, 1),
    "energy": (0, 1),
    "instrumentalness": (0, 1),
    "key": (-1, 11),
    "liveness": (0, 1),
    "mode": (0, 1),
    "speechiness": (0, 1),
    "tempo": (1, 350),
    "time_signature": (3, 7),
    "valence": (0, 1),
}

# identify anomalies in each feature
anomalies = {}
for feature, (min_val, max_val) in range_checks.items():
    anomalies[feature] = spotify_df_clean.filter((col(feature) < min_val) | (col(feature) > max_val)).count()

# print the anomalies count for each feature
for feature, count in anomalies.items():
    print(f"Number of anomalies in {feature}: {count}")


Number of anomalies in acousticness: 0
Number of anomalies in danceability: 0
Number of anomalies in energy: 0
Number of anomalies in instrumentalness: 0
Number of anomalies in key: 0
Number of anomalies in liveness: 0
Number of anomalies in mode: 0
Number of anomalies in speechiness: 0
Number of anomalies in tempo: 1198
Number of anomalies in time_signature: 13816
Number of anomalies in valence: 0


From the results:

* No Anomalies:
    * Most features (acousticness, danceability, energy, instrumentalness, etc.) have no anomalies. These are clean and can be used as-is.
* tempo Anomalies:
    * 1,198 anomalies fall outside the range [1, 250]. These might include invalid or outlier values (e.g., extremely high or low BPM).
* time_signature Anomalies:
    * 13,816 anomalies fall outside the range [3, 7]. These could represent tracks with unusual time signatures, potentially errors or special cases.

In [15]:
# Show rows with tempo anomalies
tempo_anomalies = spotify_df_clean.filter((col("tempo") < 1) | (col("tempo") > 350))

# Descriptive statistics for tempo anomalies
tempo_anomalies.describe("tempo").show()


+-------+-----+
|summary|tempo|
+-------+-----+
|  count| 1198|
|   mean|  0.0|
| stddev|  0.0|
|    min|  0.0|
|    max|  0.0|
+-------+-----+



**Remarks for Tempo:**

* As you can see, the min and max values of tempo anomalies are 0.0, which means these values are missing in the dataset (i.e. 0.0 == missing values). We can eliminate them since the quantity is very small in the dataset.

In [16]:
# Show rows with time_signature anomalies
time_signature_anomalies = spotify_df_clean.filter((col("time_signature") < 3) | (col("time_signature") > 7))

# Descriptive statistics for time_signature anomalies
time_signature_anomalies.describe("time_signature").show()


+-------+-------------------+
|summary|     time_signature|
+-------+-------------------+
|  count|              13816|
|   mean| 0.9111899247249565|
| stddev|0.28447970650303633|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



# Data Filtering

In this section, we will identify and retain only the relevant features for building the recommendation system. We will consider dropping irrelevant or redundant columns like `_c0` that may not contribute to the model.

* Since `time_signature` feature has more than 13K values of anamolies and has little importance to finding similarities between songs, we will drop this feature.
* `_c0` feature is only the index of the dataset and will contribute to the model so we will drop it.

In [17]:
# retain only relevant features for the recommendation system
selected_columns = [
    'artist_name', 'track_name', 'track_id', 'popularity', 'year', 
    'genre', 'danceability', 'energy', 'key', 'loudness', 'mode', 
    'speechiness', 'acousticness', 'instrumentalness', 'liveness', 
    'valence', 'tempo', 'duration_ms'
]

# create a filtered DataFrame
spotify_df_filtered = spotify_df_clean.select(*selected_columns)

# show the schema of the filtered DataFrame
spotify_df_filtered.printSchema()

# verify the filtering process
spotify_df_filtered.show(5)


root
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- danceability: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: float (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- liveness: float (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: float (nullable = true)
 |-- duration_ms: double (nullable = true)

+-------------+----------------+--------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+
|  artist_name|      track_name|            track_id|popularity|year|   gen

In [18]:
# Count distinct values in key columns
key_columns = ['artist_name', 'track_name', 'genre']
distinct_counts = {col_name: spotify_df_filtered.select(col_name).distinct().count() for col_name in key_columns}

for col_name, count in distinct_counts.items():
    print(f"Distinct values in {col_name}: {count}")

Distinct values in artist_name: 64144
Distinct values in track_name: 880697
Distinct values in genre: 82


# Data Transformations

Here are the goals of this section:

1. **Encode categorical features:** Features like `genre` and `artist_name` are categorical and need to be encoded to numeric values for modeling.
2. **Normalize or scale numeric features:** Standardized scales are crucial for similarity metrics like cosine similarity or Euclidean distance. Features like `danceability`, `energy`, `tempo`, etc., may have varying ranges, which can affect model performance. Therefore, we will normalize or standardize these values.
3. **Generate additional features (if needed):** Create new features based on existing ones. For example, you might engineer a feature for "popularity bucket" if grouping songs by popularity levels makes sense for your recommendation algorithm. Or group songs by year into broader time periods (e.g., 5-year period) for optional temporal filtering.

## Encoding categorial features

There are mutilple approaches for encoding categorical features, such as one-hot encoding, label encoding, embedding, frequency/popularity encoding, etc. We will choose the most suitable approach for each categorical feature based on their characteristics in the dataset.

We will encode `genre` and `artist_name` to numeric values using PySpark's `StringIndexer`.

### 1. Encoding `genre` feature

We will use one-hot encoding technique for `genre` because this feature has a limited number of unique values (392 in the dataset). Also, this technique works well with similarity metrics since each genre becomes a distinct dimension.


In [19]:
# Get distinct values of the 'genre' column
unique_genres = spotify_df_filtered.select("genre").distinct()
unique_genres_list = [row['genre'] for row in unique_genres.collect()]
print(unique_genres_list)

['singer-songwriter', 'folk', 'hardstyle', 'pop', 'death-metal', 'detroit-techno', 'k-pop', 'ambient', 'guitar', 'goth', 'cantopop', 'blues', 'breakbeat', 'dance', 'groove', 'indian', 'german', 'sad', 'spanish', 'french', 'electronic', 'dub', 'deep-house', 'edm', 'rock-n-roll', 'power-pop', 'progressive-house', 'swedish', 'chill', 'party', 'hip-hop', 'techno', 'hard-rock', 'indie-pop', 'jazz', 'new-age', 'show-tunes', 'trip-hop', 'punk-rock', 'country', 'hardcore', 'industrial', 'metalcore', 'songwriter', 'metal', 'soul', 'psych-rock', 'grindcore', 'pop-film', 'salsa', 'dancehall', 'club', 'electro', 'samba', 'drum-and-bass', 'heavy-metal', 'house', 'chicago-house', 'funk', 'alt-rock', 'sleep', 'dubstep', 'gospel', 'acoustic', 'rock', 'ska', 'opera', 'black-metal', 'romance', 'emo', 'tango', 'punk', 'disco', 'classical', 'sertanejo', 'afrobeat', 'garage', 'forro', 'trance', 'minimal-techno', 'comedy', 'piano']


**Remarks for genre's unique values:** The distinct values in the genre column are single-valued entries, including words with hyphens such as "black-metal", "alt-rock", "new-age", etc. These hyphenated values are treated as single entities and do not represent multi-valued genres. Therefore, we can safely proceed with encoding this column using StringIndexer without needing to split multi-valued genres.

In [20]:
# Encoding genre feature

from pyspark.ml.feature import StringIndexer, OneHotEncoder

# StringIndexer: Converts genre into numerical indices
genre_indexer = StringIndexer(inputCol="genre", outputCol="genre_index")
spotify_df_filtered = genre_indexer.fit(spotify_df_filtered).transform(spotify_df_filtered)

# OneHotEncoder: Converts indices into one-hot encoded vectors
genre_encoder = OneHotEncoder(inputCol="genre_index", outputCol="genre_onehot", dropLast=False)
spotify_df_encoded = genre_encoder.fit(spotify_df_filtered).transform(spotify_df_filtered)

# show the results
spotify_df_encoded.select("genre", "genre_index", "genre_onehot").show(5, truncate=False)


+--------+-----------+--------------+
|genre   |genre_index|genre_onehot  |
+--------+-----------+--------------+
|acoustic|3.0        |(82,[3],[1.0])|
|acoustic|3.0        |(82,[3],[1.0])|
|acoustic|3.0        |(82,[3],[1.0])|
|acoustic|3.0        |(82,[3],[1.0])|
|acoustic|3.0        |(82,[3],[1.0])|
+--------+-----------+--------------+
only showing top 5 rows



*Note:* For `genre_onehot` column, the `(82,[3],[1.0])` format means that the vector is sparse, with 82 possible genres (total unique categories), and the index `3` corresponds to the genre `acoustic`.

In [21]:
# Optional: Show expanded one-hot encoding for better readability
# by converting sparse vector to array

from pyspark.ml.functions import vector_to_array

spotify_df_encoded = spotify_df_encoded.withColumn("genre_onehot_array", vector_to_array(col("genre_onehot")))

spotify_df_encoded.select("genre", "genre_index", "genre_onehot_array").show(5, truncate=False)

+--------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|genre   |genre_index|genre_onehot_array                                                                                                                                                                                                                                                                                                                                                                                                        |
+--------+-----------+--------------------------------------------------------------------------------------------------------------

*Note:* For `genre_onehot_array` column, the `[0.0, 0.0, 0.0, 1.0, 0.0, ...]` format represents a dense vector with 82 possible genres. For example, the `1.0` at the 3rd index (0-based indexing) corresponds to the genre `acoustic`, as indicated by the `genre_index` of `3.0`.
All other indices are `0.0`, indicating the absence of other genres.

In [22]:
# review the dataframe
# note that there are 3 new columns: genre_index, genre_onehot, genre_onehot_array
spotify_df_encoded.show(5)

+-------------+----------------+--------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+-----------+--------------+--------------------+
|  artist_name|      track_name|            track_id|popularity|year|   genre|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration_ms|genre_index|  genre_onehot|  genre_onehot_array|
+-------------+----------------+--------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+-----------+--------------+--------------------+
|   Jason Mraz| I Won't Give Up|53QF56cjZA9RTuuMZ...|        68|2012|acoustic|       0.483| 0.303|  4| -10.058|   1|     0.0429|       0.694|             0.0|   0.115|  0.139|133.406|   240166.0|        3.0|(82,[3],[1.0])|[0.0, 0.0, 0.0, 1...|
|   Jason Mraz|93 Millio

### 2. Encoding `artist_name` feature

We will use the Frequency (or Popularity) Encoding technique because `artist_name` has a high cardinality (64,159 distinct values). ALso, frequency encoding reduces the feature space while retaining meaningful patterns (e.g., more popular artists might appear in recommendations more frequently).

In [23]:
# from pyspark.sql.functions import col, count, lit

# Calculate the frequency of each artist
artist_frequency = spotify_df_encoded.groupBy("artist_name").count() \
                             .withColumnRenamed("count", "artist_frequency")

# Join the frequency data back to the main DataFrame
spotify_df_encoded = spotify_df_encoded.join(artist_frequency, on="artist_name", how="left")

# Show the results
spotify_df_encoded.select("artist_name", "artist_frequency").distinct().show(5, truncate=False)


+-----------------+----------------+
|artist_name      |artist_frequency|
+-----------------+----------------+
|Ramshackle Glory |31              |
|Zach Berkman     |17              |
|The Black Keys   |209             |
|Black Pistol Fire|77              |
|Jane's Addiction |55              |
+-----------------+----------------+
only showing top 5 rows



*Note:* The frequencies (31, 17, etc.) reflect the occurrence count of each artist in the dataset, which is intuitive and interpretable.

In [24]:
# review the dataframe
# note that there are one new columns: artist_frequency

spotify_df_encoded.show(5)

+-------------+----------------+--------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+-----------+--------------+--------------------+----------------+
|  artist_name|      track_name|            track_id|popularity|year|   genre|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration_ms|genre_index|  genre_onehot|  genre_onehot_array|artist_frequency|
+-------------+----------------+--------------------+----------+----+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+-----------+--------------+--------------------+----------------+
|   Jason Mraz| I Won't Give Up|53QF56cjZA9RTuuMZ...|        68|2012|acoustic|       0.483| 0.303|  4| -10.058|   1|     0.0429|       0.694|             0.0|   0.115|  0.139|133.406|   240166.0|        3.0|(82,[3],[1

In [25]:
## OPTIONAL: use one-hot encoding method - this is not recommended because:
## Treats all artists equally, which may lose meaningful patterns like artist popularity.
## May introduce an artificial ordinal relationship between artists (e.g., artist_index = 0 vs. artist_index = 1).

# ## Check the unique values in the 'artist_name' column 
# 
# # group by 'genre', count occurrences, and order by count in descending order
# artist_name_counts = spotify_df.groupBy("artist_name").count().orderBy(col("count").desc())
# 
# artist_name_counts.show(10)
# 
# # encode the 'artist_name' column
# artist_indexer = StringIndexer(inputCol="artist_name", outputCol="artist_index")
# spotify_df_encoded = artist_indexer.fit(spotify_df_encoded).transform(spotify_df_encoded)
# 
# # View the distinct values of artist_name and their corresponding artist_index
# distinct_artist_mapping = spotify_df_encoded.select("artist_name", "artist_index").distinct()
# 
# distinct_artist_mapping = distinct_artist_mapping.orderBy("artist_index")
# 
# distinct_artist_mapping.show(10, truncate=False)

In [26]:
## OPTIONAL: 
# # drop the original categorical columns to avoid redundancy (optional)
# spotify_df_transformed = spotify_df_encoded.drop("genre", "artist_name")
# 
# # Verify encoding
# spotify_df_transformed.show(5)

In [27]:
# spotify_df_encoded.show(5)

In [28]:
## OPTIONAL: Save the encoded dataframe to a Parquet file for reuse
# spotify_df_encoded.write.parquet("./spotify_df_encoded.parquet", mode="overwrite")

## Normalize Numerical Features

We will normalize numerical features like `danceability`, `energy`, `tempo`, etc., to have values between 0 and 1 using `MinMaxScaler`. 

Min-Max scaling is typically the best choice for recommendation systems, especially when working with similarity metrics like cosine similarity or Euclidean distance.

There are a few reasons why we should do scaling for these numerical values:

* Scaling helps to standardize different scales across features. For example, features like `loudness` (negative values) and `tempo` (positive, larger range) have much different scales.
* Scaling also improves model performance because many ML algorithms, such as neural networks and matrix factorization (used in recommendation systems), will converge faster and perform better with normalized input data.

In [29]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DoubleType

# define numerical features to normalize
numerical_features = [
    'danceability', 'energy', 'key', 'loudness', 'mode', 
    'speechiness', 'acousticness', 'instrumentalness', 'liveness', 
    'valence', 'tempo', 'duration_ms', 'popularity'
]

# assemble numerical features into a single vector column
assembler = VectorAssembler(inputCols=numerical_features, outputCol="numerical_vector")
spotify_df_vectorized = assembler.transform(spotify_df_encoded)

# apply Min-Max Scaler
scaler = MinMaxScaler(inputCol="numerical_vector", outputCol="scaled_features")
scaler_model = scaler.fit(spotify_df_vectorized)
spotify_df_normalized = scaler_model.transform(spotify_df_vectorized)

# extract scaled features into separate columns
def extract_column(index):
    """UDF to extract specific column from dense vector."""
    return udf(lambda vector: float(vector[index]), returnType=DoubleType())

# loop through features and create new columns for each scaled feature
for i, feature in enumerate(numerical_features):
    spotify_df_normalized = spotify_df_normalized.withColumn(f"{feature}_scaled", extract_column(i)(col("scaled_features")))

# drop original and intermediate columns
columns_to_drop = numerical_features + ["numerical_vector", "scaled_features"]
spotify_df_normalized = spotify_df_normalized.drop(*columns_to_drop)

# show the scaled features
spotify_df_normalized.select(*[f"{feature}_scaled" for feature in numerical_features]).show(5, truncate=False)


+-------------------+-------------------+-------------------+------------------+-----------+--------------------+-------------------+-----------------------+-------------------+--------------+------------------+--------------------+------------------+
|danceability_scaled|energy_scaled      |key_scaled         |loudness_scaled   |mode_scaled|speechiness_scaled  |acousticness_scaled|instrumentalness_scaled|liveness_scaled    |valence_scaled|tempo_scaled      |duration_ms_scaled  |popularity_scaled |
+-------------------+-------------------+-------------------+------------------+-----------+--------------------+-------------------+-----------------------+-------------------+--------------+------------------+--------------------+------------------+
|0.4864048584089521 |0.30300000309944153|0.36363636363636365|0.747479463142717 |1.0        |0.044181255481279255|0.6967871600104057 |0.0                    |0.11500000208616257|0.139         |0.5336389734062985|0.03969260582199785 |0.68        

*Note:* The values in new columns (e.g. `energy_scaled`) are normalized to be between 0 and 1.

In [30]:
# review the dataframe
spotify_df_normalized.show(5)

+----------------+--------------------+--------------------+----+--------+-----------+--------------+--------------------+----------------+-------------------+-------------------+------------------+------------------+-----------+-------------------+--------------------+-----------------------+-------------------+--------------+-------------------+--------------------+-----------------+
|     artist_name|          track_name|            track_id|year|   genre|genre_index|  genre_onehot|  genre_onehot_array|artist_frequency|danceability_scaled|      energy_scaled|        key_scaled|   loudness_scaled|mode_scaled| speechiness_scaled| acousticness_scaled|instrumentalness_scaled|    liveness_scaled|valence_scaled|       tempo_scaled|  duration_ms_scaled|popularity_scaled|
+----------------+--------------------+--------------------+----+--------+-----------+--------------+--------------------+----------------+-------------------+-------------------+------------------+------------------+-----

## Group time periods (bucketing)

We want to group the years because bucketing years into ranges helps identify patterns or trends in specific time periods.

In [32]:
from pyspark.ml.feature import Bucketizer

# define time period splits and labels
splits = [1999, 2004, 2009, 2014, 2019, 2024]
bucket_labels = ["2000-2004", "2005-2009", "2010-2014", "2015-2019", "2020-2023"]

# add a new column for bucket indexes
bucketizer = Bucketizer(
    splits=splits, 
    inputCol="year", 
    outputCol="year_bucket"
)
spotify_df_bucket = bucketizer.transform(spotify_df_normalized)

# map bucket indices to labels
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# create a UDF to map bucket indices to bucket labels
bucket_label_udf = udf(lambda idx: bucket_labels[int(idx)], StringType())
spotify_df_bucket = spotify_df_bucket.withColumn("year_period", bucket_label_udf(col("year_bucket")))

# drop intermediate column (year_bucket) if no longer needed
spotify_df_bucket = spotify_df_bucket.drop("year_bucket")

# show results
spotify_df_bucket.select("track_name", "year", "year_period").show(10)


+--------------------+----+-----------+
|          track_name|year|year_period|
+--------------------+----+-----------+
|     I Won't Give Up|2012|  2010-2014|
|    93 Million Miles|2012|  2010-2014|
|    Do Not Let Me Go|2012|  2010-2014|
|            Fast Car|2012|  2010-2014|
|    Sky's Still Blue|2012|  2010-2014|
|       What They Say|2012|  2010-2014|
|Walking in a Wint...|2012|  2010-2014|
|       Dancing Shoes|2012|  2010-2014|
|Living in the Moment|2012|  2010-2014|
|              Heaven|2012|  2010-2014|
+--------------------+----+-----------+
only showing top 10 rows



In [33]:
# StringIndexer: Convert year_period into numerical indices
period_indexer = StringIndexer(inputCol="year_period", outputCol="year_period_index")
spotify_df_bucket = period_indexer.fit(spotify_df_bucket).transform(spotify_df_bucket)

# OneHotEncoder: Convert indices into one-hot encoded vectors
period_encoder = OneHotEncoder(inputCol="year_period_index", outputCol="year_period_onehot", dropLast=False)
spotify_df_bucket = period_encoder.fit(spotify_df_bucket).transform(spotify_df_bucket)

# Show the results
spotify_df_bucket.select("track_name", "year", "year_period", "year_period_index", "year_period_onehot").show(10, truncate=False)


+------------------------------+----+-----------+-----------------+------------------+
|track_name                    |year|year_period|year_period_index|year_period_onehot|
+------------------------------+----+-----------+-----------------+------------------+
|I Won't Give Up               |2012|2010-2014  |2.0              |(5,[2],[1.0])     |
|93 Million Miles              |2012|2010-2014  |2.0              |(5,[2],[1.0])     |
|Do Not Let Me Go              |2012|2010-2014  |2.0              |(5,[2],[1.0])     |
|Fast Car                      |2012|2010-2014  |2.0              |(5,[2],[1.0])     |
|Sky's Still Blue              |2012|2010-2014  |2.0              |(5,[2],[1.0])     |
|What They Say                 |2012|2010-2014  |2.0              |(5,[2],[1.0])     |
|Walking in a Winter Wonderland|2012|2010-2014  |2.0              |(5,[2],[1.0])     |
|Dancing Shoes                 |2012|2010-2014  |2.0              |(5,[2],[1.0])     |
|Living in the Moment          |2012|2010-2

*Note:* For the `year_period_onehot` column, the `(5, [2], [1.0])` format indicates a sparse one-hot encoded vector with 5 possible time periods (buckets), where the index `2` corresponds to the time period `2010-2014`.

In [35]:
# rename the finalized dataframe before moving on to EDA
spotify_df_final = spotify_df_bucket

In [36]:
spotify_df_final.show(5)

+----------------+--------------------+--------------------+----+--------+-----------+--------------+--------------------+----------------+-------------------+-------------------+------------------+------------------+-----------+-------------------+--------------------+-----------------------+-------------------+--------------+-------------------+--------------------+-----------------+-----------+-----------------+------------------+
|     artist_name|          track_name|            track_id|year|   genre|genre_index|  genre_onehot|  genre_onehot_array|artist_frequency|danceability_scaled|      energy_scaled|        key_scaled|   loudness_scaled|mode_scaled| speechiness_scaled| acousticness_scaled|instrumentalness_scaled|    liveness_scaled|valence_scaled|       tempo_scaled|  duration_ms_scaled|popularity_scaled|year_period|year_period_index|year_period_onehot|
+----------------+--------------------+--------------------+----+--------+-----------+--------------+--------------------+--

In [37]:
# print schema to verify data types
spotify_df_final.printSchema()


root
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- genre_index: double (nullable = false)
 |-- genre_onehot: vector (nullable = true)
 |-- genre_onehot_array: array (nullable = false)
 |    |-- element: double (containsNull = false)
 |-- artist_frequency: long (nullable = true)
 |-- danceability_scaled: double (nullable = true)
 |-- energy_scaled: double (nullable = true)
 |-- key_scaled: double (nullable = true)
 |-- loudness_scaled: double (nullable = true)
 |-- mode_scaled: double (nullable = true)
 |-- speechiness_scaled: double (nullable = true)
 |-- acousticness_scaled: double (nullable = true)
 |-- instrumentalness_scaled: double (nullable = true)
 |-- liveness_scaled: double (nullable = true)
 |-- valence_scaled: double (nullable = true)
 |-- tempo_scaled: double (nullable = true)
 |-- duration_ms_scaled: double (nullable 

*Note:* Reviewing the schema of the finalized dataframe:
* One-hot encoded features (`genre_onehot`, `year_period_onehot`) are represented as `vectors`. This `vector` type is suitable for recommendation systems and similarity algorithms.
* Scaled numerical features (`*_scaled`) are of `double` type, ensuring compatibility with analysis and modeling tasks.