# Royal Cybers: End-to-End Machine Learning Pipeline for Personalized recommendations in Databricks 

## THIS IS THE MAIN NOTEBOOK (not including EDA)

In [0]:

import importlib
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from scripts.data_cleaning import clean_cosmetic_df, clean_reviews_df
from scripts.feature_engineering import add_customer_engagement, add_predictor_features, process_reviews_df
from scripts.data_transformation import transform_cosmetic_data, transform_reviews_data
from pyspark.sql.functions import col
from scripts.EDA import perform_eda
!pip install mlxtend
!pip install rapidfuzz
from scripts.baseline_model import generate_cosine_sim_recs, run_fp_growth, run_als_recommender
from delta.tables import DeltaTable
from pyspark.sql.functions import size, substring
from pyspark.sql.functions import lit
from scripts.schema_definitions import (
    expected_cosmetic_schema,
    expected_reviews_schema,
    expected_mapping_schema
)
from scripts.quality_check.data_quality_check import (
    check_schema,
    check_null_values,
    check_duplicate_rows,
    check_data_types,
    check_row_count
)
from scripts.quality_check.baseline_model_quality_check import (
    check_cosine_sim_recs_output,
    check_fp_growth_output,
    check_als_output
)


## Load Dataset

In [0]:
# Load Dataset from S3 Bucket
spark = SparkSession.builder.appName("E-Commerce Pipeline").getOrCreate()

cosmetic_store_data_path = "s3://e-commerce-pipeline-dataset/Cosmetic Store Website Data.csv"
reviews_data_path = "s3://e-commerce-pipeline-dataset/nyka_top_brands_cosmetics_product_reviews.csv"
product_mapping_path = "s3://e-commerce-pipeline-dataset/unique_product_id_pairings.csv"

In [0]:
cosmetic_df = spark.read.csv(cosmetic_store_data_path, header=True, inferSchema=True)
reviews_df = spark.read.csv(reviews_data_path, header=True, inferSchema=True)
mapping_df = spark.read.csv(product_mapping_path, header=True, inferSchema=True)

## Data Check

In [0]:
# ---------------------------------------------
# Cosmetic DataFrame Checks
# ---------------------------------------------
print("=== Checking cosmetic_df ===")
schema_ok = check_schema(cosmetic_df, expected_cosmetic_schema)
if schema_ok:
    check_data_types(cosmetic_df, expected_cosmetic_schema)
    check_null_values(cosmetic_df, threshold=0.5)
    check_duplicate_rows(cosmetic_df, subset_columns=["event_time", "product_id", "user_id"])
    check_row_count(cosmetic_df, min_count=1)

# ---------------------------------------------
# Reviews DataFrame Checks
# ---------------------------------------------
print("\n=== Checking reviews_df ===")
schema_ok = check_schema(reviews_df, expected_reviews_schema)
if schema_ok:
    check_data_types(reviews_df, expected_reviews_schema)
    check_null_values(reviews_df, threshold=0.5)
    check_duplicate_rows(reviews_df, subset_columns=["review_id"])
    check_row_count(reviews_df, min_count=1)

# ---------------------------------------------
# Mapping DataFrame Checks
# ---------------------------------------------
print("\n=== Checking mapping_df ===")
schema_ok = check_schema(mapping_df, expected_mapping_schema)
if schema_ok:
    check_data_types(mapping_df, expected_mapping_schema)
    check_null_values(mapping_df, threshold=0.5)
    check_duplicate_rows(mapping_df, subset_columns=["product_id_events", "product_id_reviews"])
    check_row_count(mapping_df, min_count=1)

## Clean Data

In [0]:
# Sometimes the scripts dont get updated here, so this should do it
import importlib
from scripts import data_cleaning

# Reload the module
importlib.reload(data_cleaning)
from scripts.data_cleaning import clean_cosmetic_df, clean_reviews_df

In [0]:
cosmetic_df = clean_cosmetic_df(cosmetic_df)
reviews_df = clean_reviews_df(reviews_df)

In [0]:
cosmetic_df.show(5)
reviews_df.show(5)

## Product Matching

In [0]:
# Rename columns for consistency
reviews_df = reviews_df.withColumnRenamed("product_id", "review_product_id")
cosmetic_df = cosmetic_df.withColumnRenamed("product_id", "cosmetic_product_id")
mapping_df = mapping_df.withColumnRenamed("product_id_reviews", "review_product_id")
mapping_df = mapping_df.withColumnRenamed("product_id_events", "cosmetic_product_id")

In [0]:
# reviews_df = reviews_df.select("review_product_id", "review_title", "review_text")
# mapping_df = mapping_df.select("review_product_id", "cosmetic_product_id")
# cosmetic_df = cosmetic_df.select("cosmetic_product_id", "event_type")

