In [1]:
import os
import sys

new_path = r'/workspace/src/main/python'
sys.path.append(new_path)
import datetime
def print_time():
    print("Current time:", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

In [2]:
import configparser

import mlflow
import json
import requests
from recommend import ComplementaryProductRecommender
from eval_model import ModelEvaluator
mlflow.__version__

  from .autonotebook import tqdm as notebook_tqdm


'3.9.0'

# Model Performance and Scaling Test

In [34]:
URL = "http://localhost:8000/recommend"
payload = [{"basket": ["678629d50c3e843442e674ae", "68952cf7fd2d7f6215293db1"], "top_k": 5}
            for _ in range(1_000_000)]

In [35]:
%%timeit
requests.post(URL, headers={"Content-Type": "application/json"}, json=payload, timeout=50)

7.3 s ± 121 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### Takes about 7.3 seconds for 1 million requests

# Model Performance Metrics

In [3]:
config = configparser.ConfigParser()
config.read('/workspace/src/main/resources/train_params.ini')

['/workspace/src/main/resources/train_params.ini']

In [4]:
model_dir = config.get("training", "latest_model_dir")
recommender = ComplementaryProductRecommender(model_dir)

2026-02-11 00:19:22,255 - INFO - Loading model from: /workspace/models/latest
2026-02-11 00:19:22,462	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2026-02-11 00:19:22,578	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2026-02-11 00:19:22,706	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2026-02-11 00:19:23,267 - INFO -   ✓ Loaded model weights
2026-02-11 00:19:23,290 - INFO -   Items: 62,138
2026-02-11 00:19:23,291 - INFO -   Factors: 16


In [5]:
import torch
from torchinfo import summary
# from torchviz import make_dot


# 1. Define a batch size for the summary
batch_size = 16

# 2. Create dummy inputs (integers) on the correct device
# We generate random indices within the valid range of users and items
user_ids = torch.randint(0, recommender.n_users, (batch_size,), device=recommender.device)
item_ids = torch.randint(0, recommender.n_items, (batch_size,), device=recommender.device)

# 3. Call summary with input_data
summary(
    recommender.model,
    input_data=[user_ids, item_ids],
    col_names=["input_size", "output_size", "num_params"],
    verbose=0
)

# output = recommender.model(user_ids, item_ids)
# graph = make_dot(output, params=dict(recommender.model.named_parameters()))
# graph.render("SimpleCNN", format="png", cleanup=True)

Layer (type:depth-idx)                   Input Shape               Output Shape              Param #
MatrixFactorization                      [16]                      [16]                      1
├─Embedding: 1-1                         [16]                      [16, 16]                  3,157,328
├─Embedding: 1-2                         [16]                      [16, 16]                  994,208
├─Embedding: 1-3                         [16]                      [16, 1]                   197,333
├─Embedding: 1-4                         [16]                      [16, 1]                   62,138
Total params: 4,411,008
Trainable params: 4,411,008
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 70.58
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 17.64
Estimated Total Size (MB): 17.65

In [8]:
recommender.n_users, recommender.n_items, recommender.model.n_factors

(197333, 62138, 16)

In [9]:
print_time()
model_dir = config.get("training", "latest_model_dir")
test_dir = os.path.join(config.get("data", "train_base"), "test_data")
k = config.getint("evaluation", "ndcg_k", fallback=10)

evaluator = ModelEvaluator(model_dir=model_dir, k=k)
sample_users=2000
metrics = evaluator.evaluate_on_test_data(
    test_dir=test_dir,
    user_col=config.get("model", "user_col"),
    item_col=config.get("model", "item_col"),
    rating_col=config.get("model", "rating_col"),
    sample_users=sample_users
)
print_time()

2026-02-11 00:21:48,515 - INFO - Loading model from: /workspace/models/latest


Current time: 2026-02-11 00:21:48


2026-02-11 00:21:48,718 - INFO -   ✓ Loaded model weights
2026-02-11 00:21:48,736 - INFO -   Users: 197,333
2026-02-11 00:21:48,737 - INFO -   Items: 62,138
2026-02-11 00:21:48,760 - INFO - MODEL EVALUATION
2026-02-11 00:21:48,831 - INFO - Test records: 128,608
2026-02-11 00:21:48,911 - INFO - User-item pairs: 126,828
2026-02-11 00:21:48,912 - INFO - Calculating RMSE on test data...
2026-02-11 00:21:49,059 - INFO - Test RMSE: 1.3788
2026-02-11 00:21:49,069 - INFO - Sampling 2000 users for evaluation
2026-02-11 00:22:49,169 - INFO - EVALUATION RESULTS
2026-02-11 00:22:49,170 - INFO - RMSE: 1.3788
2026-02-11 00:22:49,170 - INFO - Users evaluated: 2,000
2026-02-11 00:22:49,171 - INFO - NDCG@10: 0.0019
2026-02-11 00:22:49,172 - INFO - Precision@10: 0.0011
2026-02-11 00:22:49,172 - INFO - Recall@10: 0.0033


Current time: 2026-02-11 00:22:49


In [10]:
metrics

{'rmse': 1.3787560314472806,
 'ndcg@10': 0.0018945381073924743,
 'precision@10': 0.0011,
 'recall@10': 0.0032975514953456127,
 'num_users_evaluated': 2000,
 'k': 10}

# Market Basket Analysis

In [3]:
from pyspark.sql import SparkSession
import os, sys
# $example on$
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql import functions as F
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.types import DoubleType, FloatType
from pyspark.ml.feature import StringIndexer

# $example off$
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession\
    .builder\
    .appName("ALSExample")\
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.autoBroadcastJoinThreshold", "100m") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "2g") \
    .master("local[8]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
spark.sparkContext.setCheckpointDir("/home/rajde/spark-checkpoints") # Use an absolute path
# verification
print("env PYSPARK_PYTHON:", os.environ.get("PYSPARK_PYTHON"))
print("spark.conf pyspark.python:", spark.conf.get("spark.pyspark.python","<not set>"))
print("sparkContext.pythonVer:", spark.sparkContext.pythonVer)

env PYSPARK_PYTHON: /home/rajde/miniforge3/envs/py312/bin/python
spark.conf pyspark.python: <not set>
sparkContext.pythonVer: 3.12


In [24]:
spark.stop()

NameError: name 'spark' is not defined

In [5]:
surf_events = spark.read.csv('/workspace/data/surfside-eng-ml-take-home-6-recommender.csv', header=True, inferSchema=True)
surf_events.printSchema()

[Stage 1:>                                                          (0 + 8) / 8]

root
 |-- datestamp: timestamp (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- order_value: double (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_brand: string (nullable = true)
 |-- item_price: double (nullable = true)
 |-- item_quantity: integer (nullable = true)
 |-- user_ipaddress: string (nullable = true)
 |-- network_userid: string (nullable = true)
 |-- domain_userid: string (nullable = true)
 |-- domain_sessionid: string (nullable = true)
 |-- domain_sessionidx: integer (nullable = true)
 |-- geo_region: string (nullable = true)
 |-- geo_city: string (nullable = true)
 |-- geo_zipcode: string (nullable = true)
 |-- dvce_ismobile: integer (nullable = true)
 |-- useragent: string (nullable = true)



                                                                                

In [6]:
basket_data = surf_events.groupBy("order_id") \
                         .agg(F.collect_set("item_id").alias("items")).repartition(10)
(training, test) = basket_data.randomSplit([0.8, 0.2])

In [7]:
fp = FPGrowth(minSupport=0.0001, minConfidence=0.0001, itemsCol="items")
fpm = fp.fit(training)
fpm.setPredictionCol("newPrediction")


                                                                                

FPGrowthModel: uid=FPGrowth_79e31baf5be8, numTrainingRecords=132832

In [8]:
fpm.freqItemsets.withColumn("items_count",
                            F.size(F.col("items"))).orderBy(F.col("items_count").desc()).show(20, truncate=False)



+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+-----------+
|items                                                                                                                                                                                                                                                               |freq|items_count|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+-----------+
|[6855f24d8226c2beeda7cd95, 674644845f5592bae28256bc, 6717ba729fc98de9428dca09, 67463cf18c7dde0089442b5b, 6711764f843769b7baef3c0e, 6717b80719ea5159083b42ea, 67

                                                                                