# Part A

In [0]:
#all spark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
#instantiate the spark session
spark = SparkSession.builder.appName("Assi2").getOrCreate()

## Q1:
[Marks: 10] Count the odd and even numbers using file ‘integer.txt’ download it from the Quercus. Show your code and output.

In [0]:
integerRDD = spark.sparkContext.textFile("/FileStore/tables/integer.txt", 4) 

In [0]:
integerRDD = integerRDD.map(lambda line: int(line))
odd_numbers = integerRDD.filter(lambda num: num % 2 != 0)
even_numbers = integerRDD.filter(lambda num: num % 2 == 0)

In [0]:
odd_count = odd_numbers.count()
even_count = even_numbers.count()

print("Odd Numbers: ", odd_count)
print("Even Numbers: ", even_count)

Odd Numbers:  496
Even Numbers:  514


## Q2:
[Marks: 10] Calculate the salary sum per department using file ‘salary.txt’ download it from the Quercus. Show department name and salary sum. Show your code and output.

In [0]:
salaryRDD = spark.sparkContext.textFile("/FileStore/tables/salary.txt")

In [0]:
salaryRDD = salaryRDD.map(lambda line: line.split(" "))
salaryRDD = salaryRDD.map(lambda line: (line[0], float(line[1])))
salaryRDD = salaryRDD.reduceByKey(lambda x,y: x+y)

In [0]:
salaryRDD.collect()

Out[8]: [('Sales', 3488491.0),
 ('Research', 3328284.0),
 ('Developer', 3221394.0),
 ('QA', 3360624.0),
 ('Marketing', 3158450.0)]

##Q3: [TODO]
[Marks: 10] Implement MapReduce using Pyspark on file ‘shakespeare.txt’ download it from the Quercus. Show how many times these particular words appear in the document: Shakespeare, why, Lord, Library, GUTENBERG, WILLIAM, COLLEGE and WORLD. (Count exact words only (marks will be deducted for incorrect lowercase/uppercase))

In [0]:
wordsRDD = spark.sparkContext.textFile("/FileStore/tables/shakespeare_1.txt")

In [0]:
wordList = ["Shakespeare", "why", "Lord", "Library", "GUTENBERG", "WILLIAM", "COLLEGE", "WORLD"]

In [0]:
counts = (
    wordsRDD.flatMap(lambda line: line.split())
    .filter(lambda word: word in wordList)
    .map(lambda word: (word, 1))
    .reduceByKey(lambda x,y: x + y) #Reduce part of MapReduce
)

In [0]:
for word, count in counts.collect():
    print(f"{word}: {count}")

Shakespeare: 22
GUTENBERG: 99
WILLIAM: 115
WORLD: 98
COLLEGE: 98
why: 91
Lord: 341
Library: 2


## Q4
[Marks: 10] Calculate top 10 and bottom 10 words using file ‘shakespeare.txt’ download it from the Quercus. Show 10 words with most count and 10 words with least count. You can limit by 10 in ascending and descending order of count. Show your code and output.

In [0]:
wordsRDD = spark.sparkContext.textFile("/FileStore/tables/shakespeare_1.txt")

In [0]:
all_counts = (
    wordsRDD.flatMap(lambda line: line.split())
    .map(lambda word: (word, 1))
    .reduceByKey(lambda x,y: x + y)
    .sortBy(lambda x: x[1], ascending=False)
)

In [0]:
most_count = all_counts.take(10)
least_count = all_counts.sortBy(lambda x: x[1], ascending=True).take(10)

In [0]:
print("Most Common Words")
for m_word in most_count:
    print(f"{m_word}")

print("Least Common Words")
for l_word in least_count:
    print(f"{l_word}")

Most Common Words
('the', 11397)
('and', 8777)
('I', 8556)
('of', 7873)
('to', 7421)
('a', 5672)
('my', 4913)
('in', 4600)
('you', 4060)
('And', 3547)
Least Common Words
('anyone', 1)
('restrictions', 1)
('whatsoever.', 1)
('re-use', 1)
('online', 1)
('www.gutenberg.org', 1)
('COPYRIGHTED', 1)
('eBook,', 1)
('Details', 1)
('guidelines', 1)


# Part B

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

