## Importing SQL Table via SparkSQL

In [0]:
%pip install mlflow
dbutils.library.restartPython()

In [0]:
import mlflow
import mlflow.sklearn
import joblib
import os
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
from sklearn.model_selection import ParameterGrid, cross_val_score

In [0]:

df = spark.sql("SELECT * FROM workspace.default.train")
df.show(5)
df.printSchema()


## Preliminary EDA with SparkSQL

### Checking total number of records and column names



In [0]:
# Number of records
print("Total records:", df.count())

# Column list
print("Columns:", df.columns)

### Analyzing class distribution for delivery status (on time vs delayed)


In [0]:
# Count of each class in Reached.on.Time_Y.N
df.groupBy("`Reached.on.Time_Y.N`").count().show()


### Viewing summary statistics for numeric features


In [0]:
# Describe numeric fields like Weight_in_gms and Customer_rating
df.select("Weight_in_gms", "Customer_rating").describe().show()


### Exploring delivery delay distribution across warehouse blocks and modes of shipment


In [0]:
# Delays grouped by warehouse block
df.groupBy("Warehouse_block", "`Reached.on.Time_Y.N`").count().orderBy("Warehouse_block").show()

# Delays grouped by mode of shipment
df.groupBy("Mode_of_Shipment", "`Reached.on.Time_Y.N`").count().orderBy("Mode_of_Shipment").show()


### Checking average shipment weight based on delivery delay status


In [0]:
from pyspark.sql.functions import avg

df.groupBy("`Reached.on.Time_Y.N`").agg(avg("Weight_in_gms").alias("Avg_Weight")).show()


### What Did We Learn from the Data?

- A majority of deliveries (~60%) were delayed (`6563` out of `10999`).
- Weight appears to correlate with delays: on-time deliveries have a higher average weight (~4168g vs ~3272g).
- Warehouse F has the highest number of both on-time and delayed deliveries, suggesting it handles more volume overall.
- Shipments by ship show a higher delay count compared to flights and roads — could indicate slower or more unreliable logistics mode.
- Customer ratings are roughly centered around 3 (mean ~2.99), so they may not be a strong predictive feature, but we’ll include them for now.

## Building the ML Model

### Converting data to Pandas Dataframe for training (Databricks Community Edition)

In [0]:
selected_cols = [
    "Warehouse_block",
    "Mode_of_Shipment",
    "Product_importance",
    "Customer_rating",
    "Customer_care_calls",
    "Cost_of_the_Product",
    "Prior_purchases",
    "Discount_offered",
    "Weight_in_gms",
    "`Reached.on.Time_Y.N`"
]

# Subset Spark DataFrame and convert to Pandas
pdf = df.select(*selected_cols).toPandas()

# Preview Pandas DataFrame
pdf.head()

### Encoding Categorical Variables

In [0]:
from sklearn.preprocessing import LabelEncoder

df_model = pdf.copy()

categorical_cols = ["Warehouse_block", "Mode_of_Shipment", "Product_importance"]

le = LabelEncoder()
for col in categorical_cols:
    df_model[col] = le.fit_transform(df_model[col])

### Seperating Labels and Features

In [0]:

X = df_model.drop("Reached.on.Time_Y.N", axis=1)
y = df_model["Reached.on.Time_Y.N"]


### Train Test Split

In [0]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


### Defining Sweep Parameters for Model

In [0]:
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier

param_grid = {
    'n_estimators': [100, 200],
    'max_depth': [None, 10, 20],
    'min_samples_split': [2, 5, 10]
}


### Initializing Model and GridSearchCV

In [0]:

rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(estimator=rf,
                           param_grid=param_grid,
                           cv=3,             # 3-fold cross-validation
                           scoring='f1_weighted',
                           n_jobs=-1,
                           verbose=1)


