# Assignment 2: Apache Spark

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import * 

spark = SparkSession.builder.appName("Kurama").getOrCreate()

In [0]:
# dbutils.fs.rm("dbfs:/FileStore/tables/shakespeare_1.txt")

#Part A

##### Question 1: Count the odd and even numbers using the file ‘integer.txt’ and download it from Quercus. Show your code and output.

In [0]:
integerRDD = spark.sparkContext.textFile("dbfs:/FileStore/tables/integer.txt")
integerRDD.getNumPartitions()

Out[30]: 2

In [0]:
integerArray    = integerRDD.map(lambda x: int(x))
tagsArray       = integerArray.map(lambda x: ("even" if x % 2 == 0 else "odd", 1))
countsArray     = tagsArray.reduceByKey(lambda k,v: k+v)
countsArrayDict = dict(countsArray.collect())
display(countsArrayDict)

{'even': 514, 'odd': 496}

##### Question 2: Calculate the salary sum per department using the file ‘salary.txt’ and download it from Quercus. Show the department name and salary sum. Show your code and output.

In [0]:
salaryRDD = spark.sparkContext.textFile("dbfs:/FileStore/tables/salary.txt")
salaryRDD.getNumPartitions()

Out[32]: 2

In [0]:
salaryList          = salaryRDD.map(lambda x: x.split(" "))
salaryArray         = salaryList.map(lambda x: (x[0], float(x[1])))
departmentSalary    = salaryArray.reduceByKey(lambda k,v: k+v)
departmentSalaryDict= dict(departmentSalary.collect())
display(departmentSalaryDict)

{'Sales': 3488491.0,
 'Research': 3328284.0,
 'Developer': 3221394.0,
 'QA': 3360624.0,
 'Marketing': 3158450.0}

##### Question 3: Implement MapReduce using PySpark on file ‘shakespeare.txt’ and download it from the Quercus. Show how many times these particular words appear in the document: Shakespeare, When, Lord, Library, GUTENBERG, WILLIAM, COLLEGE and WORLD. (Count exact words only)

In [0]:
shakespearRDD = spark.sparkContext.textFile("dbfs:/FileStore/tables/shakespeare.txt")
shakespearRDD.getNumPartitions()

Out[34]: 2

In [0]:
wordList = {"Shakespeare", "When", "Lord", "Library", "GUTENBERG", "WILLIAM", "COLLEGE", "WORLD"}
wordList

Out[35]: {'COLLEGE',
 'GUTENBERG',
 'Library',
 'Lord',
 'Shakespeare',
 'WILLIAM',
 'WORLD',
 'When'}

In [0]:
ssWordList          = shakespearRDD.flatMap(lambda x: x.split(" ")).map(lambda word: word.strip(",.!?;:\"\'"))
filteredWordList    = ssWordList.filter(lambda word: word in wordList)
filteredWordListTags= filteredWordList.map(lambda x: (x, 1))
wordCounts          = filteredWordListTags.reduceByKey(lambda k,v: k+v)
wordCountsDict      = dict(wordCounts.collect())
display(wordCountsDict)

{'Shakespeare': 22,
 'GUTENBERG': 99,
 'WILLIAM': 127,
 'WORLD': 98,
 'COLLEGE': 98,
 'When': 405,
 'Lord': 399,
 'Library': 4}

##### Question 4: Calculate the top 15 and bottom 15 words using the file ‘shakespeare.txt’ and download it from Quercus. Show 15 words with the most count and 15 words with the least count. You can limit by 15 in ascending and descending order of count. Show your code and output.

In [0]:
ssWordList          = shakespearRDD.flatMap(lambda line: line.split(" ")).map(lambda word: word.strip(",.!?;:\"\'"))
normalizeWordList   = ssWordList.map(lambda word: word.lower())
filteredWordList    = normalizeWordList.filter(lambda word: word != "")
wordListTags        = filteredWordList.map(lambda x: (x, 1))
wordCounts          = wordListTags.reduceByKey(lambda k,v: k+v)
sortedByKeys        = wordCounts.sortBy(lambda x: x[0])
sortedByValues      = sortedByKeys.sortBy(lambda x: x[1], ascending=False)

# sortedByValues.collect()

# Get the top 15 words by count
top_15_words = sortedByValues.takeOrdered(15, key=lambda x: -x[1])

# Get the bottom 15 words by count
bottom_15_words = sortedByValues.takeOrdered(15, key=lambda x: x[1])

In [0]:
# Displaying top 15 words
print("======== Top 15 words are ========")
display(dict(top_15_words))