##Q1
[Marks: 10] Describe your data. Calculate top 20 movies with highest ratings and top 15 users who provided highest ratings. Show your code and output.

In [0]:
m_path = "/FileStore/tables/movies.csv"
df = (
    spark.read
    .format("csv")
    .option("inferSchema", True)
    .option("header", True)
    .option("sep", ",")
    .option("path", m_path)
    .load()
)

In [0]:
display(df)

movieId,rating,userId
2,3,0
3,1,0
5,2,0
9,4,0
11,1,0
12,2,0
15,1,0
17,1,0
19,1,0
21,1,0


In [0]:
display(df.describe())

summary,movieId,rating,userId
count,1501.0,1501.0,1501.0
mean,49.40572951365756,1.7741505662891406,14.383744170552964
stddev,28.937034065088994,1.187276166124803,8.591040424293272
min,0.0,1.0,0.0
max,99.0,5.0,29.0


In [0]:
top_movies = (
    df.groupBy("movieId")
    .agg(round(avg("rating"), 3).alias("avg_rating"))
    .sort(col("avg_rating").desc())
    .limit(20)
)

In [0]:
#Top Movies
top_movies.show()

+-------+----------+
|movieId|avg_rating|
+-------+----------+
|     32|     2.917|
|     90|     2.813|
|     30|       2.5|
|     94|     2.474|
|     23|     2.467|
|     49|     2.438|
|     29|       2.4|
|     18|       2.4|
|     52|     2.357|
|     53|      2.25|
|     62|      2.25|
|     92|     2.214|
|     46|       2.2|
|     68|     2.158|
|     87|     2.133|
|      2|     2.105|
|     69|     2.077|
|     27|     2.067|
|     88|     2.056|
|     22|      2.05|
+-------+----------+



In [0]:
top_users = (
    df.groupBy("userId")
    .agg(round(avg("rating"), 3).alias("avg_rating"))
    .sort(col("avg_rating").desc())
    .limit(15)
)

In [0]:
#Top Users
top_users.show()

+------+----------+
|userId|avg_rating|
+------+----------+
|    11|     2.286|
|    26|     2.204|
|    22|     2.161|
|    23|     2.135|
|     2|     2.065|
|    17|     1.957|
|     8|     1.898|
|    24|     1.885|
|    12|     1.855|
|     3|     1.833|
|    29|     1.826|
|    28|      1.82|
|     9|     1.792|
|    14|     1.789|
|    16|     1.778|
+------+----------+



##Q2
[Marks: 10] Split dataset into train and test. Try 2 different combinations for e.g. (60/40, 70/30, 75/25 and 80/20). (Train your model and use collaborative filtering approach on 70 percent of your data and test with the other 30 percent and so on). Show your code and output.

In [0]:
m_path = "/FileStore/tables/movies.csv"
df = (
    spark.read
    .format("csv")
    .option("inferSchema", True)
    .option("header", True)
    .option("sep", ",")
    .option("path", m_path)
    .load()
)

In [0]:
#Splitting the dataset
(train_70, test_30) = df.randomSplit([0.7, 0.3], seed=111) #just to keep consistant while I debug
(train_60, test_40) = df.randomSplit([0.6, 0.4], seed=111)

In [0]:
als = ALS(maxIter=5, 
          regParam=0.01, 
          userCol="userId", 
          itemCol="movieId", 
          ratingCol="rating",
          coldStartStrategy="drop"
    )

In [0]:
model_70 = als.fit(train_70)
predictions_70 = model_70.transform(test_30)

In [0]:
model_60 = als.fit(train_60)
predictions_60 = model_60.transform(test_40)

In [0]:
# Initialize evaluator
evaluate_metric = "rmse"
evaluator = RegressionEvaluator(metricName=evaluate_metric, labelCol="rating", predictionCol="prediction")

# Evaluate the model by computing the RMSE on the test data
rmse_70 = evaluator.evaluate(predictions_70)
rmse_60 = evaluator.evaluate(predictions_60)

print(f"{evaluate_metric} error (70/30 split) = " + str(rmse_70))
print(f"{evaluate_metric} error (60/40 split) = " + str(rmse_60))

rmse error (70/30 split) = 2.263824121150844
rmse error (60/40 split) = 2.2441067464804685


##Q2

