### Installing and Configuring PySpark

In [None]:
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, row_number, dense_rank,avg
from pyspark.sql.window import Window
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import ArrayType



# Create a SparkSession
spark = SparkSession.builder.appName("Amazon Recommendation System on Electronics").getOrCreate()

### Uploading Files

In [None]:
from google.colab import files
files.upload()

Saving Amazon_Electronics_Ratings.csv to Amazon_Electronics_Ratings.csv


#### Read the CSV files into DataFrames using spark

In [67]:
amazon_df = spark.read.format("csv").option("header", "false").load("/content/Amazon_Electronics_Ratings.csv")

In [68]:
amazon_df.show(10)

+--------------+----------+---+----------+
|           _c0|       _c1|_c2|       _c3|
+--------------+----------+---+----------+
| AKM1MP6P0OYPR|0132793040|5.0|1365811200|
|A2CX7LUOHB2NDG|0321732944|5.0|1341100800|
|A2NWSAGRHCP8N5|0439886341|1.0|1367193600|
|A2WNBOD3WNDNKT|0439886341|3.0|1374451200|
|A1GI0U4ZRJA8WN|0439886341|1.0|1334707200|
|A1QGNMC6O1VW39|0511189877|5.0|1397433600|
|A3J3BRHTDRFJ2G|0511189877|2.0|1397433600|
|A2TY0BTJOTENPG|0511189877|5.0|1395878400|
|A34ATBPOK6HCHY|0511189877|5.0|1395532800|
| A89DO69P0XZ27|0511189877|5.0|1395446400|
+--------------+----------+---+----------+
only showing top 10 rows



### Data Pre-Processing

In [None]:
new_Columns = {
    '_c0': 'User_ID',
    '_c1': 'Product_ID',
    '_c2': 'Rating',
    '_c3': 'Timestamp'
}
amazon_df = amazon_df.withColumnsRenamed(new_Columns)

In [None]:
amazon_df.show(10)

+--------------+----------+------+----------+
|       User_ID|Product_ID|Rating| Timestamp|
+--------------+----------+------+----------+
| AKM1MP6P0OYPR|0132793040|   5.0|1365811200|
|A2CX7LUOHB2NDG|0321732944|   5.0|1341100800|
|A2NWSAGRHCP8N5|0439886341|   1.0|1367193600|
|A2WNBOD3WNDNKT|0439886341|   3.0|1374451200|
|A1GI0U4ZRJA8WN|0439886341|   1.0|1334707200|
|A1QGNMC6O1VW39|0511189877|   5.0|1397433600|
|A3J3BRHTDRFJ2G|0511189877|   2.0|1397433600|
|A2TY0BTJOTENPG|0511189877|   5.0|1395878400|
|A34ATBPOK6HCHY|0511189877|   5.0|1395532800|
| A89DO69P0XZ27|0511189877|   5.0|1395446400|
+--------------+----------+------+----------+
only showing top 10 rows



Checking for NULL values

In [None]:
# Checking null values in each column
null_values = amazon_df.select([count(when(col(c).isNull(), c)).alias(c) for c in amazon_df.columns])

# Showing null values
null_values.show()

+-------+----------+------+---------+
|User_ID|Product_ID|Rating|Timestamp|
+-------+----------+------+---------+
|      0|         0|     0|        0|
+-------+----------+------+---------+



Neither of the columns have null values, so we will not perform any imputations over here.

In [None]:
# Changing the type of 'Rating column to integer'
column_name = ['Rating']

for N in column_name:
    amazon_df = amazon_df.withColumn(N, col(N).cast('integer'))


For Modelling, we would need all the columns in our data to be numeric.After checking the Product_ID we understood that it had a lot of non-numeric charaters, so we created a new column "New_Product_ID" where all the values are numeric.

In [None]:
windowSpec = Window.orderBy("Product_ID")
amazon_df = amazon_df.withColumn("New_Product_ID", dense_rank().over(windowSpec))
# Showing the DataFrame after the conversion
amazon_df.show(10)

