In [1]:
#!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=a8fad28035e732e4cd3556eaa6d4b1e2a4a74d1344d0a89d424a83967ed1f076
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


#Mount Shared Drive

Mount a google shared drive in order to develop ETL for model developmentation

In [2]:
import pandas as pd
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


# Create Spark Session

Create a spark session called spotify which will handle Spark functionalities.

In [3]:
from pyspark.sql import SparkSession

# Create Spark Enviroment
ss = SparkSession.builder.appName("spotify").getOrCreate()

#Sample Playlist Data

The dataset filled with a million playlists consists of 33.54 GB of json data files. On a cluster of servers, it should not be an issue, however, we are limited to a pro version of Google Colab which cannot handle the large dataset. We, therefore, need to resample the dataset to obtain a smaller working size. The sample size chosen was 5 files consisting of the portion of the playlist within the dataset. This, however, is a more manageable data size for our computational power.

In [5]:
import os
import random
import re
import shutil

#============= Sample Data ================
def random_sample_from_folder(folder_path, sample_size=10):
    """
    Select a random sample of files from the specified folder.

    :param folder_path: Path to the folder from which to sample files.
    :param sample_size: Number of files to sample.
    :return: A list of paths to the randomly sampled files.
    """
    # List all files in the folder
    all_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]

    # Determine the sample size (can't be larger than the number of files)
    actual_sample_size = min(sample_size, len(all_files))

    # Randomly sample files
    sampled_files = random.sample(all_files, actual_sample_size)

    return sampled_files

# Create Sample
folder_path = "/content/gdrive/Shareddrives/737_Project/spotify_million_playlist_dataset/data/"
sample_size = 5  # Adjust as needed
sampled_files = random_sample_from_folder(folder_path, sample_size)


#============= Abstract File Name ================
# Regular expression pattern to match the file name at the end of the path
pattern = r'/([^/]+\.json)$'

# Initialize a list to hold the matched file names
matched_file_names = []

# Iterate over the file paths and match the pattern
for file_path in sampled_files:
    match = re.search(pattern, file_path)
    if match:
        matched_file_names.append(match.group(1))  # Append the matched file name

# Print all matched file names
#for file_name in matched_file_names:
#    print(file_name)

#print(len(sampled_files))

#============= Move Data To Sampled Path ================
source_folder = '/content/gdrive/Shareddrives/737_Project/spotify_million_playlist_dataset/data/'
destination_folder = '/content/gdrive/Shareddrives/737_Project/spotify_million_playlist_dataset/sampled_data'

# Create New File
if not os.path.exists(destination_folder):
    os.makedirs(destination_folder)

# Move each file to the destination folder
for file in matched_file_names:
  # Construct the full file path
  source_file_path = os.path.join(source_folder, file)
  destination_file_path = os.path.join(destination_folder, file)

  # Move the file
  shutil.move(source_file_path, destination_file_path)

5


## Load sampled data in to Pyspark

This loads our data files within a sampled folder and reads it to a spark data frame. It can handle json file as seen in the "View of sampled data files by track_uri" within the next block

In [53]:
# Folder Path of Playlist
folder_path = "/content/gdrive/Shareddrives/737_Project/spotify_million_playlist_dataset/sampled_data/*"

df_playlist = ss.read.option("multiline", "true").json(folder_path)
#df = ss.read.option("mergeSchema", "true").json(folder_path)

## View of sampled data files by track_uri

Here we can see that the files have been properly loaded as we can witness pid and track_uri. Pid is the playlist id and the track_uri is the track identifier. This is for future implementations to build out song recommendations for playlists. The current implementation only has recommendations based on a single song.

In [54]:
from pyspark.sql.functions import explode, col
from pyspark.sql.functions import lit
tracks_df = df_playlist.select(explode("playlists").alias("playlist")) \
              .select("playlist.pid", explode("playlist.tracks").alias("track")) \
              .select("pid", "track.track_uri")

#tracks_df = tracks_df.withColumn("rating", lit(1))
tracks_df.show()

+------+--------------------+
|   pid|           track_uri|
+------+--------------------+
|150000|spotify:track:5CB...|
|150000|spotify:track:4Sn...|
|150000|spotify:track:50Y...|
|150000|spotify:track:636...|
|150000|spotify:track:2ay...|
|150000|spotify:track:5IM...|
|150000|spotify:track:5C9...|
|150000|spotify:track:4tD...|
|150000|spotify:track:15W...|
|150000|spotify:track:2kD...|
|150000|spotify:track:3yt...|
|150000|spotify:track:50K...|
|150000|spotify:track:0TY...|
|150000|spotify:track:0fy...|
|150000|spotify:track:2bh...|
|150001|spotify:track:76V...|
|150001|spotify:track:6eF...|
|150001|spotify:track:6Jn...|
|150001|spotify:track:0CK...|
|150001|spotify:track:7dC...|
+------+--------------------+
only showing top 20 rows



