In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "6g").appName('chapter3').getOrCreate()

23/12/29 15:24:17 WARN Utils: Your hostname, quangtn933.local resolves to a loopback address: 127.0.0.1; using 192.168.1.90 instead (on interface en0)
23/12/29 15:24:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/29 15:24:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Loading data.

In [3]:
raw_user_artist_path = "/Users/quangtn/Desktop/01_work/01_job/02_ml/PySpark/chapter3/data/user_artist_data.txt"

In [4]:
raw_user_artist_data = spark.read.text(raw_user_artist_path)

In [5]:
raw_user_artist_data.show(5)

                                                                                

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
+-------------------+
only showing top 5 rows



In [6]:
raw_artist_data = spark.read.text("/Users/quangtn/Desktop/01_work/01_job/02_ml/PySpark/chapter3/data/artist_data.txt")
raw_artist_data.show(5)

+--------------------+
|               value|
+--------------------+
|1134999\t06Crazy ...|
|6821360\tPang Nak...|
|10113088\tTerfel,...|
|10151459\tThe Fla...|
|6826647\tBodensta...|
+--------------------+
only showing top 5 rows



In [7]:
raw_artist_alias = spark.read.text("/Users/quangtn/Desktop/01_work/01_job/02_ml/PySpark/chapter3/data/artist_alias.txt")
raw_artist_alias.show(5)

+-----------------+
|            value|
+-----------------+
| 1092764\t1000311|
| 1095122\t1000557|
| 6708070\t1007267|
|10088054\t1042317|
| 1195917\t1042317|
+-----------------+
only showing top 5 rows



## Preparing the Data

In [8]:
from pyspark.sql.functions import split, min, max
from pyspark.sql.types import IntegerType, StringType

user_artist_df = raw_user_artist_data.withColumn('user', split(raw_user_artist_data['value'], ' ').getItem(0).cast(IntegerType()))

In [9]:
user_artist_df.show(5)

+-------------------+-------+
|              value|   user|
+-------------------+-------+
|       1000002 1 55|1000002|
| 1000002 1000006 33|1000002|
|  1000002 1000007 8|1000002|
|1000002 1000009 144|1000002|
|1000002 1000010 314|1000002|
+-------------------+-------+
only showing top 5 rows



In [10]:
user_artist_df = user_artist_df.withColumn('artist',
                                        split(raw_user_artist_data['value'], ' '). \
                                        getItem(1). \
                                        cast(IntegerType()))

23/12/29 15:24:36 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [11]:
user_artist_df.show(5)

+-------------------+-------+-------+
|              value|   user| artist|
+-------------------+-------+-------+
|       1000002 1 55|1000002|      1|
| 1000002 1000006 33|1000002|1000006|
|  1000002 1000007 8|1000002|1000007|
|1000002 1000009 144|1000002|1000009|
|1000002 1000010 314|1000002|1000010|
+-------------------+-------+-------+
only showing top 5 rows



In [12]:
user_artist_df = user_artist_df.withColumn('count',
                                        split(raw_user_artist_data['value'], ' '). \
                                        getItem(2). \
                                        cast(IntegerType())).drop('value')

user_artist_df.show(5)

+-------+-------+-----+
|   user| artist|count|
+-------+-------+-----+
|1000002|      1|   55|
|1000002|1000006|   33|
|1000002|1000007|    8|
|1000002|1000009|  144|
|1000002|1000010|  314|
+-------+-------+-----+
only showing top 5 rows



In [13]:
user_artist_df.select([min("user"), max("user"), min("artist"), \
                      max("artist")]).show()

[Stage 6:>                                                          (0 + 8) / 8]

+---------+---------+-----------+-----------+
|min(user)|max(user)|min(artist)|max(artist)|
+---------+---------+-----------+-----------+
|       90|  2443548|          1|   10794401|
+---------+---------+-----------+-----------+



                                                                                

In [14]:
from pyspark.sql.functions import col
artist_by_id = raw_artist_data.withColumn('id', split(col('value'), '\s+', 2). \
                                          getItem(0). \
                                          cast(IntegerType()))
artist_by_id.show(5)

