# Sales Forecasting Model with Snowpark ML

This notebook trains an XGBoost regression model to predict sales amounts based on:
- Date features (month, day of week)
- Region
- Product category

The model is registered in Snowflake's Model Registry and can be called as a tool from the Snowflake Intelligence Agent.

## 1. Setup and Imports

In [None]:
# Snowpark and ML imports
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, month, dayofweek, year
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.preprocessing import OrdinalEncoder
from snowflake.ml.registry import Registry
import pandas as pd
import numpy as np

# For evaluation
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

In [None]:
# Get active Snowpark session (when running in Snowsight)
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# IMPORTANT: Update these values based on your lab path
# For Manual path (si_*): DATABASE = 'SI_DB', MODEL_NAME = 'si_sales_forecast'
# For Cortex Code path (coco_*): DATABASE = 'COCO_DB', MODEL_NAME = 'coco_sales_forecast'

DATABASE = 'SI_DB'  # Change to 'COCO_DB' for Cortex Code path
SCHEMA = 'RETAIL'
MODEL_NAME = 'si_sales_forecast'  # Change to 'coco_sales_forecast' for Cortex Code path

print(f"Using database: {DATABASE}")
print(f"Model will be registered as: {MODEL_NAME}")

## 2. Load and Prepare Data

In [None]:
# Load sales data with product info
sales_df = session.table(f"{DATABASE}.{SCHEMA}.SALES")
products_df = session.table(f"{DATABASE}.{SCHEMA}.PRODUCTS")

# Join sales with products to get category
df = sales_df.join(
    products_df,
    sales_df["PRODUCT_ID"] == products_df["PRODUCT_ID"],
    "left"
).select(
    sales_df["DATE"],
    sales_df["REGION"],
    products_df["CATEGORY"],
    sales_df["UNITS_SOLD"],
    sales_df["SALES_AMOUNT"]
)

print(f"Total records: {df.count()}")
df.show(5)

In [None]:
# Feature engineering: extract date features
df_features = df.with_column("MONTH", month(col("DATE"))) \
               .with_column("DAY_OF_WEEK", dayofweek(col("DATE"))) \
               .with_column("YEAR", year(col("DATE")))

# Select features for modeling
df_model = df_features.select(
    "REGION",
    "CATEGORY", 
    "MONTH",
    "DAY_OF_WEEK",
    "YEAR",
    "UNITS_SOLD",
    "SALES_AMOUNT"
).dropna()

print(f"Records after feature engineering: {df_model.count()}")
df_model.show(5)

## 3. Encode Categorical Features

In [None]:
# Encode categorical variables (REGION, CATEGORY)
categorical_cols = ["REGION", "CATEGORY"]
output_cols = ["REGION_ENCODED", "CATEGORY_ENCODED"]

encoder = OrdinalEncoder(
    input_cols=categorical_cols,
    output_cols=output_cols
)

encoder.fit(df_model)
df_encoded = encoder.transform(df_model)

df_encoded.show(5)

## 4. Train/Test Split

In [None]:
# Define feature columns and target
feature_cols = ["REGION_ENCODED", "CATEGORY_ENCODED", "MONTH", "DAY_OF_WEEK", "YEAR", "UNITS_SOLD"]
target_col = "SALES_AMOUNT"

# Split data 80/20
train_df, test_df = df_encoded.random_split([0.8, 0.2], seed=42)

print(f"Training records: {train_df.count()}")
print(f"Test records: {test_df.count()}")

## 5. Train XGBoost Model

In [None]:
# Create and train XGBoost regressor
model = XGBRegressor(
    input_cols=feature_cols,
    label_cols=[target_col],
    output_cols=["PREDICTED_SALES"],
    n_estimators=100,
    max_depth=6,
    learning_rate=0.1,
    random_state=42
)

print("Training XGBoost model...")
model.fit(train_df)
print("Training complete!")

## 6. Evaluate Model

In [None]:
# Make predictions on test set
predictions_df = model.predict(test_df)

# Convert to pandas for evaluation
results_pd = predictions_df.select(target_col, "PREDICTED_SALES").to_pandas()

# Calculate metrics
y_true = results_pd[target_col]
y_pred = results_pd["PREDICTED_SALES"]

rmse = np.sqrt(mean_squared_error(y_true, y_pred))
mae = mean_absolute_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)

