## **Methodology**

Content-based filtering is a recommendation methodology that suggests items to a user based on the attributes of items they have previously interacted with or rated highly. The approach relies on the idea that if a user liked an item, they might like similar items based on their attributes or features.

1. **Item Representation**:
   Each item is represented as a vector of features, $ I_i $= $f_1, f_2, ..., f_n\} $, where $ f_k $ is a numerical or categorical feature of the item. Examples of features include:
   - Metadata (title, category, description)
   - Numerical attributes (price, rating)
   - Encoded features from text descriptions or other embeddings.

2. **User Profile Representation**:
   A user's preferences are represented as a vector, $ U $, typically calculated as an aggregation of the feature vectors of items the user has interacted with or rated. For example, the user profile $ U $ can be computed as:
   $
   U = \frac{\sum_{i \in \text{RatedItems}} R_u(i) \cdot I_i}{\sum_{i \in \text{RatedItems}} R_u(i)}
   $
   Where:
   - $ R_u(i) $: The rating given by user $ u $ to item $ i $
   - $ I_i $: The feature vector of item $ i $

3. **Similarity Computation**:
   The similarity between items is calculated using a similarity metric. Common metrics include:
   - **Cosine Similarity**:
     $
     \text{Similarity}(I_a, I_b) = \frac{I_a \cdot I_b}{\|I_a\| \|I_b\|}
     $
   - **Euclidean Distance** (converted to similarity):
     $
     \text{Similarity}(I_a, I_b) = \frac{1}{1 + \|I_a - I_b\|}
     $
   - **Pearson Correlation**:
     $
     \text{Similarity}(I_a, I_b) = \frac{\sum (I_a - \bar{I_a})(I_b - \bar{I_b})}{\sqrt{\sum (I_a - \bar{I_a})^2 \sum (I_b - \bar{I_b})^2}}
     $

4. **Recommendation Generation**:
   To generate recommendations for a user:
   - Compute the similarity of all items in the catalog to the items the user has rated or interacted with.
   - Aggregate the similarity scores and rank items by their relevance:
     $
     \text{Relevance}(i, u) = \sum_{j \in \text{RatedItems}} \text{Similarity}(I_i, I_j) \cdot R_u(j)
     $
   - Filter out items the user has already interacted with.

5. **Diversity and Novelty**:
   To improve diversity, penalties can be introduced for items that are too similar to each other in the recommendation list:
   $
   \text{Diversity Penalty}(I_i, I_j) = \text{PenaltyFactor} \cdot \text{Similarity}(I_i, I_j)
   $
   Adjust the final relevance scores:
   $
   \text{FinalRelevance}(i) = \text{Relevance}(i, u) - \sum_{j \in \text{Recommendations}} \text{Diversity Penalty}(I_i, I_j)
   $

### Implementation Steps:
1. **Compute Similarity**: In the code, similarity is computed using the absolute difference in ratings:
   $
   \text{Similarity}(I_a, I_b) = |R_a - R_b|
   $
   Products with similarity below the threshold are filtered.

2. **Generate Recommendations**:
   - Group and average similarity scores for potential items.
   - Rank items by average similarity (ascending in this case, implying lower differences are better).

3. **Diversity Penalty**:
   - Check similarity between recommendations and apply penalties to reduce redundancy.

4. **Mapping Recommendations**:
   - Map ASINs to product titles using metadata.

5. **Evaluate the Model**:
   - Root Mean Squared Error (RMSE) for accuracy of the collaborative filtering model:
     $
     \text{RMSE} = \sqrt{\frac{1}{n} \sum_{i=1}^{n} (R_i - \hat{R}_i)^2}
     $
   - Precision and recall to evaluate relevance:
     - Precision:
       $
       \text{Precision} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Positives}}
       $
     - Recall:
       $
       \text{Recall} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Negatives}}
       $

This methodology ensures a structured approach to content-based recommendations, including both personalization and diversity adjustments.

### INstalling Libraries