+--------------------+--------+
|               value|      id|
+--------------------+--------+
|1134999\t06Crazy ...| 1134999|
|6821360\tPang Nak...| 6821360|
|10113088\tTerfel,...|10113088|
|10151459\tThe Fla...|10151459|
|6826647\tBodensta...| 6826647|
+--------------------+--------+
only showing top 5 rows



In [15]:
artist_by_id = artist_by_id.withColumn('name', split(col('value'), '\s+', 2). \
                                       getItem(1). \
                                       cast(StringType())).drop('value')
artist_by_id.show(5)

+--------+--------------------+
|      id|                name|
+--------+--------------------+
| 1134999|        06Crazy Life|
| 6821360|        Pang Nakarin|
|10113088|Terfel, Bartoli- ...|
|10151459| The Flaming Sidebur|
| 6826647|   Bodenstandig 3000|
+--------+--------------------+
only showing top 5 rows



In [16]:
artist_alias = raw_artist_alias.withColumn('artist', split(col('value'), '\s+'). \
                                            getItem(0). \
                                            cast(IntegerType())). \
                                withColumn('alias', split(col('value'), '\s+'). \
                                           getItem(1). \
                                           cast(StringType())). \
                                drop('value')

artist_alias.show(5)

+--------+-------+
|  artist|  alias|
+--------+-------+
| 1092764|1000311|
| 1095122|1000557|
| 6708070|1007267|
|10088054|1042317|
| 1195917|1042317|
+--------+-------+
only showing top 5 rows



In [17]:
artist_by_id.filter(artist_by_id.id.isin(1092764, 1000311)).show()

                                                                                

+-------+--------------+
|     id|          name|
+-------+--------------+
|1000311| Steve Winwood|
|1092764|Winwood, Steve|
+-------+--------------+



## Building a first Model

In [18]:
from pyspark.sql.functions import broadcast, when

In [19]:
train_data = user_artist_df.join(broadcast(artist_alias), 'artist', how = 'left')

In [20]:
train_data.show(5)

+-------+-------+-----+-----+
| artist|   user|count|alias|
+-------+-------+-----+-----+
|      1|1000002|   55| NULL|
|1000006|1000002|   33| NULL|
|1000007|1000002|    8| NULL|
|1000009|1000002|  144| NULL|
|1000010|1000002|  314| NULL|
+-------+-------+-----+-----+
only showing top 5 rows



In [21]:
train_data = train_data.withColumn('artist', when(col('alias').isNull(), col('artist')). \
                                   otherwise(col('alias')))
train_data.show(5)

+-------+-------+-----+-----+
| artist|   user|count|alias|
+-------+-------+-----+-----+
|      1|1000002|   55| NULL|
|1000006|1000002|   33| NULL|
|1000007|1000002|    8| NULL|
|1000009|1000002|  144| NULL|
|1000010|1000002|  314| NULL|
+-------+-------+-----+-----+
only showing top 5 rows



In [22]:
train_data = train_data.withColumn('artist', col('artist'). \
                                   cast(IntegerType())).drop('alias')

train_data.show(5)

+-------+-------+-----+
| artist|   user|count|
+-------+-------+-----+
|      1|1000002|   55|
|1000006|1000002|   33|
|1000007|1000002|    8|
|1000009|1000002|  144|
|1000010|1000002|  314|
+-------+-------+-----+
only showing top 5 rows



In [23]:
train_data.cache()
train_data.show(5)



+-------+-------+-----+
| artist|   user|count|
+-------+-------+-----+
|      1|1000002|   55|
|1000006|1000002|   33|
|1000007|1000002|    8|
|1000009|1000002|  144|
|1000010|1000002|  314|
+-------+-------+-----+
only showing top 5 rows



                                                                                

In [24]:
train_data.count()

24296858

In [25]:
len(train_data.columns)

3

In [26]:
train_copy = train_data

In [29]:
train_data_ = train_copy.where(train_copy['artist'] < 100)

In [30]:
train_data_.count()

301713

In [31]:
from pyspark.ml.recommendation import ALS

model = ALS(rank=10, seed=0, maxIter=5, regParam=0.1,
            implicitPrefs=True, alpha=1.0, userCol='user',
            itemCol='artist', ratingCol='count').fit(train_data_)