print("\n=== Model Performance ===")
print(f"RMSE: ${rmse:,.2f}")
print(f"MAE:  ${mae:,.2f}")
print(f"R2:   {r2:.3f}")

In [None]:
# Show sample predictions
print("\nSample Predictions:")
predictions_df.select(
    "REGION", "CATEGORY", "MONTH", target_col, "PREDICTED_SALES"
).show(10)

## 7. Register Model in Snowflake Model Registry

In [None]:
# Initialize registry
registry = Registry(session=session, database_name=DATABASE, schema_name=SCHEMA)

# Log the model to registry
model_version = registry.log_model(
    model_name=MODEL_NAME,
    version_name="v1",
    model=model,
    comment=f"XGBoost sales forecasting model. RMSE: ${rmse:,.2f}, R2: {r2:.3f}"
)

print(f"Model registered: {MODEL_NAME} v1")
print(f"Full path: {DATABASE}.{SCHEMA}.{MODEL_NAME}")

In [None]:
# List all models in registry
print("\nModels in registry:")
for m in registry.models():
    print(f"  - {m.name}")

## 8. Test Model Inference via SQL

In [None]:
# Test the model with sample data
# Note: After registration, you can call the model via SQL like:
# SELECT model_name!PREDICT(...) FROM ...

test_query = f"""
WITH sample_data AS (
    SELECT 
        1 AS REGION_ENCODED,  -- e.g., 'West'
        2 AS CATEGORY_ENCODED, -- e.g., 'Electronics'
        7 AS MONTH,
        3 AS DAY_OF_WEEK,
        2025 AS YEAR,
        50 AS UNITS_SOLD
)
SELECT * FROM sample_data
"""

sample_df = session.sql(test_query)
prediction = model.predict(sample_df)
prediction.show()

## 9. Create Scoring UDF (Optional)

This creates a user-friendly UDF that wraps the model for easier use as an agent tool.

In [None]:
# Create a scoring UDF for the agent to use
udf_sql = f"""
CREATE OR REPLACE FUNCTION {DATABASE}.{SCHEMA}.PREDICT_SALES(
    region VARCHAR,
    category VARCHAR,
    prediction_month INT,
    units_estimate INT
)
RETURNS FLOAT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
PACKAGES = ('snowflake-snowpark-python', 'snowflake-ml-python')
HANDLER = 'predict'
AS
$$
def predict(region, category, prediction_month, units_estimate):
    from snowflake.snowpark.context import get_active_session
    from snowflake.ml.registry import Registry
    
    session = get_active_session()
    registry = Registry(session=session, database_name='{DATABASE}', schema_name='{SCHEMA}')
    
    # Get the model
    model = registry.get_model('{MODEL_NAME}').version('v1')
    
    # Simple encoding (in production, use the saved encoder)
    region_map = {{'East': 0, 'West': 1, 'North': 2, 'South': 3}}
    category_map = {{'Electronics': 0, 'Fitness Wear': 1, 'Home Appliances': 2, 'Smart Home': 3, 'Wearables': 4}}
    
    region_enc = region_map.get(region, 0)
    category_enc = category_map.get(category, 0)
    
    # Create input dataframe
    input_df = session.create_dataframe(
        [[region_enc, category_enc, prediction_month, 3, 2025, units_estimate]],
        schema=['REGION_ENCODED', 'CATEGORY_ENCODED', 'MONTH', 'DAY_OF_WEEK', 'YEAR', 'UNITS_SOLD']
    )
    
    # Get prediction
    result = model.run(input_df, function_name='predict')
    return float(result.collect()[0]['PREDICTED_SALES'])
$$;
"""

session.sql(udf_sql).collect()
print(f"Created UDF: {DATABASE}.{SCHEMA}.PREDICT_SALES")

In [None]:
# Test the UDF
test_udf_query = f"""
SELECT {DATABASE}.{SCHEMA}.PREDICT_SALES('West', 'Electronics', 8, 100) AS predicted_sales
"""

result = session.sql(test_udf_query).collect()
print(f"\nPredicted sales for West region, Electronics, August, 100 units: ${result[0]['PREDICTED_SALES']:,.2f}")

## Done!

The model is now registered and the UDF is ready to be added as a tool to your Snowflake Intelligence Agent.

**Next Steps:**
1. Go to Snowflake Intelligence
2. Edit your agent
3. Add a new tool of type "Function"
4. Select the `PREDICT_SALES` UDF
5. Test with: "Predict sales for West region, Fitness Wear category, for September with 75 units"