In [1]:
import pandas as pd
import os

# Define the path to the archive folder and the CSV file
archive_folder = 'archive'
csv_file_items = 'items.csv'
csv_file_events = 'events1.csv'
csv_file_users = 'users.csv'
csv_file_items = os.path.join(archive_folder, csv_file_items)
csv_file_events = os.path.join(archive_folder, csv_file_events)
csv_file_users = os.path.join(archive_folder, csv_file_users)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, avg, log1p, exp
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("MerchandisingPricingModel") \
    .getOrCreate()

# Optional: Set log level to WARN to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

In [3]:
# Load Items DataFrame
items_df = spark.read.csv(csv_file_items, header=True, inferSchema=True)
print("Items DataFrame Schema:")
items_df.printSchema()

# Load Events DataFrame
events_df = spark.read.csv(csv_file_events, header=True, inferSchema=True)
print("Events DataFrame Schema:")
events_df.printSchema()

# Load Users DataFrame
users_df = spark.read.csv(csv_file_users, header=True, inferSchema=True)
print("Users DataFrame Schema:")
users_df.printSchema()

Items DataFrame Schema:
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- variant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price_in_usd: integer (nullable = true)

Events DataFrame Schema:
root
 |-- user_id: integer (nullable = true)
 |-- ga_session_id: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- device: string (nullable = true)
 |-- type: string (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- date: timestamp (nullable = true)

Users DataFrame Schema:
root
 |-- id: integer (nullable = true)
 |-- ltv: integer (nullable = true)
 |-- date: timestamp (nullable = true)



In [4]:
from pyspark.sql.functions import to_date

# Drop rows with missing values in 'country' and 'variant' columns
events_df = events_df.dropna(subset=['country'])
items_df = items_df.dropna(subset=['variant'])

# Verify that there are no nulls in the specified columns
print("Events DataFrame Null Counts after dropping 'country' nulls:")
events_df.select([count(when(col(c).isNull(), c)).alias(c) for c in events_df.columns]).show()

print("Items DataFrame Null Counts after dropping 'variant' nulls:")
items_df.select([count(when(col(c).isNull(), c)).alias(c) for c in items_df.columns]).show()

# Convert date columns to DateType
events_df = events_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
users_df = users_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

Events DataFrame Null Counts after dropping 'country' nulls:
+-------+-------------+-------+------+----+-------+----+
|user_id|ga_session_id|country|device|type|item_id|date|
+-------+-------------+-------+------+----+-------+----+
|      0|            0|      0|     0|   0|      0|   0|
+-------+-------------+-------+------+----+-------+----+

Items DataFrame Null Counts after dropping 'variant' nulls:
+---+----+-----+-------+--------+------------+
| id|name|brand|variant|category|price_in_usd|
+---+----+-----+-------+--------+------------+
|  0|   0|    0|      0|       0|           0|
+---+----+-----+-------+--------+------------+



In [5]:

# Rename column 'id' to 'item_id' in items_df for consistency
items_df = items_df.withColumnRenamed("id", "item_id")

# Print schema to confirm renaming
items_df.printSchema()
# Merge events_df with items_df to add item details to transactions
all_transactions_name = events_df.join(
    items_df.select("item_id", "name", "brand", "variant", "category", "price_in_usd"),
    on="item_id",
    how="left"
)

# Preview the merged DataFrame
all_transactions_name.show(truncate=False)



root
 |-- item_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- variant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price_in_usd: integer (nullable = true)

+-------+-------+-------------+-------+-------+-----------+----------+---------------------------------+-------+------------------+---------+------------+
|item_id|user_id|ga_session_id|country|device |type       |date      |name                             |brand  |variant           |category |price_in_usd|
+-------+-------+-------------+-------+-------+-----------+----------+---------------------------------+-------+------------------+---------+------------+
|94     |2133   |16909        |US     |mobile |purchase   |2020-11-01|Google Large Tote White          |Google |Single Option Only|Bags     |10          |
|425    |2133   |16909        |US     |mobile |purchase   |2020-11-01|Google Heather Green Speckled Tee|Google | XL               |Appare

In [6]:
# Total number of rows in the DataFrame
total_rows = all_transactions_name.count()
print(f"Total Rows: {total_rows}")
# Count of unique item_id values
unique_item_ids = all_transactions_name.select("item_id").distinct().count()
print(f"Unique item_id Values: {unique_item_ids}")


Total Rows: 754329
Unique item_id Values: 1381


In [7]:
from pyspark.sql.functions import col, min
from pyspark.sql.window import Window

# Add a column with the smallest item_id for each name
all_transactions_name = all_transactions_name.withColumn(
    "item_id_min",
    min("item_id").over(Window.partitionBy("name"))
)
# Replace item_id with item_id_min
all_transactions_name = all_transactions_name.withColumn(
    "item_id",
    col("item_id_min")
)
# Drop item_id_min column
all_transactions_name = all_transactions_name.drop("item_id_min")

# Sort the DataFrame by item_id
sorted_transactions = all_transactions_name.orderBy("item_id", ascending=True)
sorted_transactions.show()


+-------+-------+-------------+-------+-------+-----------+----------+--------------------+------+------------------+--------+------------+
|item_id|user_id|ga_session_id|country| device|       type|      date|                name| brand|           variant|category|price_in_usd|
+-------+-------+-------------+-------+-------+-----------+----------+--------------------+------+------------------+--------+------------+
|      0|   5115|        17001|     US| mobile|   purchase|2020-11-02|Google Land & Sea...|Google|Single Option Only| Apparel|          14|
|      0|  20937|          217|     IN|desktop|add_to_cart|2020-11-18|Google Land & Sea...|Google|Single Option Only| Apparel|          14|
|      0|  10904|        16401|     TR|desktop|   purchase|2020-11-03|Google Land & Sea...|Google|Single Option Only| Apparel|          14|
|      0|  29457|        17113|     KR| mobile|   purchase|2020-11-05|Google Land & Sea...|Google|Single Option Only| Apparel|          14|
|      0|  30148|   

In [8]:
# Drop duplicates based on item_id
unique_items = sorted_transactions.dropDuplicates(["item_id"])
unique_items.show()


+-------+-------+-------------+-------+-------+--------+----------+--------------------+------+------------------+--------------------+------------+
|item_id|user_id|ga_session_id|country| device|    type|      date|                name| brand|           variant|            category|price_in_usd|
+-------+-------+-------------+-------+-------+--------+----------+--------------------+------+------------------+--------------------+------------+
|      0|   5115|        17001|     US| mobile|purchase|2020-11-02|Google Land & Sea...|Google|Single Option Only|             Apparel|          14|
|      1|   5789|        16908|     SE|desktop|purchase|2020-11-01|      Google KeepCup|Google|Single Option Only|                 New|          28|
|      2|  32033|        15864|     PT|desktop|purchase|2020-11-02|Google Land & Sea...|Google|Single Option Only|           Drinkware|          20|
|      3|  11348|        17000|     US|desktop|purchase|2020-11-02|Google Unisex Eco...|Google|           

In [9]:
# Rename columns to match the desired schema
transactions_new = unique_items.select(
    col("item_id"),
    col("name"),
    col("category")
)

# Show the new DataFrame
transactions_new.show(truncate=False)


+-------+--------------------------------------+-----------------------+
|item_id|name                                  |category               |
+-------+--------------------------------------+-----------------------+
|0      |Google Land & Sea Cotton Cap          |Apparel                |
|1      |Google KeepCup                        |New                    |
|2      |Google Land & Sea Nalgene Water Bottle|Drinkware              |
|3      |Google Unisex Eco Tee Black           |Uncategorized Items    |
|4      |Google Chicago Campus Bottle          |Campus Collection      |
|5      |Google SF Campus Zip Hoodie           |Clearance              |
|6      |Google Chicago Campus Unisex Tee      |Campus Collection      |
|7      |Google SF Campus Unisex Tee           |Clearance              |
|8      |Super G Unisex Joggers                |Shop by Brand          |
|9      |Mommy Works at Google Book            |Small Goods            |
|10     |BLM Unisex Pullover Hoodie            |Bla

In [10]:
# Convert the Spark DataFrame to Pandas for further processing
data = transactions_new.toPandas()

# Verify the conversion
print(data.head())


   item_id                                    name             category
0        0            Google Land & Sea Cotton Cap              Apparel
1        1                          Google KeepCup                  New
2        2  Google Land & Sea Nalgene Water Bottle            Drinkware
3        3             Google Unisex Eco Tee Black  Uncategorized Items
4        4            Google Chicago Campus Bottle    Campus Collection


In [11]:
data['category'].value_counts()

category
Apparel                    109
Campus Collection           66
New                         43
Accessories                 41
Clearance                   28
Bags                        23
Office                      17
Shop by Brand               16
Lifestyle                   13
Uncategorized Items         12
Drinkware                   12
Stationery                   8
Small Goods                  6
Writing Instruments          5
Google                       4
Gift Cards                   4
Notebooks & Journals         2
Electronics Accessories      1
Black Lives Matter           1
Fun                          1
Eco-Friendly                 1
Name: count, dtype: int64

In [12]:
from pyspark.sql.functions import when, col

# Replace null or None values with "Unknown" in the category column
transactions_new = transactions_new.withColumn(
    "category",
    when(col("category").isNull(), "Uncategorized Items").otherwise(col("category"))
)



In [13]:
from pyspark.ml.feature import Tokenizer

# Tokenize the category column
tokenizer = Tokenizer(inputCol="category", outputCol="tokens")
tokenized_df = tokenizer.transform(transactions_new)

# Show tokenized data
tokenized_df.select("category", "tokens","name").show(truncate=False)


+-----------------------+--------------------------+--------------------------------------+
|category               |tokens                    |name                                  |
+-----------------------+--------------------------+--------------------------------------+
|Apparel                |[apparel]                 |Google Land & Sea Cotton Cap          |
|New                    |[new]                     |Google KeepCup                        |
|Drinkware              |[drinkware]               |Google Land & Sea Nalgene Water Bottle|
|Uncategorized Items    |[uncategorized, items]    |Google Unisex Eco Tee Black           |
|Campus Collection      |[campus, collection]      |Google Chicago Campus Bottle          |
|Clearance              |[clearance]               |Google SF Campus Zip Hoodie           |
|Campus Collection      |[campus, collection]      |Google Chicago Campus Unisex Tee      |
|Clearance              |[clearance]               |Google SF Campus Unisex Tee 

In [14]:
from pyspark.ml.feature import CountVectorizer, IDF

# Step 3.1: Compute term frequency (TF)
count_vectorizer = CountVectorizer(inputCol="tokens", outputCol="raw_features")
count_model = count_vectorizer.fit(tokenized_df)
tf_df = count_model.transform(tokenized_df)

# Step 3.2: Compute inverse document frequency (IDF)
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(tf_df)
tfidf_df = idf_model.transform(tf_df)

# Show TF-IDF results
tfidf_df.select("category", "features").show(truncate=False)

+-----------------------+-----------------------------------------------------------------------+
|category               |features                                                               |
+-----------------------+-----------------------------------------------------------------------+
|Apparel                |(32,[0],[1.3277981544382822])                                          |
|New                    |(32,[3],[2.2440888863124373])                                          |
|Drinkware              |(32,[14],[3.4633291627691616])                                         |
|Uncategorized Items    |(32,[11,12],[3.3892211906154395,3.3892211906154395])                   |
|Campus Collection      |(32,[1,2],[1.8235859008397322,1.8235859008397322])                     |
|Clearance              |(32,[5],[2.660982690244224])                                           |
|Campus Collection      |(32,[1,2],[1.8235859008397322,1.8235859008397322])                     |
|Clearance          

In [15]:
from pyspark.ml.functions import vector_to_array

# Convert 'features' column from Vector to array (dense representation)
dense_df = tfidf_df.withColumn("features_dense", vector_to_array("features"))

# Show the dense features



In [16]:
dense_df.select("name","category", "features_dense").show(truncate=False)

+--------------------------------------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|name                                  |category               |features_dense                                                                                                                                                                                            |
+--------------------------------------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Google Land & Sea Cotton Cap          |Apparel                |[1.3277981544382822, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0

In [17]:
dense_df.schema

StructType([StructField('item_id', IntegerType(), True), StructField('name', StringType(), True), StructField('category', StringType(), True), StructField('tokens', ArrayType(StringType(), True), True), StructField('raw_features', VectorUDT(), True), StructField('features', VectorUDT(), True), StructField('features_dense', ArrayType(DoubleType(), False), False)])

In [27]:
from pyspark.sql.functions import col, expr

# Select name and features_dense for self-join
df1 = dense_df.select(
    col("name").alias("name1"),
    col("features_dense").alias("features1")
)
df2 = dense_df.select(
    col("name").alias("name2"),
    col("features_dense").alias("features2")
)

# Perform cross join
cross_df = df1.crossJoin(df2)

# Compute dot product using aggregate and transform with proper casting
dot_product_expr = """
    aggregate(
        transform(features1, (x, i) -> x * features2[i]),
        cast(0.0 as double),
        (acc, x) -> acc + x
    )
"""

cross_df = cross_df.withColumn("dot_product", expr(dot_product_expr))

# Compute norms of feature vectors with proper casting
norm1_expr = """
    sqrt(
        aggregate(
            features1,
            cast(0.0 as double),
            (acc, x) -> acc + x * x
        )
    )
"""

norm2_expr = """
    sqrt(
        aggregate(
            features2,
            cast(0.0 as double),
            (acc, x) -> acc + x * x
        )
    )
"""

cross_df = cross_df.withColumn("norm1", expr(norm1_expr)) \
                   .withColumn("norm2", expr(norm2_expr))

# Compute cosine similarity and handle division by zero
cosine_similarity_expr = """
    CASE 
        WHEN norm1 * norm2 = 0 THEN 0 
        ELSE dot_product / (norm1 * norm2) 
    END
"""

cross_df = cross_df.withColumn("cosine_similarity", expr(cosine_similarity_expr))

# Filter out self-similarity
similarity_df = cross_df.filter(col("name1") != col("name2"))

# Select relevant columns
similarity_df = similarity_df.select("name1", "name2", "cosine_similarity")

# Show the similarity matrix
similarity_df.show(truncate=False)


+----------------------------+--------------------------------------+-----------------+
|name1                       |name2                                 |cosine_similarity|
+----------------------------+--------------------------------------+-----------------+
|Google Land & Sea Cotton Cap|Google KeepCup                        |0.0              |
|Google Land & Sea Cotton Cap|Google Land & Sea Nalgene Water Bottle|0.0              |
|Google Land & Sea Cotton Cap|Google Unisex Eco Tee Black           |0.0              |
|Google Land & Sea Cotton Cap|Google Chicago Campus Bottle          |0.0              |
|Google Land & Sea Cotton Cap|Google SF Campus Zip Hoodie           |0.0              |
|Google Land & Sea Cotton Cap|Google Chicago Campus Unisex Tee      |0.0              |
|Google Land & Sea Cotton Cap|Google SF Campus Unisex Tee           |0.0              |
|Google Land & Sea Cotton Cap|Super G Unisex Joggers                |0.0              |
|Google Land & Sea Cotton Cap|Mo

In [42]:
from pyspark.sql.functions import col

def query_cosine_matrix(item_name, similarity_df, items_df, k=10):
    """
    Query the cosine similarity matrix for top-k similar items.

    Parameters:
    ---
    item_name : str
        The name of the item to query.
    similarity_df : pyspark.sql.DataFrame
        DataFrame containing cosine similarity with ('name1', 'name2', 'cosine_similarity').
    items_df : pyspark.sql.DataFrame
        DataFrame containing item details with ('item_id', 'name', 'category').
    k : int
        The number of top similar items to return.

    Returns:
    ---
    pyspark.sql.DataFrame:
        DataFrame with top-k similar items and their details.
    """
    # Alias the DataFrames to avoid ambiguity
    similarity_alias = similarity_df.alias("similarity")
    items_alias = items_df.alias("items")

    # Filter the similarity_df for the given item
    filtered_df = similarity_alias.filter(col("similarity.name1") == item_name)

    # Get the top-k similar items
    top_k = filtered_df.orderBy(col("similarity.cosine_similarity").desc()).limit(k)

    # Join with items_df to get item details
    result = top_k.join(
        items_alias,
        col("similarity.name2") == col("items.name"),
        how='left'
    ).select(
        col("items.item_id"),
        col("items.name").alias("recommended_item"),
        col("items.category"),
        col("similarity.cosine_similarity")
    )

    return result

# Example Usage
input_item = "Google Flat Front Bag Grey"
top_similar_items = query_cosine_matrix(input_item, similarity_df, transactions_new, k=10)

# Show the top-k similar items
top_similar_items.show(truncate=False)


+-------+-------------------------------+--------+-----------------+
|item_id|recommended_item               |category|cosine_similarity|
+-------+-------------------------------+--------+-----------------+
|21     |Google Campus Bike Tote Navy   |Bags    |1.0              |
|103    |Google Incognito Zip Pack      |Bags    |1.0              |
|333    |Google Incognito Dopp Kit V2   |Bags    |1.0              |
|297    |Google Striped Penny Pouch     |Bags    |1.0              |
|315    |Google Utility Bag Grey        |Bags    |1.0              |
|58     |Google Utility BackPack        |Bags    |1.0              |
|299    |Google Packable Bag Black      |Bags    |1.0              |
|59     |Google Incognito Techpack V2   |Bags    |1.0              |
|365    |Google Confetti Accessory Pouch|Bags    |1.0              |
|94     |Google Large Tote White        |Bags    |1.0              |
+-------+-------------------------------+--------+-----------------+



In [46]:
all_transactions_name.show(truncate=False)

+-------+-------+-------------+-------+------+-----------+----------+----+-----+-------+--------+------------+
|item_id|user_id|ga_session_id|country|device|type       |date      |name|brand|variant|category|price_in_usd|
+-------+-------+-------------+-------+------+-----------+----------+----+-----+-------+--------+------------+
|835    |5808   |4267         |US     |mobile|add_to_cart|2020-11-01|NULL|NULL |NULL   |NULL    |NULL        |
|835    |5808   |4267         |US     |mobile|add_to_cart|2020-11-01|NULL|NULL |NULL   |NULL    |NULL        |
|835    |5808   |4267         |US     |mobile|add_to_cart|2020-11-01|NULL|NULL |NULL   |NULL    |NULL        |
|835    |5808   |4267         |US     |mobile|add_to_cart|2020-11-01|NULL|NULL |NULL   |NULL    |NULL        |
|835    |5808   |4267         |US     |mobile|add_to_cart|2020-11-01|NULL|NULL |NULL   |NULL    |NULL        |
|835    |5808   |4267         |US     |mobile|add_to_cart|2020-11-01|NULL|NULL |NULL   |NULL    |NULL        |
|

In [50]:
from pyspark.sql.functions import when

# Define the event type mapping
event_type_mapping = {
    'add_to_cart': 1,
    'begin_checkout': 2,
    'purchase': 3
}

# Apply the mapping to create a new 'event_type' column
all_transactions_name = all_transactions_name.withColumn(
    "event_type",
    when(col("type") == "add_to_cart", 1)
    .when(col("type") == "begin_checkout", 2)
    .when(col("type") == "purchase", 3)
    .otherwise(0)  # Assign 0 or another value for unknown types
)

# Verify the mapping
all_transactions_name.select("type", "event_type").show(10, truncate=False)


+-----------+----------+
|type       |event_type|
+-----------+----------+
|purchase   |3         |
|purchase   |3         |
|purchase   |3         |
|purchase   |3         |
|add_to_cart|1         |
|add_to_cart|1         |
|add_to_cart|1         |
|add_to_cart|1         |
|add_to_cart|1         |
|add_to_cart|1         |
+-----------+----------+
only showing top 10 rows



In [51]:
from pyspark.ml.feature import StringIndexer

# Initialize StringIndexer for 'user_id'
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")

# Fit the indexer on the DataFrame
user_indexer_model = user_indexer.fit(all_transactions_name)

# Transform the DataFrame to include the encoded 'user_id'
all_transactions_name = user_indexer_model.transform(all_transactions_name)

# Verify the encoding
all_transactions_name.select("user_id", "user_id_encoded").show(10, truncate=False)


+-------+---------------+
|user_id|user_id_encoded|
+-------+---------------+
|2133   |13829.0        |
|2133   |13829.0        |
|5789   |14023.0        |
|5789   |14023.0        |
|5808   |4188.0         |
|5808   |4188.0         |
|5808   |4188.0         |
|5808   |4188.0         |
|5808   |4188.0         |
|5808   |4188.0         |
+-------+---------------+
only showing top 10 rows



In [67]:
from pyspark.ml.feature import StringIndexer

# Initialize and fit the StringIndexer for item_id
item_indexer = StringIndexer(inputCol="item_id", outputCol="item_id_encoded")
item_indexer_model = item_indexer.fit(all_transactions_name)

# Transform the dataset to include item_id_encoded
all_transactions_name = item_indexer_model.transform(all_transactions_name)

# Show the transformed dataset
all_transactions_name.select("item_id", "item_id_encoded").show(10, truncate=False)


+-------+---------------+
|item_id|item_id_encoded|
+-------+---------------+
|835    |0.0            |
|835    |0.0            |
|835    |0.0            |
|835    |0.0            |
|835    |0.0            |
|835    |0.0            |
|835    |0.0            |
|835    |0.0            |
|835    |0.0            |
|835    |0.0            |
+-------+---------------+
only showing top 10 rows



In [68]:
# Select the necessary columns
final_df = all_transactions_name.select(
    col("user_id_encoded").alias("user_id"),
    col("item_id_encoded").alias("item_id"),
    col("event_type")
)

# Show the final DataFrame
final_df.show(10, truncate=False)


+-------+-------+----------+
|user_id|item_id|event_type|
+-------+-------+----------+
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
+-------+-------+----------+
only showing top 10 rows



In [106]:
# Extract labels from the StringIndexer model
user_labels = user_indexer_model.labels

# Create mapping dictionaries
user_to_user_encoded = {label: idx for idx, label in enumerate(user_labels)}
user_encoded_to_user = {idx: label for idx, label in enumerate(user_labels)}



In [70]:
# Check the counts of each event_type
final_df.groupBy("event_type").count().show()

# Display sample data
final_df.show(10, truncate=False)


+----------+------+
|event_type| count|
+----------+------+
|         1|663305|
|         3| 15475|
|         2| 75549|
+----------+------+

+-------+-------+----------+
|user_id|item_id|event_type|
+-------+-------+----------+
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
+-------+-------+----------+
only showing top 10 rows



In [71]:
final_df.show(10, truncate=False)


+-------+-------+----------+
|user_id|item_id|event_type|
+-------+-------+----------+
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
|4188.0 |0.0    |1         |
+-------+-------+----------+
only showing top 10 rows



In [72]:
# Split the data into training (80%) and validation (20%) sets
train_df, val_df = final_df.randomSplit([0.8, 0.2], seed=42)

print(f"Training Data Count: {train_df.count()}")
print(f"Validation Data Count: {val_df.count()}")


Training Data Count: 603709
Validation Data Count: 150620


In [73]:
from pyspark.ml.recommendation import ALS

# Initialize the ALS model
als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="user_id",
    itemCol="item_id",
    ratingCol="event_type",
    coldStartStrategy="drop",  # To handle NaN predictions
    nonnegative=True
)

# Train the ALS model on the training data
als_model = als.fit(train_df)

print("ALS model training completed.")


ALS model training completed.


In [74]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on the validation set
predictions = als_model.transform(val_df)

# Initialize the evaluator
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="event_type",
    predictionCol="prediction"
)

