## Scalable Dimension Reduction and Clustering with Spark

Today, we will be working [Spotify song data](https://www.google.com/url?q=https%3A%2F%2Fwww.kaggle.com%2Fdatasets%2Fjoebeachcapital%2F30000-spotify-songs%2Fdata) collected from Spotify API and publicly available on Kaggle. Our goal will be to construct coherent clusters that describe music based on perceived musical features -- allowing us to map the space of musical signs (beyond simply relying on their reported "genre," which often fails to recognize cross-genre work).

Spark has implementations of PCA and SVD, along with K-Means, so we will employ these methods in this notebook. For further detail on the methods, consult the MMDS textbook.

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler, PCA
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.mllib.feature import StandardScaler as StandardScalerRDD
from pyspark.mllib.linalg.distributed import RowMatrix
import pyspark.sql.functions as F

spark = SparkSession \
        .builder \
        .appName("dr_cluster") \
        .getOrCreate()

# Read Spotify data
df = spark.read.csv('/project/macs40123/spotify_songs.csv', header=True)

# Note potentially relevant features like danceability, energy, acousticness, etc.
df.columns

24/10/16 20:30:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


['track_id',
 'track_name',
 'track_artist',
 'track_popularity',
 'track_album_id',
 'track_album_name',
 'track_album_release_date',
 'playlist_name',
 'playlist_id',
 'playlist_genre',
 'playlist_subgenre',
 'danceability',
 'energy',
 'key',
 'loudness',
 'mode',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo',
 'duration_ms']

### Data Preprocessing

In [2]:
# identify potentially relevant features and add to a feature dataframe
feature_cols = ['track_popularity', 'danceability', 'energy',
                'key', 'loudness', 'speechiness',
                'acousticness', 'instrumentalness', 'liveness',
                'valence', 'tempo', 'duration_ms']

# select feature columns and numeric data as floats
df_features = df.select(*(F.col(c).cast("float").alias(c) for c in feature_cols),'track_id', 'track_artist') \
                         .dropna()
df_features = df_features.withColumn('features', F.array(*[F.col(c) for c in feature_cols])) \
                         .select('track_id', 'track_artist', 'features')

# convert features to dense vector format (expected by K-Means, PCA)
vectors = df_features.rdd.map(lambda row: Vectors.dense(row.features))
features = spark.createDataFrame(vectors.map(Row), ["features_unscaled"])

# scale features (some values like duration_ms are much larger than others)
standardizer = StandardScaler(inputCol="features_unscaled", outputCol="features")
model = standardizer.fit(features)
features = model.transform(features) \
                .select('features')

# persist in memory before fit model
features.persist()
features.printSchema()

[Stage 2:>                                                          (0 + 2) / 2]

root
 |-- features: vector (nullable = true)



                                                                                

### K-means

Now, we could use the K-means clustering algorithm based on the features in our dataset:

In [3]:
# train model
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(features)

# make predictions (i.e. identify clusters)
predictions = model.transform(features)

# evaluate clustering by computing silhouette coef
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

                                                                                

Silhouette with squared euclidean distance = 0.18148415767740186


This is not that great without first performing dimension reduction, though...

### PCA

Let's try to perform dimensionality reduction on the ```features``` -- using [PCA](https://spark.apache.org/docs/latest/ml-features.html#pca) before fitting our K-Means model:

In [4]:
# fit model
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(features)

# transform feature data
pca_results = model.transform(features).select("pcaFeatures")
pca_features = pca_results.rdd.map(lambda row: Vectors.dense(row.pcaFeatures))
pca_features = spark.createDataFrame(pca_features.map(Row), ["features"])

# persist data before training model on PCA-discovered features
pca_features.persist()

# Note: we've reduced our dimensionality down to 2 dimensions
pca_features.show(5, truncate=False)

24/10/16 20:34:33 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/10/16 20:34:33 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
+---------------------------------------+
|features                               |
+---------------------------------------+
|[3.088270786158075,2.7825003774651256] |
|[2.631248482088186,3.0145971151939386] |
|[3.153704155957454,2.757676584083471]  |
|[3.162586940929096,2.1786940454652988] |
|[2.6209791405449754,2.6867473908798885]|
+---------------------------------------+
only showing top 5 rows



Now let's run K-means with the same parameters as above, but on the ```pcaFeatures``` produced by the PCA reduction we just executed.

In [5]:
# fit model
pca_kmeans = KMeans(k=3, seed=1)
pca_model = pca_kmeans.fit(pca_features)

# make predictions (i.e. identify clusters)
pca_predictions = pca_model.transform(pca_features)

# evaluate clustering by computing silhouette coef
pca_evaluator = ClusteringEvaluator()
silhouette = pca_evaluator.evaluate(pca_predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.5410930506879614


A bit better, but we can likely improve further. 

### SVD

Recall from MMDS that Singular Value Decomposition (SVD) can be another powerful approach for dimension reduction. Let's use [Spark's SVD implementation](https://spark.apache.org/docs/latest/mllib-dimensionality-reduction#svd-example) here (which is implemented only for RDDs, so we will need to convert our DataFrame into an RDD to use it):

In [6]:
# convert to RDD
vectors_rdd = df_features.rdd.map(lambda row: row["features"])

# use RDD-specific standardizer to re-scale data
standardizer_rdd = StandardScalerRDD()
model = standardizer_rdd.fit(vectors_rdd)
vectors_rdd = model.transform(vectors_rdd)
mat = RowMatrix(vectors_rdd)

# Compute SVD, retain 2 SVs to match 2 PCs of PCA
svd = mat.computeSVD(2, computeU=True)

# Access SVD components
U = svd.U
s = svd.s
V = svd.V

# convert U to DataFrame (and persist to memory) for clustering with K-Means
U_df = U.rows.map(lambda row: Row(features=Vectors.dense(row.toArray()))) \
             .toDF()
U_df.persist()

# Note: we've reduced our dimensionality down to 2 dimensions again
U_df.show(5, truncate=False)

                                                                                

24/10/16 20:35:35 WARN InstanceBuilder$NativeARPACK: Failed to load implementation from:dev.ludovic.netlib.arpack.JNIARPACK
24/10/16 20:35:35 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/10/16 20:35:35 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
+---------------------------------------------+
|features                                     |
+---------------------------------------------+
|[-0.005665575844501509,0.0052721985600063075]|
|[-0.005602501869667561,0.0034811359031196414]|
|[-0.005502030149423862,0.005737412004379039] |
|[-0.005516497700975487,0.0058838999703511525]|
|[-0.005453144073416807,0.003700134920677747] |
+---------------------------------------------+
only showing top 5 rows



Let's run K-means once more with the same parameters as above, but on ```U_df``` produced by the SVD reduction we just executed:

In [7]:
# train model
svd_kmeans = KMeans(k=3, seed=1)
svd_model = svd_kmeans.fit(U_df)

# make predictions (i.e. identify clusters)
svd_predictions = svd_model.transform(U_df)

# evaluate clustering by computing silhouette score
svd_evaluator = ClusteringEvaluator()
silhouette = svd_evaluator.evaluate(svd_predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.6978275869985402


This is quite a bit better than our previous two tries. Let's take a closer look at the resulting clusters. Recall that we often don't want to plot all of our data points when we're working at scale (this can result in overplotting and we want to perform as many computations in parallel on our cluster before bringing data back to our primary node and risking that we run out of memory). Here, we are working with a smaller dataset, but we will apply the same logic of working at scale. 

For instance, we can take a look at how many items are in each cluster:

In [8]:
svd_predictions.groupby('prediction') \
               .count() \
               .show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1| 4336|
|         2|13584|
|         0|14901|
+----------+-----+



We can also merge our cluster information back with the song IDs and track artists (and any other data about the songs that you would like to investigate):

In [9]:
# Add an index to U_df that matches df_features (to enable merging)
df_features_with_id = df_features.withColumn("id", F.monotonically_increasing_id())
svd_predictions_with_id = svd_predictions.withColumn("id", F.monotonically_increasing_id())

# Perform an inner join on the 'id' column to merge df_features with U_df
df_merged = df_features_with_id.join(svd_predictions_with_id, on="id", how="inner")
df_merged.show(5)

+---+--------------------+----------------+--------------------+--------------------+----------+
| id|            track_id|    track_artist|            features|            features|prediction|
+---+--------------------+----------------+--------------------+--------------------+----------+
|  0|6f807x0ima9a1j3VP...|      Ed Sheeran|[66.0, 0.748, 0.9...|[-0.0056655758445...|         0|
|  1|0r7CVbZTWZgbTCYdf...|        Maroon 5|[67.0, 0.726, 0.8...|[-0.0056025018696...|         0|
|  2|1z1Hg7Vb0AhHDiEmn...|    Zara Larsson|[70.0, 0.675, 0.9...|[-0.0055020301494...|         0|
|  3|75FpbthrwQmzHlBJL...|The Chainsmokers|[60.0, 0.718, 0.9...|[-0.0055164977009...|         0|
|  4|1e8PAfcKUYoKkxPhr...|   Lewis Capaldi|[69.0, 0.65, 0.83...|[-0.0054531440734...|         0|
+---+--------------------+----------------+--------------------+--------------------+----------+
only showing top 5 rows



Here, considering only artists of the songs, it is clear that there are discernable patterns in the way in which the clusters have been defined -- seemingly, clustering has identified a DJ-specific cluster, a singer/song-writer cluster, as well as a cluster that leans more toward rap, reggaeton (and perhaps DJs who sample this music):

In [10]:
cluster_artist_count = df_merged.groupby(['prediction', 'track_artist']) \
                                .count() \
                                .orderBy(['count', 'track_artist'],
                                         ascending=False)
# DJs
cluster_artist_count.filter(F.col('prediction') == 0) \
                    .show(5)

# Singer/Song-writers
cluster_artist_count.filter(F.col('prediction') == 1) \
                    .show(5)

# Some rap, reggaeton, (DJs sampling?)
cluster_artist_count.filter(F.col('prediction') == 2) \
                    .show(10)

+----------+--------------------+-----+
|prediction|        track_artist|count|
+----------+--------------------+-----+
|         0|       Martin Garrix|  120|
|         0|Dimitri Vegas & L...|   91|
|         0|        David Guetta|   86|
|         0|            Hardwell|   76|
|         0|       Calvin Harris|   76|
+----------+--------------------+-----+
only showing top 5 rows

+----------+-------------+-----+
|prediction| track_artist|count|
+----------+-------------+-----+
|         1|Billie Eilish|   40|
|         1|        Queen|   33|
|         1|  Frank Ocean|   25|
|         1|       Khalid|   23|
|         1|Daniel Caesar|   23|
+----------+-------------+-----+
only showing top 5 rows

+----------+----------------+-----+
|prediction|    track_artist|count|
+----------+----------------+-----+
|         2|           Drake|   69|
|         2|The Chainsmokers|   62|
|         2|            Kygo|   62|
|         2|           Queen|   60|
|         2|       Bad Bunny|   45|
|    

That's all for now, but you're encouraged to dig further into the specific songs within each cluster to describe the logic of the clusters in more detail on your own!