In [0]:
reviews_mapped_df = reviews_df.join(mapping_df, ["review_product_id"], "inner")
final_mapped_df = reviews_mapped_df.join(cosmetic_df, ["cosmetic_product_id"], "inner")

In [0]:
final_mapped_df = final_mapped_df.dropDuplicates(["review_product_id", "cosmetic_product_id"])

In [0]:
# Check if the join worked
print(f"✅ Total Mapped Reviews: {reviews_mapped_df.count()}")
print(f"✅ Total Mapped Final Products: {final_mapped_df.count()}")
final_mapped_df.show(5, truncate=False)

## Unity Catalog Paths

In [0]:
%sql
-- CREATE CATALOG ecommerces_catalog;
-- CREATE SCHEMA ecommerces_catalog.recommendation_schema;

In [0]:
# Unity Catalog table names
cosmetic_store_table = "ecommerces_catalog.recommendation_schema.cosmetic_store_data"
reviews_table = "ecommerces_catalog.recommendation_schema.product_reviews"
mapping_table = "ecommerces_catalog.recommendation_schema.product_mapping"

In [0]:
# Load Data into Unity Catalog
cosmetic_df.write.format("delta").mode("overwrite").saveAsTable(cosmetic_store_table)
reviews_df.write.format("delta").mode("overwrite").saveAsTable(reviews_table)
mapping_df.write.format("delta").mode("overwrite").saveAsTable(mapping_table)

In [0]:
# Load data from Unity Catalog
cosmetic_df = spark.read.table(cosmetic_store_table)
reviews_df = spark.read.table(reviews_table)
mapping_df = spark.read.table(mapping_table)

## Data Transformation

In [0]:
# Sometimes the scripts dont get updated here, so this should do it
import importlib
from scripts import data_transformation

# Reload the module
importlib.reload(data_transformation)
from scripts.data_transformation import transform_cosmetic_data, transform_reviews_data

In [0]:
transformed_cosmetic_df = transform_cosmetic_data(cosmetic_df)

In [0]:
transformed_cosmetic_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(cosmetic_store_table)

In [0]:
transformed_cosmetic_df.show(5)


## Feature Engineering

In [0]:
# Sometimes the scripts dont get updated here, so this should do it
import importlib
from scripts import feature_engineering

# Reload the module
importlib.reload(feature_engineering)
from scripts.feature_engineering import process_reviews_df, add_customer_engagement, add_predictor_features

### Process unstructured data (reviews)

In [0]:
reviews_df = process_reviews_df(reviews_df)

### Add outcome variable (Y)

In [0]:
cosmetic_df = add_customer_engagement(cosmetic_df)

### Add predictor variables (X)

In [0]:
cosmetic_df = add_predictor_features(cosmetic_df)

## Baseline Model for Product Recommendation

In [0]:
# Sometimes the scripts dont get updated here, so this should do it
import importlib
from scripts import baseline_model

# Reload the module
importlib.reload(baseline_model)
from scripts.baseline_model import generate_cosine_sim_recs, run_fp_growth, run_als_recommender

In [0]:
purchase_df = transformed_cosmetic_df.filter(transformed_cosmetic_df['event_type'] == 'purchase')
purchase_df = purchase_df.withColumn("product_quantity", lit(1))

In [0]:
# Convert PySpark DataFrame to pandas DataFrame
purchase_df_pandas = purchase_df.toPandas()

In [0]:
# Generate Cosine Similarity Recommendations
prod_recs = generate_cosine_sim_recs(
    df=purchase_df_pandas,
    filename="product_recs_cosine_similarity.csv",
    rows="user_session",
    cols="cosmetic_product_id",
    quantity="product_quantity",
    top=11
)


In [0]:
# Quality Check
check_cosine_sim_recs_output(prod_recs)

In [0]:
# Save Recommendations to Unity Catalog
prod_recs_spark = spark.createDataFrame(prod_recs)
prod_recs_spark = prod_recs_spark.select(
    *[col(c).alias(c.replace(" ", "_").replace(".", "_")) for c in prod_recs_spark.columns]
)

prod_recs_spark.write.format("delta").mode("overwrite").saveAsTable("ecommerces_catalog.recommendation_schema.product_recommendations")

In [0]:
# Generate FP-Growth
frequent_itemsets, association_rules = run_fp_growth(
    purchase_df,
    min_support=0.001,
    min_confidence=0.1
)

In [0]:
# Quality Check
check_fp_growth_output(frequent_itemsets, association_rules)