#Recommend Users who like a track to an Artist

## Read Music Data

Load the data into spark and clean out columns that won't be need in model developmentation.

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, BooleanType

# Load dataset
spotify_song_df = ss.read.csv("/content/gdrive/Shareddrives/737_Project/spotify_million_playlist_dataset/spotify_data.csv", header=True, inferSchema = True)
col_drop = ['_c0','key', 'duration_ms', 'time_signature'] #['Unnamed: 0','artist_name', 'track_name', 'key', 'duration_ms', 'time_signature']
spotify_song_df = spotify_song_df.drop(*col_drop)
spotify_song_df.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- explicit: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- tempo: double (nullable = true)
 |-- track_genre: string (nullable = true)



In [5]:
spotify_song_df.show(truncate = False)

+----------------------+------------------------------------+------------------------------------------------------+--------------------------------+----------+--------+------------+------+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+
|track_id              |artists                             |album_name                                            |track_name                      |popularity|explicit|danceability|energy|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo  |track_genre|
+----------------------+------------------------------------+------------------------------------------------------+--------------------------------+----------+--------+------------+------+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+
|5SuOikwiRyPMVoIQDJUgSV|Gen Hoshino                         |Comedy                                                |Comedy                          |73

## ALS Model For Artist Recommendation Based on Popularity

The ALS model develops a recommendation system for a user who likes a song to explore other artists that are of the same popularity. The model takes into consideration the track_id, the artist_id, and popularity column to offer the top 10 recommended artist for a user who likes a song.

In [56]:
from pyspark.ml.feature import StringIndexer, IndexToString, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Convert columns to data types
df = spotify_song_df \
    .withColumn('popularity', spotify_song_df['popularity'].cast('int')) \
    .withColumn('explicit', spotify_song_df['explicit'].cast('boolean')) \
    .withColumn('danceability', spotify_song_df['danceability'].cast('double')) \
    .withColumn('energy', spotify_song_df['energy'].cast('double')) \
    .withColumn('loudness', spotify_song_df['loudness'].cast('double')) \
    .withColumn('mode', spotify_song_df['mode'].cast('int')) \
    .withColumn('speechiness', spotify_song_df['speechiness'].cast('double')) \
    .withColumn('acousticness', spotify_song_df['acousticness'].cast('double')) \
    .withColumn('instrumentalness', spotify_song_df['instrumentalness'].cast('double')) \
    .withColumn('liveness', spotify_song_df['liveness'].cast('double')) \
    .withColumn('valence', spotify_song_df['valence'].cast('double')) \
    .withColumn('tempo', spotify_song_df['tempo'].cast('double'))

# Drop rows with NaN values in any column
spotify_song_df = spotify_song_df.dropna(how='any')

# StringIndexer to convert string fields to indices which will be used by ALS
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['popularity', 'duration_ms', 'explicit','track_genre', "_c0"])) ]

# Index the genre column
genre_indexer = StringIndexer(inputCol='track_genre', outputCol='track_genre_index')#.fit(df)

# One-hot encode the indexed genre column
genre_encoder = OneHotEncoder(inputCol='track_genre_index', outputCol='track_genre_vec')

assembler=VectorAssembler(inputCols=['track_id', 'artists', 'album_name', 'track_name', 'popularity',\
                                     'explicit', 'danceability', 'energy', 'loudness', 'mode', 'speechiness',\
                                     'acousticness', 'instrumentalness', 'liveness', 'valence', 'track_genre_vec'], outputCol='features')

# Define the ALS algorithm
als = ALS(
    userCol="track_id_index",
    itemCol="artists_index",
    ratingCol="popularity",
    nonnegative=True,
    implicitPrefs=False,
    coldStartStrategy="drop"
)

# Now, add these to your pipeline
pipeline_stages = indexers + [genre_indexer, genre_encoder, als]
pipeline = Pipeline(stages=pipeline_stages)

# Split the data into training and test sets
(training_data, test_data) = df.randomSplit([0.8, 0.2])

