In [1]:
import pyspark
from pyspark.sql import *
from pyspark.sql import SQLContext

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.master("local[2]").config("spark.driver.host","localhost").appName("Recommendation_movie").getOrCreate()
sc = spark.sparkContext

# using SQLContext to read parquet file
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

spark

In [2]:
df = spark.read.csv('movie_ratings.csv', inferSchema= True, header = True)
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [3]:
df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [4]:
df.describe().show()

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|           828124615|
|    max|               610|          193609|               5.0|          1537799250|
+-------+------------------+----------------+------------------+--------------------+



In [5]:
train, test = df.randomSplit([0.8, 0.2])

In [6]:
from pyspark.ml.recommendation import ALS
recom = ALS(maxIter=10, regParam=0.02, userCol='userId', itemCol='movieId', ratingCol='rating')

In [7]:
model = recom.fit(train)

In [8]:
print(model.params)

[Param(parent='ALS_ccb79194d7b0', name='coldStartStrategy', doc='strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: nan,drop.'), Param(parent='ALS_ccb79194d7b0', name='itemCol', doc='column name for item ids. Ids must be within the integer value range.'), Param(parent='ALS_ccb79194d7b0', name='predictionCol', doc='prediction column name'), Param(parent='ALS_ccb79194d7b0', name='userCol', doc='column name for user ids. Ids must be within the integer value range.')]


In [9]:
predictions = model.transform(test)
predictions.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   603|    471|   4.0| 954482443| 1.1420857|
|   474|    471|   3.0| 974668858| 2.0798697|
|   462|    471|   2.5|1123890831| 1.2137065|
|   610|    471|   4.0|1479544381| 3.0456336|
|   469|    471|   5.0| 965425364| 2.0824976|
|   373|    471|   5.0| 846830388| 3.2065692|
|   357|    471|   3.5|1348627082| 3.7114053|
|   609|    833|   3.0| 847221080|  0.880474|
|   387|   1088|   1.5|1095040878| 2.8107529|
|   307|   1088|   3.0|1186162146| 2.6117928|
|    84|   1088|   3.0| 860398568| 3.5348916|
|   226|   1088|   1.0|1096420160|  3.182537|
|    68|   1088|   3.5|1158534614| 2.7108884|
|   116|   1088|   4.5|1337195649| 3.1035523|
|   483|   1088|   3.0|1215895737| 3.5715861|
|   503|   1342|   1.0|1335214611| 3.1918712|
|   307|   1342|   2.0|1186087552| 2.6360106|
|   137|   1580|   3.5|1204859475| 3.3717048|
|   593|   1580|   1.5|1181007882|

In [10]:
# predictions.select('rating').subtract(predictions.select('prediction')).collect()
predictions.withColumn('diff',predictions['rating']-predictions['prediction']).select('diff').show()

+--------------------+
|                diff|
+--------------------+
|  2.8579143285751343|
|  0.9201302528381348|
|  1.2862935066223145|
|  0.9543664455413818|
|  2.9175024032592773|
|  1.7934308052062988|
|-0.21140527725219727|
|  2.1195260286331177|
| -1.3107528686523438|
| 0.38820719718933105|
| -0.5348916053771973|
|  -2.182537078857422|
|  0.7891116142272949|
|  1.3964476585388184|
| -0.5715861320495605|
|  -2.191871166229248|
| -0.6360106468200684|
|  0.1282951831817627|
|  -1.458526372909546|
|0.010648012161254883|
+--------------------+
only showing top 20 rows



In [11]:
from pyspark.sql.functions import avg
# predictions.withColumn('diff',predictions['rating']-predictions['prediction']).select(avg('diff')).show()

In [12]:
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating', predictionCol = 'prediction')

In [13]:
rmse = evaluator.evaluate(predictions)
print(rmse)

nan


In [14]:
test_user = test.filter(test['userId'] == 12).select('userId', 'movieId')
test_user.show()

+------+-------+
|userId|movieId|
+------+-------+
|    12|   1265|
|    12|   2100|
|    12|   2485|
|    12|   3967|
|    12|   6942|
|    12|  40629|
+------+-------+



In [15]:
recommendation_test_user = model.transform(test_user)
recommendation_test_user.show()

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|    12|   1265| 4.9313936|
|    12|   2485| 3.5849001|
|    12|   3967| 4.9177637|
|    12|  40629| 7.2070017|
|    12|   2100| 3.6872206|
|    12|   6942|  4.566821|
+------+-------+----------+



In [16]:
recommendation_test_user.orderBy('prediction', ascending=False).show()

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|    12|  40629| 7.2070017|
|    12|   1265| 4.9313936|
|    12|   3967| 4.9177637|
|    12|   6942|  4.566821|
|    12|   2100| 3.6872206|
|    12|   2485| 3.5849001|
+------+-------+----------+