# Calculate RMSE
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on validation data: {rmse:.4f}")


Root Mean Squared Error (RMSE) on validation data: 0.2222


In [75]:
# Generate top 10 item recommendations for each user
user_recommendations = als_model.recommendForAllUsers(10)

# Show recommendations for a few users
user_recommendations.select("user_id", "recommendations").show(5, truncate=False)


+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                    |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|31     |[{404, 2.1045675}, {334, 2.0156727}, {296, 1.9900073}, {300, 1.9811398}, {360, 1.9764503}, {384, 1.9761128}, {283, 1.9755534}, {353, 1.974976}, {245, 1.9745234}, {374, 1.9645573}]|
|34     |[{404, 1.8041916}, {360, 1.7999492}, {334, 1.7909286}, {375, 1.7745564}, {353, 1.7631757}, {391, 1.7532632}, {384, 1.7379528}, {296, 1.737398}, {245, 1.733621}, {283, 1.729852}]  |
|53     |[{404, 1.9829658}, {334, 1.9108629}, {360

In [63]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Define a parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.regParam, [0.05, 0.1, 0.15]) \
    .build()

# Define the evaluator
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="event_type",
    predictionCol="prediction"
)

# Initialize CrossValidator
crossval = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5
)

# Run cross-validation, and choose the best set of parameters.
cv_model = crossval.fit(train_df)