In [None]:
!pip install datasets scikit-learn pandas

Collecting datasets
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.1.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl (1

### Loading dataset and preprocessing

In [None]:
from datasets import load_dataset
import pandas as pd

##Load the raw review data
print("Loading raw reviews...")
raw_reviews = load_dataset("McAuley-Lab/Amazon-Reviews-2023", name="raw_review_All_Beauty", trust_remote_code=True)
print("Raw reviews structure:")
print(raw_reviews)

# Loadint  the metadata
print("Loading metadata...")
metadata = load_dataset("McAuley-Lab/Amazon-Reviews-2023", name="raw_meta_All_Beauty", trust_remote_code=True)
##Inspect the structure of metadata
print("Metadata structure:")
print(metadata)
print("Sample raw reviews:")
print(raw_reviews['full'].to_pandas().head())

# Check a sample of the metadata
print("Sample metadata:")
print(metadata['full'].to_pandas().head())

Loading raw reviews...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/30.3k [00:00<?, ?B/s]

Amazon-Reviews-2023.py:   0%|          | 0.00/39.6k [00:00<?, ?B/s]

All_Beauty.jsonl:   0%|          | 0.00/327M [00:00<?, ?B/s]

Generating full split: 0 examples [00:00, ? examples/s]

Raw reviews structure:
DatasetDict({
    full: Dataset({
        features: ['rating', 'title', 'text', 'images', 'asin', 'parent_asin', 'user_id', 'timestamp', 'helpful_vote', 'verified_purchase'],
        num_rows: 701528
    })
})
Loading metadata...


meta_All_Beauty.jsonl:   0%|          | 0.00/213M [00:00<?, ?B/s]

Generating full split:   0%|          | 0/112590 [00:00<?, ? examples/s]

Metadata structure:
DatasetDict({
    full: Dataset({
        features: ['main_category', 'title', 'average_rating', 'rating_number', 'features', 'description', 'price', 'images', 'videos', 'store', 'categories', 'details', 'parent_asin', 'bought_together', 'subtitle', 'author'],
        num_rows: 112590
    })
})
Sample raw reviews:
   rating                                      title  \
0     5.0  Such a lovely scent but not overpowering.   
1     4.0     Works great but smells a little weird.   
2     5.0                                       Yes!   
3     1.0                          Synthetic feeling   
4     5.0                                         A+   

                                                text images        asin  \
0  This spray is really nice. It smells really go...     []  B00YQ6X8EO   
1  This product does what I need it to do, I just...     []  B081TJ8YS3   
2                          Smells good, feels great!     []  B07PNNCSP9   
3                          

In [None]:
import numpy as np

print("Loading datasets...")
raw_reviews = load_dataset("McAuley-Lab/Amazon-Reviews-2023", name="raw_review_All_Beauty", trust_remote_code=True)
metadata = load_dataset("McAuley-Lab/Amazon-Reviews-2023", name="raw_meta_All_Beauty", trust_remote_code=True)

# Convert raw reviews and metadata to DF
print("Converting to Pandas DataFrame...")
raw_reviews_df = raw_reviews['full'].to_pandas()
metadata_df = metadata['full'].to_pandas()
print("Raw Reviews Columns:", raw_reviews_df.columns)
print("Metadata Columns:", metadata_df.columns)

selected_reviews = raw_reviews_df[['user_id', 'parent_asin', 'rating', 'text', 'timestamp']]
selected_metadata = metadata_df[['parent_asin', 'main_category', 'title', 'average_rating', 'rating_number', 'price', 'categories']]

## Merge raw reviews with metadata on 'parent_asin'
print("Merging datasets...")
merged_data = pd.merge(selected_reviews, selected_metadata, on='parent_asin', how='inner')

# Handle themissing values
print("Handling missing values...")
merged_data = merged_data.dropna(subset=['user_id', 'parent_asin', 'rating', 'timestamp', 'main_category'])

#Convert price column to numeric
print("Converting 'price' to numeric...")
merged_data['price'] = pd.to_numeric(merged_data['price'], errors='coerce')

## Fill missing prices with median
print("Filling missing prices...")
median_price = merged_data['price'].median()  # Compute median of valid numeric values
merged_data['price'] = merged_data['price'].fillna(median_price)
print("Converting timestamp to datetime...")
merged_data['timestamp'] = pd.to_datetime(merged_data['timestamp'], unit='ms')

##Prepare text data
print("Combining title and text...")
merged_data['review_text'] = merged_data['text'].fillna('') + " " + merged_data['title'].fillna('')

## Drop colunms
final_data = merged_data[['user_id', 'parent_asin', 'rating', 'timestamp', 'review_text', 'main_category', 'average_rating', 'price']]
output_file = "preprocessed_amazon_reviews_all_beauty.csv"
print(f"Saving preprocessed data to {output_file}...")
final_data.to_csv(output_file, index=False)

print("Preprocessing complete.")

Loading datasets...
Converting to Pandas DataFrame...
Raw Reviews Columns: Index(['rating', 'title', 'text', 'images', 'asin', 'parent_asin', 'user_id',
       'timestamp', 'helpful_vote', 'verified_purchase'],
      dtype='object')
Metadata Columns: Index(['main_category', 'title', 'average_rating', 'rating_number', 'features',
       'description', 'price', 'images', 'videos', 'store', 'categories',
       'details', 'parent_asin', 'bought_together', 'subtitle', 'author'],
      dtype='object')
Merging datasets...
Handling missing values...
Converting 'price' to numeric...
Filling missing prices...
Converting timestamp to datetime...
Combining title and text...
Saving preprocessed data to preprocessed_amazon_reviews_all_beauty.csv...
Preprocessing complete.


### Splitting into train-test

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import IntegerType

print("Initializing PySpark...")
spark = SparkSession.builder.appName("AmazonRecommender").getOrCreate()

print("Loading data into PySpark...")
file_path = "/content/preprocessed_amazon_reviews_all_beauty.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)
print("Data preview:")
data.show(5)

## Casting columns to correct types
data = data.withColumn("rating", col("rating").cast(IntegerType()))
data = data.withColumn("price", col("price").cast("float"))
print("Splitting data...")
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
train_data.write.csv("train_data_pyspark.csv", header=True, mode="overwrite")
test_data.write.csv("test_data_pyspark.csv", header=True, mode="overwrite")

print("Data preparation complete.")

Initializing PySpark...
Loading data into PySpark...
Data preview:
+--------------------+-----------+------+--------------------+--------------------+-------------+--------------+-----+
|             user_id|parent_asin|rating|           timestamp|         review_text|main_category|average_rating|price|
+--------------------+-----------+------+--------------------+--------------------+-------------+--------------+-----+
|AGKHLEW2SOWHNMFQI...| B00YQ6X8EO|   5.0|2020-05-05 14:08:...|This spray is rea...|   All Beauty|           4.3|15.99|
|AGKHLEW2SOWHNMFQI...| B081TJ8YS3|   4.0|2020-05-04 18:10:...|This product does...|   All Beauty|           4.0|15.99|
|AE74DYR3QUGVPZJ3P...| B097R46CSY|   5.0|2020-05-16 21:41:...|Smells good, feel...|   All Beauty|           4.4|21.98|
|AFQLNQNQYFWQZPJQZ...| B09JS339BZ|   1.0|2022-01-28 18:13:...|Felt synthetic mu...|   All Beauty|           1.0|15.99|
|AFQLNQNQYFWQZPJQZ...| B08BZ63GMJ|   5.0|2020-12-30 10:02:...|Love it Yinhua El...|   All Beauty|   

In [None]:
import os

# Define the paths for the uploaded CSV files
train_data_dir = '/content/train_data_pyspark.csv'
test_data_dir = '/content/test_data_pyspark.csv'

# List contents of the directories
train_files = os.listdir(train_data_dir)
test_files = os.listdir(test_data_dir)

# Print the contents of the directories
print("Contents of train_data_pyspark.csv directory:")
print(train_files)

print("\nContents of test_data_pyspark.csv directory:")
print(test_files)

Contents of train_data_pyspark.csv directory:
['.part-00001-47470fec-0089-4dad-b1cd-f12f2f945365-c000.csv.crc', 'part-00001-47470fec-0089-4dad-b1cd-f12f2f945365-c000.csv', '.part-00000-47470fec-0089-4dad-b1cd-f12f2f945365-c000.csv.crc', '_SUCCESS', 'part-00000-47470fec-0089-4dad-b1cd-f12f2f945365-c000.csv', '._SUCCESS.crc']

Contents of test_data_pyspark.csv directory:
['.part-00001-4269a9ff-5d57-4a25-ac76-d94db01f1b1d-c000.csv.crc', '_SUCCESS', 'part-00000-4269a9ff-5d57-4a25-ac76-d94db01f1b1d-c000.csv', '.part-00000-4269a9ff-5d57-4a25-ac76-d94db01f1b1d-c000.csv.crc', 'part-00001-4269a9ff-5d57-4a25-ac76-d94db01f1b1d-c000.csv', '._SUCCESS.crc']


In [None]:
# Get the current working directory
cwd = os.getcwd()
print("Current working directory:", cwd)

# Change the directory
os.chdir("/content/test_data_pyspark.csv")

# List all files and directories in the current directory
filenames = os.listdir()
for i in filenames:
  if "part-00000" in i and ".csv" in i and ".crc" not in i:
      test_file_path_1 = '/content/test_data_pyspark.csv/'+i
      print(test_file_path_1)
  if "part-00001" in i and ".csv" in i and ".crc" not in i:
      test_file_path_2 = '/content/test_data_pyspark.csv/'+i
      print(test_file_path_2)


# Change the directory
os.chdir("/content/train_data_pyspark.csv")

# List all files and directories in the current directory
filenames = os.listdir()

for i in filenames:
  if "part-00000" in i and ".csv" in i and ".crc" not in i:
      train_file_path_1 = '/content/train_data_pyspark.csv/'+i
      print(train_file_path_1)
  if "part-00001" in i and ".csv" in i and ".crc" not in i:
      train_file_path_2 = '/content/train_data_pyspark.csv/'+i
      print(train_file_path_2)

os.chdir("/content")

Current working directory: /content
/content/test_data_pyspark.csv/part-00000-4269a9ff-5d57-4a25-ac76-d94db01f1b1d-c000.csv
/content/test_data_pyspark.csv/part-00001-4269a9ff-5d57-4a25-ac76-d94db01f1b1d-c000.csv
/content/train_data_pyspark.csv/part-00001-47470fec-0089-4dad-b1cd-f12f2f945365-c000.csv
/content/train_data_pyspark.csv/part-00000-47470fec-0089-4dad-b1cd-f12f2f945365-c000.csv


In [None]:
# Read the CSV files into pandas DataFrames and skip bad lines
train_data = pd.concat([pd.read_csv(train_file_path_1, on_bad_lines='skip'),
                        pd.read_csv(train_file_path_2, on_bad_lines='skip')], ignore_index=True)

test_data = pd.concat([pd.read_csv(test_file_path_1, on_bad_lines='skip'),
                       pd.read_csv(test_file_path_2, on_bad_lines='skip')], ignore_index=True)

print("Train data preview:")
print(train_data.head())

print("\nTest data preview:")
print(test_data.head())

  pd.read_csv(train_file_path_2, on_bad_lines='skip')], ignore_index=True)


