In [1]:
%%configure -f 
{
"conf":{
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
    
        "spark.executor.heartbeatInterval":"10800s",
        "spark.network.timeout":"24h",
    
        "spark.driver.memory": "8G",
        "spark.executor.memory": "8G",
        "spark.executor.cores":"4",
    
        "livy.server.session.timeout":"24h",
    
        "spark.dynamicAllocation.enabled":"false",
        "spark.ext.h2o.fail.on.unsupported.spark.param":"false",    
        
        "spark.app.name":"msds694-group7"
      }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
22,application_1615435774710_0013,pyspark,idle,Link,Link,


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.window import Window

from pyspark.ml.feature import HashingTF as MLHashingTF
from pyspark.ml.feature import IDF as MLIDF
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

from collections import Counter

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
25,application_1615435774710_0016,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Analytics Goal:

* ### Artist Recommendation - Artist Clustering with tags of their music

    * Extract top tag of each track, and combine all top_tags for each artist
    * Drop artists with less than 2 tracks, as people are unlikely to follow artists with less than 2 tracks 
    * Drop tags covered by less than 2 artists, as these tags can be very specific to describe one artist, while not general enough
    * Apply TFIDF algorithm to transform tags of each artist to a TFIDF vector
    * Apply KMeans to cluster artists with TFIDF vectors
    * Compare the clustering result with an assumend label "most common tag" of artists, and calculate the gini score with assumend label in each cluster

# I. Load Data

In [4]:
train_path = "s3a://msds694-final-group7/lastfm_train_clean/All/"
test_path = "s3a://msds694-final-group7/lastfm_test_clean/All/"
lastfm_train_raw = ss.read.parquet(train_path).cache()
lastfm_test_raw = ss.read.parquet(test_path).cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
from collections import Counter
def most_common_tag(tags):
    return Counter(tags).most_common(1)[0][0]
most_common_tag_udf = udf(most_common_tag)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Train
artist_tags_train = lastfm_train_raw.select("artist", 
                                               collect_list(col("top_tag")).over(Window.partitionBy("artist")).alias("tags"))\
                                       .distinct()\
                                       .select("artist", "tags", 
                                               most_common_tag_udf("tags").alias("most_common_tag"))
# artist with more than 2 tracks
artist_tags_train = artist_tags_train.filter(size(col("tags")) > 2)

# tags being applied to more than 2 artists
artist_tags_train = artist_tags_train.select("artist", "tags", "most_common_tag",
                                             count("most_common_tag").over(Window.partitionBy("most_common_tag")).alias("mc_tag_count"))\
                                     .filter("mc_tag_count > 2").cache()


# Test
artist_tags_test = lastfm_test_raw.select("artist", 
                                        collect_list(col("top_tag")).over(Window.partitionBy("artist")).alias("tags"))\
                                  .distinct()\
                                  .select("artist", "tags", 
                                          most_common_tag_udf("tags").alias("most_common_tag"))
# artist with more than 2 tracks
artist_tags_test = artist_tags_test.filter(size(col("tags")) > 2)

# tags being applied to more than 2 artists
artist_tags_test = artist_tags_test.select("artist", "tags", "most_common_tag",
                                            count("most_common_tag").over(Window.partitionBy("most_common_tag")).alias("mc_tag_count"))\
                                   .filter("mc_tag_count > 2").cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
artist_tags_train.show(1, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------+
|artist             |tags                                                                                                                                      |most_common_tag|mc_tag_count|
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------+
|Los Toreros Muertos|[Rock en Espanol, marchosa, autocantantes, Spanish Rock, 800, 80s, Rock en Espanol, domingo, classic rock, spanish, Pilar, 800, divertida]|Rock en Espanol|15          |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------+
only showing top 1 row

In [8]:
artist_tags_test.show(1, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-------------------------------------------------------+-----------------+------------+
|artist     |tags                                                   |most_common_tag  |mc_tag_count|
+-----------+-------------------------------------------------------+-----------------+------------+
|Phil Keaggy|[chilled worship, singer-songwriter, singer-songwriter]|singer-songwriter|7           |
+-----------+-------------------------------------------------------+-----------------+------------+
only showing top 1 row

# II. Convert tags to TFIDF Vectors

In [9]:
htf = MLHashingTF(inputCol="tags", outputCol="tf")
tf_train = htf.transform(artist_tags_train)

idf = MLIDF(inputCol="tf", outputCol="features", minDocFreq=2)
tfidf_train = idf.fit(tf_train).transform(tf_train)
tfidf_train.show(1, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------+------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|artist             |tags                                                                                                                                      |most_common_tag|mc_tag_count|tf                                                                                                                            |features                                                                                                          

# III. Fit KMeans Model with different k

In [25]:
# Hand-Tuned HyperParameter with smallest Squared Euclidean Distance
# Reason: Predictions could be imbalanced, which could make Cross Validation not suitable for KMeans
k_list = [10, 20, 30, 40, 50, 60]
silhouettes = [] 
for k in k_list:
    kmeans =  KMeans(k = k, maxIter = 100) 
    model = kmeans.fit(tfidf_train)
    predictions = model.transform(tfidf_train)
    evaluator = ClusteringEvaluator()
    silhouette = evaluator.evaluate(predictions)
    silhouettes.append(silhouette)
    print(f"k = {k:>5}, Silhouette with squared euclidean distance = {silhouette}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

k =    10, Silhouette with squared euclidean distance = 0.6202441204634463
k =    20, Silhouette with squared euclidean distance = 0.20013679952638855
k =    30, Silhouette with squared euclidean distance = 0.5240330855828701
k =    40, Silhouette with squared euclidean distance = 0.3709844252926842
k =    50, Silhouette with squared euclidean distance = 0.3219939147575743
k =    60, Silhouette with squared euclidean distance = 0.23789567096324507

# IV. Best Model

The best model is the one with the local maximum silhouette with squared euclidean distance, which is 30. As 10 groups is usually not enough to describe artists with different genres, popularity and other features

In [10]:
kmeans =  KMeans(k = 30, maxIter = 100) 
model = kmeans.fit(tfidf_train)
predictions = model.transform(tfidf_train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

* ## Check Componnets in Each Cluster

In [15]:
predictions.groupBy("prediction").count().orderBy("prediction").show(30)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----+
|prediction|count|
+----------+-----+
|         0|14460|
|         1|   32|
|         2|    4|
|         3|   80|
|         4|    8|
|         5|    2|
|         6|   73|
|         7|   66|
|         8|   22|
|         9|   21|
|        10|   25|
|        11|   14|
|        12|   27|
|        13|   50|
|        14|   92|
|        15|   18|
|        16|    9|
|        17|   13|
|        18|   95|
|        19|   18|
|        20|   29|
|        21|    5|
|        22|   10|
|        23|  119|
|        24|   19|
|        25|   77|
|        26|  242|
|        27|  129|
|        28|    3|
|        29|   72|
+----------+-----+

   * ### Check what's going on in cluster 0

In [14]:
predictions.select("prediction", 
                   min(size(col("tags"))).over(Window.partitionBy("prediction")).alias("min_tracks"))\
            .distinct().orderBy("prediction").show(30, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+
|prediction|min_tracks|
+----------+----------+
|0         |3         |
|1         |26        |
|2         |67        |
|3         |26        |
|4         |57        |
|5         |118       |
|6         |29        |
|7         |19        |
|8         |16        |
|9         |28        |
|10        |12        |
|11        |29        |
|12        |22        |
|13        |20        |
|14        |17        |
|15        |17        |
|16        |30        |
|17        |22        |
|18        |21        |
|19        |23        |
|20        |14        |
|21        |46        |
|22        |31        |
|23        |22        |
|24        |20        |
|25        |26        |
|26        |21        |
|27        |16        |
|28        |92        |
|29        |27        |
+----------+----------+

In [13]:
predictions.filter("prediction == 0")\
           .select("prediction", size(col("tags")).alias("num_tracks"))\
           .filter("num_tracks < 12").count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

7014

* The number of instances in cluster 0 is much more than other clusters, as we checked the minimum number of tracks of artists in each cluster, we can see that all artists with less than 12 tracks(which happens to be the usual track number of a music album) are distributed in cluster 0, which contributes about 1/2 of cluster 0. The insignificance frequency of artists in the whole datasets make them undistinguishable with other artists, so they are all centered in the undistinguishable cluster 0.

* ## Calculate Gini Score based on "the most common tag" of each artist in each cluster

In [16]:
def gini(tags):
    """
    Use "the most common tag" as an assumed label,
    Return the gini impurity score for values in each cluster
    """
    tags_ct = Counter(tags)
    gini_score = 1
    for tag, ct in Counter(tags).items():
        pi = (ct / len(tags))
        gini_score = gini_score - pi ** 2
    return gini_score
gini_udf = udf(gini)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
pred_gini = predictions.select("prediction",
                               collect_list(col("most_common_tag")).over(Window.partitionBy("prediction")).alias("mc_tags"),
                               count("artist").over(Window.partitionBy("prediction")).alias("artist_count"))\
                       .distinct()\
                       .withColumn("gini_score", round(gini_udf("mc_tags"), 4))\
                       .orderBy("prediction")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
pred_gini.show(30)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+------------+----------+
|prediction|             mc_tags|artist_count|gini_score|
+----------+--------------------+------------+----------+
|         0|[vocal, vocal, vo...|       14460|    0.9869|
|         1|[death metal, dea...|          32|       0.0|
|         2|[Grunge, Grunge, ...|           4|       0.0|
|         3|[blues rock, blue...|          80|    0.0247|
|         4|[heavy metal, hea...|           8|       0.0|
|         5|[guitar virtuoso,...|           2|       0.0|
|         6|[blues rock, rock...|          73|    0.5885|
|         7|[rap, rap, rap, r...|          66|    0.3586|
|         8|[salsa, salsa, sa...|          22|       0.0|
|         9|[rock, ska, ska, ...|          21|    0.0907|
|        10|[Stoner Rock, Sto...|          25|    0.0768|
|        11|[Melodic Death Me...|          14|       0.0|
|        12|[black metal, bla...|          27|       0.0|
|        13|[folk, folk, folk...|          50|    0.1152|
|        14|[p

   * ### Customized Weighted Gini Score for all clusters

In [20]:
pred_gini.withColumn("weighted", col("gini_score") * col("artist_count"))\
         .select((sum("weighted") / sum("artist_count")).alias("weighted_avg_gini")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+
| weighted_avg_gini|
+------------------+
|0.9174335733232285|
+------------------+

   * ### Customized Weighted Gini Score for clusters except cluster 0

In [21]:
pred_gini.filter("prediction != 0")\
         .withColumn("weighted", col("gini_score") * col("artist_count"))\
         .select((sum("weighted") / sum("artist_count")).alias("weighted_avg_gini")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+
|  weighted_avg_gini|
+-------------------+
|0.18636768558951963|
+-------------------+

#  V. Apply best model to test set

* ### Convert to TFIDF Vector

In [22]:
htf = MLHashingTF(inputCol="tags", outputCol="tf")
tf_test = htf.transform(artist_tags_test)

idf = MLIDF(inputCol="tf", outputCol="features", minDocFreq=2)
tfidf_test = idf.fit(tf_test).transform(tf_test)
tfidf_test.show(1, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-------------------------------------------------------+-----------------+------------+----------------------------------+------------------------------------------------+
|artist     |tags                                                   |most_common_tag  |mc_tag_count|tf                                |features                                        |
+-----------+-------------------------------------------------------+-----------------+------------+----------------------------------+------------------------------------------------+
|Phil Keaggy|[chilled worship, singer-songwriter, singer-songwriter]|singer-songwriter|7           |(262144,[251512,251872],[1.0,2.0])|(262144,[251512,251872],[0.0,7.234571601853855])|
+-----------+-------------------------------------------------------+-----------------+------------+----------------------------------+------------------------------------------------+
only showing top 1 row

* ### Fit on KMeans Model

In [24]:
kmeans =  KMeans(k = 30, maxIter = 100) 
model = kmeans.fit(tfidf_test)
predictions = model.transform(tfidf_test)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette with squared euclidean distance = {silhouette}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Silhouette with squared euclidean distance = 0.1948518883606591

* ### Check Component of Each Cluster

In [30]:
predictions.groupBy("prediction").count().orderBy("prediction").show(30)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----+
|prediction|count|
+----------+-----+
|         0|    6|
|         1| 1817|
|         2|    5|
|         3|    3|
|         4|    4|
|         5|    1|
|         6|    1|
|         7|    1|
|         8|    2|
|         9|   11|
|        10|    2|
|        11|    7|
|        12|    3|
|        13|   28|
|        14|   44|
|        15|    2|
|        16|    1|
|        17|   27|
|        18|    3|
|        19|   10|
|        20|    3|
|        21|    1|
|        22|   17|
|        23|    1|
|        24|   11|
|        25|    2|
|        26|   14|
|        27|    2|
|        28|    1|
|        29|   17|
+----------+-----+

* ### Calculate Customized Gini Score

In [26]:
pred_gini = predictions.select("prediction",
                               collect_list(col("most_common_tag")).over(Window.partitionBy("prediction")).alias("mc_tags"),
                               count("artist").over(Window.partitionBy("prediction")).alias("artist_count"))\
                       .distinct()\
                       .withColumn("gini_score", round(gini_udf("mc_tags"), 4))\
                       .orderBy("prediction")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
pred_gini.show(30)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+------------+----------+
|prediction|             mc_tags|artist_count|gini_score|
+----------+--------------------+------------+----------+
|         0|[electronic, elec...|           6|       0.0|
|         1|[singer-songwrite...|        1817|    0.9821|
|         2|[blues, blues, bl...|           5|       0.0|
|         3|[Progressive rock...|           3|       0.0|
|         4|[Gothic Rock, Got...|           4|       0.0|
|         5|         [metalcore]|           1|       0.0|
|         6|    [christian rock]|           1|       0.0|
|         7|            [comedy]|           1|       0.0|
|         8|[rockabilly, rock...|           2|       0.0|
|         9|[dub, reggae, reg...|          11|    0.1653|
|        10|[bluegrass, blueg...|           2|       0.0|
|        11|[industrial metal...|           7|    0.2449|
|        12|[heavy metal, hea...|           3|       0.0|
|        13|[folk, folk, folk...|          28|     0.199|
|        14|[r

In [31]:
pred_gini.withColumn("weighted", col("gini_score") * col("artist_count"))\
         .select((sum("weighted") / sum("artist_count")).alias("weighted_avg_gini")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+
| weighted_avg_gini|
+------------------+
|0.9031123595505618|
+------------------+

In [32]:
pred_gini.filter("prediction != 1")\
         .withColumn("weighted", col("gini_score") * col("artist_count"))\
         .select((sum("weighted") / sum("artist_count")).alias("weighted_avg_gini")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+
|weighted_avg_gini|
+-----------------+
|          0.27911|
+-----------------+

# VI. Summary:

## Pros:
* Tags include composite information like popularity, genre and ages of artist, which can usually better identify an artist.
* Looks Good at clustering artists with more than 8 tracks, and artists with more common tags. In another word, seems more suitable to cluster popular artists.

## Cons:
* Tags are very diversal, but each artist will have no more than 10 distinct tags, which makes the TFIDF vector of each artist a very sparse one, so when we do the clustering based on this extremely sparse metrix, it leads to a very imbalanced result. The influence to the real application of artist recommendation is that it will cluster all artists with less production and tags to a large cluster, which is not suitable to distinguish artists of special interest.

* Clusters produced by KMeans are not reproducible since it's a non-parametric model. Once we fit it on the training dataset, when we refit it on the test dataset, even though we are using the same number of clusters, the clusters are actually different with the original ones. It means that in the real business world, if we are going to do artist recommendations based on the clustering result, when some new artists come into the group, we are not able to cluster them in given clusters, but can only create new clusters, which seems expensive.

## Following Directions:
* Try to use some other more general features like birth year, genre, nationality, etc to cluster or classify artists instead of using tags, and compare the result with this one.
* Try to decomposite the components in cluster 0 more thoroughly, and find some more approriate way to cluster or classify those artists not that popular but takes the majority population of artists.