# Regression Model - Purchase Prediction

**Cel:** Zbudowaƒá model predykcyjny do przewidywania warto≈õci zakupu (Purchase)

**Modele do przetestowania:**
1. Linear Regression (baseline)
2. Random Forest Regressor
3. Gradient Boosted Trees (GBT)

**Metryki ewaluacji:**
- RMSE (Root Mean Squared Error)
- MAE (Mean Absolute Error)
- R¬≤ (R-squared)

---

## 1. Setup & Imports

In [None]:
# Imports
import sys
sys.path.append('../')

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, stddev, min as _min, max as _max
from config.spark_config import SparkConfig

# ML imports
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (14, 6)

## 2. Start Spark Session

In [None]:
spark = SparkConfig.get_spark_session("BlackFriday-RegressionModel")
print("‚úì Spark ready!")

## 3. Load Preprocessed Features

**Note:** Try Delta Lake first. If it fails, fall back to Parquet.

In [None]:
# Try loading from Delta Lake
try:
    print("Loading from Delta Lake...")
    df_features = spark.read.format("delta").load("../data/processed/delta/features")
    print("‚úì Loaded from Delta Lake")
except Exception as e:
    print(f"Delta Lake failed: {e}")
    print("\nTrying Parquet format...")
    df_features = spark.read.parquet("../data/processed/parquet/features")
    print("‚úì Loaded from Parquet")

print(f"\nDataset: {df_features.count():,} rows x {len(df_features.columns)} columns")

## 4. Data Preparation

In [None]:
# Select only required columns for ML
ml_data = df_features.select(
    "Purchase",           # Target variable
    "scaled_features"     # Feature vector (scaled)
).withColumnRenamed("scaled_features", "features")

# Check for nulls
null_count = ml_data.filter(col("Purchase").isNull() | col("features").isNull()).count()
print(f"Rows with nulls: {null_count}")

if null_count > 0:
    print("Dropping rows with nulls...")
    ml_data = ml_data.dropna()
    print(f"‚úì Clean dataset: {ml_data.count():,} rows")

# Cache for performance
ml_data.cache()
print("\n‚úì Data cached for faster training")

In [None]:
# Analyze target variable distribution
purchase_stats = ml_data.select(
    count("Purchase").alias("count"),
    avg("Purchase").alias("mean"),
    stddev("Purchase").alias("std"),
    _min("Purchase").alias("min"),
    _max("Purchase").alias("max")
).collect()[0]

print("="*60)
print("TARGET VARIABLE STATISTICS (Purchase)")
print("="*60)
print(f"Count:   {purchase_stats['count']:,}")
print(f"Mean:    ${purchase_stats['mean']:,.2f}")
print(f"Std:     ${purchase_stats['std']:,.2f}")
print(f"Min:     ${purchase_stats['min']:,.2f}")
print(f"Max:     ${purchase_stats['max']:,.2f}")
print("="*60)

In [None]:
# Visualize target distribution
purchase_sample = ml_data.select("Purchase").sample(False, 0.01, seed=42).toPandas()

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Histogram
ax1.hist(purchase_sample['Purchase'], bins=50, edgecolor='black', alpha=0.7, color='skyblue')
ax1.set_title('Purchase Amount Distribution (1% sample)', fontsize=14, fontweight='bold')
ax1.set_xlabel('Purchase ($)')
ax1.set_ylabel('Frequency')
ax1.axvline(purchase_stats['mean'], color='red', linestyle='--', label=f"Mean: ${purchase_stats['mean']:,.0f}")
ax1.legend()

# Box plot
ax2.boxplot(purchase_sample['Purchase'], vert=True)
ax2.set_title('Purchase Amount Box Plot', fontsize=14, fontweight='bold')
ax2.set_ylabel('Purchase ($)')

plt.tight_layout()
plt.show()

## 5. Train/Test Split

In [None]:
# Split data: 80% train, 20% test
train_data, test_data = ml_data.randomSplit([0.8, 0.2], seed=42)

# Cache both sets
train_data.cache()
test_data.cache()

