In [0]:
from pyspark.sql.functions import current_date,col

today_df = spark.table("layers.silver.book_master") \
    .filter(col("scrape_date") == current_date())


today_df.display()

In [0]:
import mlflow
import mlflow.sklearn
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, lag, col, when, lead
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

mlflow.set_registry_uri("databricks-uc")
mlflow.set_experiment("/Workspace/Users/jennoronha28@gmail.com/Buy_Wait") 

# 1. CHECK BASE DATA FIRST

base_df = spark.table("layers.silver.book_master")
print(f"Total records in base table: {base_df.count()}")
print("Schema:")
base_df.printSchema()


# 2. FEATURE ENGINEERING

w_avg = (Window
            .partitionBy("source", "book_name")      
            .orderBy("scrape_date")
            .rowsBetween(-7, -1))

w_lag = (Window
            .partitionBy("source", "book_name")
            .orderBy("scrape_date"))

# Create features step by step (easier to debug)
feature_df = base_df \
    .withColumn("avg_7d_price", avg("price").over(w_avg)) \
    .withColumn("lag_1_price", lag("price", 1).over(w_lag)) \
    .withColumn("lag_7_price", lag("price", 7).over(w_lag)) \
    .withColumn("pct_change_1d", (col("price") - col("lag_1_price")) / col("lag_1_price") * 100) \
    .withColumn("pct_change_7d", (col("price") - col("lag_7_price")) / col("lag_7_price") * 100) \
    .withColumn("pct_vs_avg", (col("price") - col("avg_7d_price")) / col("avg_7d_price") * 100) \
    .withColumn("future_price", lead("price", 7).over(w_lag)) \
    .withColumn("label", when(col("future_price") > col("price"), 1).otherwise(0))

# Check feature_df
print(f"\nFeature DF record count: {feature_df.count()}")
feature_df.select("book_name", "source", "scrape_date", "price", "avg_7d_price", "lag_7_price", "label").show(5)


# 3. PREPARE TRAINING DATA (filter nulls)
feature_columns = ["price", "avg_7d_price", "lag_1_price", "lag_7_price", 
                   "pct_change_1d", "pct_change_7d", "pct_vs_avg"]

train_feature_df = feature_df.dropna(subset=feature_columns + ["label"])
print(f"\nTraining records (after dropna): {train_feature_df.count()}")

# Convert to Pandas
train_df = train_feature_df.select(feature_columns + ["label"]).toPandas()

X = train_df[feature_columns]
y = train_df["label"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print(f"Training samples: {len(X_train)}, Test samples: {len(X_test)}")
print(f"Label distribution:\n{y.value_counts()}")


# 4. TRAIN AND LOG MODEL

catalog_name = "layers"
schema_name = "gold"
model_name = "buy_wait_model"
registered_model_name = f"{catalog_name}.{schema_name}.{model_name}"

with mlflow.start_run(run_name="buy_wait_classifier") as run:
    mlflow.set_tag("created_by", "Janishia")
    mlflow.set_tag("model_type", "classification")
    mlflow.set_tag("description", "Buy and Wait Model for Book Prices")
    
    rf = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
    rf.fit(X_train, y_train)
    
    y_pred = rf.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("max_depth", 10)
    mlflow.log_param("features", str(feature_columns))
    
    mlflow.sklearn.log_model(
        rf,
        artifact_path="model",
        registered_model_name=registered_model_name,
        input_example=X_train.head(1)
    )
    
    print(f"âœ“ Model registered: {registered_model_name}")
    print(f"âœ“ Accuracy: {accuracy:.2%}")
    print(f"âœ“ Run ID: {run.info.run_id}")
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, target_names=["WAIT", "BUY"]))


# 5. MAKE PREDICTIONS - FIXED TO GET CORRECT PRICE & URL

model_uri = f"models:/{registered_model_name}/1"
loaded_model = mlflow.sklearn.load_model(model_uri)

# Get today's data - select columns EXPLICITLY to ensure correct values
target_date = current_date()

