# Project data model
In the Project data model, we will present the details for the data preparation, model training and model evaluation.

The goal of this project is to build a Recommender System with using Google Analytics Ecommerce Data, the data is available on an open database from Google, coming from real obfscated data from the Google Merchandise Store ([Google Analytics Sample](https://console.cloud.google.com/marketplace/product/obfuscated-ga360-data/obfuscated-ga360-data?project=realtime-gan) ). We will be testing the performance of 3 different algorithms in the Collaborative filtering model:
1. Matrix Factorization (Latent Factor)
2. User Based Filtering
3. Item Based Filtering

## Data Preparation
The primary objective during the data preparation phase is to construct a Utility Matrix that will serve as the foundation for our recommendation system. To achieve this, we will generate implicit ratings derived from the amount of time a user spends on a particular item. This approach assumes that the duration of interaction directly correlates with user preference, thereby allowing us to quantify interest levels in a meaningful way. The utility Matrix will be built using the following structure:




| UserID    | ItemID    | Session_Duration |
|-----------|-----------|------------------|
| User_1    | Item_A    | 120              |
| User_1    | Item_B    | 60               |
| User_2    | Item_A    | 45               |
| User_2    | Item_C    | 30               |
| User_3    | Item_B    | 85               |
| User_3    | Item_C    | 90               |

### Step 1: Importing the data set

#### Feature Selection Details

In this step, we  selected specific features from the BigQuery database by running an SQL query. These features are pivotal for our analysis/model, offering insights into user behavior, session details, and product interactions. Below is an overview of the selected features and their significance:

1. **`fullVisitorId` | User ID**: A unique identifier for each user visiting the website. 

2. **`visitNumber` | Session/Visit Number**: Indicates the ordinal number of the user's visit. For example, the first visit is 1, the second visit is 2, and so on.

3. **`hits.eCommerceAction.action_type` | Ecommerce Action Type**: Categorizes the type of interaction a user had, such as viewing an item list (1), viewing a specific item (2), etc.

4. **`hits.time` | Action Time**: Timestamp indicating when the action occurred.

5. **`hits.hitNumber` | Event Number Within a Session**: Sequential number of the event/action within a session, starting from 1.

6. **`prod.productSKU` | Product ID**: Unique identifier for each product that was interacted with.



#### Data Retrieval Process


During this phase, we initiated the retrieval of raw data from the BigQuery public dataset. To conduct a preliminary evaluation of our analytical model, we opted to extract a dataset entries from a singular day's worth of data (August 1, 2017), we get 47,723 rows

```sql
SELECT fullVisitorId, visitNumber, h.eCommerceAction.action_type, prod.productSKU, h.time, h.hitNumber
FROM bigquery-public-data.google_analytics_sample.ga_sessions_20170801, UNNEST(hits) as h, UNNEST(h.product) as prod
ORDER BY fullVisitorId ASC, visitNumber ASC, h.time ASC
```
The results from the executed query have been saved to the file located at `data/ga_sessions_20170801.csv`.


The first goal is to extract the implicit rating from the data, which is the amount of time a user spent on a prouct page.

### Step 2: Data Preprocessing

#### Initialize Spark and necessary imports

In [2]:
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
def init_spark():
    return SparkSession \
        .builder \
        .appName("GA360RECOMMENDER") \
        .getOrCreate()

spark = init_spark()

#### Defining the Sechma and loading the data into Spark DataFrame
Read the data from the csv and store in a Dataframe

In [3]:
schema = StructType([
    StructField("fullVisitorId", StringType(), True),
    StructField("visitNumber", IntegerType(), True),
    StructField("action_type", StringType(), True),
    StructField("productSKU", StringType(), True),
    StructField("time", IntegerType(), True),
    StructField("hitNumber", IntegerType(), True)
])

df = spark.read.csv("../data/ga_sessions_20170801.csv", header=True, schema=schema)

df.show(5)
print("Total number of rows in the dataframe: ", df.count(), "row")

+-------------------+-----------+-----------+--------------+----+---------+
|      fullVisitorId|visitNumber|action_type|    productSKU|time|hitNumber|
+-------------------+-----------+-----------+--------------+----+---------+
|0004915997121163857|          1|          0|GGOEYFKQ020699|   0|        1|
|0004915997121163857|          1|          0|GGOEYDHJ056099|   0|        1|
|0004915997121163857|          1|          0|GGOEYHPB072210|   0|        1|
|0004915997121163857|          1|          0|GGOEYOCR077799|   0|        1|
|0004915997121163857|          1|          0|  GGOEGAAX0351|   0|        1|
+-------------------+-----------+-----------+--------------+----+---------+
only showing top 5 rows

Total number of rows in the dataframe:  47723 row


#### Calculate the Session Duration 
Calculate the session duration (implicit rating) by substracting the time of 2 consecutive events, for the same user within the same session. 

By the end of this step, we should get the (user, item, rating) matrix.

In [4]:
# Define window specification for calculating pageview durations
windowSpec = Window.partitionBy("fullVisitorId", "visitNumber").orderBy("time")

# Calculate the next hit's time and pageview duration
df_with_durations = df.withColumn("next_time", F.lead("time", 1).over(windowSpec)) \
                      .withColumn("pageview_duration", F.when(F.isnull(F.col("next_time") - F.col("time")), 1)
                                                          .otherwise(F.col("next_time") - F.col("time")))

# Filter for product detail views only 
prodview_durations = df_with_durations.filter(df_with_durations.action_type == '2') \
                                      .select("fullVisitorId", "visitNumber", "productSKU", "pageview_duration")

# Aggregate pageview durations by fullVisitorId and productSKU
aggregate_web_stats = prodview_durations.groupBy("fullVisitorId", "productSKU") \
                                        .agg(F.sum("pageview_duration").alias("session_duration"))

user_item_rating = aggregate_web_stats

# Display the aggregated results
user_item_rating.orderBy(user_item_rating.fullVisitorId.asc()).show(10)

+-------------------+--------------+----------------+
|      fullVisitorId|    productSKU|session_duration|
+-------------------+--------------+----------------+
|0049931492016965831|GGOEGEVA022399|            9821|
|0052381813974609729|GGOEAOCB077499|           14292|
|0052381813974609729|GGOEGOCB017499|            6931|
|0052381813974609729|GGOEGOCC077299|            4745|
| 008016723867009901|GGOEGESB015099|            1488|
| 008016723867009901|GGOEGBJL013999|            1419|
| 008016723867009901|GGOEGDHC074099|            1394|
| 008016723867009901|GGOEGESC014099|               0|
| 008016723867009901|GGOEGCKQ013199|               1|
| 008016723867009901|GGOEACCQ017299|               0|
+-------------------+--------------+----------------+
only showing top 10 rows



#### Normalization Functions

We will define 3 normalization functions that will be later used to normalize the ratings

Split Data To Prevent Information Leakage

In [5]:
(trainSet, testSet) = user_item_rating.randomSplit([0.90, 0.10], seed= 123)

##### Z-Score Normalization
Normalizes the data by subtracting the mean rating and dividing by the standard deviation, for users or items.

In [6]:
from pyspark.sql.functions import mean, stddev

def z_score_normalization(train_df, test_df):
    """
    Apply Z-score normalization to the rating column of a DataFrame.
    """
    # Calculate the mean and standard deviation of session_duration of the training DataFrame
    training_mean_val = train_df.select(mean(train_df['session_duration'])).collect()[0][0]
    training_stddev_val = train_df.select(stddev(train_df['session_duration'])).collect()[0][0]

    testing_mean_val = test_df.select(mean(test_df['session_duration'])).collect()[0][0]
    testing_stddev_val = test_df.select(stddev(test_df['session_duration'])).collect()[0][0]



    # Apply Z-score normalization
    training_normalized_df = train_df.withColumn('normalized_duration', 
                   (train_df['session_duration'] - training_mean_val) / training_stddev_val)
    
    testing_normalized_df = test_df.withColumn('normalized_duration', 
                   (test_df['session_duration'] - testing_mean_val) / testing_stddev_val)
    
    return (training_normalized_df, testing_normalized_df)


(normalized_train_df, normalized_test_df) = z_score_normalization(trainSet, testSet)

normalized_train_df.orderBy(normalized_train_df.normalized_duration.desc()).show(10)

+-------------------+--------------+----------------+-------------------+
|      fullVisitorId|    productSKU|session_duration|normalized_duration|
+-------------------+--------------+----------------+-------------------+
|0834628261584717467|  GGOEGAAX0325|         1527925| 14.070723159999357|
|0834628261584717467|  GGOEGAAX0686|         1470110| 13.530952922804648|
|0485797735449723544|GGOEGESB015199|         1172597| 10.753323197326667|
| 431781159932899381|GGOEGBRJ037299|          759499|  6.896573124773528|
|7484497031611210287|GGOEYHPB072210|          748694|  6.795695888539179|
|5873059317509196502|  GGOEGAAX0104|          594894|  5.359794090034532|
|2863022817351466072|GGOEYFKQ020699|          536689|  4.816382749449596|
|7641607978785523241|GGOEGGCX056199|          443459| 3.9459723159347173|
|2827498353821012092|  GGOEGAAX0680|          427854|  3.800281499512968|
|1933634293342529288|GGOEGDHQ015399|          370320|   3.26313472399173|
+-------------------+--------------+--

##### Min-Max Normalization: 
Scale the data within the range [0, 1]

In [7]:
from pyspark.sql.functions import  min, max

def min_max_normalization(train_df, test_df):
    """
    Apply Min-Max Normalization on a specified column of a PySpark DataFrame.
    """
    # Calculate the minimum and maximum values of the specified column
    train_column_min_max = train_df.select(min(train_df['session_duration']).alias("min"), max(train_df['session_duration']).alias("max")).collect()[0]
    train_min_value, train_max_value = train_column_min_max["min"], train_column_min_max["max"]

    test_column_min_max = test_df.select(min(test_df['session_duration']).alias("min"), max(test_df['session_duration']).alias("max")).collect()[0]
    test_min_value, test_max_value = test_column_min_max["min"], test_column_min_max["max"]
    
    # Apply Min-Max Normalization
    train_df = train_df.withColumn('normalized_duration', (train_df['session_duration'] - train_min_value) / (train_max_value - train_min_value))
    test_df = test_df.withColumn('normalized_duration', (test_df['session_duration'] - test_min_value) / (test_max_value - test_min_value))
    
    return (train_df, test_df)

(min_max_train_normalized_df, min_max_test_normalized_df )= min_max_normalization(trainSet, testSet)
min_max_train_normalized_df.orderBy(min_max_train_normalized_df.normalized_duration.desc()).show(10)

+-------------------+--------------+----------------+-------------------+
|      fullVisitorId|    productSKU|session_duration|normalized_duration|
+-------------------+--------------+----------------+-------------------+
|0834628261584717467|  GGOEGAAX0325|         1527925|                1.0|
|0834628261584717467|  GGOEGAAX0686|         1470110| 0.9621611008393737|
|0485797735449723544|GGOEGESB015199|         1172597| 0.7674440826611254|
| 431781159932899381|GGOEGBRJ037299|          759499|0.49707871786900537|
|7484497031611210287|GGOEYHPB072210|          748694| 0.4900070356856521|
|5873059317509196502|  GGOEGAAX0104|          594894| 0.3893476446815125|
|2863022817351466072|GGOEYFKQ020699|          536689|0.35125349739025147|
|7641607978785523241|GGOEGGCX056199|          443459| 0.2902361045208371|
|2827498353821012092|  GGOEGAAX0680|          427854| 0.2800229068835185|
|1933634293342529288|GGOEGDHQ015399|          370320|0.24236791727342638|
+-------------------+--------------+--

##### Logarithmic Transformation
Apply the logarithm function to each data point, to reduce the impact of outliers and diminish the skewness of the original distribution, making the data more symmetrical.

In [8]:
from pyspark.sql.functions import log

def logarithmic_transformation(train_df, test_df):
    """
    Apply Logarithmic Transformation on a specified column of a PySpark DataFrame.
    """
    # Adding 1 to avoid log(0) which is undefined
    train_df = train_df.withColumn('normalized_duration', log(train_df['session_duration'] + 1))
    test_df = test_df.withColumn('normalized_duration', log(test_df['session_duration'] + 1))
    
    return (train_df, test_df)

(logarithmic_train_df, logarithmic_test_df) = logarithmic_transformation(trainSet, testSet)
logarithmic_train_df.orderBy(logarithmic_train_df.normalized_duration.desc()).show(10)

+-------------------+--------------+----------------+-------------------+
|      fullVisitorId|    productSKU|session_duration|normalized_duration|
+-------------------+--------------+----------------+-------------------+
|0834628261584717467|  GGOEGAAX0325|         1527925| 14.239421818216494|
|0834628261584717467|  GGOEGAAX0686|         1470110|  14.20084846610825|
|0485797735449723544|GGOEGESB015199|         1172597| 13.974732357899336|
| 431781159932899381|GGOEGBRJ037299|          759499| 13.540415601017965|
|7484497031611210287|GGOEYHPB072210|          748694| 13.526086969954191|
|5873059317509196502|  GGOEGAAX0104|          594894| 13.296140198366766|
|2863022817351466072|GGOEYFKQ020699|          536689| 13.193175925608253|
|7641607978785523241|GGOEGGCX056199|          443459| 13.002362885006988|
|2827498353821012092|  GGOEGAAX0680|          427854| 12.966539632116586|
|1933634293342529288|GGOEGDHQ015399|          370320| 12.822125476068756|
+-------------------+--------------+--

### Step 3: Building the Utility Matrix:

In [9]:
def user_user_utility_matrix(df):
    """
    Create a user-user utility matrix from a DataFrame.
    """
    # Pivot the DataFrame to create a user-user utility matrix
    utility_matrix = df.groupBy("fullVisitorId").pivot("productSKU").agg(F.first("normalized_duration"))
    return utility_matrix

def item_item_utility_matrix(df):
    """
    Create an item-item utility matrix from a DataFrame.
    """
    # Pivot the DataFrame to create an item-item utility matrix
    utility_matrix = df.groupBy("productSKU").pivot("fullVisitorId").agg(F.first("normalized_duration"))
    return utility_matrix

utility_matrix_user_user = user_user_utility_matrix(normalized_train_df)
utility_matrix_item_item = item_item_utility_matrix(normalized_train_df)
# Show the result
utility_matrix_user_user.show(5)
utility_matrix_item_item.show(5)

+-------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+------------+------------+------------+------------+------------+------------+------------+--------------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+--------------------+------------+------------+------------+------------+------------+------------+------------+------------

The utility matrix is sparse since the data is coming from only one day and it is a known disadvantage of collaborative filtering.

## Model Development

In this step, we will implement three collaborative filtering algorithms:
1. **Latent Factor Model**
2. **Item-Based Filtering**
3. **User-Based Filtering**

#### Utility Functions


In [10]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

def string_to_Numeric(df):
    '''
    Convert string item and user ids to numeric for ALS, since ALS only accepts numberical IDs
    '''
    userIndexer = StringIndexer(inputCol="fullVisitorId", outputCol="userId").setHandleInvalid("skip")
    itemIndexer = StringIndexer(inputCol="productSKU", outputCol="itemId").setHandleInvalid("skip")

    # Pipeline to apply the transformations
    pipeline = Pipeline(stages=[userIndexer, itemIndexer])

    # Fit and transform
    transformed_ratings_dataFrame = pipeline.fit(df).transform(df)

    transformed_ratings_dataFrame.orderBy(transformed_ratings_dataFrame.fullVisitorId).show()
    
    return transformed_ratings_dataFrame

def row_mean(row):
    '''
    This function must return the mean of the non-zero elements in the row.
    '''
    row = row[1:]
    non_NULL = [x for x in row if x != None]
    if(len(non_NULL) == 0):
        return 0
    return sum(non_NULL) / len(non_NULL)

def pearson_Correlation(row1, row2):
    '''
    This function must return the Pearson correlation between two rows.
    '''
    row1 = row1[1:]
    row2 = row2[1:]
    mean1 = row_mean(row1)
    mean2 = row_mean(row2)

    #subtract the mean from the row
    row1 = [(x1 - mean1) if x1 is not None else 0 for x1 in row1]
    row2 = [(x2 - mean2) if x2 is not None else 0 for x2 in row2]

    #calculate cossine similarity with centered rows
    numerator = sum([row1[i] * row2[i] for i in range(len(row1))])
    denominator = (sum([x ** 2 for x in row1]) ** 0.5) * (sum([x ** 2 for x in row2]) ** 0.5)

    if denominator == 0:
        return 0
    else:
        return numerator / denominator
    
ratings_train_dataframe = string_to_Numeric(normalized_train_df)
ratings_test_dataframe = string_to_Numeric(normalized_test_df)


+-------------------+--------------+----------------+--------------------+------+------+
|      fullVisitorId|    productSKU|session_duration| normalized_duration|userId|itemId|
+-------------------+--------------+----------------+--------------------+------+------+
|0049931492016965831|GGOEGEVA022399|            9821|-0.10254253517034471| 153.0|   8.0|
|0052381813974609729|GGOEAOCB077499|           14292|-0.06080055245828...|  85.0|  21.0|
|0052381813974609729|GGOEGOCC077299|            4745|-0.14993289621852152|  85.0|  19.0|
| 008016723867009901|GGOEACCQ017299|               0|-0.19423298746627546|  13.0|  38.0|
| 008016723867009901|GGOEGBJL013999|            1419|-0.18098497282337497|  13.0|  11.0|
| 008016723867009901|GGOEGCKQ013199|               1|-0.19422365130373642|  13.0| 117.0|
| 008016723867009901|GGOEGDHC074099|            1394|-0.18121837688685102|  13.0|  12.0|
| 008016723867009901|GGOEGESB015099|            1488|-0.18034077760818107|  13.0|  34.0|
| 008016723867009901|

#### Latent Factor Model

##### Basic ALS Recommender System with Hyperparameter tuning

Before applying the ALS API, we needed to convert the string IDs to numeric using a String Indexer.
The result is displayed in the previous section.

The tuned hyperparameters are: 
1. Rank: latent factor
2. RegParam: regularization parameter 
3. MaxIter: Maximum number of iterations to run

In [30]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


def basic_als_recommender(ratings_train_dataframe, ratings_test_dataframe, seed):
    '''
    This function prints the RMSE of recommendations obtained
    through ALS collaborative filtering after hyperparameter tuning
    and returns the best model.
    
    The following parameters must be used in the ALS
    optimizer:
    - coldStartStrategy: 'drop'
    '''

    #Build the recommendation model
    als = ALS(coldStartStrategy="drop", userCol="userId", itemCol="itemId", ratingCol="normalized_duration", seed=seed)

    param_grid = ParamGridBuilder()\
             .addGrid(als.rank, [30, 50, 70])\
             .addGrid(als.maxIter, [5, 10])\
             .addGrid(als.regParam, [0.01, 0.05, 0.15])\
             .build()
    
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="normalized_duration", predictionCol="prediction")

    cv = CrossValidator(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=3)
    
    model = cv.fit(ratings_train_dataframe)

    best_model = model.bestModel

    print('rank: ', best_model.rank)
    print('MaxIter: ', best_model._java_obj.parent().getMaxIter())
    print('RegParam: ', best_model._java_obj.parent().getRegParam())
    

    #Evaluate the model
    predictions = best_model.transform(ratings_test_dataframe)

    rmse = evaluator.evaluate(predictions)

    print("Root-mean-square error = " + str(rmse))
    return best_model, ratings_train_dataframe, ratings_test_dataframe


(best_model, _, _) = basic_als_recommender(ratings_train_dataframe, ratings_test_dataframe, 123)

rank:  50
MaxIter:  5
RegParam:  0.15
Root-mean-square error = 0.9931059986632015


##### Recommend Items with Basic ALS
Use the trained model with the optimzed hyperparamters to predict the ratings of items that users may like and compare with the actual rating

In [None]:
from pyspark.sql.functions import explode, col


userRecs = best_model.recommendForAllUsers(50)  # Top-50 recommendations for each user

# Explode the recommendations to have one item per row for each user
userRecsExploded = userRecs.withColumn("rec_exp", explode("recommendations")).select(
    col("userId"),
    col("rec_exp.itemId").alias("recItemId"),
    col("rec_exp.rating").alias("predictedRating")
)

# Join the exploded recommendations with the actual ratings on userId and itemId
comparisonDf = userRecsExploded.join(
    ratings_test_dataframe,
    (userRecsExploded.userId == ratings_test_dataframe.userId) & (userRecsExploded.recItemId == ratings_test_dataframe.itemId),
    "left_outer"
).select(
    userRecsExploded.userId,
    userRecsExploded.recItemId,
    userRecsExploded.predictedRating,
    ratings_test_dataframe.normalized_duration.alias("actualRating")
).where(col("actualRating").isNotNull())

comparisonDf.show(100)


+------+---------+---------------+--------------------+
|userId|recItemId|predictedRating|        actualRating|
+------+---------+---------------+--------------------+
|     0|       52|    0.065282546| -0.1387090434347468|
|     2|       46|  -0.0041253087|-0.19952488323171533|
|     4|       29|   -0.005222268|-0.34136435769489826|
|     6|       13|    0.028011408|  -0.264735116516123|
|     8|       37|    0.023654576| -0.3625344285102987|
|    10|        7|  -0.0069774063| -0.4039764459246887|
|    11|        0|   -0.013281795|  0.7242600250766675|
|    14|        7|   -0.007751298| -0.3398567920459228|
|    22|       15|  -0.0049604583|-0.18066427468708585|
|    26|        2|    0.030051872|-0.40394437005981687|
|    28|       34|     0.47217545|-0.40394437005981687|
|    30|       39|     0.07707061|-0.02208119876081...|
|    31|        3|  -0.0013461298| -0.3002110230643547|
|    32|        0|    -0.02916442| -0.2672049581112531|
|    41|        3|   0.0062961765|   3.379692574

##### ALS Recommender with Bias

In this step we add bias to the basic ALS by factoring in:
1. User mean
2. Item mean
3. Global mean

In [None]:
from pyspark.sql.functions import mean

def global_average(ratings_train_df):
    '''
    This function must print the global average rating for all users and
    all products in the training set. Training and test
    sets should be determined as before (e.g: as in function basic_als_recommender).
    '''
    #now just return the average of the ratings in the training_set
    rating_average = ratings_train_df.agg({"normalized_duration": "avg"}).collect()[0][0]

    return rating_average



def als_with_bias_recommender(ratings_train_df, ratings_test_dataframe, seed):
    '''
    This function must return the RMSE of recommendations obtained 
    using ALS + biases. The ALS model should make predictions for *i*, 
    the user-item interaction, then it should recompute the predicted 
    rating with the formula *i+user_mean+item_mean-m* (*m* is the 
    global rating).
    '''

    user_mean = ratings_train_df.groupby("userId").agg(mean("normalized_duration").alias("user_mean"))
    item_mean = ratings_train_df.groupby("itemId").agg(mean("normalized_duration").alias("item_mean"))

    training_set_with_means = ratings_train_df.join(user_mean, "userId").join(item_mean, "itemId")
    test_set_with_means = ratings_test_dataframe.join(user_mean, "userId").join(item_mean, "itemId")

    global_mean = global_average(ratings_train_df)

    final_training_set = training_set_with_means.withColumn("user_item_interaction", training_set_with_means.normalized_duration - (
                training_set_with_means.user_mean + training_set_with_means.item_mean - global_mean))


    #Build the recommendation model
    als = ALS(coldStartStrategy="drop", userCol="userId", itemCol="itemId", ratingCol="user_item_interaction", seed=seed)

    param_grid = ParamGridBuilder()\
             .addGrid(als.rank, [30, 50, 70])\
             .addGrid(als.maxIter, [5, 10])\
             .addGrid(als.regParam, [0.01, 0.05, 0.15])\
             .build()
    
    
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="normalized_duration", predictionCol="prediction")

    cv = CrossValidator(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=3)
    
    model = cv.fit(final_training_set)

    best_model = model.bestModel

    print('rank: ', best_model.rank)
    print('MaxIter: ', best_model._java_obj.parent().getMaxIter())
    print('RegParam: ', best_model._java_obj.parent().getRegParam())

    # Evaluate the model by computing the RMSE on the test data
    predictions = best_model.transform(test_set_with_means)
    predictions = predictions.withColumn("prediction", predictions['prediction']+predictions['user_mean']+predictions['item_mean']-global_mean)

    rmse = evaluator.evaluate(predictions)

    print("Root-mean-square error = " + str(rmse))
    return best_model

