In [1]:
import os
# os.environ["SPARK_HOME"]
exec(open(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py')).read())
sc

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/

Using Python version 3.6.5 (default, Mar 29 2018 13:14:23)
SparkSession available as 'spark'.


## Load Dataset

### Create Implicit Rating

In [2]:
import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import Bucketizer, MinMaxScaler, VectorAssembler

from pyspark.sql.types import DoubleType

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [3]:
df = spark.read.csv('../data/df_reco_final.csv',header=True,inferSchema=True).cache()
df.printSchema()
df.count()

root
 |-- uid: integer (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- freq_P_last_1: integer (nullable = true)
 |-- freq_P_last_3: integer (nullable = true)
 |-- freq_P_last_7: integer (nullable = true)
 |-- freq_P_last_14: integer (nullable = true)
 |-- freq_P_last_30: integer (nullable = true)
 |-- freq_P_last_44: integer (nullable = true)
 |-- freq_D_last_1: integer (nullable = true)
 |-- freq_D_last_3: integer (nullable = true)
 |-- freq_D_last_7: integer (nullable = true)
 |-- freq_D_last_14: integer (nullable = true)
 |-- freq_D_last_30: integer (nullable = true)
 |-- freq_D_last_44: integer (nullable = true)
 |-- rec_P: integer (nullable = true)
 |-- rec_D: integer (nullable = true)
 |-- rela_freq_last_1: integer (nullable = true)
 |-- rela_freq_last_3: integer (nullable = true)
 |-- rela_freq_last_7: integer (nullable = true)
 |-- rela_freq_last_14: integer (nullable = true)
 |-- rela_freq_last_30: integer (nullable = true)
 |-- rela_freq_last_44: integer (nulla

553324

In [4]:
# user record greater than 5
df_user_gt5 = df.groupBy('uid').count().where(F.col('count')>5)

In [5]:
df_user_gt5.where(F.col('uid')==6871077).show()

+-------+-----+
|    uid|count|
+-------+-----+
|6871077|  176|
+-------+-----+



In [6]:
df = df_user_gt5.join(df, on=['uid'], how='left')

In [7]:
df.where(F.col('uid')==6871077).show()

+-------+-----+--------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-----+-----+----------------+----------------+----------------+-----------------+-----------------+-----------------+
|    uid|count| song_id|freq_P_last_1|freq_P_last_3|freq_P_last_7|freq_P_last_14|freq_P_last_30|freq_P_last_44|freq_D_last_1|freq_D_last_3|freq_D_last_7|freq_D_last_14|freq_D_last_30|freq_D_last_44|rec_P|rec_D|rela_freq_last_1|rela_freq_last_3|rela_freq_last_7|rela_freq_last_14|rela_freq_last_30|rela_freq_last_44|
+-------+-----+--------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-----+-----+----------------+----------------+----------------+-----------------+-----------------+-----------------+
|6871077|  176|22399766|            0|            0|

In [8]:
df_reco = df.select('uid', 'song_id', 'freq_P_last_44', 'freq_D_last_44', 'rela_freq_last_44') \
            .where((F.col('freq_D_last_44')!=0) & ((F.col('freq_P_last_44')!=0)|(F.col('rela_freq_last_44')!=0)))

In [9]:
df_reco = df_reco.withColumn('freq_P_last_44_log10', F.log10(F.col('freq_P_last_44')+1)) \
                .withColumn('freq_D_last_44_log10', F.log10(F.col('freq_D_last_44')+1)) \
                .withColumn('rela_freq_last_44_log10', F.log10(F.col('rela_freq_last_44')+1)) \
                .withColumn('rating', 0.2*F.col('freq_P_last_44_log10')+0.5*F.col('freq_D_last_44_log10')\
                            +0.3*F.col('rela_freq_last_44_log10'))
df_reco.printSchema()

root
 |-- uid: integer (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- freq_P_last_44: integer (nullable = true)
 |-- freq_D_last_44: integer (nullable = true)
 |-- rela_freq_last_44: integer (nullable = true)
 |-- freq_P_last_44_log10: double (nullable = true)
 |-- freq_D_last_44_log10: double (nullable = true)
 |-- rela_freq_last_44_log10: double (nullable = true)
 |-- rating: double (nullable = true)



In [10]:
assembler = VectorAssembler(inputCols=['rating'], outputCol='rating_assembled')
scaler = MinMaxScaler(inputCol='rating_assembled', outputCol='rating_scaled')

## pp: pipeline
pp = Pipeline(stages=[assembler, scaler])
pp_model = pp.fit(df_reco)
df_reco= pp_model.transform(df_reco)
# df_reco.show(1)

In [11]:
## Convert column type back to double
Vec2num_udf = F.udf(lambda x: float(x[0]), DoubleType())
df_reco = df_reco.withColumn('rating_scaled_num', Vec2num_udf('rating_scaled'))
# df_reco.show()

In [12]:
splits = [.0, 1/30, 1/15, 5/15, 7/15, 1]
bucketizer = Bucketizer(splits=splits, inputCol='rating_scaled_num', outputCol='rating_final')
df_reco= bucketizer.transform(df_reco)
df_reco = df_reco.withColumn('rating_final', F.col('rating_final')+1)

In [13]:
df_reco_spark = df_reco.select('uid', 'song_id', 'rating_scaled_num', 'rating_final')
df_reco_spark.show(5)

+--------+--------+-------------------+------------+
|     uid| song_id|  rating_scaled_num|rating_final|
+--------+--------+-------------------+------------+
|16202430|23110924| 0.0857279276377621|         3.0|
|46307155| 9950164| 0.2816640595599923|         3.0|
|66767310| 4384363| 0.5713650852501141|         5.0|
|66767310|15706618| 0.5361336318936317|         5.0|
|76151459|  711119|0.21686141010778345|         3.0|
+--------+--------+-------------------+------------+
only showing top 5 rows



Remember that we don't handle the "cold-start". The <font color = red>cold start </font> strategy is following the strategy of Spark MLlib.

In [14]:
df_reco_spark.groupBy('rating_final').count().show()

+------------+-----+
|rating_final|count|
+------------+-----+
|         1.0|11983|
|         4.0| 9124|
|         3.0|37548|
|         2.0| 6624|
|         5.0| 7168|
+------------+-----+



In [15]:
df_reco_spark.where(F.col('uid')==6871077).show()

+---+-------+-----------------+------------+
|uid|song_id|rating_scaled_num|rating_final|
+---+-------+-----------------+------------+
+---+-------+-----------------+------------+



## Collaborative Filtering (PySpark ALS)

### Train Test Split

In [16]:
split_seed = 301
train, test = df_reco_spark.randomSplit(weights=[.7,.3],seed=split_seed)

### Spark ALS Recommender

In [17]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=10, regParam=0.2, userCol="uid", itemCol="song_id", ratingCol="rating_final",
          coldStartStrategy="drop")
model = als.fit(train)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating_final",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.1888604993089464


In [18]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [19]:
userRecs.where(F.col('uid')==16202430).show(truncate=False)

+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|uid     |recommendations                                                                                                                                                                                                               |
+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|16202430|[[7052957, 3.2940733], [6854188, 3.227839], [11359814, 3.1276214], [11170240, 3.1276214], [12233050, 3.1276214], [10817575, 3.1276214], [22828525, 3.1276214], [6240020, 3.0941505], [824701, 3.0319943], [532945, 3.0319943]]|
+--------+------------------------------------------------------

In [20]:
# Generate top 10 movie recommendations for a specified set of users
users = df_reco_spark.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = df_reco_spark.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

In [21]:
userSubsetRecs.show(truncate=False)

+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|uid      |recommendations                                                                                                                                                                                                           |
+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|168914470|[[1865786, 3.7717478], [22840950, 3.741018], [4135731, 3.6468449], [6855406, 3.6468449], [4316348, 3.6468449], [6953665, 3.645421], [22810480, 3.645421], [1137753, 3.645421], [2288377, 3.6043441], [4173999, 3.6043441]]|
|168886705|[[6188952, 2.7968771], [5383825, 2.7342765], [815176, 2.7132921],