print("="*60)
print("TRAIN/TEST SPLIT")
print("="*60)
print(f"Training set:   {train_data.count():,} rows ({train_data.count() / ml_data.count() * 100:.1f}%)")
print(f"Test set:       {test_data.count():,} rows ({test_data.count() / ml_data.count() * 100:.1f}%)")
print("="*60)

## 6. Model Training

### 6.1 Baseline: Linear Regression

In [None]:
print("\n" + "="*60)
print("TRAINING: LINEAR REGRESSION (Baseline)")
print("="*60)

# Initialize model
lr = LinearRegression(
    featuresCol="features",
    labelCol="Purchase",
    predictionCol="prediction",
    maxIter=100,
    regParam=0.1,
    elasticNetParam=0.0  # L2 regularization
)

# Train model
print("Training...")
lr_model = lr.fit(train_data)
print("‚úì Training complete!")

# Training metrics
print(f"\nTraining Summary:")
print(f"  RMSE: ${lr_model.summary.rootMeanSquaredError:,.2f}")
print(f"  MAE:  ${lr_model.summary.meanAbsoluteError:,.2f}")
print(f"  R¬≤:   {lr_model.summary.r2:.4f}")
print(f"  Iterations: {lr_model.summary.totalIterations}")

In [None]:
# Evaluate on test set
print("\nEvaluating on test set...")
lr_predictions = lr_model.transform(test_data)

# RMSE
evaluator_rmse = RegressionEvaluator(
    labelCol="Purchase",
    predictionCol="prediction",
    metricName="rmse"
)
lr_rmse = evaluator_rmse.evaluate(lr_predictions)

# MAE
evaluator_mae = RegressionEvaluator(
    labelCol="Purchase",
    predictionCol="prediction",
    metricName="mae"
)
lr_mae = evaluator_mae.evaluate(lr_predictions)

# R¬≤
evaluator_r2 = RegressionEvaluator(
    labelCol="Purchase",
    predictionCol="prediction",
    metricName="r2"
)
lr_r2 = evaluator_r2.evaluate(lr_predictions)

print("\n" + "="*60)
print("LINEAR REGRESSION - TEST SET RESULTS")
print("="*60)
print(f"RMSE:  ${lr_rmse:,.2f}")
print(f"MAE:   ${lr_mae:,.2f}")
print(f"R¬≤:    {lr_r2:.4f}")
print("="*60)

### 6.2 Random Forest Regressor

In [None]:
print("\n" + "="*60)
print("TRAINING: RANDOM FOREST REGRESSOR")
print("="*60)

# Initialize model
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="Purchase",
    predictionCol="prediction",
    numTrees=100,
    maxDepth=10,
    minInstancesPerNode=5,
    seed=42
)

# Train model
print("Training (this may take a few minutes)...")
rf_model = rf.fit(train_data)
print("‚úì Training complete!")
print(f"\nTrees trained: {rf_model.getNumTrees}")
print(f"Max depth: {rf_model.getOrDefault('maxDepth')}")

In [None]:
# Evaluate on test set
print("\nEvaluating on test set...")
rf_predictions = rf_model.transform(test_data)

rf_rmse = evaluator_rmse.evaluate(rf_predictions)
rf_mae = evaluator_mae.evaluate(rf_predictions)
rf_r2 = evaluator_r2.evaluate(rf_predictions)

print("\n" + "="*60)
print("RANDOM FOREST - TEST SET RESULTS")
print("="*60)
print(f"RMSE:  ${rf_rmse:,.2f}")
print(f"MAE:   ${rf_mae:,.2f}")
print(f"R¬≤:    {rf_r2:.4f}")
print("="*60)

### 6.3 Gradient Boosted Trees (GBT)

In [None]:
print("\n" + "="*60)
print("TRAINING: GRADIENT BOOSTED TREES (GBT)")
print("="*60)

# Initialize model
gbt = GBTRegressor(
    featuresCol="features",
    labelCol="Purchase",
    predictionCol="prediction",
    maxIter=100,
    maxDepth=5,
    stepSize=0.1,
    seed=42
)

