# **Content-Based Product Recommender System with PySpark & OpenAI**

### Task 1 - Set up the project

Installing the needed modules.

In [1]:
!pip install openai==1.16.2 python-dotenv pyspark



Import the modules

In [2]:
from dotenv import load_dotenv
import os
from openai import OpenAI
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType

from pyspark.ml.feature import VectorAssembler, PCA
from pyspark.ml.clustering import KMeans
import findspark
import plotly.express as px


Setup the OpenAI API

In [3]:
# Load environment variables
load_dotenv(dotenv_path='apikey.env.txt')
# Initialize OpenAI client
APIKEY=os.getenv('apikey')

#Create an Instance of the OpenAI client
client = OpenAI(api_key=APIKEY)
client

<openai.OpenAI at 0x22ed2c28d50>

Create a Spark session

In [4]:
#Create a Spark session
spark = SparkSession.builder.appName("ProductRecommenderSystem").getOrCreate()
spark

In [5]:
from pyspark.sql import SparkSession
import os

print("Creating a new Spark session with 12GB of driver memory...")

try:
    spark = SparkSession.builder \
        .appName("RecommenderSystemWithMemory") \
        .master("local[*]") \
        .config("spark.driver.host", "127.0.0.1") \
        .config("spark.driver.memory", "12g") \
        .getOrCreate()

    print("\n✅ Spark session created successfully!")
    print(f"✅ Spark Version: {spark.version}")

except Exception as e:
    print("\n❌ Test failed. Error details:", e)

Creating a new Spark session with 12GB of driver memory...

✅ Spark session created successfully!
✅ Spark Version: 3.5.1


Loading the dataset

In [7]:
file_path = 'products_dataset.csv'
# Load the dataset
df = spark.read.csv(file_path, header=True, inferSchema=True, samplingRatio=1)
df.show(5)

+----------+--------------------+--------------------+
|product_id|               title|         description|
+----------+--------------------+--------------------+
|        P0|Men's 3X Large Ca...|This heavyweight,...|
|        P1|Turmode 30 ft. RP...|If you need more ...|
|        P2|Large Tapestry Bo...|Polyester cover r...|
|        P3|16-Gauge-Sinks Ve...|It features a rec...|
|        P4|Men's Crazy Horse...|This 9 in. black ...|
+----------+--------------------+--------------------+
only showing top 5 rows



List of 8 products recently viewed by the user.

In [8]:
recently_viewed_products = [
    'P316',
    'P333',
    'P1115',
    'P1691',
    'P1082',
    'P397',
    'P1441',
    'P1054',
]

### Task 2 - Prepare the dataset

Combine `title` and `description` Columns

In [9]:
df = df.withColumn('combined_text', concat_ws("", df.title, df.description))
df.show(5)

+----------+--------------------+--------------------+--------------------+
|product_id|               title|         description|       combined_text|
+----------+--------------------+--------------------+--------------------+
|        P0|Men's 3X Large Ca...|This heavyweight,...|Men's 3X Large Ca...|
|        P1|Turmode 30 ft. RP...|If you need more ...|Turmode 30 ft. RP...|
|        P2|Large Tapestry Bo...|Polyester cover r...|Large Tapestry Bo...|
|        P3|16-Gauge-Sinks Ve...|It features a rec...|16-Gauge-Sinks Ve...|
|        P4|Men's Crazy Horse...|This 9 in. black ...|Men's Crazy Horse...|
+----------+--------------------+--------------------+--------------------+
only showing top 5 rows



get the combined_text column and convert it into a list

In [10]:
list_combined_text = df.select('combined_text').rdd.flatMap(lambda x: x).collect()
print (list_combined_text[:3])