best_model = als_with_bias_recommender(ratings_train_dataframe, ratings_test_dataframe, 123)

rank:  50
MaxIter:  10
RegParam:  0.15
Root-mean-square error = 1.3112573281898512


##### Intermediate Results Discussion

the ALS RMSE = 0.9993922401899552

the ALS+biases RMSE = 1.3112573281898512

Implicit ratings, unlike explicit ratings (e.g., a 1-5 star rating), are derived from user behavior (such as session duration, page views, purchases) and not directly from user preferences. This means they are inherently noisy and less precise. Implicit signals might not always indicate preference but rather engagement or necessity. When biases based on these signals are introduced, they might amplify the noise or inaccuracies in the data, leading to a higher RMSE. 

In our specific case, a user may have the habit to leave their screen open on a product then going on to do another task. Or maybe they are convinced about a product that they like so much that they did not hesitate to buy without wasting much time on the details page. 

An interesting direction the project can head to is to try incorporating regularization techniques, to avoid overfitting if this is what is happening.

#### Item Based Collaborative filtering

In [12]:
def k_most_similar_items_with_ratings_rdd(utility_matrix, utility_matrix_t, item_id, userId, k):
    '''
    Calculates the top k items most similar to the specified item_id for a given user,
    based on Pearson correlation of ratings.
    '''
    # Fetch the row corresponding to the specified item_id
    item_row = utility_matrix.filter(utility_matrix.productSKU == item_id).collect()

    if len(item_row) > 0:
        item_row = item_row[0]
    else:
        return []
    
    # Calculate Pearson correlation for each item with the specified item, excluding the item itself
    pearson_correlation_rdd = utility_matrix.rdd.map(lambda row: (row[0], pearson_Correlation(item_row, row))).filter(lambda x: x[0] != item_id)
        
    # Fetch the user-specific ratings for all items
    user_ratings = utility_matrix_t.filter(utility_matrix_t.fullVisitorId == userId).collect()

    if len(user_ratings) > 0:
        user_ratings = user_ratings[0]
    else:
        return []

    # Map each item to its Pearson correlation and the user's rating for that item
    items_with_with_user_rating = pearson_correlation_rdd.map(lambda x: (x[0], x[1], user_ratings[x[0]]))

    # Remove items with no rating
    items_with_with_user_rating = items_with_with_user_rating.filter(lambda x: x[2] is not None)
    
    return items_with_with_user_rating.takeOrdered(k, key=lambda x: -x[1])


