This notebook serves as an example of using PySpark to explore big data, as well as explore the Spotify API functionality, and build a deep-embedding recommendation system. I have some good examples of SQL queries for EDA

In [None]:
%%capture
!pip install pyspark
import numpy as np 
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns
%config InlineBackend.figure_format = 'svg'
sns.set_palette('rainbow')
sns.set_style('whitegrid')
import plotly.express as px
# these 2 lines fix a sporatic loading error in plotly
from plotly.offline import plot, iplot, init_notebook_mode
init_notebook_mode(connected=True)
sns.set_style('darkgrid')
# pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col


from sklearn.pipeline import Pipeline
# from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans

CSV_FILE= '/kaggle/input/spotify-huge-database-daily-charts-over-3-years/Final database.csv'
# CSV_FILE= '/kaggle/input/spotify-huge-database-daily-charts-over-3-years/Database to calculate popularity.csv'

In [None]:
# from pyspark.ml.regression import LinearRegression
# from pyspark.mllib.evaluation import RegressionMetrics

# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
# from pyspark.ml.feature import VectorAssembler, StandardScaler
# from pyspark.ml.evaluation import RegressionEvaluator

From SparkByExample:
> A spark session unifies all the different contexts, and you can access all the different contexts by invoking them on the spark session object. A Spark “driver” is an application that creates a SparkContext for executing one or more jobs in the Spark cluster. It allows your Spark/PySpark application to access Spark Cluster with the help of Resource Manager.
> 
> When you create a SparkSession object, SparkContext is also created and can be retrieved using spark.sparkContext. SparkContext will be created only once for an application; even if you try to create another SparkContext, it still returns existing SparkContext.

In [None]:
spark = SparkSession.builder.master("local[2]").appName("Spotify-Huge-Dataset").getOrCreate() #.enableHiveSupport()
spark

In [None]:
# Old way depreciated in 3.0.0
sc = spark.sparkContext
sqlContext = SQLContext(sc)
# sqlContext = SQLContext(spark)

Note I'm fixing the numerical features after loading the df. This is much slower than defining the schema before loading into a spark dataframe. I'll come back and define the schema explicitely later when I have some free time.

In [None]:
df = spark.read.option("header", True).csv(CSV_FILE)
df = df.withColumn("Release_date", F.to_date("Release_date", "yyyy-MM-dd"))
numerical_features = ['danceability', 'energy', 'instrumentalness', 'valence', 'liveliness', 'speechiness', 'acoustics',
                      'instrumentalness', 'tempo', 'duration_ms', 
                      'time_signature', 'Days_since_release', 'n_words']

for c in numerical_features:
    df = df.withColumn(c, df[c].cast("float"))
    
cols_to_drop = ['syuzhet_norm', 'bing_norm', 'afinn_norm', 'nrc_norm', 'syuzhet', 'bing'] 
for c in cols_to_drop:
    df.drop(c).collect()
    
df.printSchema()
# df.show(n=1, truncate=False, vertical=True)

# EDA
Some good SQL queries, Plotly figures, and examples of using pyspark to filter results from a large dataset.

In [None]:
# how many unique songs are there in the dataset?
df.select(["Title","Artist"]).distinct().count()

In [None]:
print("Dataset Shape using spark syntax:\n",(df.count(), len(df.columns)))

Most popular artist, all countries. Each tally represents a song on a given day (during the last 3 days) that was one of the most 200 most played songs on that day. An artist can have multiple songs per day, and the same song can be counted on again on subsequent days.

In [None]:
# each count is a song that was in the top 200 most played on a day on spotify during the last 3 years
result_df = (df.groupBy("Artist")
               .count()
               .orderBy("count", ascending=False)
               .limit(10)
               .toPandas()
            )
px.bar(result_df, y='Artist', x='count', title='Most Prolific Artists')

In [None]:
# same as above but with seaborn (sometimes plotly doesn't show up in the published notebok)
sns.barplot(data=result_df, y='Artist', x='count').set_title('Most Prolific Artists');

`CreateOrReplaceTempView` will create a temporary view of the table on memory. It won't persist, but you can run SQL queries on top of it. You can always force it to cache/persits with `saveAsTable`.

In [None]:
df.createOrReplaceTempView("df_table")

In [None]:
print("Now using the SQL Context. We can check it's the same length as before")
query = """
    SELECT Count(*) as Dataset_Length
    FROM df_table
"""
res = spark.sql(query).show()