# Fit the model
model = pipeline.fit(training_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model by computing the RMSE on the test data
evaluator = RegressionEvaluator(metricName="rmse", labelCol="popularity", predictionCol="prediction")

rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 2.04544


This portion recommends a user 10 artists based on songs they like.

In [57]:
# Get 10 artist to recommendation for a track
user_recs = model.stages[-1].recommendForAllUsers(10)

user_recs.show(truncate=False)

+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|track_id_index|recommendations                                                                                                                                                                                       |
+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26            |[{29461, 136.20064}, {17011, 114.20156}, {18244, 113.6354}, {24863, 112.77322}, {12008, 106.92717}, {7920, 103.25787}, {12620, 102.65349}, {27968, 102.62045}, {29804, 98.584236}, {27586, 96.387054}]|
|27            |[{99, 0.0}, {98, 0.0}, {97, 0.0}, {96, 0.0}, {95, 0.0}, {94, 0.0}, {93, 0.0}, {92, 0.0}, {91, 0.0}, {90, 0.0}]          

# Song Recomender

The song recommender recommends songs to a user based on a song that they like. The model is built using cosine similarity which takes into account variables on aspects of the song to recommend similar ones. The model takes a song name and recommends 10 other songs that the user might like.

In [22]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Normalizer
from pyspark.ml import Pipeline

# Drop NaNs
spotify_song_df = spotify_song_df.dropna(how='any')

# Convert columns to data types
spotify_song_df = spotify_song_df \
    .withColumn('popularity', spotify_song_df['popularity'].cast('int')) \
    .withColumn('explicit', spotify_song_df['explicit'].cast('boolean')) \
    .withColumn('danceability', spotify_song_df['danceability'].cast('double')) \
    .withColumn('energy', spotify_song_df['energy'].cast('double')) \
    .withColumn('loudness', spotify_song_df['loudness'].cast('double')) \
    .withColumn('mode', spotify_song_df['mode'].cast('int')) \
    .withColumn('speechiness', spotify_song_df['speechiness'].cast('double')) \
    .withColumn('acousticness', spotify_song_df['acousticness'].cast('double')) \
    .withColumn('instrumentalness', spotify_song_df['instrumentalness'].cast('double')) \
    .withColumn('liveness', spotify_song_df['liveness'].cast('double')) \
    .withColumn('valence', spotify_song_df['valence'].cast('double')) \
    .withColumn('tempo', spotify_song_df['tempo'].cast('double'))

# Example of handling categorical variables
features = ['popularity', 'danceability', 'energy', 'loudness', 'mode', 'speechiness',
            'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo']

assembler_audio = VectorAssembler(inputCols=features, outputCol="audio_vec")

normalizer = Normalizer(inputCol="audio_vec", outputCol="normFeatures")

stringIndexer = StringIndexer(inputCol="track_genre", outputCol="genreIndexed")
encoder = OneHotEncoder(inputCols=["genreIndexed"], outputCols=["genreVec"])

# Assuming other features are numerical and already in the desired format
assembler = VectorAssembler(inputCols=["normFeatures", "genreVec"], outputCol="features")

pipeline = Pipeline(stages=[assembler_audio, normalizer, stringIndexer, encoder, assembler])
pipelineModel = pipeline.fit(spotify_song_df)
df_transformed = pipelineModel.transform(spotify_song_df)

In [23]:
df_transformed.show(truncate=False)

+----------------------+------------------------------------+------------------------------------------------------+--------------------------------+----------+--------+------------+------+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|track_id              |artists                             |album_name                                          

Data cleaning process to convert a sparse vector to an array and then to a list. Needed to obtain a cosine similarity score as we will need to take the dot product of the vectors.

In [24]:
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.functions import udf

# UDF to convert a sparse vector to a dense vector
def sparse_to_dense(sparse_vector):
    return sparse_vector.toArray().tolist()

# Register the UDF with the appropriate return type
sparse_to_array_udf = udf(sparse_to_dense, ArrayType(DoubleType()))

# Apply the UDF to the normalized feature column
df_transformed = df_transformed.withColumn("features", sparse_to_array_udf("features"))

In [25]:
df_transformed.show(truncate=False)

+----------------------+------------------------------------+------------------------------------------------------+--------------------------------+----------+--------+------------+------+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Cosine similarity score to determine the similarity between two songs.

In [26]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import DoubleType
import numpy as np

# UDF to calculate the dot product of two vectors
def cosine_similarity(list_a, list_b):
    # Convert lists to numpy arrays
    np_a = np.array(list_a)
    np_b = np.array(list_b)
    # Calculate dot product using numpy
    #return float(np.dot(np_a, np_b)) #/ (np.linalg.norm(np_a) * np.linalg.norm(np_b))

    # Calculate dot product of features
    dot_product = np.dot(np_a, np_b)
    dot_product = float(dot_product)

    # Calculate the normalization of list a
    norm_vec1 = np.linalg.norm(list_a)
    norm_vec1 = float(norm_vec1)

    # Calculate the normalization of list b
    norm_vec2 = np.linalg.norm(list_b)
    norm_vec2 = float(norm_vec2)

    # Calculate the cosine similarity of all songs
    cosine_similarity = dot_product / (norm_vec1 * norm_vec2) if norm_vec1 != 0 and norm_vec2 != 0 else 0.0
    return cosine_similarity

cosine_similarity_udf = udf(cosine_similarity, DoubleType())

Recommend the top 10 songs based on a song track.

In [53]:
from pyspark.sql.functions import col, lit
import pyspark.sql.functions as F

# Assuming `song_id` is a unique identifier for each song
def recommend_songs(song_id, df):
    # Filter out the target song
    target_song = df.filter(df.track_id == song_id).select("features").collect()[0][0]
    #return target_song

    # Add a column for the target song's features
    df_with_similarity = df.withColumn("targetFeatures", F.array([F.lit(x) for x in target_song]))
    #df_with_similarity = df.withColumn("targetFeatures", lit(target_song))

    # Calculate cosine similarity between target song and all songs
    df_with_similarity = df_with_similarity.withColumn("similarity", cosine_similarity_udf("features", "targetFeatures"))

    # Sort by similarity and fetch top 10 songs
    df_filtered = df_with_similarity.filter(df_with_similarity.track_id != song_id)
    top_songs = df_filtered.sort(col("similarity").desc()).limit(10)

    return top_songs

# Example usage
song_id = '6DCZcSspjsKoFjzjrWoCdn'
top_recommendations = recommend_songs(song_id, df_transformed)
top_recommendations
top_recommendations.show(truncate=False)

+----------------------+------------------------+----------------------------------------------------+-----------------------------------------+----------+--------+------------+------+--------+----+-----------+------------+----------------+--------+-------+------+-----------+----------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [32]:
df_transformed.filter(df_transformed.artists == 'Drake').show(truncate=False)

+----------------------+-------+----------+----------+----------+--------+------------+------+--------+----+-----------+------------+----------------+--------+-------+------+-----------+---------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Find similar songs

The function takes in a query and returns the data we want about a song to search for.

In [43]:
def search_songs_in_df(query):
    # Filter DataFrame where song_name contains the query string (case-insensitive)
    filtered_df = df_transformed.filter(df_transformed.track_name.contains(query)).select("track_id", "track_name", 'artists', 'album_name').limit(10)
    return filtered_df

In [59]:
import ipywidgets as widgets
from IPython.display import display, clear_output

# Input text and button for the search
text = widgets.Text(value='', placeholder='Type a song name', description='Search:', disabled=False)
button = widgets.Button(description="Search")

# Dropdown for selecting a song from search results
dropdown = widgets.Dropdown(options=['Select a song'], description='Results:', disabled=False)

# Output widget for displaying the track ID
output = widgets.Output()

display(text, button, dropdown, output)


def on_button_clicked(b):
    with output:
        clear_output()
        query = text.value
        if query:
            results_df = search_songs_in_df(query).collect()
            # Update dropdown options with search results
            dropdown.options = [(row.track_name, row.artists, row.album_name, row.track_id) for row in results_df]
            dropdown.disabled = False if results_df else True
        else:
            print("Please enter a query.")

def on_dropdown_change(change):
    with output:
        if change['name'] == 'value' and change['new']:
            clear_output()
            print(f"Track Name: {change['new'][0]}")
            print(f"Artist: {change['new'][1]}")
            print(f"Album: {change['new'][2]}")
            print(f"Track ID: {change['new'][3]}")

            song_id = change['new'][3]
            top_recommendations = recommend_songs(song_id, df_transformed)
            top_recommendations.select("track_name", "artists", "album_name").show(truncate=False)


button.on_click(on_button_clicked)
dropdown.observe(on_dropdown_change, names='value')


Text(value='', description='Search:', placeholder='Type a song name')

Button(description='Search', style=ButtonStyle())

Dropdown(description='Results:', options=('Select a song',), value='Select a song')

Output()

Fin