def item_item_recommender_rdd(utility_matrix, utility_matrix_t, ratings_test_dataframe, k):
    '''
    Predicts a rating for a specified item based on item-item similarity.
    '''
    
    list_of_predictions = [] # List to store the predicted ratings

    for row in ratings_test_dataframe.rdd.collect():
        itemId = row[1]
        userId = row[0]
        
        actual_rating = ratings_test_dataframe.filter(ratings_test_dataframe.fullVisitorId == userId).filter(ratings_test_dataframe.productSKU == itemId).collect()[0][3]

        # Find similar items and their ratings by the user
        similar_items_with_ratings = k_most_similar_items_with_ratings_rdd(utility_matrix, utility_matrix_t, itemId, userId, k)
    
        # Calculate the weighted sum of ratings (numerator) and sum of similarities (denominator)
        numerator = sum([item[1] * item[2] for item in similar_items_with_ratings]) # weighted sum
        denominator = sum([item[1] for item in similar_items_with_ratings]) # sum of similarities

        if denominator != 0:
            list_of_predictions.append((numerator / denominator, actual_rating))
    
    return list_of_predictions


item_based_prediction_actual = item_item_recommender_rdd(utility_matrix_item_item, utility_matrix_user_user, ratings_test_dataframe.limit(1), 500)