In [None]:
# Most popular artist (by sum of popularity of songs) in the USA
query = """
SELECT
                                    Artist, 
       ROUND(SUM(Popularity), 2) AS Populartiy
FROM df_table
WHERE USA == 1
GROUP BY Artist
ORDER BY AVG(Popularity) DESC
LIMIT 10
"""

res = spark.sql(query)
res.show(10, truncate=False)

In [None]:
# select only the songs released in 1939
(df.filter(F.year(df['Release_date']) == 1939)
   .select('Title', 'Artist','Release_date', 'Genre')
   .distinct()
   .show(5, truncate=False)
)

### Most Popular Song per Decade
First with a nested query and using pandas to drop duplicates. Then optimized with [scalar-aggregate reduction](https://www.stevenmoseley.com/blog/tech/high-performance-sql-correlated-scalar-aggregate-reduction-queries)

In [None]:
query = """
SELECT
        ROUND(Year(Release_date), -1) AS Decade,
        Round(Popularity, 2)          AS Popularity,
                                         Title,
                                         Artist
FROM df_table
INNER JOIN (SELECT Max(Popularity) as mp
            FROM df_table
            WHERE ROUND(Year(Release_date), -1) IS NOT NULL
            AND USA == 1
            GROUP BY ROUND(Year(Release_date), -1)
           ) AS temp
ON temp.mp = df_table.Popularity
ORDER BY Decade ASC, Popularity ASC
"""

res = spark.sql(query)
res.toPandas().drop_duplicates(subset='Decade', keep="last")

In [None]:
# highly optimized version of the above query via scalar-aggregate-reduction
query = """
SELECT
    ROUND(Year(Release_date), -1) as Decade,
    ROUND(Max(Popularity), 2) as Popularity,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), Title)), 12) AS Title,
    SUBSTRING(MAX(CONCAT(LPAD(Popularity, 11, 0), Artist)), 12) AS Artist
FROM
    df_table
WHERE
    ROUND(Year(Release_date), -1) IS NOT NULL
    AND USA == 1
GROUP BY Decade
ORDER BY Decade ASC
"""

spark.sql(query).show()

### Most popular Genre per decade

In [None]:
 # Most popular genres, period.
query = """
SELECT Genre, COUNT(*) AS Tally
FROM df_table
GROUP BY Genre
ORDER BY Tally DESC
"""
spark.sql(query).show(5)

In [None]:
query = """
SELECT
      ROUND(Year(Release_date), -1) AS Decade,
      Genre, COUNT(Genre)           AS counts
FROM  df_table
WHERE ROUND(Year(Release_date), -1) IS NOT NULL
GROUP BY Decade, Genre
ORDER BY COUNT(Genre) DESC
"""

res = (spark.sql(query)
            .dropDuplicates(subset=['Decade'])
            .orderBy('Decade')
            .show()
      )
# res.toPandas().drop_duplicates(subset='Decade', keep="first")

## For each track, what day was it most popular?
(Just for a small selection of them)

In [None]:
query = """
SELECT Title, Artist, Release_date, MAX(Popularity)
FROM df_table
WHERE Artist == "Paulo Londra"
GROUP BY Title, Artist, Release_date
LIMIT 10
"""

res = spark.sql(query).show()

## Let's see how music changed over the decades

In [None]:
sound_features = ['danceability', 'energy', 'instrumentalness', 'valence', 'liveliness', 'speechiness', 'acoustics']
col_names = ['Decade']
col_names.extend(sound_features)

df_music_features = (df.sample(.2, seed=42)
                       .groupBy(F.round(F.year(df.Release_date), -1))
                       .agg({feature: 'mean' for feature in sound_features})
                       .toDF(*col_names)
                       .orderBy('Decade')
                       .toPandas()
                       .dropna(axis=0)
                    )
fig = px.line(df_music_features, x='Decade', y=sound_features, title='Song Characteristics Over the Decades')
fig.show()

In [None]:
# same as above but with seaborn. (sometimes plotly doesn't show up in the published notebok)
sns.lineplot(data=pd.melt(df_music_features, ['Decade']), x='Decade', y='value', hue='variable').set_title('Song Characteristics Over the Decades');

# Let's check out the spotify API

[currently based off this](https://www.kaggle.com/vatsalmavani/music-recommendation-system-using-spotify-dataset). We can extract more song information than is provided by the dataset by interacting with the Spotify API. Using this, we can get features like song length using `spotipy.audio_features()`

In [None]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
SPOTIFY_CLIENT_ID = user_secrets.get_secret("SPOTIFY_CLIENT_ID")
SPOTIFY_CLIENT_SECRET = user_secrets.get_secret("SPOTIFY_CLIENT_SECRET")

In [None]:
%%capture
!pip install spotipy

In [None]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from collections import defaultdict

sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id=SPOTIFY_CLIENT_ID,
                                                           client_secret=SPOTIFY_CLIENT_SECRET
                                                          )
                    )