23/12/29 15:25:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/12/29 15:25:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/12/29 15:25:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [39]:
#import pickle
#with open("/Users/quangtn/Desktop/01_work/01_job/02_ml/PySpark/chapter3/model.pkl", "wb") as file:
#    pickle.dump(model, file)
model.save("/Users/quangtn/Desktop/01_work/01_job/02_ml/PySpark/chapter3/model.h5")

                                                                                

In [79]:
user_id = 2093760

existing_artist_ids = train_data.filter(train_data_.user==user_id).select("artist").collect()

In [80]:
existing_artist_ids = [i[0] for i in existing_artist_ids]

In [81]:
artist_by_id.filter(col('id').isin(existing_artist_ids)).show()

+-------+---------------+
|     id|           name|
+-------+---------------+
|   1180|     David Gray|
|    378|  Blackalicious|
|    813|     Jurassic 5|
|1255340|The Saw Doctors|
|    942|         Xzibit|
+-------+---------------+



In [82]:
user_subset = train_data.select('user').where(col('user')==user_id).distinct()

In [83]:
user_subset.show(5)

+-------+
|   user|
+-------+
|2093760|
+-------+



In [84]:
top_predictions = model.recommendForUserSubset(user_subset, 5)

In [85]:
top_predictions.show(5)

+----+---------------+
|user|recommendations|
+----+---------------+
+----+---------------+



In [86]:
top_predictions_pandas = top_predictions.toPandas()

## Evaluating Recommendation quality

In [131]:
import random
from pyspark.sql.functions import col, lit, count, mean, coalesce


def area_under_curve(positive_data, b_all_artist_ids, predict_function) -> float:
    positive_predictions = predict_function(positive_data.select("user", "artist")).withColumnRenamed("prediction", "positivePrediction")

    def negative_data_generation(user_artist_tuples):
        user_negative_artists = []
        for user, pos_artist_ids in user_artist_tuples:
            pos_artist_id_set = set(pos_artist_ids)
            negative_artists = set()
            while len(negative_artists) < len(pos_artist_id_set):
                artist_id = b_all_artist_ids[random.randint(0, len(b_all_artist_ids) - 1)]
                if artist_id not in  pos_artist_id_set:
                    negative_artists.add(artist_id)
            user_negative_artists.extend([(user, artist_id) for artist_id in negative_artists])
        return user_negative_artists

    user_artist_rdd = positive_data.select("user", "artist").rdd.groupByKey().mapValues(list).collect()
    negative_data = spark.createDataFrame(negative_data_generation(user_artist_rdd), schema=["user", "artist"])

    negative_predictions = predict_function(negative_data).withColumnRenamed("prediction", "negativePrediction")

    joined_predictions = positive_predictions.join(negative_predictions, "user").select("user", "positivePrediction", "negativePrediction").cache()

    all_counts = joined_predictions.groupBy("user").agg(count(lit(1)).alias("total")).select("user", "total")
    correct_counts = joined_predictions.filter(col("positivePrediction") > col("negativePrediction")).groupby("user").agg(count("user").alias("correct")).select("user", "correct")

    mean_auc = all_counts.join(correct_counts, ["user"], "left_outer").select(col("user"), (coalesce(col("correct"), lit(0)) / col("total")).alias("auc")).agg(mean("auc")).collect()[0][0]
    
    joined_predictions.unpersist()
    return mean_auc

In [96]:
all_data = user_artist_df.join(broadcast(artist_alias), 'artist', how='left') \
                        .withColumn('artist', when(col('alias').isNull(), col('artist')) \
                        .otherwise(col('alias'))) \
                        .withColumn('artist', col('artist').cast(IntegerType())).drop('alias')

In [97]:
all_data.count()

                                                                                

24296858

In [98]:
copy_data = all_data

In [99]:
all_data.show(5)

+-------+-------+-----+
| artist|   user|count|
+-------+-------+-----+
|      1|1000002|   55|
|1000006|1000002|   33|
|1000007|1000002|    8|
|1000009|1000002|  144|
|1000010|1000002|  314|
+-------+-------+-----+
only showing top 5 rows



In [102]:
all_data_ = all_data.where(all_data.artist < 500)
all_data_.count()

1026625

In [103]:
train_data, cv_data = all_data_.randomSplit([0.9, 0.1], seed=54321)
train_data.cache()
cv_data.cache()

DataFrame[artist: int, user: int, count: int]

In [104]:
all_artist_ids = all_data_.select("artist").distinct()