Train data preview:
                        user_id parent_asin rating                timestamp  \
0  AE222BBOVZIF42YOOPNBXL4UUMYA  B013HR1A92      5  2016-03-10 00:27:52.000   
1  AE222Y4WTST6BUZ4J5Y2H6QMBITQ  B00012FPSO      4  2013-06-24 21:11:42.000   
2  AE223UUOHC3V2XF4JOTTDDSBODSQ  B06Y5Y3R5L      1  2018-05-31 19:14:53.756   
3  AE224LWIR4ZDJLRKORFHMJGABP6Q  B000NMF45G      5  2011-01-11 13:45:27.000   
4  AE224QB4K4PDIZ6FVU6MGVFEYSPQ  B01LBESKBO      3  2018-10-06 20:19:13.934   

                                         review_text main_category  \
0  Great product....excellent price for good resu...    All Beauty   
1  The cleaning unit does a good job of cleaning ...    All Beauty   
2  junk Black Charcoal Peel Off Face Mask / Japan...    All Beauty   
3  I was paying 50$ for one filter from the manuf...    All Beauty   
4  A bottle was open. Cosmetic Art 30-Piece Nail ...    All Beauty   

  average_rating  price  
0            3.9  15.99  
1            4.5  15.99  
2     

  pd.read_csv(test_file_path_2, on_bad_lines='skip')], ignore_index=True)


