In [1]:
import os
exec(open(os.path.join(os.environ["SPARK_HOME"], "python/pyspark/shell.py")).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.7.1 (default, Dec 14 2018 13:28:58)
SparkSession available as 'spark'.


In [2]:
import os
import warnings
warnings.filterwarnings('ignore')
import pandas as pd

import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import Bucketizer, MinMaxScaler, VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml import Transformer
from pyspark.sql import Window


In [3]:
# this file contains 'uid', 'song_id' and freqency features for implicity rating

df = spark.read.csv("/Users/fanyang/Documents/musicbox/data/recommender_model01_0116.csv",
                   header=True, inferSchema=True).cache()

In [4]:
pd.DataFrame(df.take(5), columns=df.columns)

Unnamed: 0,_c0,uid,song_id,comp_play_last_7,comp_play_last_14,comp_play_last_21,comp_play_last_30,comp_play_last_44,freq_P_last_7,freq_P_last_14,freq_P_last_21,freq_P_last_30,freq_P_last_44,freq_D_last_7,freq_D_last_14,freq_D_last_21,freq_D_last_30,freq_D_last_44
0,0,103103073,572912,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0
1,1,104737814,0,0,0,0,0,0,0,0,0,0,127,0,0,0,0,0
2,2,10919480,277650,2,2,2,2,3,2,2,2,2,3,0,0,0,0,1
3,3,10919480,389413,0,0,0,1,1,0,0,0,1,1,0,0,0,0,0
4,4,10919480,461313,1,2,2,2,2,1,2,2,3,6,0,0,0,0,1


### 1. data validation (remove inactive uid/song_id, invalid frequency features)

In [5]:
print("Before data cleaning, original shape of dataframe is: ")
print(df.count(), len(df.columns))

Before data cleaning, original shape of dataframe is: 
2005120 18


In [6]:
df.agg(F.countDistinct('uid'), F.countDistinct('song_id')).show()

+-------------------+-----------------------+
|count(DISTINCT uid)|count(DISTINCT song_id)|
+-------------------+-----------------------+
|              56065|                 311180|
+-------------------+-----------------------+



In [7]:
# clean data 
df = df.drop('_c0')
df = df[df.song_id != 0]

In [8]:
# find the most active uid
df_user_select = df.groupBy('uid').count().orderBy(F.col('count'), ascending=False).show(10)

+---------+-----+
|      uid|count|
+---------+-----+
|169031835| 2223|
|168451768| 1503|
|168139162| 1448|
|168954949| 1425|
|168479098| 1404|
|167587977| 1327|
|168255392| 1324|
|168636306| 1318|
|168393361| 1250|
|168156556| 1248|
+---------+-----+
only showing top 10 rows



In [9]:
# find the least active uid
df_user_select = df.groupBy('uid').count().orderBy(F.col('count')).show(10)

+---------+-----+
|      uid|count|
+---------+-----+
|167723216|    1|
|168437519|    1|
|168612160|    1|
|168680386|    1|
|169030108|    1|
|168670275|    1|
|167903161|    1|
|167644114|    1|
|168730743|    1|
|168746784|    1|
+---------+-----+
only showing top 10 rows



In [10]:
# find the most popular song_id
df_song_select = df.groupBy('song_id').count().orderBy(F.col('count'), ascending=False).show(10)

+--------+-----+
| song_id|count|
+--------+-----+
|15249349| 9267|
| 9950164| 8729|
| 6468891| 5570|
| 5237384| 5343|
| 3287564| 4799|
|15807836| 4471|
| 5114569| 4230|
| 6657692| 3928|
| 3620537| 3599|
| 7149583| 3255|
+--------+-----+
only showing top 10 rows



In [11]:
# find the least popular song_id
df_song_select = df.groupBy('song_id').count().orderBy(F.col('count')).show(10)

+--------+-----+
| song_id|count|
+--------+-----+
|  123146|    1|
| 6188853|    1|
|19374223|    1|
| 3577158|    1|
| 1106674|    1|
| 3230157|    1|
| 2834769|    1|
| 4307451|    1|
| 4119915|    1|
|13438469|    1|
+--------+-----+
only showing top 10 rows



In [12]:
# remove most inactive uid
df_user_select = df.groupBy('uid').count().where(F.col('count')>10)
df = df_user_select.join(df, on=['uid'], how='left')

In [13]:
# remove most inactive song_id
df_song_select = df.groupBy('song_id').count().where(F.col('count')>10)
df = df_song_select.join(df, on=['song_id'], how='left')

In [14]:
df.agg(F.countDistinct('uid'), F.countDistinct('song_id')).show()

+-------------------+-----------------------+
|count(DISTINCT uid)|count(DISTINCT song_id)|
+-------------------+-----------------------+
|              30137|                  24144|
+-------------------+-----------------------+



In [15]:
print("After removing inactive uid and song_id, the current shape of dataframe is: ")
print(df.count(), len(df.columns))

After removing inactive uid and song_id, the current shape of dataframe is: 
1359307 19


### 2. generate implicit rating from frequency feature.

In [16]:
# method 1: select features from longest time frame 44 days for implicit rating
# remove rows that the selected feature contain zero

df_feature_select = df.select('uid', 'song_id','comp_play_last_44', 'freq_P_last_44', 'freq_D_last_44') \
.where(((F.col('comp_play_last_44')!=0)|(F.col('freq_P_last_44')!=0)) & (F.col('freq_D_last_44')!=0))

In [17]:
# check NULL before feature transformation

df_feature_select.select(*[F.sum(F.col(c).isNull().cast('int')) \
                           .alias(c) for c in df_feature_select.columns]).show()

+---+-------+-----------------+--------------+--------------+
|uid|song_id|comp_play_last_44|freq_P_last_44|freq_D_last_44|
+---+-------+-----------------+--------------+--------------+
|  0|      0|                0|             0|             0|
+---+-------+-----------------+--------------+--------------+



In [18]:
# transform features(take log10, and then combine the three features into one)
# implicit rating: assuming 'comp_play_last_44', 'freq_P_last_44', 'freq_D_last_44'  Equally contribute to rating

df_feature_transform = df_feature_select.withColumn('comp_play_last_44_transf', F.log10(F.col('comp_play_last_44')+1)) \
                        .withColumn('freq_P_last_44_transf', F.log10(F.col('freq_P_last_44')+1)) \
                        .withColumn('freq_D_last_44_transf', F.log10(F.col('freq_D_last_44')+1)) \
                        .withColumn('rating', 0.33*F.col('comp_play_last_44_transf')+0.33*F.col('freq_P_last_44_transf')\
                            +0.33*F.col('freq_D_last_44_transf')) 

In [19]:
# transforma and standardrize rating 
# VectorAssembler is a transformer that combines a given list of columns into a single vector column


assembler = VectorAssembler(inputCols=['rating'], outputCol='rating_assembled')
scaler = MinMaxScaler(inputCol='rating_assembled', outputCol='rating_scaled')
pp = Pipeline(stages=[assembler, scaler])
pp_model = pp.fit(df_feature_transform)
df_final = pp_model.transform(df_feature_transform)

In [20]:
df_final

DataFrame[uid: int, song_id: decimal(20,0), comp_play_last_44: int, freq_P_last_44: int, freq_D_last_44: int, comp_play_last_44_transf: double, freq_P_last_44_transf: double, freq_D_last_44_transf: double, rating: double, rating_assembled: vector, rating_scaled: vector]

In [21]:
# udf User defined function
Vec2num_udf = F.udf(lambda x: float(x[0]), DoubleType())
df_final = df_final.withColumn('rating_scaled_num', Vec2num_udf('rating_scaled'))

In [22]:
df_final

DataFrame[uid: int, song_id: decimal(20,0), comp_play_last_44: int, freq_P_last_44: int, freq_D_last_44: int, comp_play_last_44_transf: double, freq_P_last_44_transf: double, freq_D_last_44_transf: double, rating: double, rating_assembled: vector, rating_scaled: vector, rating_scaled_num: double]

In [23]:
# Bucketizer transforms a column of continuous features to a column of feature buckets
# put scaled_rating into 5 bins

splits = [.0, 1/30, 1/15, 5/15, 7/15, 1]
bucketizer = Bucketizer(splits=splits, inputCol='rating_scaled_num', outputCol='implicit_ratings')
df_final = bucketizer.transform(df_final)
df_final = df_final.withColumn('implicit_ratings', F.col('implicit_ratings')+1)

In [24]:
df_final

DataFrame[uid: int, song_id: decimal(20,0), comp_play_last_44: int, freq_P_last_44: int, freq_D_last_44: int, comp_play_last_44_transf: double, freq_P_last_44_transf: double, freq_D_last_44_transf: double, rating: double, rating_assembled: vector, rating_scaled: vector, rating_scaled_num: double, implicit_ratings: double]

In [25]:
df_final_model = df_final.select('uid', 'song_id', 'implicit_ratings')

In [26]:
df_final_model.printSchema()

root
 |-- uid: integer (nullable = true)
 |-- song_id: decimal(20,0) (nullable = true)
 |-- implicit_ratings: double (nullable = true)



In [27]:
df_final_model.groupBy('implicit_ratings').count().show()

+----------------+------+
|implicit_ratings| count|
+----------------+------+
|             1.0| 21681|
|             4.0| 16371|
|             3.0|120856|
|             2.0| 23822|
|             5.0|  3759|
+----------------+------+



### 3. build recommendation model

In [28]:
def randomSplitByUID(df, weights, seed=None):
    trainingRation = weights[0]
    fractions = {row['uid']: trainingRation for row in df.select('uid').distinct().collect()}
    training = df.sampleBy('uid', fractions, seed)
    testRDD = df.rdd.subtract(training.rdd)
    test = spark.createDataFrame(testRDD, df.schema)
    return training, test

train, test = randomSplitByUID(df_final_model, weights=[0.7, 0.3])

In [29]:
# implicitPrefs: specifies whether to use the explicit rating (defaults to false, the explicit rating).
# rank: the number of latent factors in the model (defaults to 10).
# maxIter: maximum number of iterations to run (defaults to 10).
# regParam: specifies the regularization parameter in ALS (defaults to 1.0)
# alpha: a parameter for ALS implicit rating governing baseline confidence (defaults to 1.0).

# model_1

als = ALS(implicitPrefs=True, seed=42,userCol="uid", itemCol="song_id", ratingCol="implicit_ratings") \
    .setRank(50) \
    .setMaxIter(22) \
    .setRegParam(0.5) \
    .setAlpha(40)

model = als.fit(train)

In [30]:
recommendations = model.transform(test)
recommendations.show(10)

+---------+-------+----------------+------------+
|      uid|song_id|implicit_ratings|  prediction|
+---------+-------+----------------+------------+
|168497431| 118989|             2.0|   0.5367587|
|168966848| 118989|             2.0|   0.8157581|
|168398182| 118989|             2.0|  0.68088716|
|168794096| 118989|             3.0|   0.1478242|
|168666973| 133948|             3.0|  -0.3624314|
|168761984| 167532|             1.0|         NaN|
| 74921309| 200878|             4.0| 0.061903685|
|168053065| 235318|             3.0|  0.21362168|
|167637737| 235318|             4.0|-0.056609396|
|167963231| 255362|             1.0|   1.0998604|
+---------+-------+----------------+------------+
only showing top 10 rows



In [31]:
recommendations.show(10)

+---------+-------+----------------+------------+
|      uid|song_id|implicit_ratings|  prediction|
+---------+-------+----------------+------------+
|168497431| 118989|             2.0|   0.5367587|
|168966848| 118989|             2.0|   0.8157581|
|168398182| 118989|             2.0|  0.68088716|
|168794096| 118989|             3.0|   0.1478242|
|168666973| 133948|             3.0|  -0.3624314|
|168761984| 167532|             1.0|         NaN|
| 74921309| 200878|             4.0| 0.061903685|
|168053065| 235318|             3.0|  0.21362168|
|167637737| 235318|             4.0|-0.056609396|
|167963231| 255362|             1.0|   1.0998604|
+---------+-------+----------------+------------+
only showing top 10 rows



In [32]:
# make recommendation for each uid

from pyspark.sql.functions import col
from pyspark.sql.functions import expr

windowSpec = Window.partitionBy('uid').orderBy(col('prediction').desc())
predicted_recommendation = recommendations \
            .select('uid', 'song_id', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(10)) \
            .groupBy('uid') \
            .agg(expr('collect_list(song_id) as song_id'))

predicted_recommendation.show()

+---------+--------------------+
|      uid|             song_id|
+---------+--------------------+
|167582087|[3247615, 6203964...|
|167627297|  [6212893, 6660691]|
|167674030|[96891, 6771014, ...|
|167683346|[6196652, 3973161...|
|167697454|[850803, 3223114,...|
|167727745|           [9918220]|
|167735352|            [516330]|
|167746855|  [4849640, 5390879]|
|167760432|[1128081, 507941,...|
|167888996|[1149606, 908531,...|
|167894223|[3565454, 7116549...|
|167910793| [6435339, 24013319]|
|167913407|[6651583, 4982519...|
|167979490|  [6159657, 4152712]|
|167993496|           [7187500]|
|168045751|           [6700790]|
|168054336|[157606, 1041659,...|
|168150258|            [328037]|
|168236162|[874711, 1166670,...|
|168275526|[6686279, 708518,...|
+---------+--------------------+
only showing top 20 rows



In [33]:
predicted_recommendation.show(10)

+---------+--------------------+
|      uid|             song_id|
+---------+--------------------+
|167582087|[3247615, 6203964...|
|167627297|  [6212893, 6660691]|
|167674030|[96891, 6771014, ...|
|167683346|[6196652, 3973161...|
|167697454|[850803, 3223114,...|
|167727745|           [9918220]|
|167735352|            [516330]|
|167746855|  [4849640, 5390879]|
|167760432|[1128081, 507941,...|
|167888996|[1149606, 908531,...|
+---------+--------------------+
only showing top 10 rows



In [34]:
# list actual recommendations based on uid's implicit ratings

windowSpec = Window.partitionBy('uid').orderBy(col('implicit_ratings').desc())
rated_recommendation = recommendations \
            .select('uid', 'song_id', 'implicit_ratings', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(30)) \
            .groupBy('uid') \
            .agg(expr('collect_list(song_id) as song_id'))

rated_recommendation.show(10)

+---------+--------------------+
|      uid|             song_id|
+---------+--------------------+
|167582087|[1102831, 868744,...|
|167627297|  [6212893, 6660691]|
|167674030|[7186052, 2365697...|
|167683346|[3973161, 6357412...|
|167697454|[1928478, 590383,...|
|167727745|           [9918220]|
|167735352|            [516330]|
|167746855|  [4849640, 5390879]|
|167760432|[94232, 978692, 1...|
|167888996|[1149606, 908531,...|
+---------+--------------------+
only showing top 10 rows



In [35]:
rated_recommendation.show()

+---------+--------------------+
|      uid|             song_id|
+---------+--------------------+
|167582087|[1102831, 868744,...|
|167627297|  [6212893, 6660691]|
|167674030|[7186052, 2365697...|
|167683346|[3973161, 6357412...|
|167697454|[1928478, 590383,...|
|167727745|           [9918220]|
|167735352|            [516330]|
|167746855|  [4849640, 5390879]|
|167760432|[94232, 978692, 1...|
|167888996|[1149606, 908531,...|
|167894223|[3565454, 7024394...|
|167910793| [24013319, 6435339]|
|167913407|[7051854, 4982519...|
|167979490|  [6159657, 4152712]|
|167993496|           [7187500]|
|168045751|           [6700790]|
|168054336|[3401253, 86762, ...|
|168150258|            [328037]|
|168236162|[955482, 6412113,...|
|168275526|[708528, 4441094,...|
+---------+--------------------+
only showing top 20 rows



In [36]:
# use ranking for model evaluation

recommendaton_evaluation = predicted_recommendation.join(F.broadcast(rated_recommendation), 'uid', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))    

In [37]:
from pyspark.mllib.evaluation import RankingMetrics

rankingMetrics = RankingMetrics(recommendaton_evaluation)
metric = rankingMetrics.ndcgAt(30)

print('evaluation of model_1: ')
print('metric: ', '%.4f' %metric)
print('meanAveragePrecision: ', '%.4f' %rankingMetrics.meanAveragePrecision)
print('precisionAt: ', '%.4f'% rankingMetrics.precisionAt(30))

evaluation of model_1: 
metric:  0.9684
meanAveragePrecision:  0.9551
precisionAt:  0.1336


### 4. tune parameter of recommendation model

In [39]:
# implicitPrefs: specifies whether to use the explicit rating (defaults to false, the explicit rating).
# rank: the number of latent factors in the model (defaults to 10).
# maxIter: maximum number of iterations to run (defaults to 10).
# regParam: specifies the regularization parameter in ALS (defaults to 1.0)
# alpha: a parameter for ALS implicit rating governing baseline confidence (defaults to 1.0).

als = ALS(implicitPrefs=True, seed=42,userCol="uid", itemCol="song_id", ratingCol="implicit_ratings") \
    .setRank(100) \
    .setMaxIter(40) \
    .setRegParam(0.1) \
    .setAlpha(1)

model = als.fit(train)
recommendations = model.transform(test)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.4.0/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.4.0/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/local/Cellar/apache-spark/2.4.0/libexec/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: org does not exist in the JVM

In [None]:
windowSpec = Window.partitionBy('uid').orderBy(col('prediction').desc())
predicted_recommendation = recommendations \
            .select('uid', 'song_id', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(10)) \
            .groupBy('uid') \
            .agg(expr('collect_list(song_id) as song_id'))



windowSpec = Window.partitionBy('uid').orderBy(col('implicit_ratings').desc())
rated_recommendation = recommendations \
            .select('uid', 'song_id', 'implicit_ratings', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(30)) \
            .groupBy('uid') \
            .agg(expr('collect_list(song_id) as song_id'))




In [None]:
recommendaton_evaluation = predicted_recommendation.join(F.broadcast(rated_recommendation), 'uid', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))  


rankingMetrics = RankingMetrics(recommendaton_evaluation)
metric = rankingMetrics.ndcgAt(30)
print('evaluation of model_2: ')
print('metric: ', '%.4f' %metric)
print('meanAveragePrecision: ', '%.4f' %rankingMetrics.meanAveragePrecision)
print('precisionAt: ', '%.4f'% rankingMetrics.precisionAt(30))

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 53474)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/socketserver.py", line 313, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/anaconda3/lib/python3.7/socketserver.py", line 344, in process_request
    self.finish_request(request, client_address)
  File "/anaconda3/lib/python3.7/socketserver.py", line 357, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/anaconda3/lib/python3.7/socketserver.py", line 717, in __init__
    self.handle()
  File "/usr/local/Cellar/apache-spark/2.4.0/libexec/python/pyspark/accumulators.py", line 268, in handle
    poll(accum_updates)
  File "/usr/local/Cellar/apache-spark/2.4.0/libexec/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/local/Cellar/apache-spark/2.4.0/libexec/python/pyspark/accumulators.py", line 245, in accum