print("The list of prediction vs actual ratings: ", item_based_prediction_actual)


The list of prediction vs actual ratings:  []


#### User Based Collaborative Filtering

In [13]:

def k_most_similar_users_with_ratings_rdd(utility_matrix, utility_matrix_t, item_id, userId, k):
    '''
    Calculates the top k users most similar to the specified user for a given item,
    '''
    # Collect the row corresponding to the specified user
    user_row = utility_matrix.filter(utility_matrix.fullVisitorId == userId).collect()

    if len(user_row) > 0:
        user_row = user_row[0]
    else:
        return []
    
    # Calculate Pearson correlation for each user with the specified user, excluding the user themselves
    pearson_correlation_rdd = utility_matrix.rdd.map(lambda row: (row[0], pearson_Correlation(user_row, row))).filter(lambda x: x[0] != userId)

    # Collect the ratings for the specified item across users
    item_ratings = utility_matrix_t.filter(utility_matrix_t.productSKU == item_id).collect()

    if len(item_ratings) > 0:
        item_ratings = item_ratings[0]
    else:
        return []
    
    # Map each user to their Pearson correlation with the specified user and their rating for the specified item
    users_with_item_ratings = pearson_correlation_rdd.map(lambda x: (x[0], x[1], item_ratings[x[0]]))
    
    # Remove users with no rating
    users_with_item_ratings = users_with_item_ratings.filter(lambda x: x[2] is not None)
    
    # Return the top k similar users and their ratings
    return users_with_item_ratings.takeOrdered(k, key=lambda x: -x[1])