In [None]:
# Checking for missing values
print("Train Data - Missing Values:")
print(train_data.isnull().sum())

print("\nTest Data - Missing Values:")
print(test_data.isnull().sum())

# Check the data types of each column
print("\nTrain Data - Data Types:")
print(train_data.dtypes)

print("\nTest Data - Data Types:")
print(test_data.dtypes)

# Checking for duplicate rows
print("\nTrain Data - Duplicate Rows:")
print(train_data.duplicated().sum())
print("\nTest Data - Duplicate Rows:")
print(test_data.duplicated().sum())

Train Data - Missing Values:
user_id                0
parent_asin            0
rating                 0
timestamp              0
review_text           26
main_category         54
average_rating      1320
price             281596
dtype: int64

Test Data - Missing Values:
user_id             0
parent_asin         0
rating              0
timestamp           0
review_text         0
main_category      13
average_rating     17
price             629
dtype: int64

Train Data - Data Types:
user_id            object
parent_asin        object
rating             object
timestamp          object
review_text        object
main_category      object
average_rating     object
price             float64
dtype: object

Test Data - Data Types:
user_id            object
parent_asin        object
rating             object
timestamp          object
review_text        object
main_category      object
average_rating     object
price             float64
dtype: object

Train Data - Duplicate Rows:
4611

