In [1]:
#Import libraries
import warnings
warnings.filterwarnings('ignore')
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.sql import Row
import pandas as pd

#Create local Spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

### 1. Import the MovieLens Dataset

In [2]:
# Reading the MovieLens dataset
data = spark.read.text("C:/Users/tuhina2/Desktop/Big_Data/HW4/ml-100k/u.data")
data = data.rdd  
data_split = data.map(lambda row: row.value.split("\t"))

# loading the ratings from the MovieLens dataset
ratings = data_split.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=float(p[3])))
final_ratings = spark.createDataFrame(ratings)

#Dropping the Timestamp column as we don't require it for our analysis
final_ratings = final_ratings.drop("timestamp")
final_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|   196|    242|   3.0|
|   186|    302|   3.0|
|    22|    377|   1.0|
|   244|     51|   2.0|
|   166|    346|   1.0|
|   298|    474|   4.0|
|   115|    265|   2.0|
|   253|    465|   5.0|
|   305|    451|   3.0|
|     6|     86|   3.0|
|    62|    257|   2.0|
|   286|   1014|   5.0|
|   200|    222|   5.0|
|   210|     40|   3.0|
|   224|     29|   3.0|
|   303|    785|   3.0|
|   122|    387|   5.0|
|   194|    274|   2.0|
|   291|   1042|   4.0|
|   234|   1184|   2.0|
+------+-------+------+
only showing top 20 rows



#### Splitting the training and testing dataset

In [3]:
#Creating a random state while splitting, 
#so that the data doesn't change everytime we run it
(train_data, test_data) = final_ratings.randomSplit([0.8, 0.2], 50)

### 2. Build A Recommendation Model Using Alternating Least Squares

In [4]:
# Building the ALS Model and fitting the training data
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True)
als_model = als.fit(train_data)

# Evaluate the model through Root Mean Square Error
reg_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse1 = reg_evaluator.evaluate(als_model.transform(test_data))

### 3. Reporting the Original Performance

In [5]:
print("Root Mean Square Error (Original Performance): " + str(rmse1))

Root Mean Square Error (Original Performance): nan


This case shown above is referred to as "cold start problem" because Root Mean Square Error is found to be NaN here.
 
In this example, Spark distributes NaN predictions across the ALS Model. Spark allows you to remove rows from the DataFrame containing forecasts with NaN values to overcome this problem. The Cold Start Strategy option is set to "drop" to accomplish this, which we will be doing next.

### 4. Performance Improvement By Solving The Cold Start Problem and Cross Validation

In [6]:
#Solving the cold start problem by setting the coldStartStrategy option set to "drop" 
als_final = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, coldStartStrategy= "drop")
als_final_model = als_final.fit(train_data)

# Evaluate the model through Root Mean Square Error
rmse2 = reg_evaluator.evaluate(als_final_model.transform(test_data))
print("After solving the cold start problem, Root Mean Square Error = " + str(rmse2))

After solving the cold start problem, Root Mean Square Error = 0.9206439343602205


After solving the Cold Start Problem, we got the RMSE of around 0.920, and now we will try to improve the performance using Cross Validation for the next step.

### 5.	Optimize the model based on step 4 and report the improvement of performance. 

In [7]:
# Parameters for tuning
paramGrid = ParamGridBuilder() \
    .addGrid(als_final.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(als_final.rank, [5, 10, 15]) \
    .build()

#Cross Validation with 10 folds
crossvalidation_model = CrossValidator(estimator = als_final,
                     estimatorParamMaps = paramGrid,
                     evaluator = reg_evaluator,
                     numFolds=10)

#Cross Validation Using the Best Model
cv = crossvalidation_model.fit(train_data).bestModel

#Root Mean Square Error value after solving cold start problem
print("After performing cross validation, Root Mean Square Error value is: ", reg_evaluator.evaluate(cv.transform(test_data)))

After performing cross validation, Root Mean Square Error value is:  0.9138797746405847


Here, we can observe that, even after cross validation, we can't see much improvement. Earlier, it was 0.920 and now it has come out to be 0.9138, so it has not improved that much.

In [8]:
# Finding top recommenders and putting it in pandas dataframe
top_recomm = cv.recommendForAllUsers(10)
top_recommendations = top_recomm.toPandas()
top_recommendations

Unnamed: 0,userId,recommendations
0,1,"[(1449, 5.267789363861084), (1463, 5.177079200..."
1,3,"[(320, 4.690036773681641), (902, 4.65658712387..."
2,6,"[(1463, 4.903487682342529), (1449, 4.632701396..."
3,12,"[(313, 5.161916255950928), (318, 5.12897014617..."
4,13,"[(1463, 5.811648845672607), (1104, 5.050313949..."
...,...,...
938,934,"[(1463, 5.067784309387207), (1449, 4.711485862..."
939,940,"[(251, 4.165430068969727), (313, 4.16200637817..."
940,941,"[(1449, 5.261136054992676), (251, 4.9636077880..."
941,942,"[(1463, 5.247258186340332), (1169, 5.001148223..."


### 6.	Output top 10 movies for all the users 

In [9]:
#Putting all users and their recommendations by picking their movieIds
users = []
recomend = []

for i in range(len(top_recommendations)):
    users.append(top_recommendations.iloc[i,0])
    u_recom = "" 
    for item in top_recommendations.iloc[i,1]:
        u_recom = u_recom + ", " + str(item.asDict()["movieId"])
    recomend.append(u_recom[2:])

recom_df = pd.DataFrame(data = zip(users, recomend), columns=["user", "recommendations"])
recom_df

Unnamed: 0,user,recommendations
0,1,"1449, 1463, 169, 1367, 408, 114, 127, 12, 100, 50"
1,3,"320, 902, 1268, 1347, 1607, 1524, 114, 56, 180..."
2,6,"1463, 1449, 1367, 483, 718, 851, 1512, 1104, 5..."
3,12,"313, 318, 64, 251, 1450, 272, 22, 1449, 963, 1642"
4,13,"1463, 1104, 814, 851, 867, 57, 199, 1397, 483,..."
...,...,...
938,934,"1463, 1449, 1367, 408, 169, 318, 1122, 50, 114..."
939,940,"251, 313, 272, 1131, 1449, 1450, 1427, 963, 16..."
940,941,"1449, 251, 408, 1467, 114, 1463, 64, 12, 483, ..."
941,942,"1463, 1169, 318, 496, 1398, 64, 1449, 136, 163..."


In [10]:
#Exporting users and their recommendations to a text file
with open("Top_Recommendations.txt", "w") as f:
    f.write("UserId\t\t Recommendations \n")
    for i in range(len(recom_df)):
        f.write(str(recom_df.iloc[i,0]) + "\t" + recom_df.iloc[i,1] + "\n")