In [None]:
%pip install mlflow

In [None]:
#all spark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import desc

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
#instantiate the spark session
spark = SparkSession.builder.appName("Demo").getOrCreate()

## Part A

### 1. Count the odd and even numbers using file ‘integer.txt’

In [None]:
integerRDD = spark.sparkContext.textFile("/FileStore/tables/integer.txt")

In [None]:
#Number of whole integers
integerRDD.count()

In [None]:
#create rdds for odd numbers and even numbers respectively
odd_RDD = integerRDD.filter(lambda x: int(x) % 2 == 0)
even_RDD = integerRDD.filter(lambda x: int(x) % 2 != 0)

In [None]:
#Show the results
print("Number of odd integers:",odd_RDD.count(),",","number of even integers:",even_RDD.count())

### 2. Calculate the salary sum per department using file ‘salary.txt’. Show department name and salary sum.

In [None]:
salaryRDD = spark.sparkContext.textFile("/FileStore/tables/salary.txt")

In [None]:
#split each line by ","
arrayRDD = salaryRDD.map(lambda x: x.split(","))
#split name and salary by " "
arrayRDD = arrayRDD.map(lambda x: x[0].split(" "))
#make pair of each name and its salary 
kvRDD = arrayRDD.map(lambda x: (x[0],int(x[1])))
#sum by key(department name)
sumRDD = kvRDD.reduceByKey(lambda x,y: x+y)

In [None]:
#Show department name and salary sum
print(sumRDD.collect())

### 3. Implement MapReduce using Pyspark on file ‘shakespeare.txt’. Show how many times these particular words appear in the document: Shakespeare, why, Lord, Library, GUTENBERG, WILLIAM, COLLEGE and WORLD. (Count exact words only (marks will be deducted for incorrect lowercase/uppercase))

In [None]:
shakespeareRDD = spark.sparkContext.textFile("/FileStore/tables/shakespeare.txt")

In [None]:
#function to remore punctuation
import re
def removePunctuation(text):
    return re.sub(re.compile(r'[^a-zA-Z0-9\s]'),"",text).strip()

In [None]:
#remove all punctuation in shakespeaseRDD
cleanRDD = shakespeareRDD.map(removePunctuation)

In [None]:
cleanRDD.collect()

In [None]:
#split into each word
wordRDD = cleanRDD.flatMap(lambda x: x.split())

In [None]:
wordRDD.collect()

In [None]:
#List the exact words
word_list = ['Shakespeare', 'why', 'Lord', 'Library', 'GUTENBERG', 'WILLIAM', 'COLLEGE', 'WORLD']

In [None]:
#create dictionary to save the counts for each word
count_words = {}
for word in word_list:
    count_words[word] = wordRDD.filter(lambda x: x == word).count()

In [None]:
#Show how many times these particular words appear in the document: Shakespeare, why, Lord, Library, GUTENBERG, WILLIAM, COLLEGE and WORLD.
print(count_words)

### 4. Calculate top 10 and bottom 10 words using file ‘shakespeare.txt’. Show 10 words with most count and 10 words with least count. You can limit by 10 in ascending and descending order of count. Show your code and output

In [None]:
#create pair for each word (word,1)
kvRDD = wordRDD.map(lambda x: (x,1))

In [None]:
#sum the value by key
sumRDD = kvRDD.reduceByKey(lambda x,y: x+y)

In [None]:
#Top 10 words: Sort by value descending and show first 10 items
top10 = sumRDD.sortBy(lambda x: x[1],False).collect()[:10]
print(top10)

In [None]:
#Bottom 10 words: Sort by value ascending and show first 10 items
bottom10 = sumRDD.sortBy(lambda x: x[1],True).collect()[:10]
print(bottom10)

## Part B

### 1. Describe your data. Calculate top 20 movies with highest ratings and top 15 users who provided highest ratings. Show your code and output.

In [None]:
# File location and type
path = "/FileStore/tables/movies.csv"
 
df = spark.read \
  .format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ',') \
  .option("path", path) \
  .load()

In [None]:
df.rdd.getNumPartitions()

In [None]:
df.printSchema()

In [None]:
df.describe().show()

In this data, there are 1501 ratings from 30 users for 100 movies. There are three columns: movieID, rating and userID. These three columns are all integers. The rating values is from 1 to 5.

In [None]:
#Top 20 movies(groupby 'movieId' and sort by count)
df.groupby('movieId').count().sort(desc('count')).show(15)

In [None]:
#Top 15 users who provided highest ratings(groupby 'userId' and sort by count)
df.groupBy('userId').count().sort(desc('count')).show(15)

### 2. Split dataset into train and test. Try 2 different combinations (60/40, 80/20). Show your code and output.

In [None]:
#Try 1: ramdom split into 60/40
train_1,test_1 = df.randomSplit([0.6, 0.4],seed=2022)
train_1.count(),test_1.count()

In [None]:
# training the model
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model_1 = als.fit(train_1)

In [None]:
# predict using the testing datatset
predictions_1 = model_1.transform(test_1)
predictions_1.show()

In [None]:
#Try 2: ramdom split into 80/20
train_2,test_2 = df.randomSplit([0.8, 0.2],seed=2022)
train_2.count(),test_2.count()

In [None]:
# training the model
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model_2 = als.fit(train_2)

In [None]:
# predict using the testing datatset
predictions_2 = model_2.transform(test_2)
predictions_2.show()