Test Data

In [None]:
# Drop rows with missing values
train_data.dropna(inplace=True)
test_data.dropna(inplace=True)

In [None]:
print("Column types before scaling:")
print(train_data.dtypes)
print("Columns in the data:")
print(train_data.columns)

Column types before scaling:
user_id            object
parent_asin        object
rating             object
timestamp          object
review_text        object
main_category      object
average_rating     object
price             float64
dtype: object
Columns in the data:
Index(['user_id', 'parent_asin', 'rating', 'timestamp', 'review_text',
       'main_category', 'average_rating', 'price'],
      dtype='object')


In [None]:
print("Column types before scaling:")
print(test_data.dtypes)
print("Columns in the data:")
print(test_data.columns)

Column types before scaling:
user_id            object
parent_asin        object
rating             object
timestamp          object
review_text        object
main_category      object
average_rating     object
price             float64
dtype: object
Columns in the data:
Index(['user_id', 'parent_asin', 'rating', 'timestamp', 'review_text',
       'main_category', 'average_rating', 'price'],
      dtype='object')


In [None]:
print("Column types before scaling:")
print(train_data.dtypes)
print("Columns in the data:")
print(train_data.columns)

Column types before scaling:
user_id            object
parent_asin        object
rating             object
timestamp          object
review_text        object
main_category      object
average_rating     object
price             float64
dtype: object
Columns in the data:
Index(['user_id', 'parent_asin', 'rating', 'timestamp', 'review_text',
       'main_category', 'average_rating', 'price'],
      dtype='object')


