In [35]:
# You should change your bucket name below. See the book for more details
# (https://www.amazon.com/Data-Analytics-Google-Cloud-Hands-ebook/dp/B087XZZ2C6/)
data = spark.read.csv("gs://rs-movielens-2/ratings.csv", header=True, inferSchema=True)

In [36]:
data.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [37]:
print(data.rdd.take(2)) # viewing 2 rows of the raw rdd

# Convert your dataframe into an RDD, and then into a list
ratings = data.rdd.map(list)
ratings.take(2)

[Row(userId=1, movieId=1, rating=4.0, timestamp=964982703), Row(userId=1, movieId=3, rating=4.0, timestamp=964981247)]


[[1, 1, 4.0, 964982703], [1, 3, 4.0, 964981247]]

In [38]:
# Import the Rating object
from pyspark.mllib.recommendation import Rating
# Convert the data into Rating objects
ratings_data = ratings.map(lambda line: Rating(int(line[0]), int(line[1]), float(line[2])));
# This is what a Rating object looks like
ratings_data.take(2)

[Rating(user=1, product=1, rating=4.0), Rating(user=1, product=3, rating=4.0)]

In [39]:
# Split the data into training and test, in 80-20% ratio
training_data, test_data = ratings_data.randomSplit([0.8,0.2]);

In [40]:
# Import the ALS method
from pyspark.mllib.recommendation import ALS
# Build the model based on the training data, with tank = 10 and iterations = 10
model = ALS.train(training_data, rank=10, iterations=10)

In [41]:
# Drop the ratings column
testdata_nr = test_data.map(lambda p: (p[0],p[1]))
testdata_nr.take(2)

[(1, 1), (1, 6)]

In [42]:
# Predict the model  
predictions = model.predictAll(testdata_nr)

In [43]:
# Print the first rows of the RDD
predictions.take(2)

[Rating(user=590, product=1084, rating=3.8205984311223125),
 Rating(user=414, product=1084, rating=4.12221289922118)]

In [44]:
# Prepare ratings data
ratings_kv = ratings_data.map(lambda r: ((r[0],r[1]),r[2]));
ratings_kv.take(2)

[((1, 1), 4.0), ((1, 3), 4.0)]

In [45]:
# Prepare predictions data
predictions_kv = predictions.map(lambda r: ((r[0],r[1]),r[2]))
predictions_kv.take(2)

[((590, 1084), 3.8205984311223125), ((414, 1084), 4.12221289922118)]

In [46]:
# Join the ratings data with predictions data
ratings_predictions = ratings_kv.join(predictions_kv)
ratings_predictions.take(5)

[((1, 1219), (2.0, 4.913380814462766)),
 ((1, 1377), (3.0, 3.155047446018721)),
 ((1, 1552), (4.0, 3.9478755850642604)),
 ((1, 1644), (3.0, 2.8124819971676938)),
 ((1, 1804), (5.0, 1.782714730433351))]

In [52]:
# Convert RDD to Dataframe
# First, create a clean version of the ratings_predictions RDD
ratings_predictions_clean = ratings_predictions. \
  map(lambda r: (r[0][0], r[0][1], r[1][0], r[1][1]))
# Next, convert this clean version to a PySpark dataframe
df_ratings_predictions = ratings_predictions_clean.toDF()
# Print the dataframe schema - just for your information
df_ratings_predictions.printSchema()
# Print some records in the dataframe - just for your information
df_ratings_predictions.show(5)
# Save the dataframe df_ratings_predictions into a 
# single csv file using "coalesce" command
df_ratings_predictions.coalesce(1).write.format('com.databricks.spark.csv'). \
save('gs://rs-movielens-2/ratings_predictions1.csv')

root
 |-- _1: long (nullable = true)
 |-- _2: long (nullable = true)
 |-- _3: double (nullable = true)
 |-- _4: double (nullable = true)

+---+----+---+------------------+
| _1|  _2| _3|                _4|
+---+----+---+------------------+
|  1|1219|2.0| 4.913380814462766|
|  1|1377|3.0| 3.155047446018721|
|  1|1552|4.0|3.9478755850642604|
|  1|1644|3.0|2.8124819971676938|
|  1|1804|5.0| 1.782714730433351|
+---+----+---+------------------+
only showing top 5 rows



In [48]:
# Calculate and print MAE
MAE = ratings_predictions.map(lambda r: abs(r[1][0] - r[1][1])).mean()
print("Mean Absolute Error of the model for the test data = {:.4f}".format(MAE))

Mean Absolute Error of the model for the test data = 0.8240


In [49]:
# Calculate and print MSE
MSE = ratings_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error of the model for the test data = {:.4f}".format(MSE))

Mean Squared Error of the model for the test data = 1.2286


In [53]:
# Calculate and print RMSE
RMSE = MSE**(1/2)
print("Root Mean Squared Error of the model for the test data = \
{:.4f}".format(RMSE))

Root Mean Squared Error of the model for the test data = 1.1084
