# Recommender Data Generation 

In [174]:
from pyspark import SparkContext
import pyspark.sql.functions as F
import pandas as pd

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [131]:
df_play = spark.read.csv('../data/play_ds.csv',header=True)
df_play.show(5)

+---------+------+--------+----------+---------+-----------+
|      uid|device| song_id|      date|play_time|song_length|
+---------+------+--------+----------+---------+-----------+
|168551247|    ar|11881432|2017-03-30|       78|        149|
|168549788|    ip|  295469|2017-03-30|       16|        242|
|168543026|    ar| 6623026|2017-03-30|        0|          0|
|168550571|    ar|       0|2017-03-30|       24|        251|
|168548101|    ip| 6913185|2017-03-30|       40|        198|
+---------+------+--------+----------+---------+-----------+
only showing top 5 rows



In [132]:
df_play_time = df_play.withColumn("play_time_tot", F.when(
    F.col('play_time')<=0, 0
).otherwise(F.col('play_time')))

### Generate ratings of each user i to each song j as :
 (total play time of user i to song j)/ (user i's longest playing time of all songs)

In [133]:
def playtime_generation(df):

    df_play_time = df \
        .groupBy('uid','song_id') \
        .agg(F.sum(F.col('play_time_tot')).alias('playtime_tot') 
            )
    return df_play_time

df_ratings_time = playtime_generation(df_play_time)

In [134]:
df_ratings_tmp = df_ratings_time.groupBy('uid').agg(F.max(F.col('playtime_tot')).alias('max_playtime'))

In [135]:
#df_rating is the long format of the ratings
df_ratings = df_ratings_time.join(df_ratings_tmp,on='uid',how='left')
df_rating = df_ratings.withColumn('ratings', F.col('playtime_tot')/F.col('max_playtime')).select('uid','song_id','ratings')

In [136]:
type(df_rating.rdd)

pyspark.rdd.RDD

In [137]:
df_rating.show(3)

+--------+--------+-------------------+
|     uid| song_id|            ratings|
+--------+--------+-------------------+
|11596711|15807836|0.06303972366148532|
|11596711|  169744|0.14709268854346574|
|11596711|  149745| 0.1079447322970639|
+--------+--------+-------------------+
only showing top 3 rows



### Remove missing values from df_ratings.
The missingness is from the down-sampled play_ds.csv data

In [138]:
df_rating2 = df_rating.filter(~(df_rating["ratings"].isNull() | df_rating["song_id"].isNull()))
print(df_rating2.filter(df_rating2["ratings"].isNull() ).count())
print(df_rating2.count())


0
3093222


### Convert uid and song_id to interger format


In [139]:
# df_rating2["ratings"].max()
# print(df_rating2.agg({"ratings": "max"}).collect())
# print(df_rating2.agg({"song_id": "max"}).collect())
# print(df_rating2.agg({"uid": "max"}).collect())
print(df_rating2.dtypes)
from pyspark.sql.types import IntegerType
df_rating3 = df_rating2.withColumn(
    "uid", df_rating2["uid"].cast(IntegerType())
    ).withColumn(
    "song_id", df_rating2["song_id"].cast(IntegerType())
    ).dropna()
print(df_rating3.dtypes)


[('uid', 'string'), ('song_id', 'string'), ('ratings', 'double')]
[('uid', 'int'), ('song_id', 'int'), ('ratings', 'double')]


In [None]:
# df_rating3.show()
# df_rating3.subtract(df_rating3.dropna()).show()

In [141]:
df_rating3.toPandas().to_csv('../data/df_rating.csv',index=False)

## Fit model with spark ALS

In [143]:
(training, test) = df_rating3.randomSplit([0.8, 0.2])

In [153]:
# model = ALS.train(training.rdd, rank = 10, iterations = 5, lambda_=0.01)
als = ALS(maxIter=5, regParam=0.01, userCol="uid", itemCol="song_id", ratingCol="ratings",
          coldStartStrategy="drop")
model1 = als.fit(training)

### Model evaluation

In [178]:
predictions = model1.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="ratings",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


Root-mean-square error = 0.19986818827118968


In [182]:
test.agg(F.mean('ratings')).show()

+-------------------+
|       avg(ratings)|
+-------------------+
|0.14028382220048344|
+-------------------+