# Train model
print("Training (this may take several minutes)...")
gbt_model = gbt.fit(train_data)
print("‚úì Training complete!")
print(f"\nTrees trained: {gbt_model.getNumTrees}")
print(f"Max depth: {gbt_model.getOrDefault('maxDepth')}")

In [None]:
# Evaluate on test set
print("\nEvaluating on test set...")
gbt_predictions = gbt_model.transform(test_data)

gbt_rmse = evaluator_rmse.evaluate(gbt_predictions)
gbt_mae = evaluator_mae.evaluate(gbt_predictions)
gbt_r2 = evaluator_r2.evaluate(gbt_predictions)

print("\n" + "="*60)
print("GRADIENT BOOSTED TREES - TEST SET RESULTS")
print("="*60)
print(f"RMSE:  ${gbt_rmse:,.2f}")
print(f"MAE:   ${gbt_mae:,.2f}")
print(f"R¬≤:    {gbt_r2:.4f}")
print("="*60)

## 7. Model Comparison

In [None]:
# Create comparison dataframe
results = pd.DataFrame({
    'Model': ['Linear Regression', 'Random Forest', 'Gradient Boosted Trees'],
    'RMSE': [lr_rmse, rf_rmse, gbt_rmse],
    'MAE': [lr_mae, rf_mae, gbt_mae],
    'R¬≤': [lr_r2, rf_r2, gbt_r2]
})

# Sort by RMSE (lower is better)
results = results.sort_values('RMSE')

print("\n" + "="*80)
print("MODEL COMPARISON")
print("="*80)
print(results.to_string(index=False))
print("="*80)

# Determine best model
best_model_name = results.iloc[0]['Model']
best_rmse = results.iloc[0]['RMSE']
best_r2 = results.iloc[0]['R¬≤']

print(f"\nüèÜ Best Model: {best_model_name}")
print(f"   RMSE: ${best_rmse:,.2f}")
print(f"   R¬≤: {best_r2:.4f}")

In [None]:
# Visualize comparison
fig, axes = plt.subplots(1, 3, figsize=(16, 5))

# RMSE comparison
axes[0].bar(results['Model'], results['RMSE'], color=['skyblue', 'coral', 'lightgreen'])
axes[0].set_title('RMSE Comparison (lower is better)', fontsize=12, fontweight='bold')
axes[0].set_ylabel('RMSE ($)')
axes[0].tick_params(axis='x', rotation=45)

# MAE comparison
axes[1].bar(results['Model'], results['MAE'], color=['skyblue', 'coral', 'lightgreen'])
axes[1].set_title('MAE Comparison (lower is better)', fontsize=12, fontweight='bold')
axes[1].set_ylabel('MAE ($)')
axes[1].tick_params(axis='x', rotation=45)

# R¬≤ comparison
axes[2].bar(results['Model'], results['R¬≤'], color=['skyblue', 'coral', 'lightgreen'])
axes[2].set_title('R¬≤ Comparison (higher is better)', fontsize=12, fontweight='bold')
axes[2].set_ylabel('R¬≤')
axes[2].axhline(y=0, color='red', linestyle='--', alpha=0.5)
axes[2].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

## 8. Feature Importance Analysis

**Note:** Only tree-based models (RF, GBT) provide feature importance.

In [None]:
# Get feature importance from Random Forest
print("\n" + "="*60)
print("FEATURE IMPORTANCE (Random Forest)")
print("="*60)

# Get feature names (from original feature engineering)
# Note: This is a simplified version. In production, you'd save feature names with the model.
feature_names = [
    'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status',
    'Product_Category_1', 'Product_Category_2', 'Product_Category_3',
    'user_purchase_count', 'user_total_spent', 'user_avg_purchase', 'user_std_purchase',
    'user_unique_products', 'product_purchase_count', 'product_unique_customers',
    'product_avg_price', 'product_total_revenue', 'product_popularity_score',
    'category_purchase_count', 'category_avg_price', 'rfm_frequency_score',
    'rfm_monetary_score', 'rfm_score', 'purchase_vs_user_avg_ratio',
    'purchase_vs_product_avg_ratio', 'is_high_value_customer'
]

