In [1]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
import seaborn as sns
from matplotlib import pyplot as plt

In [2]:
import findspark
findspark.init()
findspark.find()

'C:\\folderconda\\envs\\new_env\\lib\\site-packages\\pyspark'

In [3]:
from pyspark.sql import SparkSession
sc = SparkContext
# sc.setCheckpointDir('checkpoint')
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

In [4]:
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
columns = ['languages', 'user_account']
df = spark.createDataFrame(data).toDF(*columns)
df.show()

+---------+------------+
|languages|user_account|
+---------+------------+
|     Java|       20000|
|   Python|      100000|
|    Scala|        3000|
+---------+------------+



In [5]:
movies = spark.read.csv("movies.csv",header=True)
ratings = spark.read.csv("ratings.csv",header=True)

In [6]:
movies.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [7]:
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [8]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [9]:
from pyspark.sql.functions import countDistinct, avg, col, when, count, mean

In [10]:
print(movies.count())
print(ratings.count())

9742
100836


In [11]:
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [12]:
data = ratings.selectExpr("cast(userId as int) userId", "cast(movieId as int) movieId", "cast(rating as float) rating")

In [13]:
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [14]:
data.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [15]:
train, test = data.randomSplit([0.9, 0.1], seed = 91)

In [16]:
print(train.count())
print(test.count())

90697
10139


In [17]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [18]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating")

In [19]:
paramGrid = ParamGridBuilder().addGrid(als.rank,
                                       [5,10,20]).addGrid(als.regParam,
                                                          [0.5, 1, 2]).addGrid(als.maxIter, [10,20]).build()

In [20]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [21]:
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=4)

In [22]:
cvModel = crossval.fit(train)

In [23]:
best_model = cvModel.bestModel

print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

**Best Model**
  Rank: 5
  MaxIter: 10
  RegParam: 0.5


In [24]:
predictions = best_model.transform(test)

In [25]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = nan


In [26]:
predictions.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   171|    471|   3.0| 3.7458649|
|   287|    471|   4.5| 2.2593472|
|   414|    471|   5.0| 3.0046396|
|   260|    471|   4.5| 2.9986377|
|   104|    471|   4.5|  3.011997|
+------+-------+------+----------+
only showing top 5 rows



In [27]:
temp = predictions.withColumn('mean_squared', (predictions['rating']-predictions['prediction'])**2)


In [28]:
temp.show(5)

+------+-------+------+----------+------------------+
|userId|movieId|rating|prediction|      mean_squared|
+------+-------+------+----------+------------------+
|   171|    471|   3.0| 3.7458649|0.5563144015613943|
|   287|    471|   4.5| 2.2593472| 5.020524968383654|
|   414|    471|   5.0| 3.0046396|3.9814630239279722|
|   260|    471|   4.5| 2.9986377|2.2540888272089887|
|   104|    471|   4.5|  3.011997|2.2141529741912223|
+------+-------+------+----------+------------------+
only showing top 5 rows



In [29]:
temp.select(mean('mean_squared')).show()

+-----------------+
|avg(mean_squared)|
+-----------------+
|              NaN|
+-----------------+



In [30]:
predictions.select(*(count(when(col(c).isNull(), c)).alias(c) for c in ['rating', 'prediction'])).show()

+------+----------+
|rating|prediction|
+------+----------+
|     0|         0|
+------+----------+



In [31]:
pd_predictions = predictions.toPandas()

In [32]:
import numpy as np

np.mean((pd_predictions['rating']-pd_predictions['prediction'])**2)

1.006004810333252