In [0]:
with mlflow.start_run():

    # Log dataset information
    mlflow.log_param("train_samples", len(X_train))
    mlflow.log_param("test_samples", len(X_test))
    mlflow.log_param("n_features", X_train.shape[1])
    mlflow.log_param("target_classes", len(np.unique(y_train)))

    # Log model metadata
    mlflow.log_param("model_type", "RandomForestClassifier")
    mlflow.log_param("cv_folds", grid_search.cv if hasattr(grid_search, 'cv') else 5)

    # === TRACK EACH PARAMETER COMBINATION ===

    param_combinations = list(ParameterGrid(grid_search.param_grid))
    mlflow.log_param("total_combinations", len(param_combinations))
    
    print(f"Testing {len(param_combinations)} parameter combinations...")
    
    best_score = -1
    best_params = None
    best_model = None
    
    # Manually iterate through each parameter combination
    for i, params in enumerate(param_combinations):
        print(f"Testing combination {i+1}/{len(param_combinations)}: {params}")
        
        # Create model with current parameters
        from sklearn.ensemble import RandomForestClassifier
        rf_model = RandomForestClassifier(**params, random_state=42)
        
        # Perform cross-validation for this combination
        cv_scores_acc = cross_val_score(rf_model, X_train, y_train, 
                                       cv=grid_search.cv if hasattr(grid_search, 'cv') else 5, 
                                       scoring='accuracy')
        cv_scores_f1 = cross_val_score(rf_model, X_train, y_train, 
                                      cv=grid_search.cv if hasattr(grid_search, 'cv') else 5, 
                                      scoring='f1_weighted')
        
        # Calculate mean scores
        mean_cv_acc = cv_scores_acc.mean()
        mean_cv_f1 = cv_scores_f1.mean()
        std_cv_acc = cv_scores_acc.std()
        std_cv_f1 = cv_scores_f1.std()
        
        # Log metrics for this parameter combination (using step for time series)
        mlflow.log_metric("cv_accuracy_mean", mean_cv_acc, step=i)
        mlflow.log_metric("cv_f1_mean", mean_cv_f1, step=i)
        mlflow.log_metric("cv_accuracy_std", std_cv_acc, step=i)
        mlflow.log_metric("cv_f1_std", std_cv_f1, step=i)
        
        # Log individual fold scores
        for fold_idx, (acc_score, f1_score) in enumerate(zip(cv_scores_acc, cv_scores_f1)):
            mlflow.log_metric(f"fold_{fold_idx}_accuracy", acc_score, step=i)
            mlflow.log_metric(f"fold_{fold_idx}_f1", f1_score, step=i)
        
        # Log parameters for this combination
        param_string = "_".join([f"{k}={v}" for k, v in params.items()])
        mlflow.log_param(f"combination_{i}_params", param_string)
        
        # Track best model
        if mean_cv_acc > best_score:  # You can change this to f1 if preferred
            best_score = mean_cv_acc
            best_params = params
            best_model = rf_model
        
        print(f"  → CV Accuracy: {mean_cv_acc:.4f} (±{std_cv_acc:.4f})")
        print(f"  → CV F1: {mean_cv_f1:.4f} (±{std_cv_f1:.4f})")
    
    # Train the best model on full training data
    print(f"\nBest parameters found: {best_params}")
    best_model.fit(X_train, y_train)
    
    # Log best parameters
    for param_name, param_value in best_params.items():
        mlflow.log_param(f"best_{param_name}", param_value)
    
    mlflow.log_metric("best_cv_score", best_score)
    
    # Make predictions with best model
    best_rf = best_model
    y_pred_tuned = best_rf.predict(X_test)
    
    from sklearn.metrics import f1_score  # Re-import to override the float

    # Evaluate performance on test set
    acc = accuracy_score(y_test, y_pred_tuned)
    f1_score_value = f1_score(y_test, y_pred_tuned, average='weighted')

    # Log final test metrics
    mlflow.log_metric("final_test_accuracy", acc)
    mlflow.log_metric("final_test_f1_score", f1_score_value)

    # Log model with input signature
    signature = infer_signature(X_train, y_pred_tuned)

    mlflow.sklearn.log_model(
        sk_model=best_rf, 
        artifact_path="model",
        signature=signature
    )

    # Print final results
    print(f"\n{'='*60}")
    print("FINAL RESULTS:")
    print(f"Best CV Score: {best_score:.4f}")
    print(f"Final Test Accuracy: {acc:.4f}")
    print(f"Final Test F1-score: {f1:.4f}")
    print("Model logged to MLflow")
    print(f"All {len(param_combinations)} parameter combinations tracked!")
    print(f"{'='*60}")

### Visualizing Feature importance

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Get feature importances from the best model
importances = best_rf.feature_importances_
feature_names = X_train.columns

# Create a DataFrame for easier plotting
feature_importance_df = pd.DataFrame({
    'Feature': feature_names,
    'Importance': importances
}).sort_values(by='Importance', ascending=False)

# Plot
plt.figure(figsize=(10, 6))
sns.barplot(x='Importance', y='Feature', data=feature_importance_df)
plt.title("Feature Importances - Best Random Forest Model")
plt.xlabel("Importance Score")
plt.ylabel("Feature")
plt.tight_layout()
plt.show()


### Feature Importance Insights (Tuned Random Forest)

- Discount Offered and Weight in gms are the most influential features, together accounting for over 70% of the model’s predictive power. This indicates that heavier shipments and larger discounts are strongly associated with delivery delays.
- Cost of the Product and Prior Purchases also contribute meaningfully, suggesting pricing and customer loyalty may influence delivery performance.
- Features such as Mode of Shipment, Product Importance, and Customer Rating had minimal impact on predictions, implying they may be less predictive in this dataset.