### Label Encoding

In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler


train_data['parent_asin'] = train_data['parent_asin'].astype(str)
test_data['parent_asin'] = test_data['parent_asin'].astype(str)

# Encoding 'user_id' and 'parent_asin' columns
combined_user_data = pd.concat([train_data['user_id'], test_data['user_id']], axis=0)
combined_item_data = pd.concat([train_data['parent_asin'], test_data['parent_asin']], axis=0)
label_encoder_user = LabelEncoder()
label_encoder_user.fit(combined_user_data)

label_encoder_item = LabelEncoder()
label_encoder_item.fit(combined_item_data)

# #Transform train and test data
train_data['user_id_encoded'] = label_encoder_user.transform(train_data['user_id'])
test_data['user_id_encoded'] = label_encoder_user.transform(test_data['user_id'])
train_data['parent_asin_encoded'] = label_encoder_item.transform(train_data['parent_asin'])
test_data['parent_asin_encoded'] = label_encoder_item.transform(test_data['parent_asin'])

#Drop non-numeric columns
columns_to_drop = ['timestamp', 'review_text', 'main_category']
train_data = train_data.drop(columns=[col for col in columns_to_drop if col in train_data.columns])
test_data = test_data.drop(columns=[col for col in columns_to_drop if col in test_data.columns])

train_data['average_rating'] = pd.to_numeric(train_data['average_rating'], errors='coerce')
test_data['average_rating'] = pd.to_numeric(test_data['average_rating'], errors='coerce')

train_data['rating'] = pd.to_numeric(train_data['rating'], errors='coerce')
test_data['rating'] = pd.to_numeric(test_data['rating'], errors='coerce')
numerical_columns = ['rating', 'average_rating', 'price']
train_data = train_data.dropna(subset=numerical_columns)
test_data = test_data.dropna(subset=numerical_columns)

#Scaling
scaler = StandardScaler()
train_data.loc[:, numerical_columns] = scaler.fit_transform(train_data[numerical_columns])
test_data.loc[:, numerical_columns] = scaler.transform(test_data[numerical_columns])
print("\nTrain Data - After Encoding and Scaling:")
print(train_data.head())

print("\nTest Data - After Encoding and Scaling:")
print(test_data.head())


Train Data - After Encoding and Scaling:
                        user_id parent_asin    rating  average_rating  \
0  AE222BBOVZIF42YOOPNBXL4UUMYA  B013HR1A92  0.676357       -0.003402   
1  AE222Y4WTST6BUZ4J5Y2H6QMBITQ  B00012FPSO -0.023249       -0.003297   
2  AE223UUOHC3V2XF4JOTTDDSBODSQ  B06Y5Y3R5L -2.122066       -0.003437   
3  AE224LWIR4ZDJLRKORFHMJGABP6Q  B000NMF45G  0.676357       -0.003297   
4  AE224QB4K4PDIZ6FVU6MGVFEYSPQ  B01LBESKBO -0.722854       -0.003402   

      price  user_id_encoded  parent_asin_encoded  
0 -0.003544                0                15600  
1 -0.003544                1                  132  
2 -0.003544                4                29980  
3 -0.003544                7                  735  
4 -0.003544                8                25111  

Test Data - After Encoding and Scaling:
                        user_id parent_asin    rating  average_rating  \
0  AE223FOBUQAUOSGVCDRABQGKGVSQ  B0006NXBT8  0.676357       -0.003262   
1  AE224QB4K4PDIZ6FV

 -2.12206561]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  test_data.loc[:, numerical_columns] = scaler.transform(test_data[numerical_columns])


In [None]:
spark = SparkSession.builder.appName("ContentBasedFiltering").getOrCreate()
train_data = spark.createDataFrame(train_data)
test_data = spark.createDataFrame(test_data)

### Training the model

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F

