# Introduction and Motivation

In the era of digital media, music streaming platforms have transformed how we discover and enjoy music. Services like Spotify have millions of tracks available at our fingertips, making music recommendation systems essential for enhancing user experience. Recognizing the significance of these systems, our team chose to explore the “Spotify Tracks, Genre, Audio Features” dataset from Kaggle (https://www.kaggle.com/datasets/pepepython/spotify-huge-database-daily-charts-over-3-years/data) using PySpark to analyze trends and build a music recommendation model.

Our motivation for selecting this dataset is rooted in a desire to engage with a more intriguing and relatable subject matter than the typical technical datasets we’ve encountered in previous classes. Music is a universal language, and understanding how recommendation systems work within this context is both exciting and relevant. Music recommendation models are increasingly important as the amount of data generated online continues to grow, impacting how users interact with streaming services.

The primary objectives of our project are twofold:
1. Data Analysis: We aim to understand trends within the music data by examining how different genres relate to specific audio feature metrics. For example, we want to investigate whether certain genres consistently exhibit higher danceability scores or how energy levels vary across genres.
2. Model Building: Utilizing machine learning techniques, we plan to develop a music recommendation model. This model will leverage the insights gained from our data analysis to suggest tracks that align with user preferences based on audio features.

Our project focuses on mainstream music, as the dataset predominantly features popular tracks. While we have considered algorithms like K-Nearest Neighbors for recommendation, we will determine the most suitable machine learning methods as we progress, ensuring they align with our dataset characteristics and project goals.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Data Collection

In [111]:
from pyspark.sql import SparkSession


if SparkSession._instantiatedSession is not None:
    print("An existing SparkSession is detected.")
    SparkSession._instantiatedSession.stop()


# Initialize a Spark session
# Start a new Spark session
spark = SparkSession.builder \
    .appName("MusicDataAnalysisProject") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()


spark.sparkContext.setLogLevel("ERROR")

# Path to your CSV file
csv_path = "drive/MyDrive/612_files/Final database.csv"

music_df = spark.read.option("header", "true").csv(csv_path, inferSchema=True)
music_df = music_df.withColumnRenamed("Country", "Country0")

# Show the first few rows to verify
music_df.show()


An existing SparkSession is detected.
+-----------+--------------------+------------------+---------------+--------------------+------------+-----------------+----------------+--------+--------------------+------------+------------+---------------+------------------+------------------+---+--------+----+-----------+-------------------+----------------+----------+-------+-------+-----------+--------------+---------+------------------+-------------------+--------------+-------------+-------+-----------+------+------+--------+---------+----------------+----+----+-------+-----+-----+----+-----+-----+-----+---+---+--------+---+------+---------+----+----+------------+---------+----------+--------+-------+----+-----+----+-----+------------+-------+----+----+-------+--------+-----+--------+--------+-------+----------+-------------------+------------+-------------------+-------------------+------------------+-------------+-------------------+------------------+-------------------+-----------+---

# Data Inspection and Validation

Numerical columns are String in the schema, so we'll convert them to numeric types later on (Data Transformation).

In [112]:
# Print the schema of the DataFrame
music_df.printSchema()

root
 |-- Country0: string (nullable = true)
 |-- Uri: string (nullable = true)
 |-- Popularity: double (nullable = true)
 |-- Title: string (nullable = true)
 |-- Artist: string (nullable = true)
 |-- Album/Single: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Artist_followers: string (nullable = true)
 |-- Explicit: string (nullable = true)
 |-- Album9: string (nullable = true)
 |-- Release_date: string (nullable = true)
 |-- Track_number: string (nullable = true)
 |-- Tracks_in_album: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acoustics: string (nullable = true)
 |-- instrumentalness: string (nullable = true)
 |-- liveliness: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- tempo: string (nullable = true)
 |-- duration_ms: 

Check for null and duplicates in the dataset. Also, check the distribution of popularity scores because later we will categoprize the data by low, medium, and high popularity levels.

In [113]:
from pyspark.sql.functions import col, sum, count, when
# Count null values in each column
null_counts = music_df.select([count(when(col(c).isNull(), c)).alias(c) for c in music_df.columns])
null_counts.show()

# Check for duplicates
duplicate_count = music_df.count() - music_df.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count}")

# Check popularity range for validity (e.g., ensure no extreme outliers)
music_df.select("Popularity").describe().show()