In [0]:
# Save Frequent Itemsets to Unity Catalog
frequent_itemsets_spark = spark.createDataFrame(frequent_itemsets)
frequent_itemsets_spark.write.format("delta").mode("overwrite").saveAsTable("ecommerces_catalog.recommendation_schema.frequent_itemsets")

# Save Association Rules to Unity Catalog
association_rules_spark = spark.createDataFrame(association_rules)
association_rules_spark.write.format("delta").mode("overwrite").saveAsTable("ecommerces_catalog.recommendation_schema.association_rules")

In [0]:
# Gnerate ALS Recommendations
als_model, user_recs, item_recs = run_als_recommender(purchase_df)

In [0]:
# Quality Check
check_als_output(als_model, user_recs, item_recs)

In [0]:
# Save User Recommendations to Unity Catalog
user_rec_spark = spark.createDataFrame(user_recs)
user_rec_spark.write.format("delta").mode("overwrite").saveAsTable("ecommerces_catalog.recommendation_schema.als_user_recs")

# Save Item Recommendations to Unity Catalog
item_recs_spark = spark.createDataFrame(item_recs)
item_recs_spark.write.format("delta").mode("overwrite").saveAsTable("ecommerces_catalog.recommendation_schema.als_item_recs")

In [0]:
# Display results
print(prod_recs.head())
print(frequent_itemsets.head())
print(association_rules.head())
print(user_recs.head())
print(item_recs.head())

## Baseline Model Evaluation

In [0]:
# Sometimes the scripts dont get updated here, so this should do it
import importlib
from scripts import baseline_evaluation

# Reload the module
importlib.reload(baseline_evaluation)
from scripts.baseline_evaluation import evaluate_cosine_model, log_baseline_metrics, calculate_precision_recall_at_k, calculate_precision_recall_frequent_itemsets, evaluate_als_with_mapping

In [0]:
# Evaluate Cosine Similarity Recommendations
precision_cosine, recall_cosine = evaluate_cosine_model(
    recs=prod_recs,
    ground_truth=purchase_df_pandas,
    model_name="Cosine Similarity",
    k=10
)

In [0]:
print(f"Precision: {precision_cosine:}")
print(f"Recall: {recall_cosine:}")

In [0]:
# Evaluate FP_Growth Recommendations
ground_truth = purchase_df_pandas[["user_session", "cosmetic_product_id"]]
precision_fp_growth, recall_fp_growth = calculate_precision_recall_frequent_itemsets(
    frequent_itemsets=frequent_itemsets,
    ground_truth=ground_truth,
    k=10
)


In [0]:
print(f"FP-Growth Precision: {precision_fp_growth:}")
print(f"FP-Growth Recall: {recall_fp_growth:}")

In [0]:
# Evaluate ALS Recommendations
user_mapping = purchase_df_pandas[["user_session"]].drop_duplicates().reset_index(drop=True)
user_mapping["user_session_index"] = user_mapping.index + 1 

precision_als, recall_als = evaluate_als_with_mapping(
    recs=user_recs,
    ground_truth=purchase_df_pandas,
    user_mapping=user_mapping,
    k=10
)

In [0]:
print(f"ALS Precision: {precision_als:}")
print(f"ALS Recall: {recall_als:}")

## Topic Sentiment Analysis

In [0]:
# Sometimes the scripts dont get updated here, so this should do it
import importlib
from scripts import topic_sentiment_analysis

# Reload the module
importlib.reload(topic_sentiment_analysis)
from scripts.topic_sentiment_analysis import process_reviews, analyze_topics

In [0]:
processed_df = process_reviews(final_mapped_df)

In [0]:
processed_df.select("review_text", "topics", "sentiment").show(truncate=False)

In [0]:
# Analyze topic engagement trends
topic_analysis_df = analyze_topics(processed_df)

In [0]:
# Show topic trends
display(topic_analysis_df.limit(5))

## Product Embedding

In [0]:
# Sometimes the scripts dont get updated here, so this should do it
import importlib
from scripts import product_embedding

# Reload the module
importlib.reload(product_embedding)
from scripts.product_embedding import (
    generate_review_embeddings, 
    combine_review_embeddings, 
    generate_topic_sentiment_embeddings, 
    merge_embeddings, 
    clean_aggregated_embeddings
)



In [0]:
# Step 1: Generate review embeddings
review_embedding_df = generate_review_embeddings(processed_df)

In [0]:
# Step 2: Combine review embeddings
review__combine_embedding_df = combine_review_embeddings(review_embedding_df)

In [0]:
# Step 3: Generate topic & sentiment embeddings
topic_embedding_df = generate_topic_sentiment_embeddings(review__combine_embedding_df)


In [0]:
# Step 4: Merge all embeddings into `final_embedding`
combine_embedding_df = merge_embeddings(topic_embedding_df)