{'the': 13691,
 'and': 12935,
 'of': 9366,
 'i': 9091,
 'to': 8987,
 'a': 6655,
 'you': 6049,
 'my': 5691,
 'in': 5253,
 'that': 5230,
 'is': 4250,
 'not': 3895,
 'for': 3841,
 'with': 3798,
 'his': 3576}

In [0]:
# Displaying bottom 15 words
print("======== Bottom 15 words ========")
display(dict(bottom_15_words))

{'#100]': 1,
 '(*)': 1,
 '(212-254-5093)': 1,
 '(3)': 1,
 '(72600.2026@compuserve.com)': 1,
 '(_)': 1,
 '(although': 1,
 '(by': 1,
 '(doctor-like)': 1,
 '(dressed': 1,
 '(first': 1,
 '(fore': 1,
 '(forsooth)': 1,
 '(from': 1,
 '(have': 1}

#PART B

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

In [0]:
# Reading CSV file and creating dataframe
df    = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/movies.csv")

# Convert each column to integer type as required by ALS
for col_name in df.columns:
    df = df.withColumn(col_name, col(col_name).cast(IntegerType()))

# Display loaded dataset
display(df)

movieId,rating,userId
2,3,0
3,1,0
5,2,0
9,4,0
11,1,0
12,2,0
15,1,0
17,1,0
19,1,0
21,1,0


##### Question 1: Describe your data. Calculate the top 12 movies with the highest ratings and the top 12 users who provided the highest ratings. Show your code and output. 
The provided data is a CSV file containing movie ratings given by different users. Each row represents a rating with three columns: movieId, rating, and userId. The dataset consists of 30 users, 100 movies and each movie rated on the scale of 1-5. The results for top 12 movies with the highest ratings and the top 12 users who provided the highest ratings are shown below.

In [0]:
# Calculate the average rating for each movie and sorting by their average rating in descending order
movie_ratings = df.groupBy("movieId").agg(round(avg("rating"), 4).alias("avg_rating"))
top_movies = movie_ratings.orderBy(desc("avg_rating")).limit(12)

# Show the top 12 movies with highest ratings
print("Top 12 movies with highest average rating are:")
top_movies.show()

Top 12 movies with highest average rating are:
+-------+----------+
|movieId|avg_rating|
+-------+----------+
|     32|    2.9167|
|     90|    2.8125|
|     30|       2.5|
|     94|    2.4737|
|     23|    2.4667|
|     49|    2.4375|
|     29|       2.4|
|     18|       2.4|
|     52|    2.3571|
|     53|      2.25|
|     62|      2.25|
|     92|    2.2143|
+-------+----------+



In [0]:
# Calculate the average rating for each user and sorting by their average rating in descending order
movie_ratings = df.groupBy("userId").agg(round(avg("rating"), 4).alias("avg_rating"))
top_users = movie_ratings.orderBy(desc("avg_rating")).limit(12)

# Show the top 12 movies with highest ratings
print("Top 12 users with highest average rating are:")
top_users.show()

Top 12 users with highest average rating are:
+------+----------+
|userId|avg_rating|
+------+----------+
|    11|    2.2857|
|    26|    2.2041|
|    22|    2.1607|
|    23|    2.1346|
|     2|    2.0652|
|    17|    1.9565|
|     8|     1.898|
|    24|    1.8846|
|    12|    1.8545|
|     3|    1.8333|
|    29|    1.8261|
|    28|      1.82|
+------+----------+



##### Question 2: Split the dataset into train and test. Try 2 different combinations for e.g. (60/40, 70/30, 75/25 and 80/20). (Train your model and use collaborative filtering approach on 70 percent of your data and test with the other 30 percent and so on). Show your code and output.

In [0]:
# Splitting the dataset into 80/20 and 70/30 train and test set
(train8020, test8020) = df.randomSplit([0.8, 0.2], seed=42)
(train7030, test7030) = df.randomSplit([0.7, 0.3], seed=42)

In [0]:
# Creating model object with default parameters and fitting on training datasets
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)


In [0]:
# For 80/20 split dataset
# Fitting the model on training data
modelDefault8020 = als.fit(train8020)

# Predicting the results on test sets
predictions8020 = modelDefault8020.transform(test8020)

# Predictions on test set of 80/20 split dataset
print("======== ======== ")
print("Predictions on test set of 80/20 split dataset")
predictions8020.show(10)

