**Exploratory Data Analysis (EDA) Summary**

* The sample from [reynolds_apps_wksp_catalog.inventory_analytics.store_sales](#table) shows that `warehouse_id`, `category_id`, and `item_name` are well-populated, with a diverse set of values for each feature.
* The label column `ss_quantity` contains some nulls, which will need to be imputed or removed during preprocessing.
* There is a wide range of item names, indicating high cardinality for this categorical feature. One-hot encoding or target encoding may be required.
* Next steps:
  * Impute missing values in `ss_quantity`.
  * Encode categorical features, especially `item_name`.
  * Prepare the dataset for train/validation/test split.

Proceeding to feature preprocessing and encoding.

In [0]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer

# Load relevant columns from the table, sample 100,000 rows for pandas processing
df = spark.table('reynolds_apps_wksp_catalog.inventory_analytics.store_sales') \
    .select('warehouse_id', 'category_id', 'item_name', 'ss_quantity', 'date') \
    .sample(False, 100000/164427736, seed=42)

pdf = df.toPandas()

# Extract month from date
pdf['month'] = pd.to_datetime(pdf['date']).dt.month

# Impute missing values in ss_quantity with mean
imputer = SimpleImputer(strategy='mean')
pdf['ss_quantity'] = imputer.fit_transform(pdf[['ss_quantity']])
pdf['ss_quantity'] = pdf['ss_quantity'].astype(int)

# One-hot encode categorical features including month
encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
encoded = encoder.fit_transform(pdf[['warehouse_id', 'category_id', 'item_name', 'month']])
encoded_df = pd.DataFrame(encoded, columns=encoder.get_feature_names_out(['warehouse_id', 'category_id', 'item_name', 'month']))

# Combine features and label
final_df = pd.concat([pdf[['warehouse_id', 'category_id', 'item_name', 'month', 'ss_quantity']], encoded_df], axis=1)

final_df.head()

In [0]:
from sklearn.model_selection import train_test_split

# Use the preprocessed pandas DataFrame 'final_df'
raw_feature_cols = ['warehouse_id', 'category_id', 'item_name', 'month']
X = final_df[raw_feature_cols]
y = final_df['ss_quantity']

# First split into train (60%) and temp (40%)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4, random_state=42)
# Then split temp into validation (20%) and test (20%)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

print(f"Train set count: {len(X_train)}")
print(f"Validation set count: {len(X_val)}")
print(f"Test set count: {len(X_test)}")

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS reynolds_apps_wksp_catalog.inventory_analytics.mlflow_vol

In [0]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import mlflow
import numpy as np
from mlflow.models import infer_signature
from sklearn.preprocessing import OneHotEncoder
import pandas as pd

feature_cols = ['warehouse_id', 'category_id', 'item_name', 'month']
encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
X_train_enc = encoder.fit_transform(X_train[feature_cols])
X_val_enc = encoder.transform(X_val[feature_cols])
X_test_enc = encoder.transform(X_test[feature_cols])
rf = RandomForestRegressor(random_state=42, n_jobs=-1)
model = rf.fit(X_train_enc, y_train)
val_pred = model.predict(X_val_enc)
val_rmse = np.sqrt(mean_squared_error(y_val, val_pred))
print(f"Validation RMSE: {val_rmse}")
test_pred = model.predict(X_test_enc)
test_rmse = np.sqrt(mean_squared_error(y_test, test_pred))
print(f"Test RMSE: {test_rmse}")
import mlflow.pyfunc
class DemandForecastingModel(mlflow.pyfunc.PythonModel):
    def __init__(self, model, encoder, feature_cols, train_df):
        self.model = model
        self.encoder = encoder
        self.feature_cols = feature_cols
        self.train_df = train_df
    def predict(self, context, model_input):
        X = pd.DataFrame(model_input)[self.feature_cols]
        # Fallback logic for missing months
        preds = []
        for _, row in X.iterrows():
            enc = self.encoder.transform([row.values])
            try:
                pred = self.model.predict(enc)[0]
            except Exception:
                # Fallback: average for warehouse/category/month
                mask = (self.train_df['warehouse_id'] == row['warehouse_id']) & (self.train_df['category_id'] == row['category_id']) & (self.train_df['month'] == row['month'])
                if mask.any():
                    pred = self.train_df.loc[mask, 'ss_quantity'].mean()
                else:
                    # Fallback: average for warehouse/category
                    mask2 = (self.train_df['warehouse_id'] == row['warehouse_id']) & (self.train_df['category_id'] == row['category_id'])
                    pred = self.train_df.loc[mask2, 'ss_quantity'].mean()
            preds.append(pred)
        return np.array(preds)
input_example = X_train[feature_cols].iloc[:5]
signature = infer_signature(X_train[feature_cols], model.predict(X_train_enc))
if mlflow.active_run():
    mlflow.end_run()
with mlflow.start_run():
    mlflow.pyfunc.log_model(
        artifact_path="model",
        python_model=DemandForecastingModel(model, encoder, feature_cols, pd.concat([X_train, y_train], axis=1)),
        input_example=input_example,
        signature=signature
    )
    mlflow.log_metric("val_rmse", val_rmse)
    mlflow.log_metric("test_rmse", test_rmse)
    mlflow.log_param("model_type", "RandomForestRegressor + OneHotEncoder (pyfunc) + month")
    mlflow.log_param("features", ", ".join(feature_cols))
    mlflow.log_param("label", "ss_quantity")
    print("Model logged to MLflow as pyfunc with month feature and fallback logic.")

In [0]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import mlflow
import numpy as np
from mlflow.models import infer_signature
from sklearn.preprocessing import OneHotEncoder
import pandas as pd

feature_cols = ['warehouse_id', 'category_id', 'item_name', 'month']
encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
X_train_enc = encoder.fit_transform(X_train[feature_cols])
X_val_enc = encoder.transform(X_val[feature_cols])
X_test_enc = encoder.transform(X_test[feature_cols])
rf = RandomForestRegressor(random_state=42, n_jobs=-1)
model = rf.fit(X_train_enc, y_train)
val_pred = model.predict(X_val_enc)
val_rmse = np.sqrt(mean_squared_error(y_val, val_pred))
print(f"Validation RMSE: {val_rmse}")
test_pred = model.predict(X_test_enc)
test_rmse = np.sqrt(mean_squared_error(y_test, test_pred))
print(f"Test RMSE: {test_rmse}")
import mlflow.pyfunc
class DemandForecastingModel(mlflow.pyfunc.PythonModel):
    def __init__(self, model, encoder, feature_cols, train_df):
        self.model = model
        self.encoder = encoder
        self.feature_cols = feature_cols
        self.train_df = train_df
    def predict(self, context, model_input):
        X = pd.DataFrame(model_input)[self.feature_cols]
        preds = []
        for _, row in X.iterrows():
            enc = self.encoder.transform([row.values])
            try:
                pred = self.model.predict(enc)[0]
            except Exception:
                mask = (self.train_df['warehouse_id'] == row['warehouse_id']) & (self.train_df['category_id'] == row['category_id']) & (self.train_df['month'] == row['month'])
                if mask.any():
                    pred = self.train_df.loc[mask, 'ss_quantity'].mean()
                else:
                    mask2 = (self.train_df['warehouse_id'] == row['warehouse_id']) & (self.train_df['category_id'] == row['category_id'])
                    pred = self.train_df.loc[mask2, 'ss_quantity'].mean()
            preds.append(pred)
        return np.array(preds)
# Fix: Cast integer columns to float in input_example for MLflow signature
input_example = X_train[feature_cols].iloc[:5].copy()
for col in ['warehouse_id', 'category_id', 'month']:
    input_example[col] = input_example[col].astype(float)
signature = infer_signature(input_example, model.predict(X_train_enc[:5]))
if mlflow.active_run():
    mlflow.end_run()
with mlflow.start_run():
    mlflow.pyfunc.log_model(
        artifact_path="model",
        python_model=DemandForecastingModel(model, encoder, feature_cols, pd.concat([X_train, y_train], axis=1)),
        input_example=input_example,
        signature=signature
    )
    mlflow.log_metric("val_rmse", val_rmse)
    mlflow.log_metric("test_rmse", test_rmse)
    mlflow.log_param("model_type", "RandomForestRegressor + OneHotEncoder (pyfunc) + month")
    mlflow.log_param("features", ", ".join(feature_cols))
    mlflow.log_param("label", "ss_quantity")
    print("Model logged to MLflow as pyfunc with month feature and fallback logic.")

In [0]:
def get_workspace_url_and_headers():
    from databricks.sdk import WorkspaceClient
    workspace_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().browserHostName().get()
    w = WorkspaceClient()
    headers = w.config.authenticate()
    return workspace_url, headers

In [0]:
import mlflow
import time
import requests
import os

# Set MLflow registry URI to Unity Catalog BEFORE any client/experiment code
mlflow.set_registry_uri('databricks-uc')
os.environ["MLFLOW_DFS_TMP"] = "/Volumes/reynolds_apps_wksp_catalog/inventory_analytics/mlflow_vol"

def get_workspace_url_and_headers():
    from databricks.sdk import WorkspaceClient
    workspace_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().browserHostName().get()
    w = WorkspaceClient()
    headers = w.config.authenticate()
    return workspace_url, headers

workspace_url, headers = get_workspace_url_and_headers()

client = mlflow.tracking.MlflowClient()
experiment = client.get_experiment_by_name(mlflow.get_experiment(mlflow.active_run().info.experiment_id).name) if mlflow.active_run() else None
experiment_id = experiment.experiment_id if experiment else None

runs = mlflow.search_runs(experiment_ids=[experiment_id] if experiment_id else None, order_by=["start_time DESC"], max_results=20)
run_id = None
for _, row in runs.iterrows():
    artifacts = client.list_artifacts(row['run_id'])
    if any(a.path == 'model' for a in artifacts):
        run_id = row['run_id']
        break
if not run_id:
    raise RuntimeError("No run with a logged model artifact found.")
print(f"Using MLflow run ID: {run_id}")

# Register as Unity Catalog model
uc_model_name = "reynolds_apps_wksp_catalog.inventory_analytics.demand_forecasting_model"
result = mlflow.register_model(
    model_uri=f"runs:/{run_id}/model",
    name=uc_model_name
)
print(f"Registered Unity Catalog model version: {result.version}")

# Wait for model to be READY
for _ in range(20):
    model_version_details = client.get_model_version(uc_model_name, result.version)
    status = model_version_details.status
    print(f"Model version status: {status}")
    if status == "READY":
        break
    time.sleep(5)

# Update the serving endpoint to use the new Unity Catalog model version
endpoint_name = "demand-forecasting-endpoint"
payload = {
    "name": endpoint_name,
    "config": {
        "served_models": [
            {
                "model_name": uc_model_name,
                "model_version": str(result.version),
                "workload_type": "CPU",
                "workload_size": "Small",
                "scale_to_zero_enabled": True
            }
        ]
    }
}
endpoint_url = f"https://{workspace_url}/api/2.0/serving-endpoints/{endpoint_name}/config"
response = requests.patch(endpoint_url, headers=headers, json=payload["config"])
if response.status_code == 404:
    response = requests.post(
        f"https://{workspace_url}/api/2.0/serving-endpoints",
        headers=headers,
        json=payload
    )
print(response.json())

In [0]:
# Print the model serving endpoint URL
print(f"Model Serving Endpoint URL: https://{workspace_url}/serving-endpoints/{endpoint_name}/invocations")

In [0]:

# Check endpoint status
status_url = f"https://{workspace_url}/api/2.0/serving-endpoints/{endpoint_name}"
response = requests.get(status_url, headers=headers)
status_info = response.json()

# Print status and readiness
print("Model Serving Endpoint Status")
# print(status_info)

while (status_info.get("state", {}).get("config_update", False)) == 'IN_PROGRESS':
    response = requests.get(status_url, headers=headers)
    
    status_info = response.json()

    print(str(datetime.now()),"Current status:", (status_info.get("state", {}).get("config_update", False)))
    
    time.sleep(120)

final_status = str(status_info.get("state", {}).get("ready"))
if final_status == 'READY':
    print(f"Endpoint is {final_status} for inference.")
else:
    print(status_info)
    raise("Endpoint is not ready for inference")

In [0]:
import requests
import pandas as pd
from datetime import datetime

# Example: Prepare a batch of input data (replace with your actual data)
base_batch = pd.DataFrame([
    {"warehouse_id": 1, "category_id": 9, "item_name": "Gaming Controller"},
    {"warehouse_id": 4, "category_id": 5, "item_name": "Bestselling Novel"}
])

# Add month feature for current and next two months
current_month = datetime.now().month
months = [(current_month + i ) % 12 + 1 for i in range(3)]

batch_data = pd.DataFrame([
    {**row, "month": m}
    for _, row in base_batch.iterrows()
    for m in months
])

# Ensure input types match model signature (float for ids/month)
for col in ["warehouse_id", "category_id", "month"]:
    batch_data[col] = batch_data[col].astype(float)

# Prepare the request payload with raw features
payload = {"dataframe_split": {
    "columns": batch_data.columns.tolist(),
    "data": batch_data.values.tolist()
}}

# print("Payload being sent to endpoint:")
# print(payload)

# Call the serving endpoint
response = requests.post(
    f"https://{workspace_url}/serving-endpoints/{endpoint_name}/invocations",
    headers=headers,
    json=payload
)

# print("Raw response from endpoint:")
# print(response.text)

# Convert the predictions to a pandas DataFrame and include input columns
try:
    result = response.json()
    if isinstance(result, list):
        predictions_df = pd.DataFrame(result, columns=["prediction"])
    elif isinstance(result, dict) and "predictions" in result:
        predictions_df = pd.DataFrame(result["predictions"], columns=["prediction"])
    else:
        predictions_df = pd.DataFrame([result])
    if "prediction" in predictions_df.columns:
        predictions_df["prediction"] = predictions_df["prediction"].round().astype(int)
    output_df = pd.concat([batch_data.reset_index(drop=True), predictions_df.reset_index(drop=True)], axis=1)
    print("Batch inference response as DataFrame (with input columns):")
    display(output_df)
except Exception as e:
    print("Failed to parse response as DataFrame. Raw response:")
    print(response.text)
    raise e