def user_user_recommender_rdd(utility_matrix, utility_matrix_t, ratings_test_dataframe, k):
    '''
    Predicts a rating for a specified item based on user-user similarity.
    '''

    list_of_predictions = [] # List to store the predicted ratings

    for row in ratings_test_dataframe.rdd.collect():
        itemId = row[1]
        userId = row[0]

        actual_rating = ratings_test_dataframe.filter(ratings_test_dataframe.fullVisitorId == userId).filter(ratings_test_dataframe.productSKU == itemId).collect()[0][3]

        # Calculate similarities and get ratings for the top k similar users
        similar_items_with_ratings = k_most_similar_users_with_ratings_rdd(utility_matrix, utility_matrix_t, itemId, userId, k)
        
        # Calculate the weighted sum of ratings (numerator) and sum of similarities (denominator)
        numerator = sum([item[1] * item[2] for item in similar_items_with_ratings]) # weighted sum of ratings
        denominator = sum([item[1] for item in similar_items_with_ratings]) # sum of similarities

        if denominator != 0:
            list_of_predictions.append((numerator / denominator, actual_rating))
    
    return list_of_predictions
    

user_based_prediction_actual = user_user_recommender_rdd( utility_matrix_user_user, utility_matrix_item_item, ratings_test_dataframe.limit(1), 500)