Predictions on test set of 80/20 split dataset
+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      2|     4|    28| 2.3499846|
|     13|     2|    28| 1.9563231|
|     17|     1|    28| 1.0817308|
|     52|     1|    28| 0.9489822|
|     54|     1|    28| 1.0347327|
|     63|     1|    28|  0.867857|
|     78|     1|    28|0.86701816|
|      0|     1|    27|0.82873076|
|      3|     1|    26| 1.5309827|
|      4|     4|    26|   2.04725|
+-------+------+------+----------+
only showing top 10 rows



In [0]:
# For 70/30 split dataset
# Fitting the model on training data
modelDefault7030 = als.fit(train7030)

# Predicting the results on test sets
predictions7030 = modelDefault7030.transform(test7030)

# Predictions on test set of 70/30 split dataset
print("======== ======== ")
print("Predictions on test set of 70/30 split dataset")
predictions7030.show(10)

Predictions on test set of 70/30 split dataset
+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      0|     3|    28|0.60361654|
|      2|     4|    28| 1.9209852|
|      3|     1|    28| 0.7872818|
|     13|     2|    28|  1.297262|
|     17|     1|    28| 1.5905243|
|     19|     3|    28| 1.7393432|
|     29|     1|    28| 1.0909404|
|     50|     1|    28| 1.4662006|
|     52|     1|    28| 1.4912053|
|     54|     1|    28| 1.3346343|
+-------+------+------+----------+
only showing top 10 rows



##### Question 3: Explain MSE, RMSE and MAE. Compare and evaluate both of your models with evaluation metrics (RMSE or MAE), show your code and print your results. Describe which one works better and why.

 Mean Squared Error (MSE), Root Mean Squared Error (RMSE), and Mean Absolute Error (MAE) are evaluation metrics used to measure the performance of regression models. Here's an explanation of each metric:

- MSE: MSE calculates the average of the squared differences between the predicted values and the actual values. It gives more weight to larger errors, making it sensitive to outliers.
MSE = Σ(predicted_value - actual_value)^2 / n

- RMSE: RMSE is the square root of MSE. It has the same units as the target variable, making it easier to interpret.
RMSE = √(Σ(predicted_value - actual_value)^2 / n)

- MAE: MAE calculates the average of the absolute differences between the predicted values and the actual values. It treats all errors equally, making it less sensitive to outliers than MSE and RMSE.
MAE = Σ|predicted_value - actual_value| / n

Lower values of MSE, RMSE, and MAE indicate better model performance. RMSE and MAE are more interpretable than MSE since they have the same units as the target variable. The choice between RMSE and MAE depends on the problem and the nature of the data. RMSE is more sensitive to outliers, so if your data has many outliers, MAE might be a better choice. However, if outliers are not a concern, RMSE is generally preferred because it gives more weight to larger errors. In our dataset, there are no outliers as all the movie ratings are between 1 and 5, hence I used RMSE to evaluate the performance of my model.

The 80/20 split works better for this dataset because it has a relatively small number of instances (1,501 rows). By using 80% of the data for training, the model has enough data points to learn the underlying patterns and generalize well to unseen data. The results are shown below.

In [0]:
# Evaluate the test set results using RMSE metric
evaluatorRMSE = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse8020 = evaluatorRMSE.evaluate(predictions8020)
rmse7030 = evaluatorRMSE.evaluate(predictions7030)
print("Root-mean-square error with 80/20 split on test data = " + str(rmse8020))
print("Root-mean-square error with 70/30 split on test data = " + str(rmse7030))

Root-mean-square error with 80/20 split on test data = 0.9275246242850462
Root-mean-square error with 70/30 split on test data = 1.0780020578144989


##### Question 4: Now tune the parameters of your algorithm to get the best set of parameters. Explain different parameters of the algorithm which you have used for tuning your algorithm. Evaluate all your models again. Show your code with the best values and output.

I have used the following paramter:

- **rank** is the number of latent factors in the model (defaults to 10).
- **maxIter** is the maximum number of iterations to run (defaults to 10).
- **regParam** specifies the regularization parameter in ALS (defaults to 1.0).

There is not much improvement in RMSE values for both 80/20 split and 70/30 split by optimising hyper parameter grid. The best RMSE split after tuning hyperparamters for 80/20 split is 0.95 and for 70/30 is 1.09.

In [0]:
# For 80/20 dataset split
paramGrid = ParamGridBuilder()\
            .addGrid(als.rank,[12,15,18])\
            .addGrid(als.maxIter,[15,20,25])\
            .addGrid(als.regParam,[0.075, 0.085, 0.1])\
            .build()

# crossVal = CrossValidator(estimator=als,
#                           estimatorParamMaps=paramGrid,
#                           evaluator=evaluatorRMSE,
#                           numFolds=3)