+--------------+----------+------+----------+--------------+
|       User_ID|Product_ID|Rating| Timestamp|New_Product_ID|
+--------------+----------+------+----------+--------------+
| AKM1MP6P0OYPR|0132793040|     5|1365811200|             1|
|A2CX7LUOHB2NDG|0321732944|     5|1341100800|             2|
|A2NWSAGRHCP8N5|0439886341|     1|1367193600|             3|
|A2WNBOD3WNDNKT|0439886341|     3|1374451200|             3|
|A1GI0U4ZRJA8WN|0439886341|     1|1334707200|             3|
|A1QGNMC6O1VW39|0511189877|     5|1397433600|             4|
|A3J3BRHTDRFJ2G|0511189877|     2|1397433600|             4|
|A2TY0BTJOTENPG|0511189877|     5|1395878400|             4|
|A34ATBPOK6HCHY|0511189877|     5|1395532800|             4|
| A89DO69P0XZ27|0511189877|     5|1395446400|             4|
+--------------+----------+------+----------+--------------+
only showing top 10 rows



Similarly, After checking the user_ID we understood that it had a lot of non-numeric charaters, so we created a new column "New_User_ID" where all the values are numeric.



In [None]:
windowSpec = Window.orderBy("user_ID")
amazon_df = amazon_df.withColumn("New_User_ID", dense_rank().over(windowSpec))

# Showing the DataFrame after the conversion
amazon_df.show(10)

+--------------------+----------+------+----------+--------------+-----------+
|             User_ID|Product_ID|Rating| Timestamp|New_Product_ID|New_User_ID|
+--------------------+----------+------+----------+--------------+-----------+
|A00000262KYZUE4J5...|B003UYU16G|     5|1353456000|        195535|          1|
|A000063614T1OE0BU...|B00419ZT3E|     5|1365120000|        203899|          2|
|A000063614T1OE0BU...|B00432ZSHG|     5|1365120000|        206616|          2|
|A00009182QVLSWIGH...|B009SXR7WE|     5|1364947200|        372948|          3|
|A00009661LC9LQPGK...|B004GWQBWY|     5|1351209600|        226215|          4|
|A00010809P09NUU6ZP6H|B002SSM5AU|     5|1365379200|        154933|          5|
|A00014061C2IZNE0Y...|B00EKSG8JU|     4|1390348800|        438092|          6|
|A000145014WOTZJ5N...|B00F3L19KQ|     5|1405382400|        442988|          7|
|A00015222LZ55IJSV...|B001MSVPM6|     1|1361491200|        121040|          8|
|A00015228CUPGPF957DS|B00474ORI6|     1|1402617600| 

### EDA of the Dataset

Checking the types of the new columns

In [None]:
amazon_df.printSchema()

root
 |-- User_ID: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- New_Product_ID: integer (nullable = false)
 |-- New_User_ID: integer (nullable = false)



Getting a summary of the data

In [None]:
# Displaying the statistics
amazon_df.select(column_name).describe().show()

+-------+------------------+
|summary|            Rating|
+-------+------------------+
|  count|           7824482|
|   mean| 4.012336791112817|
| stddev|1.3809098185804336|
|    min|                 1|
|    max|                 5|
+-------+------------------+



Total Number of Users giving ratings


In [None]:
len(amazon_df.select('User_ID').distinct().collect())

4201696

Total Number of Products in the data.

In [None]:
len(amazon_df.select('Product_ID').distinct().collect())

476002

Re-checking for NULL values

In [None]:
# Checking null values in each column
null_values = amazon_df.select([count(when(col(c).isNull(), c)).alias(c) for c in amazon_df.columns])

# Showing nul values
null_values.show()

+-------+----------+------+---------+--------------+-----------+
|User_ID|Product_ID|Rating|Timestamp|New_Product_ID|New_User_ID|
+-------+----------+------+---------+--------------+-----------+
|      0|         0|     0|        0|             0|          0|
+-------+----------+------+---------+--------------+-----------+



In [65]:
rating_stats = amazon_df.groupBy('Rating').agg(count('Rating').alias('Total Ratings'))

In [66]:
rating_stats.show()

+------+-------------+
|Rating|Total Ratings|
+------+-------------+
|     1|       901765|
|     3|       633073|
|     5|      4347541|
|     4|      1485781|
|     2|       456322|
+------+-------------+



### Pre-Processing Post EDA

Dropping the Timestamp column, as it does not have much relevance in our recommendation system.

In [None]:
amazon_df = amazon_df.drop("Timestamp")

In [None]:
amazon_df.show(10)

