In [5]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
import hashlib
from pyspark.sql.functions import udf

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS


import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

from pyspark.sql.types import *
from pyspark.sql import Row

from pyspark.ml.feature import StringIndexer

In [6]:
conf = SparkConf().setAppName("test").setMaster("*")
spark = SparkSession.builder.getOrCreate()

In [7]:
def fix_ids(s):
    return int(int(hashlib.sha1(s).hexdigest(), 16) % (10 ** 8))

fix_ids_udf = udf(fix_ids)

def fix_decimal_values(s):
    return round(s,2)

fix_round_udf = udf(fix_decimal_values)

def conv_to_int(v):
    return int(v)

to_int_udf = udf(conv_to_int)

In [8]:
df = spark.read.json('data/dataset/review.json')
rating_df = df.select(df['user_id'],df['stars'].cast('float'),df['business_id'],df['date'].cast('date') )

In [9]:
# indexer = StringIndexer(inputCol="user_id", outputCol="userId")
# indexed = indexer.fit(rating_df).transform(rating_df)
# rating_df = indexed.withColumn('userId',to_int_udf(indexed['userId']))
#rating_df.show()

In [10]:
# indexer = StringIndexer(inputCol="business_id", outputCol="businessId")
# indexed = indexer.fit(rating_df).transform(rating_df)
# rating_df = indexed.withColumn('businessId',to_int_udf(col('businessId')))
#rating_df.show()

In [11]:
# rating_df = rating_df.select('userId','businessId','stars','date')
# rating_df.show(5)

In [12]:
rating_df= rating_df.withColumn('userId',fix_ids_udf(rating_df['user_id']).cast('int'))
rating_df= rating_df.withColumn('businessId',fix_ids_udf(rating_df['business_id']).cast('int'))

In [13]:
(training,test,spill) = rating_df.randomSplit([0.20,0.02,.78])

In [14]:
gavg=training.groupBy().avg().collect()[0][0]
print(gavg)

3.72698403945


In [15]:
df_user = training.groupby(col('userId')).avg().select(col('userId'),col('avg(stars)'))
df_user = df_user.select(col('userId'),df_user['avg(stars)'].alias('user-mean').cast('float'))
df_item = training.groupby(col('businessId')).avg().select(col('businessId'),col('avg(stars)'))
df_item = df_item.select(col('businessId'),df_item['avg(stars)'].alias('item-mean').cast('float'))

training = training.join(df_user,'userId')
training = training.join(df_item,'businessId')

training = training.withColumn('user-item-interaction',training.stars-(training['user-mean']+\
    training['item-mean']-gavg))

training = training.select(training.userId,training.businessId,training.stars,training['user-mean'],training['item-mean'],\
training['user-item-interaction'].cast('float'))
training = training.withColumn('user-mean',fix_round_udf(training['user-mean']))
training = training.withColumn('user-mean',fix_round_udf(training['item-mean']))
training = training.withColumn('user-mean',fix_round_udf(training['user-item-interaction']))
training.show(20)

+--------+----------+-----+---------+---------+---------------------+
|  userId|businessId|stars|user-mean|item-mean|user-item-interaction|
+--------+----------+-----+---------+---------+---------------------+
|85306473|    272391|  5.0|    -0.23|      5.0|          -0.23134898|
|43941930|    533542|  4.0|    -0.27|      4.0|          -0.27301595|
|58058439|    919571|  1.0|     0.06|      1.0|          0.060317054|
| 5959682|    919571|  1.0|     2.73|      1.0|             2.726984|
|50575816|    919571|  1.0|    -0.02|      1.0|         -0.023015961|
|17809023|    919571|  1.0|     0.73|      1.0|             0.726984|
|84355756|    919571|  1.0|     0.39|      1.0|           0.39365104|
|72852787|   1088310|  5.0|    -1.27|      5.0|            -1.273016|
|45421067|   1166132|  1.0|     2.73|      1.0|             2.726984|
|79393828|   1192456|  2.0|    -0.27|      2.0|          -0.27301595|
|89514075|   1210943|  4.0|     0.52|      4.0|            0.5235939|
|72529602|   1342122

In [20]:
test_user = test.groupby('userId').avg().select('userId','avg(stars)')
test_user = test_user.select('userId',test_user['avg(stars)'].alias('user-mean').cast('float'))
test_item = test.groupby('businessId').avg().select('businessId','avg(stars)')
test_item = test_item.select('businessId',test_item['avg(stars)'].alias('item-mean').cast('float'))
test_df = test.join(test_user,'userId')
df_test = test_df.join(test_item,'businessId')

df_test = df_test.withColumn('user-mean',fix_round_udf(df_test['user-mean']))
df_test = df_test.withColumn('user-mean',fix_round_udf(df_test['item-mean']))
df_test.show(20)

+----------+--------+--------------------+-----+--------------------+----------+---------+---------+
|businessId|  userId|             user_id|stars|         business_id|      date|user-mean|item-mean|
+----------+--------+--------------------+-----+--------------------+----------+---------+---------+
|  60200681|87631795|--NIc98RMssgy0mSZ...|  4.0|xidr6_d3fwKSb_XaB...|2017-08-08|     3.67|3.6666667|
|  54932015|63928832|--xdSgqUJmcvJot-3...|  4.0|64A4CTvJ2uRQVPkJ5...|2013-09-30|     2.89|2.8888888|
|  17399192|60851158|-04zuZ0tQoGpgG49P...|  5.0|KxlRX3ORVZ2R80icu...|2013-05-18|      5.0|      5.0|
|  59385589|87437728|-0Hbf-cgvSsu8749n...|  4.0|Ljknr0VF5Ia2DlTzE...|2016-05-14|     4.33|4.3333335|
|  35747616|98918747|-0SqALqeWmInVftG_...|  4.0|rMrymOj6RcBBddGuO...|2011-10-23|      3.8|      3.8|
|  26310381|89642291|-4BEUkLvHQntN6qPf...|  4.0|55X2pom73IhiP19UF...|2009-06-22|     3.71|3.7142856|
|  88094466|63833796|-57uOzAWlx__p6QlX...|  3.0|aGDo7GDN5YLvpYykp...|2010-12-03|      4.4| 

In [21]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="businessId", ratingCol='user-item-interaction',
          coldStartStrategy="drop",rank=40)
model = als.fit(training)

In [22]:
predictions = model.transform(test)
df_test = df_test.join(predictions,'userId').\
    select(df_test['userId'],df_test['businessId'],df_test['stars'],df_test['user-mean']\
    ,df_test['item-mean'],predictions['prediction'])
df_test2 = df_test.withColumn('actual_interaction',df_test.stars-(df_test['user-mean']+\
df_test['item-mean']-gavg))
evaluator = RegressionEvaluator(metricName="rmse", labelCol="actual_interaction",predictionCol="prediction")
rmse = evaluator.evaluate(df_test2)

In [23]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.37019432634