trainValSplit = TrainValidationSplit(estimator=als, 
                                     estimatorParamMaps=paramGrid, 
                                     evaluator=evaluatorRMSE,
                                     trainRatio=0.8,
                                     parallelism=2, seed=42)



In [0]:
# # For 80/20 split dataset
# Fitting the model on traing data of 80/20 split
model8020 = trainValSplit.fit(train8020)

# Best model
bestmodel8020 = model8020.bestModel

# Get the ALS estimator that was used to fit the best model
bestRank = bestmodel8020._java_obj.parent().getRank()
bestMaxIter = bestmodel8020._java_obj.parent().getMaxIter()
bestRegParam = bestmodel8020._java_obj.parent().getRegParam()

print(f"Best Rank: {bestRank}")
print(f"Best MaxIter: {bestMaxIter}")
print(f"Best RegParam: {bestRegParam}")

Best Rank: 15
Best MaxIter: 20
Best RegParam: 0.075


In [0]:
# Use the best model to make predictions on the test data
predictions8020 = bestmodel8020.transform(test8020)
predictions8020.show(10)

# Calculate RMSE on test set of 80/20 split
rmse8020 = evaluatorRMSE.evaluate(predictions8020)
print(f"Root-mean-square error = {rmse8020}")

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      2|     4|    28|  2.486236|
|     13|     2|    28| 2.3335807|
|     17|     1|    28| 1.0599709|
|     52|     1|    28| 1.0590941|
|     54|     1|    28| 0.8223515|
|     63|     1|    28| 1.1412219|
|     78|     1|    28|0.79123265|
|      0|     1|    27|0.77941245|
|      3|     1|    26| 1.3478315|
|      4|     4|    26| 1.6788038|
+-------+------+------+----------+
only showing top 10 rows

Root-mean-square error = 0.9384031582310887


