In [1]:
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import os
import sys
import pickle

In [2]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
# Dynamic path settings
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath("__file__")))
DATA_DIR = os.path.join(BASE_DIR, "dataset")
TRAIN_TEST_SPLIT_DIR = os.path.join(BASE_DIR, "train_test_split")

In [4]:
# Upload Maps
with open(os.path.join(DATA_DIR, 'user_id_map.pkl'), 'rb') as f:
    user_id_map = pickle.load(f)

with open(os.path.join(DATA_DIR, 'product_id_map.pkl'), 'rb') as f:
    product_id_map = pickle.load(f)

In [5]:
spark = SparkSession.builder \
    .appName("RecommenderSystem") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.cores", "4") \
    .config("spark.python.worker.reuse", "false") \
    .config("spark.local.dir", "/tmp/spark-temp") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .getOrCreate()

In [6]:
# Load test data
train_sales_path = os.path.join(TRAIN_TEST_SPLIT_DIR, "train_sales_data.csv")
test_sales_path = os.path.join(TRAIN_TEST_SPLIT_DIR, "test_sales_data.csv")
train_df = pd.read_csv(train_sales_path)
test_df = pd.read_csv(test_sales_path)

In [7]:
# Check common user and product IDs
common_user_ids = set(train_df['user_id']).intersection(set(test_df['user_id']))
common_product_ids = set(train_df['product_id']).intersection(set(test_df['product_id']))

print(f"Common user IDs: {len(common_user_ids)}")
print(f"Common product IDs: {len(common_product_ids)}")

if len(common_user_ids) == 0 or len(common_product_ids) == 0:
    raise ValueError("No common user IDs or product IDs between training and test sets.")

Common user IDs: 1148
Common product IDs: 1148


In [8]:
# Ensure there are data rows in test data
if test_df.empty:
    raise ValueError("Test DataFrame is empty.")
print("Test DataFrame:\n", test_df.head(), "\n")

Test DataFrame:
    user id                        product id Interaction type   
0      3.0  2c55cae269aebf53838484b0d7dd931a             like  \
1      7.0  40d3cd16b41970ae6872e914aecf2c8e         purchase   
2      8.0  bc178f33a04dbccefa95b165f8b56830             view   
3      9.0  cc2083338a16c3fe2f7895289d2e98fe             like   
4     14.0  82c86a4d24dce5e14303033d7b658b78             view   

            Time stamp  Unnamed: 4  user_id  product_id  interaction_type  
0  2023-10-12 08:00:00         NaN        2         517                 2  
1  2023-10-16 08:00:00         NaN        6         759                 3  
2  2023-10-17 08:00:00         NaN        7        2235                 1  
3  2023-10-18 08:00:00         NaN        8        2405                 2  
4  2023-10-23 08:00:00         NaN       13        1510                 1   



In [9]:
# Convert interaction_type column to numeric
test_df['interaction_type'] = pd.to_numeric(test_df['interaction_type'], errors='coerce')
test_df.dropna(subset=['interaction_type'], inplace=True)

In [10]:
# Check if the dataframe is still empty after conversion and dropping NA values
if test_df.empty:
    raise ValueError("Test DataFrame is empty after converting 'interaction_type' to numeric and dropping NA values.")

In [11]:
# Convert test data to Spark DataFrame
test_data = [Row(user_id=int(row.user_id), product_id=int(row.product_id), interaction_type=float(row['interaction_type'])) for index, row in test_df.iterrows()]
test_df_spark = spark.createDataFrame(test_data)

In [12]:
print("Test DataFrame:\n", test_df_spark.head(), "\n")

Test DataFrame:
 Row(user_id=2, product_id=517, interaction_type=2.0) 



In [13]:
# Load the trained model using Spark's load method
model_path = os.path.join(BASE_DIR, "als_model")
model = ALSModel.load(model_path)

In [14]:
# Making predictions on test data
predictions = model.transform(test_df_spark)

In [15]:
if predictions.head(1) == []:
    raise ValueError("Predictions DataFrame is empty after model.transform(). Ensure your test data has sufficient entries.")

In [16]:
predictions.show()

+-------+----------+----------------+----------+
|user_id|product_id|interaction_type|prediction|
+-------+----------+----------------+----------+
|      2|       517|             2.0| 1.9074912|
|      6|       759|             3.0| 2.9219954|
|      7|      2235|             1.0| 0.8998625|
|      8|      2405|             2.0| 1.9074912|
|     13|      1510|             1.0| 0.8998625|
|     15|      2470|             2.0| 1.9074912|
|     18|       885|             2.0| 1.9074913|
|     20|      1990|             3.0| 2.9219954|
|     23|      1545|             1.0| 0.8998625|
|     25|      1263|             2.0| 1.9074911|
|     26|       589|             3.0|  2.921995|
|     28|      2980|             2.0| 1.9074911|
|     33|       431|             1.0| 0.8998625|
|     37|       854|             1.0| 0.8998625|
|     39|      2305|             3.0| 2.9219952|
|     41|      1912|             1.0| 0.8998625|
|     48|       929|             2.0| 1.9074911|
|     49|      1976|

In [17]:
# Evaluation metrics
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="interaction_type", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

Root-mean-square error = 0.0911734855821343


In [18]:
# Getting recommendations for users
user_recs = model.recommendForAllUsers(10)
user_recs.show()
user_recs = model.recommendForAllUsers(100).toPandas()


+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{1744, 1.4342929...|
|      3|[{405, 1.3465422}...|
|      5|[{2574, 2.063318}...|
|      6|[{759, 2.9219956}...|
|     12|[{117, 2.0395474}...|
|     13|[{198, 1.5998125}...|
|     16|[{2540, 2.9219954...|
|     19|[{2976, 2.9219952...|
|     20|[{1990, 2.9219954...|
|     22|[{1796, 2.1820056...|
|     26|[{589, 2.921995},...|
|     27|[{2176, 1.4673496...|
|     28|[{313, 2.0426118}...|
|     31|[{1064, 1.5370151...|
|     34|[{275, 2.3007212}...|
|     40|[{723, 2.921995},...|
|     47|[{1268, 1.625262}...|
|     48|[{1197, 2.0176582...|
|     52|[{1084, 2.075826}...|
|     53|[{61, 1.4719193},...|
+-------+--------------------+
only showing top 20 rows



In [19]:
from py2neo import Graph, Node, Relationship

In [20]:
# Connecting to the Neo4j database
graph = Graph("bolt://localhost:7687", auth=("neo4j", "1234123412"))

In [None]:
# Add Recommendations to Neo4j
for index, row in user_recs.iterrows():
    original_user_id = user_id_map[row['user_id']]
    user_node = graph.nodes.match("Customer", id=original_user_id).first()
    recommendations = row['recommendations']
    for rec in recommendations:
        original_product_id = product_id_map[rec['product_id']]
        score = rec['rating']
        product_node = graph.nodes.match("Product", id=original_product_id).first()
        if user_node and product_node:
            recommendation = Relationship(user_node, "RECOMMENDED", product_node, score=score)
            graph.create(recommendation)