print("The list of preduction vs actual ratings: ", user_based_prediction_actual)


The list of preduction vs actual ratings:  []


### Normalization Function Selection: 
The goal is to identify which normalization technique—MinMax, Z-Score, or Log Transformation—optimizes the performance of the recommendation system.

In [26]:
import math
from pyspark.sql.types import StructType, StructField, DoubleType

def evaluate_recommender_with_RMSE(function, utility_matrix,utility_matrix_t, ratings_test_df, k, num_of_users= None):
    '''
    This function must evaluate the recommender system using the specified function
    and return the RMSE of the predictions.
    '''
    
    ratings_test_df = ratings_test_df if num_of_users is None else ratings_test_df.limit(num_of_users)

    predicted_actual_rating = function(
        utility_matrix, 
        utility_matrix_t,  
        ratings_test_df, 
        k
    )
        
    # Convert the list to a DataFrame for easier display 
    # Define the schema for the DataFrame
    schema = StructType([
        StructField("predicted_rating", DoubleType(), nullable=False),
        StructField("actual_rating", DoubleType(), nullable=False)
    ])
    
    predictions_df = spark.createDataFrame([(float(pred), float(actual)) for pred, actual in predicted_actual_rating], schema)
    predictions_df.show()

    squared_errors = [(pred[0] - pred[1]) ** 2 for pred in predicted_actual_rating]
    # Calculate RMSE
    rmse = math.sqrt(sum(squared_errors) / len(squared_errors))

    if(function == user_user_recommender_rdd):
        print(f"RMSE for User Based Recommender: {rmse}")
    else:
        print(f"RMSE for Item Based Recommender: {rmse}")