In [0]:
alsBest = ALS(rank=bestRank, maxIter=bestMaxIter, regParam=bestRegParam, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

# Fitting the model on training data
model8020 = alsBest.fit(train8020)

# Predicting the results on test sets
predictions8020 = model8020.transform(test8020)

# Predictions on test set of 80/20 split dataset
print("======== ======== ")
print("Predictions on test set of 80/20 split dataset")
predictions8020.show(10)

Predictions on test set of 80/20 split dataset
+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      2|     4|    28|  2.486236|
|     13|     2|    28| 2.3335807|
|     17|     1|    28| 1.0599709|
|     52|     1|    28| 1.0590941|
|     54|     1|    28| 0.8223515|
|     63|     1|    28| 1.1412219|
|     78|     1|    28|0.79123265|
|      0|     1|    27|0.77941245|
|      3|     1|    26| 1.3478315|
|      4|     4|    26| 1.6788038|
+-------+------+------+----------+
only showing top 10 rows



In [0]:
# Calculate RMSE on test set of 80/20 split
rmse8020 = evaluatorRMSE.evaluate(predictions8020)
print(f"Root-mean-square error = {rmse8020}")

Root-mean-square error = 0.9384031582310887


In [0]:
# For 70/30 dataset split
paramGrid = ParamGridBuilder()\
            .addGrid(als.rank,[12,15,18])\
            .addGrid(als.maxIter,[15,20,25])\
            .addGrid(als.regParam,[0.075, 0.085, 0.1])\
            .build()

trainValSplit = TrainValidationSplit(estimator=als, 
                                     estimatorParamMaps=paramGrid, 
                                     evaluator=evaluatorRMSE,
                                     trainRatio=0.8,
                                     parallelism=2, seed=42)

# Fitting the model on traing data of 70/30 split
model7030 = trainValSplit.fit(train7030)

# Best model
bestmodel7030 = model7030.bestModel

# Get the ALS estimator that was used to fit the best model
bestRank = bestmodel7030._java_obj.parent().getRank()
bestMaxIter = bestmodel7030._java_obj.parent().getMaxIter()
bestRegParam = bestmodel7030._java_obj.parent().getRegParam()

print(f"Best Rank: {bestRank}")
print(f"Best MaxIter: {bestMaxIter}")
print(f"Best RegParam: {bestRegParam}")

Best Rank: 18
Best MaxIter: 25
Best RegParam: 0.1


In [0]:
# Use the best model to make predictions on the test data
predictions7030 = bestmodel7030.transform(test7030)
predictions7030.show(10)

# Calculate RMSE on test set of 70/30 split
rmse7030 = evaluatorRMSE.evaluate(predictions7030)
print(f"Root-mean-square error = {rmse7030}")

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|     31|     1|    27| 1.5220054|
|     31|     1|    13| 1.1410546|
|     31|     1|     5| 1.5448587|
|     31|     1|    19| 1.0114135|
|     31|     3|    14| 1.7945395|
|     31|     1|     0| 1.8213575|
|     85|     1|    26| 2.3645499|
|     85|     1|    13| 1.2472625|
|     85|     3|     6| 1.6347357|
|     85|     1|     5| 1.3983475|
+-------+------+------+----------+
only showing top 10 rows

Root-mean-square error = 1.0815026297803374


In [0]:
alsBest = ALS(rank=bestRank, maxIter=bestMaxIter, regParam=bestRegParam, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

# Fitting the model on training data
model7030 = alsBest.fit(train7030)

# Predicting the results on test sets
predictions7030 = model7030.transform(test7030)

# Predictions on test set of 70/30 split dataset
print("======== ======== ")
print("Predictions on test set of 70/30 split dataset")
predictions7030.show(10)

# Calculate RMSE on test set of 70/30 split
rmse7030 = evaluatorRMSE.evaluate(predictions7030)
print(f"Root-mean-square error = {rmse7030}")

Predictions on test set of 70/30 split dataset
+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|     31|     1|    27| 1.5220054|
|     31|     1|    13| 1.1410546|
|     31|     1|     5| 1.5448587|
|     31|     1|    19| 1.0114135|
|     31|     3|    14| 1.7945395|
|     31|     1|     0| 1.8213575|
|     85|     1|    26| 2.3645499|
|     85|     1|    13| 1.2472625|
|     85|     3|     6| 1.6347357|
|     85|     1|     5| 1.3983475|
+-------+------+------+----------+
only showing top 10 rows

Root-mean-square error = 1.0815026297803372


I am getting worse results after hyperparameter tuning, despite trying different combinations. I believe the main cause is that TrainValidationSplit uses with 80% of the training data for training and 20% for validation effectively reduces the overall training data to only 64% of the original data, which is already very small (1501 datapoints). This reduction likely leads to overfitting as the model is optimized using a parametric grid approach with a limited dataset. Hence I will be using original model with default hyperparamters (i.e rank = 10, maxIter = 10, regParam = 0.1, numUserBlocks = 10, numItemBlocks = 10) in Q5, which is best overall. 

##### Question 5: Calculate the top 12 movie recommendations for user ID 10 and user ID 12. Show your code and output. 


In [0]:
# Create a DataFrame with the specific user IDs (10 and 12)
users_to_recommend = df.select(als.getUserCol()).distinct().filter(col("userId").isin([10, 12]))

# Get the top 12 movie recommendations for each specified user
user_recommendations = modelDefault8020.recommendForUserSubset(users_to_recommend, 12)

# Extract and print recommendations for user ID 10 and user ID 12
user_recommendations_df = user_recommendations.select("userId", "recommendations.movieId", "recommendations.rating")

user_10_recommendations = user_recommendations_df.filter(col("userId") == 10).collect()
user_12_recommendations = user_recommendations_df.filter(col("userId") == 12).collect()

print("Top 12 movie recommendations for user ID 10:")
for row in user_10_recommendations:
    movie_ids = row.movieId
    ratings = row.rating
    formatted_ratings = [f"{rating:.3f}" for rating in ratings]
    print(f"UserID: {row.userId}, MovieIDs: {movie_ids}, Ratings: {formatted_ratings}")

print("\nTop 12 movie recommendations for user ID 12:")
for row in user_12_recommendations:
    movie_ids = row.movieId
    ratings = row.rating
    formatted_ratings = [f"{rating:.3f}" for rating in ratings]
    print(f"UserID: {row.userId}, MovieIDs: {movie_ids}, Ratings: {formatted_ratings}")


Top 12 movie recommendations for user ID 10:
UserID: 10, MovieIDs: [92, 40, 12, 81, 49, 25, 89, 42, 62, 4, 9, 82], Ratings: ['3.401', '2.894', '2.884', '2.694', '2.605', '2.574', '2.555', '2.466', '2.390', '2.347', '2.317', '2.240']

Top 12 movie recommendations for user ID 12:
UserID: 12, MovieIDs: [17, 55, 27, 64, 65, 46, 35, 50, 48, 90, 20, 32], Ratings: ['4.570', '4.453', '4.127', '4.122', '3.907', '3.815', '3.809', '3.555', '3.482', '3.465', '3.437', '3.354']