### 3. Explain MSE, RMSE and MAE. Compare and evaluate both of your models with evaluation metrics (RMSE), show your code and print your results. Describe which one works better and why?

1. Mean absolute error (MAE) represents the average of the absolute differences between the actual and predicted values in the data set, and it measures the average of the residuals in the data set.
2. Mean squared error (MSE) represents the average of the squared differences between the original and predicted values in the data set. It measures the variance of the residuals.
3. The root mean squared error (RMSE) is the square root of the mean squared error (MSE). It measures the standard deviation of the residuals.

MSE and RMSE penalize larger prediction errors by MAE. However, RMSE is more widely used than MSE to assess the performance of regression models versus other stochastic models because it has the same units as the dependent variable (Y-axis).

In [None]:
# Evaluate the model 1 (60/40) by computing the RMSE on the test data
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse_1 = evaluator.evaluate(predictions_1)
print("Root-mean-square error = " + str(rmse_1))

In [None]:
#Evaluate the model 2 (80/20) by computing the RMSE on the test data
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse_2 = evaluator.evaluate(predictions_2)
print("Root-mean-square error = " + str(rmse_2))

The lower value of RMSE means the higher accuracy of the model. Thus, in these two cases, spliting training/test into 80/20 for model 2 has better performance. Increasing the percentage of the training set can make the model prediction more accurate.

### 4. Now tune the parameters of your algorithm to get the best set of parameters. Explain different parameters of the algorithm which you have used for tuning your algorithm. Evaluate all your models again. Show your code with best values and output.

The four parameters I used to tune the model is: rank, maxIter, regParam, and alpha.
1. Rank is the number of latent factors in the model.
2. MaxIter is the maximum number of iterations to run.
3. RegParam specifies the regularization parameter in ALS.
4. Alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

In [None]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

In [None]:
#set the parameters and its values
paramGrid = (ParamGridBuilder()
                    .addGrid(als.rank, [ 8, 12])
                    .addGrid(als.maxIter, [5,10])
                    .addGrid(als.regParam, [0.01,0.001])
                    .addGrid(als.alpha, [1.0,2.0])
                    .build())
#evaluator is RMSE
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")


# Run cross-validation, and choose the best set of parameters.
alsCV = CrossValidator(estimator=als,
                            estimatorParamMaps=paramGrid,
                            evaluator=evaluatorR,
                           numFolds=5)

In [None]:
#fit the train dataset
modelCV_1 = alsCV.fit(train_1)

In [None]:
# Best model and its parameters of Model 1 (60/40)
bestModel_1 = modelCV_1.bestModel
print ('Best Param (rank): ', bestModel_1._java_obj.parent().getRank())
print ('Best Param (MaxIter): ', bestModel_1._java_obj.parent().getMaxIter())
print ('Best Param (regParam): ', bestModel_1._java_obj.parent().getRegParam())
print ('Best Param (Alpha): ', bestModel_1._java_obj.parent().getAlpha())

In [None]:
# Evaluate the best model of model 1 (60/40) by computing the RMSE on the test data
predictionsCV_1 = bestModel_1.transform(test_1)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmseCV_1 = evaluator.evaluate(predictionsCV_1)
print("Root-mean-square error = " + str(rmseCV_1))

In [None]:
modelCV_2 = alsCV.fit(train_2)

In [None]:
# Best model and its parameters of Model 2 (80/20)
bestModel_2 = modelCV_2.bestModel
print ('Best Param (rank): ', bestModel_2._java_obj.parent().getRank())
print ('Best Param (regParam): ', bestModel_2._java_obj.parent().getRegParam())
print ('Best Param (MaxIter): ', bestModel_2._java_obj.parent().getMaxIter())
print ('Best Param (Alpha): ', bestModel_2._java_obj.parent().getAlpha())

In [None]:
# Evaluate the best model of Model 2 (80/20) by computing the RMSE on the test data
predictionsCV_2 = bestModel_2.transform(test_2)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmseCV_2 = evaluator.evaluate(predictionsCV_2)
print("Root-mean-square error = " + str(rmseCV_2))

After tuning, the RMSE of model 1 (split by 60/40) decreased from 2.045 to 1.783. The RMSE of model 2 (split by 80/20) decreased from 1.982 to 1.652. 

Best Parameters: 
1. rank:  12
2. regParam:  0.01
3. maxIter:  10
4. alpha:  1.0

The values of maxiter and alpha remains the  default values, however, the values of rank change from 10 to 12 and the value of alpha changed from 1.0 to 0.01.

### 5. Calculate top 15 movies recommendations for user id 10 and user id 14. Show your code and output.

In [None]:
# Generate top 15 movies recommendations for each user
userRecs = bestModel_2.recommendForAllUsers(15)

In [None]:
#get top 15 movies recommendations for user id 10
userId10 = userRecs.where(col('userId')==10).select(col('recommendations'))
Top_10 = userId10.collect()
Top15_userid10 = []
for i in Top_10[0][0]:
    Top15_userid10.append(i[0])
#show the result of top 15 movies
print('Top 15 movies recommendations for user id 10 is:',Top15_userid10)

In [None]:
#get top 15 movies recommendations for user id 14
userId14 = userRecs.where(col('userId')==14).select(col('recommendations'))
Top_14 = userId14.collect()
Top15_userid14 = []
for i in Top_14[0][0]:
    Top15_userid14.append(i[0])
#show the result of top 15 movies
print('Top 15 movies recommendations for user id 14 is:',Top15_userid14)