
## Notebook Summary: Product Recommendation Pipeline

**Steps performed in this notebook:**

1. **Configuration Setup**
   - Defined database and table names, model URI, and parameters for recommendations.

2. **Incremental Run Logic**
   - Checked for previous runs and determined the start time for incremental inference.

3. **Model Loading**
   - Loaded ML model for product ranking using MLflow.

4. **Data Preparation**
   - Loaded sales, customer, and product tables.
   - Derived customer age groups.

5. **User Segmentation**
   - Segmented users into new active, recently active, historical, and cold users.

6. **Recommendation Generation**
   - For active users: Generated ML + rule-based hybrid recommendations.
   - For cold users: Generated context-aware popular product recommendations.

7. **Result Union & Snapshot Write**
   - Combined recommendations and wrote results to the snapshot table.

8. **Preview**
   - Displayed the latest recommendations for review.

---
**Outcome:**  
Automated, incremental product recommendations for users, stored and previewed in a unified table.

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException
import pandas as pd
import mlflow.pyfunc
from datetime import datetime

spark.conf.set("spark.databricks.remoteFiltering.blockSelfJoins", "false")

# ============================================================
# CONFIG
# ============================================================

GOLD_DB   = "kusha_solutions.product_recomendation"
SILVER_DB = "kusha_solutions.product_recomendation"

RECO_TABLE = f"{GOLD_DB}.user_recommendation_snapshots_live"

MODEL_URI     = "models:/kusha_solutions.default.product_recommendation_ranker@prod"
MODEL_VERSION = "v2"

TOP_K = 10
REFRESH_DAYS = 7

# üîí STRICT INFERENCE START (AFTER JAN 5)
INFERENCE_START_TS = "2026-01-06 00:00:00"

# ============================================================
# 0Ô∏è‚É£ DETERMINE INCREMENTAL START TIME (FIXED)
# ============================================================

def table_exists(name):
    try:
        spark.table(name)
        return True
    except AnalysisException:
        return False

if table_exists(RECO_TABLE):
    last_run_ts = (
        spark.table(RECO_TABLE)
             .select(F.max("run_ts").alias("ts"))
             .collect()[0]["ts"]
    )
    run_start_ts = last_run_ts                      # Python datetime
    data_cutoff_ts = last_run_ts                    # Python datetime
    print("üîÅ Incremental run from:", run_start_ts)
else:
    run_start_ts = datetime.fromisoformat(INFERENCE_START_TS)   # Python datetime
    data_cutoff_ts = run_start_ts
    print("üÜï First inference run from:", INFERENCE_START_TS)

refresh_cutoff_ts = F.current_timestamp() - F.expr(f"INTERVAL {REFRESH_DAYS} DAYS")

# ============================================================
# 1Ô∏è‚É£ LOAD MODEL (INFERENCE ONLY)
# ============================================================

model = mlflow.pyfunc.load_model(MODEL_URI)
print("‚úÖ ML model loaded")

# ============================================================
# 2Ô∏è‚É£ LOAD & PREPARE BASE TABLES
# ============================================================

sales = spark.table(f"{GOLD_DB}.gold_sales_enriched")

# ---- Customers with DERIVED AgeGroup ----
customers = (
    spark.table(f"{SILVER_DB}.silver_customers")
         .select("CustomerID", "Age", "Location", "PreferredSeason")
         .withColumn(
             "AgeGroup",
             F.when(F.col("Age").isNull(), "Unknown")
              .when(F.col("Age") < 18, "Under 18")
              .when(F.col("Age").between(18, 24), "18-24")
              .when(F.col("Age").between(25, 34), "25-34")
              .when(F.col("Age").between(35, 44), "35-44")
              .when(F.col("Age").between(45, 54), "45-54")
              .when(F.col("Age").between(55, 64), "55-64")
              .otherwise("65+")
         )
         .distinct()
)

products = spark.table(f"{GOLD_DB}.gold_product_features")

# ============================================================
# 3Ô∏è‚É£ USER SEGMENTATION (STRICT POST‚ÄìJAN 5)
# ============================================================

new_active_users = (
    sales.filter(F.col("EventTime") > F.lit(run_start_ts))
         .select("CustomerID")
         .distinct()
)

recent_active_users = (
    sales.filter(F.col("EventTime") >= refresh_cutoff_ts)
         .filter(F.col("EventTime") > F.lit(INFERENCE_START_TS))
         .select("CustomerID")
         .distinct()
)

historical_active_users = sales.select("CustomerID").distinct()

cold_users = customers.join(
    historical_active_users, "CustomerID", "left_anti"
)

ml_users = new_active_users.union(recent_active_users).distinct()

print("New active users:", new_active_users.count())
print("Recently active users:", recent_active_users.count())
print("Cold users:", cold_users.count())

# ============================================================
# 4Ô∏è‚É£ ML + RULE BASED RECOMMENDATIONS (ACTIVE USERS)
# ============================================================

active_reco_pdf = None