# Hyperparameters
similarity_threshold = 0.5
diversity_penalty = 0.1
metadata_df = train_data.select('parent_asin_encoded').distinct().withColumn(
    'product_title',
    F.concat(F.lit('Product '), F.col('parent_asin_encoded'))
)

def content_based_recommendation(user_id, train_data, top_n=10, similarity_threshold=0.5, diversity_penalty=0.1):
    # Get the ratings given by the user
    user_ratings = train_data.filter(train_data.user_id_encoded == user_id).select('user_id_encoded', 'parent_asin_encoded', 'rating')
    product_similarity = train_data.alias("a").join(train_data.alias("b"), F.col("a.parent_asin_encoded") != F.col("b.parent_asin_encoded"))

    product_similarity = product_similarity.withColumn(
        "similarity",
        F.abs(F.col("a.rating") - F.col("b.rating"))
    )

    ## Apply the similarity threshold to filter out products with low similarity
    product_similarity = product_similarity.filter(F.col("similarity") >= similarity_threshold)

    # Filter based on the user ratingss
    recommendations = product_similarity.filter(
        F.col("a.user_id_encoded") == user_id
    ).select(
        "b.parent_asin_encoded",
        "similarity"
    ).groupBy("b.parent_asin_encoded").agg(
        F.avg("similarity").alias("average_similarity")
    ).orderBy("average_similarity", ascending=True).limit(top_n)

    recommendations.show(truncate=False)
    recommended_product_ids = recommendations.select('parent_asin_encoded').rdd.flatMap(lambda x: x).collect()

    ##Map recommended product ASINs to titles
    recommendations_df = spark.createDataFrame([(asin,) for asin in recommended_product_ids], ['parent_asin_encoded'])
    recommended_titles_df = recommendations_df.join(metadata_df, on="parent_asin_encoded", how="inner")

    ## Retrieve product titles
    product_titles = recommended_titles_df.select('product_title').rdd.flatMap(lambda x: x).collect()

    # Applyingdiversity penalty
    for i in range(len(product_titles)):
        for j in range(i+1, len(product_titles)):
            if product_titles[i] == product_titles[j]:
                print(f"Applying diversity penalty between {product_titles[i]} and {product_titles[j]}")
                pass
    print(f"\nTop Recommendations for User: {user_id}")
    for title in product_titles:
        print(f"- {title}")


user_id_example = label_encoder_user.transform(['AGKHLEW2SOWHNMFQIJGBECAF7INQ'])[0]  # Encode the user ID
content_based_recommendation(user_id_example, train_data, similarity_threshold=similarity_threshold, diversity_penalty=diversity_penalty)


als = ALS(userCol="user_id_encoded", itemCol="parent_asin_encoded", ratingCol="rating", coldStartStrategy="drop")

##Fitting the model
model = als.fit(train_data)
# Make predictions on the test data
predictions = model.transform(test_data)

#COmputing RSME
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"\nRoot Mean Squared Error (RMSE) = {rmse}")

# Precision and Recall
threshold = 3.0  # Ratings greater than 3 are considered "positive"
predictions = predictions.withColumn("prediction_binary", (F.col("prediction") >= threshold).cast("double"))
predictions = predictions.withColumn("rating_binary", (F.col("rating") >= threshold).cast("double"))

+-------------------+------------------+
|parent_asin_encoded|average_similarity|
+-------------------+------------------+
|15164              |0.6996056777520083|
|62849              |0.6996056777520083|
|29524              |0.6996056777520083|
|14117              |0.6996056777520084|
|29478              |0.6996056777520084|
|72203              |0.6996056777520084|
|39330              |0.6996056777520084|
|2882               |0.6996056777520084|
|9051               |0.6996056777520084|
|68166              |0.6996056777520084|
+-------------------+------------------+


Top Recommendations for User: 222421
- Product 14117
- Product 29478
- Product 15164
- Product 62849
- Product 29524
- Product 72203
- Product 68166
- Product 39330
- Product 9051
- Product 2882
Root Mean Squared Error (RMSE) = 0.9236358438829306