# Evaluate the best model
best_model = cv_model.bestModel
predictions = best_model.transform(val_df)
rmse = evaluator.evaluate(predictions)
print(f"Best Model RMSE: {rmse:.4f}")


Best Model RMSE: 0.2000


In [None]:
user_recommendations = als_model.recommendForAllUsers(5)
user_recommendations.show(5, truncate=False)


+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                    |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|31     |[{404, 2.1045675}, {334, 2.0156727}, {296, 1.9900073}, {300, 1.9811398}, {360, 1.9764503}, {384, 1.9761128}, {283, 1.9755534}, {353, 1.974976}, {245, 1.9745234}, {374, 1.9645573}]|
|34     |[{404, 1.8041916}, {360, 1.7999492}, {334, 1.7909286}, {375, 1.7745564}, {353, 1.7631757}, {391, 1.7532632}, {384, 1.7379528}, {296, 1.737398}, {245, 1.733621}, {283, 1.729852}]  |
|53     |[{404, 1.9829658}, {334, 1.9108629}, {360

In [None]:
def pretty_print_recommendations(user_id, recommendations, transactions_new):
    """
    Pretty-print recommendations for a user.
    
    Args:
    - user_id: The user_id to display recommendations for.
    - recommendations: List of recommendations from ALS.
    - transactions_new: DataFrame containing item metadata (item_id, name, category).
    """
    # Get recommendations for the specified user
    user_recs = next((row for row in recommendations if row.user_id == user_id), None)
    if not user_recs:
        print(f"No recommendations found for user: {user_id}")
        return
    
    # Extract recommended item IDs
    recommended_items = [row.item_id for row in user_recs.recommendations]

    # Pretty print recommendations
    print(f"\nShowing recommendations for user: {user_id}")
    print("=" * 40)
    print(recommended_items)
    # final all recommended items from the transactions_new dataframe
    recommended_items_df = transactions_new.filter(col("item_id").isin(recommended_items))
    recommended_items_df.show(truncate=False)
    return recommended_items_df


In [None]:
# Example user_id
test_user_id = 10

# Pretty print recommendations for the user
pretty_print_recommendations(test_user_id, user_recommendations_list, transactions_new)



Showing recommendations for user: 10
[404, 334, 296, 360, 353, 300, 384, 283, 245, 391]
+-------+--------------------------------------+-----------------+
|item_id|name                                  |category         |
+-------+--------------------------------------+-----------------+
|245    |Google Men's Softshell Moss           |Apparel          |
|283    |Google Cambridge Campus Bottle        |Campus Collection|
|296    |Google LA Campus Lapel Pin            |Campus Collection|
|300    |Google Men's Puff Jacket Black        |Apparel          |
|360    |Google Seattle Campus Sticker         |Campus Collection|
|384    |Google Tee Green                      |Clearance        |
|391    |Google Mountain View Campus Ladies Tee|Campus Collection|
|404    |Google Cork Pencil Pouch              |New              |
+-------+--------------------------------------+-----------------+



DataFrame[item_id: int, name: string, category: string]