+--------+---+----------+-----+------+------------+-----+----------------+--------+------+------------+------------+---------------+------------+------+---+--------+----+-----------+---------+----------------+----------+-------+-----+-----------+--------------+---------+------------------+-------------------+--------------+-------------+-------+-----------+------+------+--------+---------+----------------+----+----+-------+-----+-----+----+-----+-----+-----+---+---+--------+---+------+---------+----+----+------------+---------+----------+--------+-------+-----+-----+-----+-----+------------+-------+-----+-----+-------+--------+-----+--------+--------+-------+----------+-----------------+------------+---------+--------+------------+-------------+----------+-------------+-------------+-----------+------------------+-------------+----------+---------+-------------+--------------+-----------+--------------+--------------+---------------+---------------+-----+--------------+-------------+--

# Data Transformation

In [114]:
from pyspark.sql.functions import col

# List of columns to cast to integer or double as appropriate
int_columns = ["Track_number", "Tracks_in_album", "time_signature", "Argentina", "Australia", "Austria",
               "Belgium", "Brazil", "Canada", "Chile", "Colombia", "Costa Rica", "Denmark", "Ecuador",
               "Finland", "France", "Germany", "Global", "Indonesia", "Ireland", "Italy", "Malaysia",
               "Mexico", "Netherlands", "New Zealand", "Norway", "Peru", "Philippines", "Poland",
               "Portugal", "Singapore", "Spain", "Sweden", "Switzerland", "Taiwan", "Turkey", "UK", "USA"]

double_columns = ["Popularity", "Artist_followers", "danceability", "energy", "key", "loudness",
                  "mode", "speechiness", "acoustics", "instrumentalness", "liveliness", "valence",
                  "tempo", "duration_ms", "Days_since_release", "syuzhet_norm", "bing_norm", "afinn_norm",
                  "nrc_norm", "syuzhet", "bing", "afinn", "nrc", "anger", "anticipation", "disgust", "fear",
                  "joy", "sadness", "surprise", "trust", "negative", "positive", "n_words", "anger_norm",
                  "anticipation_norm", "disgust_norm", "fear_norm", "joy_norm", "sadness_norm", "surprise_norm",
                  "trust_norm", "negative_norm", "positive_norm", "anger_norm2", "anticipation_norm2",
                  "disgust_norm2", "fear_norm2", "joy_norm2", "sadness_norm2", "surprise_norm2", "trust_norm2",
                  "negative_norm2", "positive_norm2", "negative_bog_jr", "positive_bog_jr", "Bayes",
                  "Negative_Bayes", "Neutral_Bayes", "Positive_Bayes", "Desire", "Explore", "Fun",
                  "Hope", "Love", "Nostalgia", "Thug", "bing_norm_negative"]

transformed_df = music_df

# Cast columns to integer
for column in int_columns:
    transformed_df = transformed_df.withColumn(column, col(column).cast("int"))

# Cast columns to double
for column in double_columns:
    transformed_df = transformed_df.withColumn(column, col(column).cast("double"))

# Fill Null values (REPLACE WITH BETTER FILLING METHOD HERE)
transformed_df = transformed_df.fillna(0)

# Verify the updated schema
transformed_df.printSchema()


root
 |-- Country0: string (nullable = true)
 |-- Uri: string (nullable = true)
 |-- Popularity: double (nullable = false)
 |-- Title: string (nullable = true)
 |-- Artist: string (nullable = true)
 |-- Album/Single: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Artist_followers: double (nullable = false)
 |-- Explicit: string (nullable = true)
 |-- Album9: string (nullable = true)
 |-- Release_date: string (nullable = true)
 |-- Track_number: integer (nullable = true)
 |-- Tracks_in_album: integer (nullable = true)
 |-- danceability: double (nullable = false)
 |-- energy: double (nullable = false)
 |-- key: double (nullable = false)
 |-- loudness: double (nullable = false)
 |-- mode: double (nullable = false)
 |-- speechiness: double (nullable = false)
 |-- acoustics: double (nullable = false)
 |-- instrumentalness: double (nullable = false)
 |-- liveliness: double (nullable = false)
 |-- valence: double (nullable = false)
 |-- tempo: double (nullable = false)
 |-

In [115]:
# Define popularity categories for later
transformed_df = transformed_df.withColumn(
    "PopularityCategory",
    when(transformed_df.Popularity < 5000, "Low")
    .when((transformed_df.Popularity >= 5000) & (transformed_df.Popularity < 18000), "Medium")
    .otherwise("High")
)

# Data Filtering

In [116]:
# Further refine by North America
filtered_df = transformed_df.filter((music_df.Country0 == "USA") | (music_df.Country0 == "Canada") | (music_df.Country0 == "Mexico"))

print(f"Number of rows: {filtered_df.count()}")


Number of rows: 15113


# Feature Seleciton