In [28]:
print("\n===============================================")
print("Evaluating the Z Score Normalization function")
print("===============================================")
(z_score_normalized_train_df, z_score_normalized_test_df) = z_score_normalization(trainSet, testSet)
print("train set std", z_score_normalized_train_df.select(stddev(z_score_normalized_train_df['normalized_duration'])).collect()[0][0])
utility_matrix_item_item = item_item_utility_matrix(z_score_normalized_train_df)
utility_matrix_user_user = user_user_utility_matrix(z_score_normalized_train_df)

evaluate_recommender_with_RMSE(item_item_recommender_rdd, utility_matrix_item_item, utility_matrix_user_user, z_score_normalized_test_df, 500, 10) # Item Based Evaluation
evaluate_recommender_with_RMSE(user_user_recommender_rdd, utility_matrix_user_user, utility_matrix_item_item, z_score_normalized_test_df, 500, 10) # User Based Evaluation

print("\n===============================================")
print("Evaluating the Min-Max Normalization function")
print("===============================================")
(min_max_normalized_train_df, min_max_normalized_test_df) = min_max_normalization(trainSet, testSet)
print("train set std", min_max_normalized_test_df.select(stddev(min_max_normalized_test_df['normalized_duration'])).collect()[0][0])
utility_matrix_item_item = item_item_utility_matrix(min_max_normalized_train_df)
utility_matrix_user_user = user_user_utility_matrix(min_max_normalized_train_df)

evaluate_recommender_with_RMSE(item_item_recommender_rdd, utility_matrix_item_item, utility_matrix_user_user, min_max_normalized_test_df, 500, 10) # Item Based Evaluation
evaluate_recommender_with_RMSE(user_user_recommender_rdd, utility_matrix_user_user, utility_matrix_item_item, min_max_normalized_test_df, 500, 10) # User Based Evaluation