+--------------------+----------+------+--------------+-----------+
|             User_ID|Product_ID|Rating|New_Product_ID|New_User_ID|
+--------------------+----------+------+--------------+-----------+
|A00000262KYZUE4J5...|B003UYU16G|     5|        195535|          1|
|A000063614T1OE0BU...|B00419ZT3E|     5|        203899|          2|
|A000063614T1OE0BU...|B00432ZSHG|     5|        206616|          2|
|A00009182QVLSWIGH...|B009SXR7WE|     5|        372948|          3|
|A00009661LC9LQPGK...|B004GWQBWY|     5|        226215|          4|
|A00010809P09NUU6ZP6H|B002SSM5AU|     5|        154933|          5|
|A00014061C2IZNE0Y...|B00EKSG8JU|     4|        438092|          6|
|A000145014WOTZJ5N...|B00F3L19KQ|     5|        442988|          7|
|A00015222LZ55IJSV...|B001MSVPM6|     1|        121040|          8|
|A00015228CUPGPF957DS|B00474ORI6|     1|        212397|          9|
+--------------------+----------+------+--------------+-----------+
only showing top 10 rows



### Recommendation Models

### **Using Popularity Based Method**

In [None]:
# Split the data into training and testing sets
train_df_split, test_df_split = amazon_df.randomSplit([0.7, 0.3], seed=97)

#### Calculate the global mean rating
##### This gives us the mean rating of all the products that have been rated by the users

In [None]:
global_mean_rating = train_df_split.select(avg('Rating')).first()[0]
global_mean_rating

4.012564681493862

#### Calculate the mean rating for each user and product respectively in the training set


In [None]:
user_column = Window().partitionBy('New_User_ID')
mean_rating_per_user = train_df_split.withColumn('mean_rating_user', avg('Rating').over(user_column)).select('New_User_ID', 'mean_rating_user')

In [None]:
product_column = Window().partitionBy('New_Product_ID')
mean_rating_per_product = train_df_split.withColumn('mean_rating_product', avg('Rating').over(product_column)).select('New_Product_ID', 'mean_rating_product')

#### Mean Rating for every User

In [None]:
mean_rating_per_user.show(10)

+-----------+----------------+
|New_User_ID|mean_rating_user|
+-----------+----------------+
|          1|             5.0|
|          2|             5.0|
|          2|             5.0|
|          3|             5.0|
|          5|             5.0|
|          6|             4.0|
|          7|             5.0|
|          8|             1.0|
|         10|             4.5|
|         10|             4.5|
+-----------+----------------+
only showing top 10 rows



#### Mean Rating for every Product

In [None]:
mean_rating_per_product.show(10)

+--------------+-------------------+
|New_Product_ID|mean_rating_product|
+--------------+-------------------+
|             1|                5.0|
|             2|                5.0|
|             3|                1.0|
|             4|                5.0|
|             4|                5.0|
|             4|                5.0|
|             5|  2.869565217391304|
|             5|  2.869565217391304|
|             5|  2.869565217391304|
|             5|  2.869565217391304|
+--------------+-------------------+
only showing top 10 rows



#### Compute the standard deviations from the global average for each user and product

In [42]:
user_deviations = mean_rating_per_user.rdd.map(lambda row: (row['New_User_ID'], row['mean_rating_user'] - global_mean_rating)).collectAsMap()

In [47]:
product_deviations = mean_rating_per_product.rdd.map(lambda row: (row['New_Product_ID'], row['mean_rating_product'] - global_mean_rating)).collectAsMap()

#### Predicting rating for test users for given product based on the training data

In [48]:
def prediction_function(New_User_ID, New_Product_ID):
    user_deviation = user_deviations.get(New_User_ID, 0)
    product_deviation = product_deviations.get(New_Product_ID, 0)
    return global_mean_rating + user_deviation + product_deviation

#### Adding a column for the predicted rating of the test data

In [52]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
final_prediction = udf(prediction_function, FloatType())
test_df_split = test_df_split.withColumn('Final_predicted_rating', final_prediction('New_User_ID', 'New_Product_ID'))

In [53]:
test_df_split.show(5)

+--------------------+----------+------+--------------+-----------+----------------------+
|             User_ID|Product_ID|Rating|New_Product_ID|New_User_ID|Final_predicted_rating|
+--------------------+----------+------+--------------+-----------+----------------------+
|A00009661LC9LQPGK...|B004GWQBWY|     5|        226215|          4|             3.7337663|
|A00015228CUPGPF957DS|B00474ORI6|     1|        212397|          9|              2.689655|
|A00018041RRVMCICC...|B004EIJXES|     5|        222075|         11|             4.3535557|
|A000186437REL8X2R...|B007X26T3A|     4|        327500|         12|              3.857143|
|A000187635I595IAV...|B003EO1H7E|     5|        178699|         13|              4.889912|
+--------------------+----------+------+--------------+-----------+----------------------+
only showing top 5 rows