In [122]:
# Using filter methods to perform feature reduction
from pyspark.sql.functions import corr
from pyspark.sql.types import StringType
import math
correlations = []

for column in filtered_df.columns:
  if column != 'PopularityCategory' and column != 'Popularity' and not isinstance(filtered_df.schema[column].dataType, StringType):
    correlation = filtered_df.select(corr(column, 'Popularity')).collect()[0][0]
    if correlation != None and not math.isnan(correlation):
     correlations.append((column, abs(correlation)))

correlations.sort(key=lambda x: x[1], reverse=True)
# Take only the top 40 results
correlations = correlations[:40]
print(correlations)

[('Top50_dummy', 0.4346431592670897), ('Popu_max', 0.4340389702408112), ('latin', 0.15300185707867855), ('Mexico', 0.1527521861578449), ('Artist_followers', 0.11137668540120713), ('danceability', 0.09937458018679889), ('USA', 0.08694518459194184), ('loudness', 0.08289056786066452), ('hip hop', 0.081238495360119), ('Track_number', 0.07619440454606241), ('single', 0.07152882386888473), ('valence', 0.07038985083763921), ('Tracks_in_album', 0.06620718654602113), ('album31', 0.065794229677695), ('Explore', 0.05704048392975408), ('Top10_dummy', 0.055777983690837266), ('Released_after_2017', 0.04954463542320316), ('anger_norm', 0.04935757335234637), ('negative_norm', 0.04629730720985017), ('Explicit_true', 0.04590524850461089), ('Explicit_false', 0.044767982751107945), ('positive_norm', 0.04358691767821842), ('fear_norm', 0.04340278886681954), ('trust_norm', 0.04287677048413806), ('anticipation_norm', 0.042529791221222434), ('sadness_norm', 0.04240922408778379), ('k-pop', 0.04167491599878745)

In [64]:
# Convert popularity category we made prior to an indexed label
from pyspark.ml.feature import StringIndexer

filtered_df = filtered_df.drop("Popularity")

indexer = StringIndexer(inputCol="PopularityCategory", outputCol="label")
model1_df = indexer.fit(filtered_df).transform(filtered_df)

# Filter out the remaining non-numeric columns
feature_columns = [col for col, dtype in filtered_df.dtypes if dtype != 'string']

In [65]:
# Assemble features
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid = "keep")
model1_df = assembler.transform(model1_df).select("features", "label")
model1_df.select("features").show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                                                          

In [66]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf_tester = RandomForestClassifier(featuresCol="features", labelCol="label")
rf_wrapper_model = rf_tester.fit(model1_df)

importance = rf_wrapper_model.featureImportances
feature_importance = sorted(zip(importance, feature_columns), key=lambda x: x[0], reverse=True)
# take only the top 40 results
feature_importance = feature_importance[:40]
print(feature_importance)