In [0]:
# Step 5: Clean final embeddings
final_embedding_df = clean_aggregated_embeddings(combine_embedding_df)

In [0]:
final_embedding_df.printSchema()

In [0]:
final_embedding_df.select("final_embedding").show(5, truncate=False)

In [0]:
# Save embeddings to Unity Catalog
final_embedding_df.write.format("delta").mode("overwrite").saveAsTable("ecommerces_catalog.recommendation_schema.topic_embeddings")

## Add Product Embedding to Baseline Model for Improvements

In [0]:
# Sometimes the scripts dont get updated here, so this should do it
import importlib
from scripts import recommendation_with_embeddings

# Reload the module
importlib.reload(recommendation_with_embeddings)
from scripts.recommendation_with_embeddings import (
    generate_cosine_sim_recs_with_embeddings,
    rerank_fp_growth_rules,
    run_als_recommender_with_embeddings
)

In [0]:
# Assuming reviews_combined is the DataFrame with combined embeddings
product_embeddings_dict = {
    row["review_product_id"]: row["aggregated_embedding"]
    for row in cleaned_embeddings_df.collect()
}


In [0]:
# 1. Cosine Similarity Recommendations
cosine_sim_recs = generate_cosine_sim_recs_with_embeddings(
    product_embeddings_df=cleaned_embeddings_df.toPandas(),
    filename="cosine_similarity_with_embeddings.csv"
)

In [0]:
print(cosine_sim_recs)

In [0]:
# 2. FP-Growth Recommendations with Reranking
frequent_itemsets, association_rules = run_fp_growth(purchase_df) 

In [0]:
reranked_rules = rerank_fp_growth_rules(association_rules, product_embeddings_dict)
print(reranked_rules)

In [0]:
from pyspark.sql.functions import lit

purchase_ids = purchase_df.select("cosmetic_product_id").distinct()
embedding_ids = cleaned_embeddings_df.select("review_product_id").distinct()

default_embedding = [0.0] * 384  # 384-dimensional zero vector
missing_ids_df = purchase_ids.subtract(embedding_ids).withColumn("aggregated_embedding", lit(default_embedding))

# Combine missing IDs with cleaned_embeddings_df
final_embeddings_df = cleaned_embeddings_df.union(missing_ids_df)

## Evaluation 

In [0]:
# Sometimes the scripts dont get updated here, so this should do it
import importlib
from scripts import embedding_evaluation

# Reload the module
importlib.reload(embedding_evaluation)
from scripts.embedding_evaluation import evaluate_precision_recall

In [0]:
from pyspark.sql import functions as F

# Filter actual purchases from the `purchase_df`
actual_purchases = purchase_df.filter(F.col("event_type") == "purchase")
actual_purchases = actual_purchases.select("user_session", "cosmetic_product_id").distinct()


In [0]:
from pyspark.sql.functions import col, explode

# Convert `cosine_sim_recs` to PySpark DataFrame
cosine_recs_df = spark.createDataFrame(cosine_sim_recs)

# Explode recommendations
cosine_recs_df = cosine_recs_df.select(
    col("product_id").alias("recommended_product_id"),
    explode(col("scores")).alias("recommended_score")
)

# Join with actual purchases
cosine_eval = cosine_recs_df.join(
    actual_purchases,
    cosine_recs_df.recommended_product_id == actual_purchases.cosmetic_product_id,
    "left"
)

# Calculate precision and recall
cosine_precision = cosine_eval.filter(col("cosmetic_product_id").isNotNull()).count() / cosine_eval.count()
cosine_recall = cosine_eval.filter(col("cosmetic_product_id").isNotNull()).count() / actual_purchases.count()


In [0]:
# Convert `reranked_rules` to PySpark DataFrame
fp_recs_df = spark.createDataFrame(reranked_rules)

# Explode antecedent and consequent for comparison
fp_recs_df = fp_recs_df.select(
    explode(col("antecedent")).alias("recommended_product_id"),
    col("embedding_similarity")
)

# Join with actual purchases
fp_eval = fp_recs_df.join(
    actual_purchases,
    fp_recs_df.recommended_product_id == actual_purchases.cosmetic_product_id,
    "left"
)

# Calculate precision and recall
fp_precision = fp_eval.filter(col("cosmetic_product_id").isNotNull()).count() / fp_eval.count()
fp_recall = fp_eval.filter(col("cosmetic_product_id").isNotNull()).count() / actual_purchases.count()


In [0]:
print(f"Cosine Similarity - Precision: {cosine_precision:}, Recall: {cosine_recall:}")
print(f"FP-Growth - Precision: {fp_precision:}, Recall: {fp_recall:}")
