# Q1

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()
ratings = spark.read.json("movies.json").select("user_id", "product_id", "score")
ratings.cache()

ratings.printSchema()
ratings.show()

root
 |-- user_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- score: double (nullable = true)

+--------------+----------+-----+
|       user_id|product_id|score|
+--------------+----------+-----+
|A141HP4LYPWMSR|B003AI2VGA|  3.0|
|A328S9RN3U5M68|B003AI2VGA|  3.0|
|A1I7QGUDP043DG|B003AI2VGA|  5.0|
|A1M5405JH9THP9|B003AI2VGA|  3.0|
| ATXL536YX71TR|B003AI2VGA|  3.0|
|A3QYDL5CDNYN66|B003AI2VGA|  2.0|
| AQJVNDW6YZFQS|B003AI2VGA|  1.0|
| AD4CDZK7D31XP|B00006HAXW|  5.0|
|A3Q4S5DFVPB70D|B00006HAXW|  5.0|
|A2P7UB02HAVEPB|B00006HAXW|  5.0|
|A2TX99AZKDK0V7|B00006HAXW|  4.0|
| AFC8IKR407HSK|B00006HAXW|  5.0|
|A1FRPGQYQTAOR1|B00006HAXW|  5.0|
|A1RSDE90N6RSZF|B00006HAXW|  5.0|
|A1OUBOGB5970AO|B00006HAXW|  4.0|
|A3NPHQVIY59Y0Y|B00006HAXW|  5.0|
| AFKMBAY28XO8A|B00006HAXW|  5.0|
| A66KMXH9V7OGU|B00006HAXW|  5.0|
| AFJ27ZV9183B8|B00006HAXW|  5.0|
| AXMKAXC0TR9AW|B00006HAXW|  5.0|
+--------------+----------+-----+
only showing top 20 rows



# Q2

In [7]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_index").fit(ratings)
    for column in ["user_id", "product_id"]
]

pipeline = Pipeline(stages=indexers)
ratings_indexed = pipeline.fit(ratings).transform(ratings)

training_data,validation_data = ratings_indexed.randomSplit([8.0,2.0])

als = ALS(userCol="user_id_index",itemCol="product_id_index",ratingCol="score",rank=10,maxIter=5,regParam=0.01,coldStartStrategy="drop")
evaluator = RegressionEvaluator(metricName="rmse",labelCol="score",predictionCol="prediction")

model = als.fit(training_data)
predictions=model.transform(validation_data)
predictions.show(10,False)

+--------------+----------+-----+-------------+----------------+----------+
|user_id       |product_id|score|user_id_index|product_id_index|prediction|
+--------------+----------+-----+-------------+----------------+----------+
|A2EIEXQVCPXZNG|B0001G6PZC|3.0  |897.0        |7.0             |1.095872  |
|A1YR4OM2QLQ1LY|B0002V7TJM|4.0  |1339.0       |680.0           |4.1145973 |
|ADEETJWRKD6OA |B000063W82|5.0  |5287.0       |6.0             |-1.3428309|
|AX9J0U8J6XTCY |B0001G6PZC|5.0  |1903.0       |7.0             |3.4624398 |
|AQ01Q3070LT29 |B000063W1R|1.0  |53.0         |37.0            |10.190427 |
|A1TW9ZGRDQQZ2Y|B0001G6PZC|5.0  |133.0        |7.0             |5.2124743 |
|APW72P4COWTBK |B0001G6PZC|5.0  |5670.0       |7.0             |2.5696666 |
|A3OIZEXS8CGBOD|B0001G6PZC|1.0  |1005.0       |7.0             |0.99860936|
|A3OIZEXS8CGBOD|B0001G6PZC|1.0  |1005.0       |7.0             |0.99860936|
|A3OIZEXS8CGBOD|B0001G6PZC|1.0  |1005.0       |7.0             |0.99860936|
+-----------

# Q3

In [8]:
user1 = validation_data.filter(validation_data['user_id_index']==1.0).select(['product_id','user_id','user_id_index','product_id_index'])
user1.show()
recommendations = model.transform(user1) 
recommendations.orderBy('prediction',ascending=False).show()


+----------+--------------+-------------+----------------+
|product_id|       user_id|user_id_index|product_id_index|
+----------+--------------+-------------+----------------+
|B000UGBOT0|A2NJO6YE954DBH|          1.0|            78.0|
|6303998690|A2NJO6YE954DBH|          1.0|           408.0|
|B000AMWIVM|A2NJO6YE954DBH|          1.0|            11.0|
|B000AMWIVM|A2NJO6YE954DBH|          1.0|            11.0|
|B001QB5SCM|A2NJO6YE954DBH|          1.0|            87.0|
|6301798708|A2NJO6YE954DBH|          1.0|           198.0|
|B0000DK4QK|A2NJO6YE954DBH|          1.0|            51.0|
|B0072V6PPE|A2NJO6YE954DBH|          1.0|            53.0|
|B000083EDB|A2NJO6YE954DBH|          1.0|            61.0|
|B000OYTPJO|A2NJO6YE954DBH|          1.0|           112.0|
|B00004CZR2|A2NJO6YE954DBH|          1.0|            55.0|
|B00005Y6YQ|A2NJO6YE954DBH|          1.0|           376.0|
|B00005Y6YQ|A2NJO6YE954DBH|          1.0|           376.0|
|B000XJSL9U|A2NJO6YE954DBH|          1.0|            14.

# Q4

In [9]:
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

# Additional Evaluation Metric: Mean Absolute Error (MAE)
evaluator_mae = RegressionEvaluator(
    metricName="mae",
    labelCol="score",
    predictionCol="prediction"
)

mae = evaluator_mae.evaluate(predictions)
print(f"Mean Absolute Error (MAE) = {mae}")


Root Mean Squared Error (RMSE) = 5.111305408388869
Mean Absolute Error (MAE) = 3.259489457352753