#### Compute the Root Mean Square Error on the test data by comapring the predicted and actual ratings

In [54]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='Rating', predictionCol='Final_predicted_rating')
rmse = evaluator.evaluate(test_df_split)
print(f'RMSE: {rmse}')

RMSE: 1.422104915105256


We have got a score RMSE score of 1.422, which is not a good score for our model.

## **Collaborative Filtering using Alternate least square method**

#### Dividing the Data into Training and Test

In [55]:
(training, test) = amazon_df.randomSplit([0.70, 0.30], seed=97)

In [56]:
als = ALS(maxIter=8, regParam=0.1, coldStartStrategy="drop", userCol="New_User_ID", itemCol="New_Product_ID", ratingCol="Rating")
model = als.fit(training)

#### Fitting the model on Test Data and generating predictions

In [57]:
als_model = als.fit(test)
predicted_rating = als_model.transform(test)
predicted_rating.show()

+--------------+----------+------+--------------+-----------+----------+
|       User_ID|Product_ID|Rating|New_Product_ID|New_User_ID|prediction|
+--------------+----------+------+--------------+-----------+----------+
|A1GI0U4ZRJA8WN|0439886341|     1|             3|     511953| 0.9656209|
|A2WNBOD3WNDNKT|0439886341|     3|             3|    2118941| 2.8968623|
|A1E4WG8HRWWK4R|0528881469|     5|             5|     439076| 4.8160067|
|A29LPQQDG7LD5J|0528881469|     1|             5|    1409800| 0.9744198|
| AR84FMFYCQCWF|0528881469|     1|             5|    3932068| 0.9744198|
|A28B1G1MSJ6OO1|0528881469|     4|             5|    1369550|  3.897679|
|A1NQPG5IJ43HJI|0558835155|     3|             6|     735676| 2.9445274|
|A2TKKYL3GKFS2M|0594033926|     5|            12|    2024572| 4.8438396|
|A3I1C8WM8DLSMM|0594033926|     5|            12|    2778084|  4.937534|
| AHYURLVH267MA|0594033926|     5|            12|    3647039|  4.937534|
|A29LNVZLBG0IYN|0594033926|     5|            12|  

#### Calculating RMSE and evaluating the model

In [58]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
rmse = evaluator.evaluate(predicted_rating)
print("RMSE:", rmse)

RMSE: 0.1966442961699962


We have got a score RMSE score of 0.196, which is a good score for our model.

#### Building a Model on Collaborative filtering using ALS

In [59]:
distinct_user_ID = amazon_df.select('New_User_ID').distinct().rdd.flatMap(lambda x: x)
distinct_user_ID.collect()[:10]

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [60]:
distinct_users_df = spark.createDataFrame([(user,) for user in distinct_user_ID.collect()[:500] ], ['New_User_ID'])

In [61]:
new_recommended_users = als_model.recommendForUserSubset(distinct_users_df, 2)

#### Recommending two new products for users

In [62]:
from pyspark.sql.functions import col, expr
recommedation_result = new_recommended_users.select(col("New_User_ID"), expr("transform(recommendations, x -> x.New_Product_ID) as New_Product_ID"))


recommedation_result.show(truncate=False)


+-----------+----------------+
|New_User_ID|New_Product_ID  |
+-----------+----------------+
|4          |[298989, 236556]|
|9          |[315113, 337359]|
|11         |[324323, 319345]|
|12         |[101953, 469658]|
|13         |[327851, 251577]|
|15         |[76573, 331093] |
|17         |[398358, 91992] |
|23         |[214020, 462020]|
|24         |[402597, 363908]|
|25         |[327851, 129981]|
|26         |[50902, 2140]   |
|28         |[217943, 26022] |
|29         |[129580, 15264] |
|32         |[112385, 207018]|
|33         |[62398, 41116]  |
|35         |[220718, 36956] |
|36         |[318669, 70587] |
|38         |[402597, 132526]|
|39         |[349622, 221548]|
|41         |[129580, 15264] |
+-----------+----------------+
only showing top 20 rows