# Get importance scores
rf_importance = rf_model.featureImportances.toArray()

# Create dataframe
importance_df = pd.DataFrame({
    'Feature': feature_names[:len(rf_importance)],  # Match length
    'Importance': rf_importance
}).sort_values('Importance', ascending=False)

# Display top 15
print("\nTop 15 Most Important Features:")
print(importance_df.head(15).to_string(index=False))

In [None]:
# Visualize top 15 features
plt.figure(figsize=(12, 8))
top_features = importance_df.head(15)
plt.barh(top_features['Feature'], top_features['Importance'], color='skyblue')
plt.xlabel('Importance Score', fontsize=12)
plt.ylabel('Feature', fontsize=12)
plt.title('Top 15 Feature Importances (Random Forest)', fontsize=14, fontweight='bold')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

## 9. Prediction Analysis

In [None]:
# Analyze predictions from best model (use RF as example)
print("\n" + "="*60)
print("PREDICTION ANALYSIS (Random Forest)")
print("="*60)

# Sample predictions
sample_predictions = rf_predictions.select("Purchase", "prediction").sample(False, 0.001, seed=42).toPandas()

# Calculate residuals
sample_predictions['residual'] = sample_predictions['Purchase'] - sample_predictions['prediction']
sample_predictions['abs_residual'] = np.abs(sample_predictions['residual'])
sample_predictions['pct_error'] = (sample_predictions['abs_residual'] / sample_predictions['Purchase']) * 100

print(f"\nSample size: {len(sample_predictions):,}")
print(f"\nResidual Statistics:")
print(f"  Mean Absolute Error: ${sample_predictions['abs_residual'].mean():,.2f}")
print(f"  Median Absolute Error: ${sample_predictions['abs_residual'].median():,.2f}")
print(f"  Mean Percentage Error: {sample_predictions['pct_error'].mean():.2f}%")
print(f"  Median Percentage Error: {sample_predictions['pct_error'].median():.2f}%")

In [None]:
# Visualize predictions vs actual
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Scatter plot: Actual vs Predicted
axes[0].scatter(sample_predictions['Purchase'], sample_predictions['prediction'], 
                alpha=0.5, s=20, color='skyblue', edgecolors='black', linewidth=0.5)
axes[0].plot([sample_predictions['Purchase'].min(), sample_predictions['Purchase'].max()],
             [sample_predictions['Purchase'].min(), sample_predictions['Purchase'].max()],
             'r--', lw=2, label='Perfect Prediction')