In [None]:
# to search for a specific song title and filter the returned JSON
sp.search(q='track: smells like teen spirit')['tracks']['items'][0]['album']

In [None]:
def find_song(name, year):
    song_data = defaultdict()
    results = sp.search(q=f'track: {name} year: {year}', limit=1)
    if results['tracks']['items'] == []:
        return None

    results = results['tracks']['items'][0]
    track_id = results['id']
    audio_features = sp.audio_features(track_id)[0]

    song_data['name'] = [name]
    song_data['year'] = [year]
    song_data['explicit'] = [int(results['explicit'])]
    song_data['duration_ms'] = [results['duration_ms']]
    song_data['popularity'] = [results['popularity']]

    for key, value in audio_features.items():
        song_data[key] = value

    return pd.DataFrame(song_data)

# Clustering

There are a few possible approaches for comparing song similarities. One is to just use the continuous, numerical variables (things like danceability, energy, etc.) and do PCA or k-means or some other way to reduce dimensionality.  If you're just considering the song features (continuous variables) you could just create a feature vector and look at the cosine similartity to find the most similar sounding song, taking into account the numerical features and the one-hot-encoded countries.

Some options: 
- [Non-linear PCA (NLPCA)](https://pubmed.ncbi.nlm.nih.gov/22176263/)
- [Factor Analysis of Mixed Data (FAMD)](https://github.com/MaxHalford/Prince#factor-analysis-of-mixed-data-famd)

Alternatively, we can create an embedding, where we map all the songs into an n-dimensional feature space and then look for the most similar vectors in this space (probably with k-NNN. Then we can get the k-most similar songs). 

## First check out the cosine similiarty of song feature vectors

In [None]:
###### First KPop ######
query_kpop = """
SELECT Title, Artist, {}
FROM df_table
WHERE `k-pop` = 1
""".format(', '.join(numerical_features))

df_kpop_songs = (spark.sql(query_kpop)
                      .sample(.1)
                      .dropna()
                      .toPandas() # don't do this, it's better to sample before querying
                )
####### Now Rap #######
query_rap = """
SELECT Title, Artist, {}
FROM df_table
WHERE rap = 1
""".format(', '.join(numerical_features))

df_rap_songs = (spark.sql(query_rap)
                     .sample(.1)
                     .dropna()
                     .toPandas() # don't do this, it's better to sample before querying
               )
df_rap_songs.head()

In [None]:
df_kpop_songs.head()

In [None]:
# it might be better to used a normalized cosine similarity instead of scaling first and then doing it.
from scipy import spatial
from sklearn.preprocessing import Normalizer
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
transformer = Normalizer()

scaled_kpop_df = scaler.fit_transform(df_kpop_songs.iloc[:, 2:]) # drop the title and artist with the iloc
scaled_rap_df = scaler.fit_transform(df_rap_songs.iloc[:, 2:])


# cos similarity of a rap and a k-pop song
song1 = np.array(scaled_rap_df[1])
song2 = np.array(scaled_kpop_df[2])
result = 1 - spatial.distance.cosine(song1, song2)
print("Cosine similarity of a rap and a k-pop song:", result)

In [None]:
# cos similarity of two rap songs
song1 = np.array(scaled_rap_df[1])
song2 = np.array(scaled_rap_df[10])
result = 1 - spatial.distance.cosine(song1, song2)
print("Cosine similarity of two rap songs:", result)

## Dimentionality reduction
This is useful for visualizing kmeans clustering later. One mistake people make with PCA, is assuming that the dimentions you get will be interpretable. In this case, we're going to take two types of music (Kpop and Rap), and then try reducing all the numeric, musical features down to two dimentions. The two dimentions won't really represent the genre of the music, but we can pretend that this is true. When we do KMeans clustering later on, we can visualize it on these two PCA axes.

In [None]:
# let's add an OHE genre encoding
df_rap_songs = df_rap_songs.assign(is_rap=1,
                                   is_kpop=0
                                   )
df_kpop_songs = df_kpop_songs.assign(is_rap=0,
                                     is_kpop=1
                                     )
df_rap_and_kpop = pd.concat([df_rap_songs, df_kpop_songs])
X = scaler.fit_transform(df_rap_and_kpop.iloc[:, 2:])

pca = PCA(n_components=10)
pca.fit(X)
print(pca.explained_variance_ratio_)

In [None]:
sns.lineplot(x=[x for x in range(1, 11)], y=pca.explained_variance_ratio_).set_title("% Variance Explained vs # Dimensions");

As expected, it's able to explain most the vairance using 1 dimension. This roughly corresponds to "genre," which instead was encoded as either `is_rap` or `is_kpop`

In [None]:
pca = PCA(n_components=2)
principalComponents = pca.fit_transform(X)
PCA_components = pd.DataFrame(principalComponents)

# sns.scatterplot(data=principalComponents, alpha=.1)
sns.scatterplot(x=PCA_components[0], y=PCA_components[1], alpha=.1).set_title("First 2 PCA Components");
plt.xlabel('PCA 1');
plt.ylabel('PCA 2');

The first component is particularly excellent at separation

## Kmeans Clustering

Find the optimum number of clusters with an elbow plot. View the top 2 PCA clusters, and then use kmeans with various number of clusters. The "Scree" plot below, shows the percent of variance explained as a function of the number of clusters used

In [None]:
ks = range(1, 10)
inertias = []
for k in ks:
    # Create a KMeans instance with k clusters: model
    model = KMeans(n_clusters=k)
    
    # Fit model to samples
    model.fit(PCA_components.iloc[:,:2])
    
    # Append the inertia to the list of inertias
    inertias.append(model.inertia_)
    
sns.lineplot(x=ks, y=inertias, marker='o').set_title("Inertia vs # Clusters used")
plt.xlabel('number of clusters, k')
plt.ylabel('inertia')
plt.xticks(ks)
plt.show()

Again, as expected, 2 clusters seems to make sense

In [None]:
km = KMeans(
    n_clusters=2, init='random',
    n_init=10, max_iter=300, 
    tol=1e-04, random_state=0
)
y_km = km.fit_predict(X)

In [None]:
df_pca_kmeans_plot = pd.concat([PCA_components, pd.Series(y_km)], axis=1)
df_pca_kmeans_plot.columns = ['PCA_1', 'PCA_2', 'Cluster']

In [None]:
sns.scatterplot(data=df_pca_kmeans_plot, x='PCA_1', y='PCA_2', hue='Cluster')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.title('2 component PCA');

In [None]:
# now color by genre instead of cluster
df_final = pd.concat([df_pca_kmeans_plot, df_rap_and_kpop.reset_index()['is_rap']], axis=1)
df_final['is_rap'] = df_final['is_rap'].replace({1:'Rap', 0: 'KPop'})
df_final['Cluster'] = df_final['Cluster'].replace({1:'Cluster 2', 0: 'Cluster 1'})
df_final = df_final.rename(columns={'is_rap': 'Genre'})

sns.scatterplot(data=df_final, x='PCA_1', y='PCA_2', hue='Genre')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.title('2 component PCA');
# px.scatter(df_final, x='PCA_1', y='PCA_2', color='Genre', hover_data=['Genre'])

Now, would it work it be able to pick out genre and group into two clusters as nicely, if we excluded the "Genre" features from the training labels?

In [None]:
df_rap_and_kpop = pd.concat([df_rap_songs, df_kpop_songs])
rap_kpop_labels = df_rap_and_kpop[['is_rap', 'is_kpop']]
df_rap_and_kpop = df_rap_and_kpop.drop(columns=['is_rap', 'is_kpop'])
X = scaler.fit_transform(df_rap_and_kpop.iloc[:, 2:])

pca = PCA(n_components=10)
pca.fit(X)
print(pca.explained_variance_ratio_)
sns.lineplot(x=[x for x in range(1, 11)], y=pca.explained_variance_ratio_).set_title("% Variance Explained vs # Dimensions")
plt.show()

pca = PCA(n_components=2)
principalComponents = pca.fit_transform(X)
PCA_components = pd.DataFrame(principalComponents)

# sns.scatterplot(data=principalComponents, alpha=.1)
sns.scatterplot(x=PCA_components[0], y=PCA_components[1], alpha=.1).set_title("First 2 PCA Components");
plt.xlabel('PCA 1');
plt.ylabel('PCA 2');
plt.show()


ks = range(1, 10)
inertias = []
for k in ks:
    # Create a KMeans instance with k clusters: model
    model = KMeans(n_clusters=k)
    
    # Fit model to samples
    model.fit(PCA_components.iloc[:,:2])
    
    # Append the inertia to the list of inertias
    inertias.append(model.inertia_)
    
sns.lineplot(x=ks, y=inertias, marker='o').set_title("Inertia vs # Clusters used")
plt.xlabel('number of clusters, k')
plt.ylabel('inertia')
plt.xticks(ks)
plt.show()

km = KMeans(
    n_clusters=3, init='random',
    n_init=10, max_iter=300, 
    tol=1e-04, random_state=0
)
y_km = km.fit_predict(X)

df_pca_kmeans_plot = pd.concat([PCA_components, pd.Series(y_km)], axis=1)
df_pca_kmeans_plot.columns = ['PCA_1', 'PCA_2', 'Cluster']
sns.scatterplot(data=df_pca_kmeans_plot, x='PCA_1', y='PCA_2', hue='Cluster')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.title('2 component PCA');

In [None]:
df_final = pd.concat([df_pca_kmeans_plot, rap_kpop_labels.reset_index()['is_rap']], axis=1)
df_final['is_rap'] = df_final['is_rap'].replace({1:'Rap', 0: 'KPop'})
df_final['Cluster'] = df_final['Cluster'].replace({1:'Cluster 2', 0: 'Cluster 1'})
df_final = df_final.rename(columns={'is_rap': 'Genre'})

# px.scatter(df_final, x='PCA_1', y='PCA_2', color='Genre', hover_data=['Cluster'])
sns.scatterplot(data=df_final, x='PCA_1', y='PCA_2', hue='Genre')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.title('2 component PCA');

Without providing PCA the genre as a feature, it separates the data very differently. You can see that it makes no attempt to separate by genre (when we add it back in and plot, coloring by genre instead of cluster). And interestingly, it suggests that three clusters is the best separation, instead of two.

# Find the most similar song
We could do something like fit_transform the entire dataset, then write a function to loop through all the poosible songs (only like 60k of them), and then return the minimum. We need to be careful or at least selective about the features though, because trying to OHE all the variables might exceed our RAM limit

In [None]:
# we might want to grab the URI too, to compare how they sound later. I'll skip for now
query_all = """
SELECT Title, Artist, Genre, {}
FROM df_table
""".format(', '.join(numerical_features))

df_all_songs = (spark.sql(query_all)
                     .dropna()
                     .toPandas()
                     .drop_duplicates(['Title', 'Artist'])
                     .reset_index(drop=True)
                )

In [None]:
df_all_songs.columns

In [None]:
df_all_songs_ohe = pd.get_dummies(df_all_songs.drop(columns='Title'))
scaled_df_all_songs_ohe = scaler.fit_transform(df_all_songs_ohe)

In [None]:
def get_most_similar_song(title, artist):
    title = title.lower()
    # get the vector for the requested song
    song_idx = df_all_songs.query(f"Title == '{title}' and Artist == '{artist}'").index.values[0]
    song_vector = scaled_df_all_songs_ohe[song_idx]
    # find the most similar song
    min_difference = 1
    closest_song_idx = 0
    for index, song in enumerate(scaled_df_all_songs_ohe):
        distance = spatial.distance.cosine(song_vector, song)
        if distance < min_difference:
            if index == song_idx:
                pass
            else:
                min_difference = distance
                closest_song_idx = index #np_iterator.index
    # get the title and the artist of the most similar song
    closest_song = df_all_songs.loc[closest_song_idx,['Title', 'Artist']]
    print("Closest Song:\n-------------", closest_song, sep="\n")
    return closest_song_idx

In [None]:
get_most_similar_song("Numb", "Linkin Park")

In [None]:
spark.sql("SELECT Artist, Title FROM df_table WHERE Artist LIKE 'Radio%'").distinct().show(50)

In [None]:
get_most_similar_song("creep", "Radiohead")