if ml_users.count() > 0:

    fs_df = spark.table(f"{GOLD_DB}.fs_canddiate_features")
    fs_active = fs_df.join(ml_users, "CustomerID", "inner")

    if fs_active.count() > 0:

        pdf = fs_active.toPandas().fillna(0)

        FEATURE_COLS = [
            "src_same_category",
            "src_brand_affinity",
            "src_fbt",
            "src_trending",
            "src_age_group",
            "src_location",
            "user_views",
            "user_carts",
            "user_purchases",
            "recent_7d_interaction",
            "ProductRating",
            "ReviewsCount",
            "DiscountPercent",
            "log_reviews",
            "is_discounted",
            "AvgReviewRating",
            "age_group_encoded",
            "num_sources"
        ]

        pdf["prediction_score"] = model.predict(pdf[FEATURE_COLS])

        pdf["rule_score"] = (
            0.30 * pdf["src_trending"] +
            0.25 * pdf["is_discounted"] +
            0.20 * pdf["src_brand_affinity"] +
            0.10 * pdf["src_fbt"] +
            0.10 * pdf["src_same_category"] +
            0.05 * pdf["src_location"] +
            0.05 * pdf["src_age_group"] +
            0.05 * pdf["num_sources"]
        )

        pdf["final_score"] = 0.7 * pdf["prediction_score"] + 0.3 * pdf["rule_score"]

        product_meta = products.select(
            "ProductID", "ProductName", "Brand"
        ).toPandas()

        pdf = pdf.merge(product_meta, on="ProductID", how="left")

        def build_reason(r):
            reasons = []
            if r.src_trending: reasons.append("Trending")
            if r.is_discounted: reasons.append("Discounted")
            if r.src_brand_affinity: reasons.append("Brand preference")
            if r.src_same_category: reasons.append("Same category")
            if r.src_fbt: reasons.append("Frequently bought together")
            if r.src_location: reasons.append("Popular in your location")
            if r.src_age_group: reasons.append("Popular in your age group")
            return ", ".join(reasons) if reasons else "Personalized ranking"

        pdf["recommendation_reason"] = pdf.apply(build_reason, axis=1)

        topk = (
            pdf.sort_values(["CustomerID", "final_score"], ascending=[True, False])
               .groupby("CustomerID")
               .head(TOP_K)
        )

        active_reco_pdf = (
            topk.groupby("CustomerID")
                .apply(lambda x: [
                    {
                        "product_id": int(r.ProductID),
                        "product_name": r.ProductName,
                        "brand": r.Brand,
                        "score": round(r.final_score, 6),
                        "reason": r.recommendation_reason
                    }
                    for r in x.itertuples()
                ])
                .reset_index(name="recommendations")
        )

        active_reco_pdf["recommendation_type"] = "ML_HYBRID"

# ============================================================
# 5Ô∏è‚É£ CONTEXT-AWARE COLD USER RECOMMENDATIONS
# ============================================================

cold_reco_pdf = None

if cold_users.count() > 0:

    cold_pdf = cold_users.toPandas()
    product_pdf = products.select(
        "ProductID", "ProductName", "Brand", "ProductRating", "ReviewsCount"
    ).toPandas()

    reco_rows = []

    for _, u in cold_pdf.iterrows():

        top_products = (
            product_pdf
            .sort_values(["ProductRating", "ReviewsCount"], ascending=False)
            .head(TOP_K)
        )

        reco_rows.append({
            "CustomerID": u["CustomerID"],
            "recommendations": [
                {
                    "product_id": int(r.ProductID),
                    "product_name": r.ProductName,
                    "brand": r.Brand,
                    "score": 0.0,
                    "reason": "Popular products for new users"
                }
                for r in top_products.itertuples()
            ],
            "recommendation_type": "COLD_CONTEXTUAL"
        })

    cold_reco_pdf = pd.DataFrame(reco_rows)

# ============================================================
# 6Ô∏è‚É£ UNION + SNAPSHOT WRITE (FIXED)
# ============================================================

frames = []

if active_reco_pdf is not None:
    frames.append(active_reco_pdf)

if cold_reco_pdf is not None:
    frames.append(cold_reco_pdf)

if frames:
    final_df = pd.concat(frames, ignore_index=True)

    final_df["run_ts"] = datetime.now()          # Python datetime
    final_df["model_version"] = MODEL_VERSION
    final_df["data_cutoff_ts"] = data_cutoff_ts # Python datetime ‚úÖ

    spark.createDataFrame(final_df) \
         .write \
         .mode("append") \
         .saveAsTable(RECO_TABLE)

    print("‚úÖ Recommendations generated & stored successfully")
else:
    print("‚ö†Ô∏è No users to recommend in this run")

# ============================================================
# 7Ô∏è‚É£ PREVIEW
# ============================================================

display(
    spark.table(RECO_TABLE)
         .orderBy(F.col("run_ts").desc())
         .limit(5)
)


In [0]:
df = spark.table("kusha_solutions.product_recomendation.user_recommendation_snapshots_live")
display(df)