axes[0].set_xlabel('Actual Purchase ($)', fontsize=12)
axes[0].set_ylabel('Predicted Purchase ($)', fontsize=12)
axes[0].set_title('Actual vs Predicted Values', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Residual plot
axes[1].scatter(sample_predictions['prediction'], sample_predictions['residual'],
                alpha=0.5, s=20, color='coral', edgecolors='black', linewidth=0.5)
axes[1].axhline(y=0, color='red', linestyle='--', lw=2)
axes[1].set_xlabel('Predicted Purchase ($)', fontsize=12)
axes[1].set_ylabel('Residual ($)', fontsize=12)
axes[1].set_title('Residual Plot', fontsize=14, fontweight='bold')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Residual distribution
plt.figure(figsize=(12, 5))
plt.hist(sample_predictions['residual'], bins=50, edgecolor='black', alpha=0.7, color='lightgreen')
plt.axvline(x=0, color='red', linestyle='--', lw=2, label='Zero Error')
plt.xlabel('Residual ($)', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.title('Distribution of Residuals', fontsize=14, fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## 10. Save Best Model

**Note:** On Windows, save as pickle to avoid Hadoop issues.

In [None]:
import pickle
import os

# Create models directory
models_dir = "../models"
os.makedirs(models_dir, exist_ok=True)

# Save model metadata (not the full model, just configuration and metrics)
model_metadata = {
    'model_type': 'RandomForestRegressor',
    'model_name': best_model_name,
    'metrics': {
        'rmse': rf_rmse,
        'mae': rf_mae,
        'r2': rf_r2
    },
    'hyperparameters': {
        'numTrees': rf_model.getNumTrees,
        'maxDepth': rf_model.getOrDefault('maxDepth'),
        'minInstancesPerNode': rf_model.getOrDefault('minInstancesPerNode')
    },
    'feature_importance': importance_df.to_dict('records'),
    'training_date': pd.Timestamp.now().isoformat()
}

# Save metadata
with open(f"{models_dir}/regression_model_metadata.pkl", 'wb') as f:
    pickle.dump(model_metadata, f)

print("="*60)
print("MODEL SAVED")
print("="*60)
print(f"Model metadata saved to: {models_dir}/regression_model_metadata.pkl")
print("\nNote: Full Spark model can be saved with:")
print("  rf_model.save('../models/rf_regression_model')")
print("  (May require proper Hadoop setup on Windows)")
print("="*60)

## 11. Business Insights & Recommendations

In [None]:
print("\n" + "="*80)
print("BUSINESS INSIGHTS & RECOMMENDATIONS")
print("="*80)

print("\n‚úÖ Model Performance:")
print(f"  - Best Model: {best_model_name}")
print(f"  - Prediction Accuracy (R¬≤): {best_r2:.2%}")
print(f"  - Average Error (MAE): ${rf_mae:,.2f}")
print(f"  - Typical prediction is within ¬±${rf_mae:,.2f} of actual purchase")

print("\nüìä Key Insights from Feature Importance:")
print("  Top 3 factors influencing purchase amount:")
for i, row in importance_df.head(3).iterrows():
    print(f"    {i+1}. {row['Feature']}: {row['Importance']:.4f}")

print("\nüéØ Business Applications:")
print("  1. Revenue Forecasting: Predict expected revenue for upcoming campaigns")
print("  2. Customer Targeting: Identify high-value purchase opportunities")
print("  3. Inventory Planning: Forecast demand by product category")
print("  4. Personalized Marketing: Tailor offers based on predicted purchase value")
print("  5. Budget Allocation: Optimize ad spend by predicted customer value")

print("\nüí° Recommendations:")
print("  1. Focus marketing on customers with high RFM scores")
print("  2. Optimize product mix based on category importance")
print("  3. Implement dynamic pricing using purchase predictions")
print("  4. Create targeted campaigns for predicted high-value transactions")
print("  5. Use model to score leads and prioritize sales efforts")

print("\nüîç Model Limitations:")
print(f"  - R¬≤ of {best_r2:.2%} means {(1-best_r2):.2%} of variance is unexplained")
print("  - Consider additional features: time/seasonality, promotions, external factors")
print("  - May not generalize to completely new products or customers")
print("  - Requires periodic retraining as customer behavior evolves")

print("\n" + "="*80)

## 12. Summary

In [None]:
print("\n" + "="*80)
print("‚úÖ REGRESSION MODEL COMPLETE")
print("="*80)

print("\nüìà Models Trained:")
print("  ‚úì Linear Regression (Baseline)")
print("  ‚úì Random Forest Regressor")
print("  ‚úì Gradient Boosted Trees")

print(f"\nüèÜ Best Model: {best_model_name}")
print(f"  - RMSE: ${best_rmse:,.2f}")
print(f"  - MAE: ${rf_mae:,.2f}")
print(f"  - R¬≤: {best_r2:.4f}")

print("\nüíæ Saved:")
print("  ‚úì Model metadata and metrics")
print("  ‚úì Feature importance scores")

print("\nüéØ Next Steps:")
print("  1. Build Clustering Model (Customer Segmentation)")
print("  2. Build Recommendation System (ALS)")
print("  3. Deploy model to production")
print("  4. Setup monitoring and retraining pipeline")

print("\n" + "="*80)

In [None]:
# Optional: Stop Spark
# spark.stop()