print("\n===============================================")
print("Evaluating the Logarithmic Transformation function")
print("===============================================")
(log_normalized_train_df, log_normalized_test_df) = logarithmic_transformation(trainSet, testSet)
print("train set std", log_normalized_test_df.select(stddev(log_normalized_test_df['normalized_duration'])).collect()[0][0])
utility_matrix_item_item = item_item_utility_matrix(log_normalized_train_df)
utility_matrix_user_user = user_user_utility_matrix(log_normalized_train_df)

evaluate_recommender_with_RMSE(item_item_recommender_rdd, utility_matrix_item_item, utility_matrix_user_user, log_normalized_test_df, 500, 10) # Item Based Evaluation
evaluate_recommender_with_RMSE(user_user_recommender_rdd, utility_matrix_user_user, utility_matrix_item_item, log_normalized_test_df, 500, 10) # User Based Evaluation


Evaluating the Z Score Normalization function
train set std 0.9999999999999998
+--------------------+--------------------+
|    predicted_rating|       actual_rating|
+--------------------+--------------------+
|-0.18396846315227863| -0.3625344285102987|
|-0.14663694865816213|-0.12690512516191746|
|-0.14910121044162003| -0.4039764459246887|
|-0.19422365130373642| -0.4039764459246887|
|  -0.078795043069357| -0.3398567920459228|
+--------------------+--------------------+

RMSE for Item Based Recommender: 0.20463919767985853
+--------------------+--------------------+
|    predicted_rating|       actual_rating|
+--------------------+--------------------+
|-0.12174154048772799| -0.3625344285102987|
|-0.11852710156736367|-0.12690512516191746|
|-0.17083355675391376| -0.4039764459246887|
|-0.19422365130373642| -0.4039764459246887|
|-0.16157886034634159| -0.3398567920459228|
+--------------------+--------------------+

RMSE for User Based Recommender: 0.1940032312900367

Evaluating the Min-M

#### Results:
Z-Score Normalization appears to perform exceptionally well with the item-based collaborative filtering approach, yielding a very low RMSE. This suggests that standardizing the ratings to have a mean of 0 and a standard deviation of 1 effectively captures the preference patterns across items, leading to accurate predictions. However, for the user-based approach, the performance is slightly weaker, although still competitive.

Given these results, we chose Z-Score for User and Item Based Filtering

## Model Evalutation: User Based and Item Based

In [29]:
z_score_normalized_train_df, z_score_normalized_test_df = min_max_normalization(trainSet, testSet)
utility_matrix_item_item = item_item_utility_matrix(z_score_normalized_train_df)
utility_matrix_user_user = user_user_utility_matrix(z_score_normalized_train_df)

print("Evaluation for Item Based Recommender:")
evaluate_recommender_with_RMSE(item_item_recommender_rdd, utility_matrix_item_item, utility_matrix_user_user, z_score_normalized_test_df, 500)
print("Evaluation for User Based Recommender")
evaluate_recommender_with_RMSE(user_user_recommender_rdd, utility_matrix_user_user, utility_matrix_item_item, z_score_normalized_test_df, 500)

Evaluation for Item Based Recommender:
+--------------------+--------------------+
|    predicted_rating|       actual_rating|
+--------------------+--------------------+
|7.195622761041902E-4| 0.00583535447971853|
|0.003336570986695...|0.039013770894588747|
|0.003163821645022983|                 0.0|
|6.544823862427803E-7|                 0.0|
|0.008092414950565945|0.009028539941917447|
|0.006751640296480521|0.006779308880849468|
|0.001775038082629...|                 0.0|
|9.188932702848634E-4|4.516528235076262E-6|
|0.003783232167321...|0.053773785166817974|
|0.007611630152003534|0.014610968840471706|
|1.767102442855506...|  0.5327696706095958|
|0.002534090720589429| 0.08807681711222218|
|2.949314536686665E-7|                 0.0|
|0.002192515993913314|0.012614663360567999|
|8.377374543907587E-4|                 0.0|
|0.009777312368080894|  0.2674913847223916|
|0.001432771055920...|  0.0312408258020225|
|0.002771732905738...| 0.01960624906846605|
|0.001306346842940...|0.0087168994936

# Conclusion
Given the RMSE values for the three collaborative filtering algorithms applied on the test set (10%) of an e-commerce dataset covering a span of 1 day:

1. Item-Based Recommender RMSE: 0.8716780363753216
2. User-Based Recommender RMSE: 0.21951523321964755
3. Latent Factor Model (ALS) RMSE: 0.9993922401899552

The Item-Based Filtering algorithm, while not performing as well as the Item-Based approach, still shows a reasonable level of accuracy. We assume that this is because an item has less ratings than users in general.

The Latent Factor Model, has the highest RMSE among the three methods. This performance might be attributed to the characteristics of the dataset, which encompasses e-commerce data for only a single day. Latent factor models, often require a more extensive range of interactions to effectively uncover the underlying patterns and relationships between users and items. The limited temporal scope of the data resulted to poorer performance compared to the other methods. 

Given these observations, the **User-Based Collaborative Filtering** algorithm is the preferred choice for this dataset. 