today_features = (feature_df
    .filter(col("scrape_date") == target_date)
    .filter(col("avg_7d_price").isNotNull())  # Only rows with enough history
    .filter(col("lag_7_price").isNotNull())
    .select(
        col("book_name"),
        col("source"), 
        col("url"),
        col("price").alias("current_price"),  # Rename to be clear this is today's price
        col("avg_7d_price"),
        col("lag_1_price"),
        col("lag_7_price"),
        col("pct_change_1d"),
        col("pct_change_7d"),
        col("pct_vs_avg")
    )
    .toPandas()
)

if len(today_features) > 0:
    # Create feature matrix for prediction (needs 'price' column name for model)
    X_predict = today_features[["current_price", "avg_7d_price", "lag_1_price", "lag_7_price", 
                                 "pct_change_1d", "pct_change_7d", "pct_vs_avg"]].copy()
    X_predict.columns = feature_columns  # Rename to match model's expected columns
    
    # Predict
    predictions = loaded_model.predict(X_predict)
    probabilities = loaded_model.predict_proba(X_predict)
    
    today_features["recommendation"] = ["ðŸŸ¢ BUY" if p == 1 else "ðŸ”´ WAIT" for p in predictions]
    today_features["confidence"] = [f"{max(prob)*100:.1f}%" for prob in probabilities]
    
    # Display results
    result_df = today_features[[
        "book_name", "source", "url", "current_price", 
        "avg_7d_price", "pct_vs_avg", "recommendation", "confidence"
    ]]
    
    print(f"\nðŸ“š Recommendations for {target_date}:")
    display(spark.createDataFrame(result_df))
    
    # Also show a nice summary
    print("\nðŸ“Š Summary by Book:")
    for _, row in result_df.iterrows():
        price_status = "below avg âœ“" if row["pct_vs_avg"] < 0 else "above avg"
        print(f"  {row['book_name']} ({row['source']}): â‚¹{row['current_price']:.0f} ({price_status}) â†’ {row['recommendation']} ({row['confidence']})")
        print(f"    URL: {row['url']}")
else:
    print(f"No data for {target_date}")

In [0]:
def get_recommendations(target_date=None):
    """Get buy/wait recommendations for a specific date"""
    from pyspark.sql.functions import current_date, max as spark_max
    
    if target_date is None:
        target_date = feature_df.select(spark_max("scrape_date")).collect()[0][0]
    
    print(f"ðŸ“… Getting recommendations for: {target_date}\n")
    
    today_features = (feature_df
        .filter(col("scrape_date") == target_date)
        .filter(col("avg_7d_price").isNotNull())
        .filter(col("lag_7_price").isNotNull())
        .select(
            col("book_name"), col("source"), col("url"),
            col("price").alias("current_price"),
            col("avg_7d_price"), col("lag_1_price"), col("lag_7_price"),
            col("pct_change_1d"), col("pct_change_7d"), col("pct_vs_avg")
        )
        .toPandas()
    )
    
    if len(today_features) == 0:
        print("No data for this date")
        return
    
    X_predict = today_features[["current_price", "avg_7d_price", "lag_1_price", "lag_7_price", 
                                 "pct_change_1d", "pct_change_7d", "pct_vs_avg"]].copy()
    X_predict.columns = feature_columns
    
    predictions = loaded_model.predict(X_predict)
    probabilities = loaded_model.predict_proba(X_predict)
    
    today_features["recommendation"] = ["ðŸŸ¢ BUY" if p == 1 else "ðŸ”´ WAIT" for p in predictions]
    today_features["confidence"] = [f"{max(prob)*100:.1f}%" for prob in probabilities]
    
    # Show BUY recommendations first
    for _, row in today_features.sort_values("recommendation", ascending=False).iterrows():
        status = "below avg âœ“" if row["pct_vs_avg"] < 0 else "above avg"
        print(f"{row['recommendation']} {row['book_name']} ({row['source']}): â‚¹{row['current_price']:.0f} ({status}) - {row['confidence']}")
        print(f"   {row['url']}\n")

# Run it
get_recommendations()