[Marks: 10] Explain MSE, RMSE and MAE. Compare and evaluate both of your models with evaluation metrics (RMSE or MAE), show your code and print your results. Describe which one works better and why?

MSE:
- Mean Squared Error - Takes the squared difference between the two values and averages that value
$$
\text{MSE} = \frac{1}{n} \sum_{i=1}^{n} (Y_i - \hat{Y}_i)^2
$$
- gives more weight to larger differences because of the square

RMSE:
- Root Mean Squared Error - square root of MSE
$$
\text{RMSE} = \sqrt{\text{MSE}} = \sqrt{\frac{1}{n} \sum_{i=1}^{n} (Y_i - \hat{Y}_i)^2}
$$
- Returns error in same units as original values

MAE:
- Mean Absolute Error 
$$
\text{MAE} = \frac{1}{n} \sum_{i=1}^{n} |Y_i - \hat{Y}_i|
$$
- Less sensitive to outliers because it doesnt square the difference in terms

In [0]:
metrics = ["rmse", "mse", "mae"]

for evaluate_metric in metrics:
    evaluator = RegressionEvaluator(metricName=evaluate_metric, labelCol="rating", predictionCol="prediction")

    eval_70 = evaluator.evaluate(predictions_70)
    eval_60 = evaluator.evaluate(predictions_60)

    print(f"{evaluate_metric} (70/30 split) = " + str(eval_70))
    print(f"{evaluate_metric} (60/40 split) = " + str(eval_60))

rmse (70/30 split) = 2.2638241211508454
rmse (60/40 split) = 2.2441067464804685
mse (70/30 split) = 5.124899651504391
mse (60/40 split) = 5.03601508959915
mae (70/30 split) = 1.6504281881732055
mae (60/40 split) = 1.7097732553887643


Preforms the best according with mae error (for both split cases)

## Q4

[Marks: 20] Now tune the parameters of your algorithm to get the best set of parameters. Explain different parameters of the algorithm which you have used for tuning your algorithm. Evaluate all your models again. Show your code with best values and output.


I chose the following parameters to tune:
1. Rank - Indicates the number of latent factors used to model user and item profiles in matrix factorization, higher rank better at higher complexity, but may overfit
2. Max Iterations - Maximum number of iterations, larger means more iterations a likely better results at the cost of computing power
3. Regularization Parameter - Helps control model complexity, preventing over-specialization/fitting to the training data

In [0]:
als = ALS(maxIter=5, 
          regParam=0.01, 
          userCol="userId", 
          itemCol="movieId", 
          ratingCol="rating",
          coldStartStrategy="drop"
    )

In [0]:
evaluator = RegressionEvaluator(metricName=evaluate_metric, labelCol="rating", predictionCol="prediction")

In [0]:
paramGrid = (ParamGridBuilder()
             .addGrid(als.rank, [10, 50, 100])
             .addGrid(als.maxIter, [5, 10])
             .addGrid(als.regParam, [0.01, 0.1, 0.5])
             .build())

In [0]:
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [0]:
cvModel = crossval.fit(train_70)

In [0]:
best_model = cvModel.bestModel

predictions = best_model.transform(test_30)
rmse = evaluator.evaluate(predictions)

print(f"The root mean squared error for our model is: {rmse}")
print(f"Best Rank: {best_model.rank}")
print(f"Best MaxIter: {best_model._java_obj.parent().getMaxIter()}")
print(f"Best RegParam: {best_model._java_obj.parent().getRegParam()}")

The root mean squared error for our model is: 0.7719996752686227
Best Rank: 50
Best MaxIter: 10
Best RegParam: 0.1


##Q5

[Marks: 10]: Calculate top 15 movies recommendations for user id 10 and user id 14. Show your code and output.

In [0]:
users = spark.createDataFrame([Row(userId=10), Row(userId=14)])

userRecs = best_model.recommendForUserSubset(users, 15)

userRecs.show(truncate=False)

+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                                                                              |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|10    |[{2, 3.4701433}, {40, 2.9946866}, {49, 2.9023652}, {81, 2.6283717}, {42, 2.5993888}, {62, 2.568149}, {82, 2.4535387}, {29, 2.3663242}, {0, 2.3144367}, {58, 2.279584}, {25, 2.2745152}, {43, 2.2139893}, {9