In [105]:
all_artist_ids = [i[0] for i in all_artist_ids.collect()]

In [107]:
len(all_artist_ids)

360

In [108]:
model = ALS(rank=10, seed=0, maxIter=5, regParam=0.1,
            implicitPrefs=True, alpha=1.0, userCol='user',
            itemCol="artist", ratingCol="count").fit(train_data)

                                                                                

In [109]:
model.save("/Users/quangtn/Desktop/01_work/01_job/02_ml/PySpark/chapter3/model2train.h5")

                                                                                

In [132]:
area_under_curve(cv_data, all_artist_ids, model.transform)

                                                                                

0.8286901438248726

In [135]:
from pyspark.sql.functions import sum as _sum

def predict_most_listened(train):
    listen_counts = train.groupBy("artist").agg(_sum("count").alias("prediction")).select("artist", "prediction")
    return train.join(listen_counts, "artist", "left_outer").select("user", "artist", "prediction")

In [137]:
area_under_curve(cv_data, all_artist_ids, predict_most_listened(train_data))

## Hyperparameter selection

In [138]:
from pprint import pprint
from itertools import product

In [140]:
ranks = [5, 30]
reg_params = [4.0, 0.0001]
alphas = [1.0, 40.0]
hyperparam_combinations = list(product(*[ranks, reg_params, alphas]))

In [144]:
evaluations = []

for c in hyperparam_combinations:
    rank = c[0]
    reg_param = c[1]
    alpha = c[2]
    model = ALS().setSeed(0).setImplicitPrefs(True).setRank(rank).setRegParam(reg_param).setAlpha(alpha).setMaxIter(20).setUserCol("user").setItemCol("artist").setRatingCol("count").setPredictionCol("prediction").fit(train_data)

    auc = area_under_curve(cv_data, all_artist_ids, model.transform)

    model.userFactors.unpersist()
    model.itemFactors.unpersist()

    evaluations.append((auc, (rank, reg_param, alpha)))

23/12/29 17:51:25 WARN BlockManager: Block rdd_5763_0 already exists on this machine; not re-adding it
                                                                                

In [149]:
evaluations.sort(key=lambda x: x[0], reverse=True)
pprint(evaluations)

[(0.8662409297413768, (30, 4.0, 1.0)),
 (0.8646001772550876, (5, 4.0, 1.0)),
 (0.8510079003145494, (30, 4.0, 40.0)),
 (0.8415942950410648, (5, 0.0001, 1.0)),
 (0.840553188245789, (5, 4.0, 40.0)),
 (0.8042284110688335, (5, 0.0001, 40.0)),
 (0.7250629011496921, (30, 0.0001, 1.0)),
 (0.6889394822574169, (30, 0.0001, 40.0))]


In [154]:
some_users = all_data.select("user").distinct().limit(100)

def make_recommendation(model, user_id, num_recs):
    user_subset = train_data.select('user').where(col('user') == user_id).distinct()
    recommendations = model.recommendForUserSubset(user_subset, num_recs)
    return recommendations

some_recommendations = [(user_id[0], make_recommendation(model, user_id[0], 5)) for user_id in some_users.collect()]


In [161]:
for user_id, recs_df in some_recommendations:
    recs_df = recs_df.select("recommendations")
    recommedend_artists = [row.asDict()["artist"] for row in recs_df.collect()[0][0]]
    print(f"{user_id} -> {', '.join(map(str, recommedend_artists))}")

1000190 -> 82, 441, 452, 15, 316


                                                                                

1001043 -> 59, 15, 2, 275, 62
1001129 -> 435, 250, 181, 434, 18


                                                                                

1001139 -> 100, 228, 281, 421, 75
1002431 -> 82, 30, 377, 15, 181


                                                                                

1002605 -> 15, 313, 407, 189, 242
1004666 -> 18, 425, 360, 4, 217


                                                                                

1005158 -> 30, 33, 405, 181, 137
1005439 -> 275, 189, 304, 253, 407


                                                                                

1005697 -> 393, 250, 49, 420, 300
1005853 -> 234, 425, 393, 478, 83


                                                                                

1007007 -> 78, 82, 122, 316, 463


IndexError: list index out of range

In [None]:
recommended_artists = [row.asDict()["artist"] for row in recs_df.collect()[0][0]]