[(0.3179169947541516, 'Top50_dummy'), (0.27548682994406687, 'Top10_dummy'), (0.23806578665253345, 'Popu_max'), (0.03904206994881784, 'Tracks_in_album'), (0.03353870982798258, 'Days_since_release'), (0.008514261686112214, 'metal'), (0.008388226379688895, 'latin'), (0.008144821108742232, 'Track_number'), (0.00616771658064309, 'danceability'), (0.00559774139963169, 'loudness'), (0.005381645535034464, 'single'), (0.004798939416514374, 'album31'), (0.004793718540906223, 'Artist_followers'), (0.0038489916913674134, 'Explicit_true'), (0.0038390211336067357, 'instrumentalness'), (0.002975111747403256, 'speechiness'), (0.0027353631880354923, 'energy'), (0.0026459868526021147, 'Explicit_false'), (0.00253809992421598, 'duration_ms'), (0.0021594765853787208, 'else'), (0.0021084674688736016, 'hip hop'), (0.0016257825716336567, 'n_words'), (0.0014651757422487863, 'tempo'), (0.0012311972839006249, 'positive_bog_jr'), (0.0012090689894723224, 'negative_norm2'), (0.0011375489441969634, 'pop'), (0.000898

In [123]:
# Combining results - features that don't appear in either feature_importance or correlations will be removed

filtered_fnames = [tup[0] for tup in correlations]

#filtered_df = filtered_df.select(filtered_fnames)
#filtered_df.show()


# Model 1 (Popularity Predictor)

In [132]:
from ast import mod

from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Re-assemble filtered features
feature_columns = [col for col, dtype in filtered_df.dtypes if dtype != 'string']

indexer = StringIndexer(inputCol="PopularityCategory", outputCol="label")
model1_df = indexer.fit(filtered_df).transform(filtered_df)

for fname in filtered_fnames:
  model1_df = model1_df.withColumn(fname, col(fname).cast("double"))

model1_df = model1_df.dropna()

assembler = VectorAssembler(inputCols=filtered_fnames, outputCol="features", handleInvalid = "keep")
model1_df = assembler.transform(model1_df).select("features", "label")

model1_df.select("features").show(truncate=False)

# Split data into train and test sets
train_df, test_df = model1_df.randomSplit([0.8, 0.2], seed=42)
#big, small = model1_df.randomSplit([0.9, 0.1], seed=42)
#train_df, test_df = small.randomSplit([0.8, 0.2], seed=42)

# Initialize the classifiers
rf_classifier = RandomForestClassifier(featuresCol="features", labelCol="label", seed=42)
dt_classifier = DecisionTreeClassifier(featuresCol="features", labelCol="label", seed=42)
lr_classifier = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                                                                       |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [133]:
# Create parameter grids for each classifier
rf_paramGrid = (ParamGridBuilder()
                .addGrid(rf_classifier.numTrees, [50, 100])
                .addGrid(rf_classifier.maxDepth, [5, 10])
                .build())

dt_paramGrid = (ParamGridBuilder()
                .addGrid(dt_classifier.maxDepth, [5, 10, 15])
                .addGrid(dt_classifier.minInstancesPerNode, [1, 2, 4])
                .build())

lr_paramGrid = (ParamGridBuilder()
                .addGrid(lr_classifier.regParam, [0.01, 0.1, 1.0])
                .addGrid(lr_classifier.elasticNetParam, [0.0, 0.5, 1.0])
                .build())

In [134]:
# Evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Set up CrossValidators for each model
rf_cv = CrossValidator(estimator=rf_classifier,
                       estimatorParamMaps=rf_paramGrid,
                       evaluator=evaluator,
                       numFolds=3,
                       parallelism=2)

dt_cv = CrossValidator(estimator=dt_classifier,
                       estimatorParamMaps=dt_paramGrid,
                       evaluator=evaluator,
                       numFolds=3,
                       parallelism=2)

lr_cv = CrossValidator(estimator=lr_classifier,
                       estimatorParamMaps=lr_paramGrid,
                       evaluator=evaluator,
                       numFolds=3,
                       parallelism=2)

In [135]:
# Fit the models
print("Fitting Random Forest model...")
rf_model = rf_cv.fit(train_df)
print("Fitting Decision Tree model...")
dt_model = dt_cv.fit(train_df)
print("Fitting Logistic Regression model...")
lr_model = lr_cv.fit(train_df)

Fitting Random Forest model...
Fitting Decision Tree model...
Fitting Logistic Regression model...


In [136]:
# Make predictions on the test set
rf_predictions = rf_model.transform(test_df)
dt_predictions = dt_model.transform(test_df)
lr_predictions = lr_model.transform(test_df)

# Evaluate the models
rf_accuracy = evaluator.evaluate(rf_predictions)
dt_accuracy = evaluator.evaluate(dt_predictions)
lr_accuracy = evaluator.evaluate(lr_predictions)

print(f"Random Forest Accuracy: {rf_accuracy}")
print(f"Decision Tree Accuracy: {dt_accuracy}")
print(f"Logistic Regression Accuracy: {lr_accuracy}")

Random Forest Accuracy: 0.8570627802690582
Decision Tree Accuracy: 0.8239910313901345
Logistic Regression Accuracy: 0.8256726457399103


In [137]:
# Determine the best model
accuracies = {'Random Forest': rf_accuracy, 'Decision Tree': dt_accuracy, 'Logistic Regression': lr_accuracy}
best_model_name = max(accuracies, key=accuracies.get)
print(f"The best model is: {best_model_name} with an accuracy of {accuracies[best_model_name]}")

# Access the best model and its parameters
if best_model_name == 'Random Forest':
    best_model = rf_model.bestModel
    best_params = rf_model.bestModel.extractParamMap()
elif best_model_name == 'Decision Tree':
    best_model = dt_model.bestModel
    best_params = dt_model.bestModel.extractParamMap()
else:
    best_model = lr_model.bestModel
    best_params = lr_model.bestModel.extractParamMap()

print("Best Model Parameters:")
for param, value in best_params.items():
    print(f"{param}: {value}")

The best model is: Random Forest with an accuracy of 0.8570627802690582
Best Model Parameters:
RandomForestClassifier_f763d3524bef__bootstrap: True
RandomForestClassifier_f763d3524bef__cacheNodeIds: False
RandomForestClassifier_f763d3524bef__checkpointInterval: 10
RandomForestClassifier_f763d3524bef__featureSubsetStrategy: auto
RandomForestClassifier_f763d3524bef__featuresCol: features
RandomForestClassifier_f763d3524bef__impurity: gini
RandomForestClassifier_f763d3524bef__labelCol: label
RandomForestClassifier_f763d3524bef__leafCol: 
RandomForestClassifier_f763d3524bef__maxBins: 32
RandomForestClassifier_f763d3524bef__maxDepth: 10
RandomForestClassifier_f763d3524bef__maxMemoryInMB: 256
RandomForestClassifier_f763d3524bef__minInfoGain: 0.0
RandomForestClassifier_f763d3524bef__minInstancesPerNode: 1
RandomForestClassifier_f763d3524bef__minWeightFractionPerNode: 0.0
RandomForestClassifier_f763d3524bef__numTrees: 50
RandomForestClassifier_f763d3524bef__predictionCol: prediction
RandomFore

# Model 2 (KNN Music Recommender)

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
from pyspark.sql import DataFrame

# Step 1: Aggregate features by Uri, retaining Title and Artist
model2_df = transformed_df.groupBy("Uri").agg(
    F.first("Title").alias("Title"),
    F.first("Artist").alias("Artist"),
    F.avg("danceability").alias("danceability"),
    F.avg("energy").alias("energy"),
    F.avg("loudness").alias("loudness"),
    F.avg("acoustics").alias("acoustics"),
    F.avg("instrumentalness").alias("instrumentalness"),
    F.avg("liveliness").alias("liveliness"),
    F.avg("valence").alias("valence"),
    F.avg("tempo").alias("tempo")
)

audio_features = ['danceability', 'energy', 'loudness', 'acoustics',
                  'instrumentalness', 'liveliness', 'valence', 'tempo']
model2_df = model2_df.fillna(0, subset=audio_features)
# Combine audio features into a single vector column
assembler = VectorAssembler(inputCols=audio_features, outputCol="features")
data_with_features = assembler.transform(model2_df)  # Replace 'data' with your PySpark DataFrame



In [None]:
# Step 2: Standardize the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(data_with_features)
data_scaled = scaler_model.transform(data_with_features)

In [None]:
# Step 3: Define the recommendation function including Title and Artist in the output
def get_recommendations(df: DataFrame, target_uri: str, n_recommendations: int = 5) -> DataFrame:
    """
    Get song recommendations based on Euclidean distance to a target song by Uri.

    Parameters:
    df (DataFrame): The PySpark DataFrame containing scaled features.
    target_uri (str): The Uri of the target song.
    n_recommendations (int): Number of recommendations to retrieve.

    Returns:
    DataFrame: A PySpark DataFrame of recommended songs including Uri, Title, and Artist.
    """
    # Extract the feature vector of the target song by Uri
    target_song = df.where(F.col("Uri") == target_uri).select("scaled_features").first()

    # Check if target_song exists
    if not target_song:
        raise ValueError(f"Uri {target_uri} not found in the dataset.")

    target_vector = target_song["scaled_features"]

    # Define a UDF to calculate Euclidean distance
    def euclidean_distance(v1, v2):
        return float(v1.squared_distance(v2)) ** 0.5

    distance_udf = F.udf(lambda v: euclidean_distance(target_vector, v), FloatType())

    # Calculate distances and get the closest songs
    df_with_distances = df.withColumn("distance", distance_udf(F.col("scaled_features")))
    recommendations = (df_with_distances
                       .filter(F.col("Uri") != target_uri)  # Exclude the target song itself
                       .select("Uri", "Title", "Artist", "distance")  # Include Uri, Title, Artist
                       .orderBy("distance")
                       .limit(n_recommendations))

    return recommendations

In [None]:
# Example usage:
recommendations_df = get_recommendations(data_scaled, target_uri="https://open.spotify.com/track/6FyRXC8tJUh863JCkyWqtk", n_recommendations=5)
recommendations_df.show()

+--------------------+------------------+--------------------+----------+
|                 Uri|             Title|              Artist|  distance|
+--------------------+------------------+--------------------+----------+
|https://open.spot...|        adan y eva|        Paulo Londra|0.04859169|
|https://open.spot...|la jeepeta - remix|Nio Garcia - Anue...|0.43462214|
|https://open.spot...|la jeepeta - remix|Nio Garcia - Anue...|0.44388047|
|https://open.spot...|       fuego lento|          Drake Bell| 0.5159253|
|https://open.spot...|     supuestamente|    Ozuna - Anuel AA|0.57871056|
+--------------------+------------------+--------------------+----------+

