In [1]:
from numpy import array
from math import sqrt
import random
import hdfs
from pyspark import SparkContext, SparkConf

from urllib.request import urlretrieve
import pandas as pd
import os

from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

sqlContext = SQLContext(sc)

# Reading the input file

In [16]:
# This is the type of path if you read from either a local hdfs cluster or your C Drive 
# (I have a local cluster setup for practice)
# file_path = "hdfs://localhost:9000/books/ratings.csv" # For reading from local hadoop setup
# file_path = "C://Working_Directory//books//ratings.csv" # For reading from local C Drive

In [5]:
DF_Initial = sqlContext.read.format('csv').options(header='true', inferSchema='true').load(file_path)
DF_Initial

DataFrame[book_id: int, user_id: int, rating: int]

In [6]:
DF_Initial.show(5)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
|      1|    588|     5|
|      1|   1169|     4|
|      1|   1185|     4|
+-------+-------+------+
only showing top 5 rows



In [10]:
book_ratings = DF_Initial

# Creating the Training and Test Dataset

In [19]:
(training, test) = book_ratings.randomSplit([0.8,0.2])

# Initializing the model

In [32]:
# Initialize model
als = ALS(userCol = 'user_id', itemCol = 'book_id', ratingCol = 'rating', coldStartStrategy = 'drop', nonnegative = True)

# Setting up the parameter Grid
param_grid = ParamGridBuilder().addGrid(als.rank, [12,13,14]).addGrid(als.maxIter, [18,19,20]).addGrid(als.regParam, [.17, .18, .19]).build()

# For Mode Evaluation
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating', predictionCol = 'prediction')

# Creating the Training Validation Split
tvs = TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid, evaluator = evaluator)

# Fitting the model

In [35]:
model = tvs.fit(training)

In [36]:
best_model = model.bestModel

In [38]:
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [39]:
print("RMSE = " + str(rmse))
print("**Best Model**")
print(" Rank: "), best_model.rank
print(" MaxIter:"), best_model._java_obj.parent().getMaxIter()
print(" RegParam:"), best_model._java_obj.parent().getRegParam()

RMSE = 0.8744368620722411
**Best Model**
 Rank: 
 MaxIter:
 RegParam:


(None, 0.19)

# Generating Recommendations

In [40]:
user_recs = best_model.recommendForAllUsers(10)

In [44]:
def get_recs_for_books(recs):
    recs = recs.select("recommendations.book_id", "recommendations.rating")
    movies = recs.select("book_id").toPandas().iloc[0,0]
    ratings = recs.select("rating").toPandas().iloc[0,0]
    rating_matrix = pd.DataFrame(movies, columns = ["book_id"])
    rating_matrix["ratings"] = ratings
    ratings_matrix_ps = sqlContext.createDataFrame(rating_matrix)
    return ratings_matrix_ps

In [45]:
df1 = get_recs_for_user(user_recs)

In [49]:
df1.show(5)

+-------+-----------------+
|book_id|          ratings|
+-------+-----------------+
|   6590|4.298887252807617|
|   5207|4.290401935577393|
|   3628|4.287137985229492|
|   9566|4.286754131317139|
|   6920|4.254033088684082|
+-------+-----------------+
only showing top 5 rows