["Men's 3X Large Carbon Heather Cotton/Polyester Rain Defender Paxton Heavyweight Hooded Zip-Front SweatshirtThis heavyweight, water-repellent hooded sweatshirt has a zip front for fast layering. ORIGINAL FIT. 13 oz., 75% cotton/25% polyester blend with Rain Defender durable water repellent. Attached, jersey-lined three-piece hood with drawcord closure. Antique-finish brass front zipper. Two front hand-warmer pockets have a hidden security pocket inside. Stretchable, spandex-reinforced rib-knit cuffs and waistband. Locker loop facilitates hanging.", "Turmode 30 ft. RP TNC Female to RP TNC Male Adapter CableIf you need more length between your existing wireless device and Hi-Gain Antenna, this is the product for you. It's compatible with most Wi-Fi Antennas, so it is easy for you to extend your wireless network. Just replace your existing cable that runs between your wireless device and Antenna and you're ready to use your network with extended range.", 'Large Tapestry Bolster BedPolyes

Use OpenAI text embedding model to create the vector embeddings.

In [11]:
#Use OpenAI's embeddings API to generate embeddings for the combined text
response = client.embeddings.create(input=list_combined_text, model="text-embedding-3-small", dimensions=512)

embedding_vectors = [d.embedding for d in response.data]
embedding_vectors[:2]

[[0.03746229410171509,
  0.030402367934584618,
  -0.013690895400941372,
  -0.0015493858372792602,
  0.007462074514478445,
  -0.03556773066520691,
  0.021412132307887077,
  0.08193089812994003,
  0.04915138706564903,
  -0.05765904486179352,
  0.04196634888648987,
  0.042502544820308685,
  -0.05869569256901741,
  0.03108154982328415,
  0.02277049794793129,
  0.06452237069606781,
  0.10430818051099777,
  -0.03749804198741913,
  -0.08779331296682358,
  0.06441512703895569,
  -0.05637217313051224,
  0.05848121643066406,
  -0.026702608913183212,
  -0.07714086771011353,
  0.02629152499139309,
  0.038034237921237946,
  -0.04811473935842514,
  0.032529283314943314,
  0.05569298937916756,
  -0.015916112810373306,
  0.003541134065017104,
  -0.023574793711304665,
  0.018516669049859047,
  -0.026041299104690552,
  0.0442541241645813,
  -0.07067076116800308,
  -0.021590864285826683,
  0.10938417911529541,
  -0.033816155046224594,
  0.02857929840683937,
  0.031260281801223755,
  -0.05748031288385391,

Let't put the embedding vectors into our original dataframe

Convert embedding vectors list into a Pyspark DataFrame

In [12]:
features_column_names = [f"embedding_{i}" for i in range(len(embedding_vectors[0]))]
embeddings_df = spark.createDataFrame(embedding_vectors, schema=features_column_names)
embeddings_df.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------

Add unique `row_id` to each row in the pysaprk dataframe

In [13]:
embeddings_df = embeddings_df.repartition(1).withColumn("id", F.monotonically_increasing_id()) 
embeddings_df.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------

Add unique `row_id` to each row in our main pyspark dataframe `df`

In [14]:
df = df.repartition(1).withColumn("id", F.monotonically_increasing_id())
df.show(5)

+----------+--------------------+--------------------+--------------------+---+
|product_id|               title|         description|       combined_text| id|
+----------+--------------------+--------------------+--------------------+---+
|        P0|Men's 3X Large Ca...|This heavyweight,...|Men's 3X Large Ca...|  0|
|        P1|Turmode 30 ft. RP...|If you need more ...|Turmode 30 ft. RP...|  1|
|        P2|Large Tapestry Bo...|Polyester cover r...|Large Tapestry Bo...|  2|
|        P3|16-Gauge-Sinks Ve...|It features a rec...|16-Gauge-Sinks Ve...|  3|
|        P4|Men's Crazy Horse...|This 9 in. black ...|Men's Crazy Horse...|  4|
+----------+--------------------+--------------------+--------------------+---+
only showing top 5 rows



Let's join the two dataframes

In [15]:
df=df.join(embeddings_df, on="id", how="inner").drop("id")
df.show(5)

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-

### Task 3 - Cluster products using K-means

Assemble the 512 Embedding Columns into a Single 'features' Column

In [16]:
# Create a VectorAssembler to combine the embedding columns into a single vector column
vector_assembler = VectorAssembler(inputCols=features_column_names, outputCol="features")
df_vector = vector_assembler.transform(df)
df_vector = df_vector.select("product_id", "title", "description", "features")
df_vector.show(5)

+----------+--------------------+--------------------+--------------------+
|product_id|               title|         description|            features|
+----------+--------------------+--------------------+--------------------+
|        P0|Men's 3X Large Ca...|This heavyweight,...|[0.03746229410171...|
|        P1|Turmode 30 ft. RP...|If you need more ...|[0.03523961082100...|
|        P2|Large Tapestry Bo...|Polyester cover r...|[0.03586056455969...|
|        P3|16-Gauge-Sinks Ve...|It features a rec...|[-0.0583403557538...|
|        P4|Men's Crazy Horse...|This 9 in. black ...|[0.01998496614396...|
+----------+--------------------+--------------------+--------------------+
only showing top 5 rows



Apply K-Means Clustering with 5 Clusters on the `features` Column

In [17]:
#Apply KMeans clustering to the vectorized data
kmeans = KMeans(k=5, featuresCol="features", predictionCol="cluster")
kmeans_model = kmeans.fit(df_vector)
# Make predictions
clustered_data = kmeans_model.transform(df_vector)
clustered_data.show(5)

+----------+--------------------+--------------------+--------------------+-------+
|product_id|               title|         description|            features|cluster|
+----------+--------------------+--------------------+--------------------+-------+
|        P0|Men's 3X Large Ca...|This heavyweight,...|[0.03746229410171...|      1|
|        P1|Turmode 30 ft. RP...|If you need more ...|[0.03523961082100...|      1|
|        P2|Large Tapestry Bo...|Polyester cover r...|[0.03586056455969...|      2|
|        P3|16-Gauge-Sinks Ve...|It features a rec...|[-0.0583403557538...|      3|
|        P4|Men's Crazy Horse...|This 9 in. black ...|[0.01998496614396...|      1|
+----------+--------------------+--------------------+--------------------+-------+
only showing top 5 rows



### Task 4 - Visualize the clusters

Let's reduce the dimensionality of our features for visualization purpose

`512 dimensions => 2 dimensions`

In [18]:
#Reduce the dimensionality of the data using PCA
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(clustered_data)
# Transform the data using PCA
pca_predictions = pca_model.transform(clustered_data)
pca_predictions.show(5)

+----------+--------------------+--------------------+--------------------+-------+--------------------+
|product_id|               title|         description|            features|cluster|        pca_features|
+----------+--------------------+--------------------+--------------------+-------+--------------------+
|        P0|Men's 3X Large Ca...|This heavyweight,...|[0.03746229410171...|      1|[0.17175422895204...|
|        P1|Turmode 30 ft. RP...|If you need more ...|[0.03523961082100...|      1|[-0.1712201838770...|
|        P2|Large Tapestry Bo...|Polyester cover r...|[0.03586056455969...|      2|[-0.0157152123515...|
|        P3|16-Gauge-Sinks Ve...|It features a rec...|[-0.0583403557538...|      3|[0.00184801379850...|
|        P4|Men's Crazy Horse...|This 9 in. black ...|[0.01998496614396...|      1|[-0.0290300480526...|
+----------+--------------------+--------------------+--------------------+-------+--------------------+
only showing top 5 rows



In [19]:
pca_df = pca_predictions.select("product_id", "cluster", "pca_features").toPandas()
pca_df.head(5)

Unnamed: 0,product_id,cluster,pca_features
0,P0,1,"[0.1717542289520449, -0.030842862242463177]"
1,P1,1,"[-0.1712201838770119, 0.13222755896218577]"
2,P2,2,"[-0.01571521235154917, -0.3101509581758502]"
3,P3,3,"[0.0018480137985098844, -0.05692967020347345]"
4,P4,1,"[-0.02903004805265498, 0.03971240652884337]"


Let's plot the Clusters

In [20]:
pca_df['x'] = pca_df['pca_features'].apply(lambda x: x[0])
pca_df['y'] = pca_df['pca_features'].apply(lambda x: x[1])
pca_df.head(5)

Unnamed: 0,product_id,cluster,pca_features,x,y
0,P0,1,"[0.1717542289520449, -0.030842862242463177]",0.171754,-0.030843
1,P1,1,"[-0.1712201838770119, 0.13222755896218577]",-0.17122,0.132228
2,P2,2,"[-0.01571521235154917, -0.3101509581758502]",-0.015715,-0.310151
3,P3,3,"[0.0018480137985098844, -0.05692967020347345]",0.001848,-0.05693
4,P4,1,"[-0.02903004805265498, 0.03971240652884337]",-0.02903,0.039712


In [21]:
def plot_clusters(pca_df, num_clusters=5):
    """
    Plots a 2D visualization of clusters using Plotly Express.

    Parameters:
    - pca_df (DataFrame): A Pandas DataFrame containing columns 'x', 'y', and 'cluster'.
      'x' and 'y' are the 2D PCA components, and 'cluster' indicates the cluster label.
    - num_clusters (int): The number of unique clusters to display.
    - recently_viewed_df (DataFrame, optional): DataFrame with 'x' and 'y' coordinates for recently viewed products.

    This function creates an interactive scatter plot where each point is colored according to its cluster.
    Recently viewed products are marked as black crosses if provided.

    Returns:
    - fig (Figure): The Plotly figure object for the plot.
    """

    # Create the base cluster plot
    fig = px.scatter(
        pca_df,
        x='x',
        y='y',
        opacity=0.6,
        size_max=4,
        color= pca_df.cluster.astype(str),
        title='2D Visualization of Clusters with Recently Viewed Products',
        labels={'x': 'PCA Component 1', 'y': 'PCA Component 2'},
        category_orders={'cluster': list(range(num_clusters))},
        # show the product id in the tooltip
        hover_data={'product_id': True}

    )

    # Update layout to add legend title and adjust plot settings
    fig.update_layout(legend_title_text='Clusters', legend=dict(x=1, y=1), width=600, height=500)

    return fig

fig = plot_clusters(pca_df)
fig.show()

### Task 5 - Highlight recently viewed products

In [25]:
print("The user has recently viewed the following products: ", recently_viewed_products)

The user has recently viewed the following products:  ['P316', 'P333', 'P1115', 'P1691', 'P1082', 'P397', 'P1441', 'P1054']


Let's have a look at the records in our `clustered_data` dataframe related to the recently viewed products.

In [26]:
filtered_data = clustered_data.filter(clustered_data.product_id.isin(recently_viewed_products))
filtered_data.show()

+----------+--------------------+--------------------+--------------------+-------+
|product_id|               title|         description|            features|cluster|
+----------+--------------------+--------------------+--------------------+-------+
|      P316|Mystic Fitz Roy B...|With its distress...|[-0.0203592479228...|      2|
|      P333|Florida Shag Beig...|Lavish natural mo...|[-0.0176252555102...|      2|
|      P397|1 gal. #M250-3 Ap...|BEHR ULTRA SCUFF ...|[-0.0041810967959...|      0|
|     P1054|1 gal. #HDPG60 Mi...|The improved PPG ...|[-0.0088056623935...|      0|
|     P1082|1 qt. #S220-7 Mol...|BEHR ULTRA SCUFF ...|[-0.0200616624206...|      0|
|     P1115|Modern Gray/Multi...|This Modern Gray/...|[-0.0280533097684...|      2|
|     P1441|1 qt. #PPU6-06 Ho...|BEHR PREMIUM PLUS...|[-0.0061895805411...|      0|
|     P1691|Genet Rust/Red-Br...|Add a refreshing ...|[-0.0354774855077...|      2|
+----------+--------------------+--------------------+--------------------+-

In [27]:
unique_clusters = filtered_data.select("cluster").distinct().rdd.flatMap(lambda x:x).collect()
unique_clusters

[2, 0]

### Task 6 - Recommend products based on recently viewed products

Let's have a look at the recently viewed products titles

Let's see the distinct clusters of the recenetly viewed products.

In [28]:
filtered_data.select("title").rdd.flatMap(lambda x: x).collect()

["Mystic Fitz Roy Beige 9' 0 x 12' 0 Area Rug",
 'Florida Shag Beige/Multi 3 ft. x 5 ft. Floral Area Rug',
 '1 gal. #M250-3 Apple Turnover Extra Durable Flat Interior Paint & Primer',
 '1 gal. #HDPG60 Misty Emerald Lake Flat Interior Paint and Primer',
 '1 qt. #S220-7 Molasses Extra Durable Flat Interior Paint & Primer',
 'Modern Gray/Multi 9 ft. x 12 ft. Vibrant Abstract Polyester Area Rug',
 '1 qt. #PPU6-06 Honey Locust Eggshell Enamel Low Odor Interior Paint & Primer',
 'Genet Rust/Red-Brown 8 ft. x 11 ft. Abstract Wool Area Rug']

Let's find the possible products for the recommendation.

In [29]:
possible_recommendations = clustered_data.filter(clustered_data.cluster.isin(unique_clusters)).filter(~clustered_data.product_id.isin(recently_viewed_products))
possible_recommendations.show()

+----------+--------------------+--------------------+--------------------+-------+
|product_id|               title|         description|            features|cluster|
+----------+--------------------+--------------------+--------------------+-------+
|        P2|Large Tapestry Bo...|Polyester cover r...|[0.03586056455969...|      2|
|        P6|5 gal. #650C-2 Po...|BEHR PRO i300 Sem...|[0.00177027715835...|      0|
|       P11|1 qt. #350F-7 Wil...|BEHR PREMIUM PLUS...|[0.00107431644573...|      0|
|       P16|5 gal. #BL-W10 Ma...|BEHR PREMIUM PLUS...|[-0.0082466555759...|      0|
|       P18|1 qt. #M400-5 Bab...|BEHR PREMIUM PLUS...|[0.00725177722051...|      0|
|       P21|Whimsicle Blue Mu...|In true bohemian ...|[0.03499031439423...|      2|
|       P24|5-gal. #HDGO64U C...|The Glidden 5-gal...|[-0.0455830655992...|      0|
|       P26|5 gal. #W-B-320 W...|BEHR ULTRA SCUFF ...|[0.00336035597138...|      0|
|       P30|1 qt. Bermuda San...|This Glidden Exte...|[-0.0577026866376...| 

Let's perform a groupby and generate a list of product IDs that can be recommended for each of the clusters.

In [33]:
recommendations = possible_recommendations.groupby("cluster").agg(F.collect_list("product_id").alias("recommended_products"))
recommendations.show()                                                                  

+-------+--------------------+
|cluster|recommended_products|
+-------+--------------------+
|      2|[P2, P21, P52, P7...|
|      0|[P6, P11, P16, P1...|
+-------+--------------------+



In [35]:
#Convert to pandas and apply random recommendations 
recommendations_df = recommendations.toPandas()
recommendations_df['recommended_products'] = recommendations_df['recommended_products'].apply(lambda x: np.random.choice(x, size=5, replace=False).tolist())
recommendations_df.head()

Unnamed: 0,cluster,recommended_products
0,2,"[P1146, P383, P180, P1667, P1448]"
1,0,"[P1503, P1693, P1830, P887, P1632]"


In [37]:
# write a python function to display the recommendations
def display_recommendations(row):
    # find the title of the product in df
    product_ids = row['recommended_products']
    cluster = row.cluster

    titles = df.filter(df["product_id"].isin(product_ids)).select("title").collect()

    print("\n")
    print("Recommendations for Cluster:", cluster)
    for title in titles:
        print(title[0])

recommendations_df.apply(display_recommendations, axis=1)
# End of the notebook



Recommendations for Cluster: 2
Vintage Hamadan Blue/Black 5 ft. x 8 ft. Floral Medallion Area Rug
Margie Tribal Fringe Black 8 ft. x 10 ft. Area Rug
Courtyard Natural/Red 2 ft. x 14 ft. Floral Indoor/Outdoor Runner Rug
Evie 1 Gold 5 ft. x 7 ft. 6 in. Area Rug
Watercolor Green/Fuchsia 2 ft. x 6 ft. Abstract Runner Rug


Recommendations for Cluster: 0
1 qt. PPG1231-6 Azure Tide Satin Interior Latex Paint
1 gal. #BXC-72 Evergreen Trail Satin Enamel Exterior Paint & Primer
5 gal. #P280-2 Gold Thread Eggshell Enamel Interior Paint & Primer
1 gal. #470C-2 Winter Fresh Eggshell Enamel Low Odor Interior Paint & Primer
1 gal. #S140-5 Red Gerbera Eggshell Enamel Low Odor Interior Paint & Primer


0    None
1    None
dtype: object