In [None]:
####################################################################################
# Weather Prediction Using DFT, XGBoost, and SHAP                                 #
####################################################################################

# Author: Garrett Frere

# Description:
# Predicts hourly temperatures using XGBoost and interprets predictions with SHAP. 
# Normalizes temperature via DFT and processes data partitioned by `LOCATION`.

# Key Features:
# 1. **Time-Series Preprocessing**:
#    - Applies DFT to `TEMP` for frequency-domain normalization.

# 2. **Model Training and Prediction**:
#    - Trains XGBoost to predict `TEMP` using features like `TEMP_FFT` and `HOUR_OF_DAY`.

# 3. **SHAP Analysis**:
#    - Computes global SHAP values for feature importance.
#    - Supports local SHAP analysis per partition (e.g., `LOCATION`).

# Use Cases:
# - Forecasting hourly temperatures.
# - Understanding feature influence using SHAP.

# Requirements:
# - Python 3.8+ (3.11 preferred for performance), pandas, numpy, scipy, xgboost, shap.

####################################################################################

In [None]:
# Snowpark for Python
from snowflake.snowpark import Session
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.functions import udtf
from snowflake.snowpark.context import get_active_session
import snowflake.snowpark.functions as F

# Snowpark ML
from snowflake.ml.registry import Registry
from snowflake.ml.model import custom_model
from snowflake.ml.model import model_signature

# data science libs
from scipy.fft import fft
import pandas as pd
import numpy as np
import xgboost as xgb
import shap
import matplotlib.pyplot as plt

# misc
import json

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session

session = get_active_session()

In [None]:
# Resize output width
pd.set_option('display.width', 1000)

In [None]:
def generate_temperature_data(start_date, num_days, locations):
    """
    Generates synthetic weather data for multiple locations, including temperature 
    and an unrelated feature for demonstration purposes.
    
    Args:
        start_date (str): The starting date in 'YYYY-MM-DD' format.
        num_days (int): Number of days to generate data for.
        locations (list): List of location names.

    Returns:
        pd.DataFrame: Synthetic weather data.
    """
    # Generate hourly timestamps
    hours_per_day = 24
    total_hours = num_days * hours_per_day
    date_range = pd.date_range(start=start_date, periods=total_hours, freq='h')

    # Initialize synthetic data
    data = []

    for location in locations:
        for hour in range(total_hours):
            # Generate temperature with daily sinusoidal pattern
            time_of_day = hour % 24
            base_temp = 15 + 10 * np.sin(2 * np.pi * time_of_day / 24)  # Sinusoidal pattern
            temp_variation = np.random.normal(scale=2)  # Add random noise
            temp = base_temp + temp_variation

            # Add random unrelated feature
            random_noise = np.random.uniform(0, 100)  # Unrelated random values

            data.append({
                "DATE": date_range[hour],
                "TEMP": temp,
                "LOCATION": location,
                "HOUR_OF_DAY": time_of_day,
                "RANDOM_NOISE": random_noise
            })

    return pd.DataFrame(data)

# Generate data for multiple locations
locations = ['New York', 'Los Angeles', 'Chicago']
df = generate_temperature_data(start_date='2023-01-01', num_days=3, locations=locations)

# Plot temperature data for all locations on a single plot
plt.figure(figsize=(12, 6))
for location in locations:
    location_data = df[df['LOCATION'] == location]
    plt.plot(
        location_data['DATE'], 
        location_data['TEMP'], 
        label=location
    )

# Customize the plot
plt.title('Simulated Temperature Data for Multiple Locations')
plt.xlabel('Time')
plt.ylabel('Temperature (°C)')
plt.legend(title='Location')
plt.grid()
plt.xticks(rotation=45)
plt.tight_layout()

# Display the plot
plt.show()


In [None]:
# Generate data for multiple locations
locations = ['New York', 'Los Angeles', 'Chicago', 'Tokyo']
df = generate_temperature_data(start_date='2023-01-01', num_days=5, locations=locations)

for location in locations:
    print(f"Head for {location}:")
    print(df[df['LOCATION'] == location].head(), "\n")


In [None]:
class WeatherPredictor:
    """Weather Prediction using XGBoost with hourly features and SHAP."""

    def prepare_data(self, input: pd.DataFrame) -> pd.DataFrame:
        """
        Normalizes weather data using FFT, excluding the DC component.
        
        Args:
            input (pd.DataFrame): Raw DataFrame containing 'TEMP' and 'HOUR_OF_DAY'.
        
        Returns:
            pd.DataFrame: DataFrame with an added 'TEMP_FFT' column.
        """
        # Validate required columns
        required_columns = {'TEMP', 'HOUR_OF_DAY'}
        if not required_columns.issubset(input.columns):
            missing = required_columns - set(input.columns)
            raise KeyError(f"Missing required columns: {missing}")

        # Ensure 'TEMP' is numeric
        if not pd.api.types.is_numeric_dtype(input['TEMP']):
            raise ValueError("'TEMP' column must be numeric.")

        # Perform FFT and exclude DC component
        input = input.copy()
        temp_array = input['TEMP'].to_numpy()
        fft_values = fft(temp_array)
        input = input.iloc[1:].copy()  # Drop the first row
        input['TEMP_FFT'] = np.abs(fft_values[1:])  # Exclude the first value (DC component)

        return input
        
    @custom_model.partitioned_inference_api
    def predict(self, input: pd.DataFrame) -> pd.DataFrame:
        """
        Prepares data using FFT and trains an XGBoost model to predict temperature.
        
        Args:
            input (pd.DataFrame): Raw DataFrame with 'TEMP' and 'HOUR_OF_DAY'.
        
        Returns:
            pd.DataFrame: DataFrame with predictions added as a 'PREDICTED_TEMP' column.
        """
        # Prepare data with FFT normalization
        prepared_data = self.prepare_data(input)

        # Extract features (X) and target (y)
        X = prepared_data.drop(columns=['TEMP', 'DATE'])  # Drop non-feature columns
        y = prepared_data['TEMP']
        
        # Convert LOCATION to categorical dtype
        if 'LOCATION' in X.columns:
            X['LOCATION'] = X['LOCATION'].astype('category')

        # Train the XGBoost model
        xgb_model = xgb.XGBRegressor(objective='reg:squarederror', n_estimators=100, enable_categorical=True)
        xgb_model.fit(X, y)

        # Make predictions
        prepared_data['PREDICTED_TEMP'] = xgb_model.predict(X)

        # Save model and features for further use (e.g., SHAP)
        self.xgb_model = xgb_model
        self.X = X

        return prepared_data[['DATE', 'LOCATION', 'HOUR_OF_DAY', 'TEMP', 'PREDICTED_TEMP']]

    def calculate_global_shap(self) -> pd.DataFrame:
        """Calculates global SHAP values for the trained XGBoost model."""
        if not hasattr(self, 'xgb_model'):
            raise ValueError("Model not trained. Run 'predict' first.")

        # Calculate SHAP values
        explainer = shap.Explainer(self.xgb_model)
        shap_values = explainer(self.X)

        # Aggregate global SHAP values
        global_shap = np.abs(shap_values.values).mean(axis=0)
        feature_importance = pd.DataFrame({
            'FEATURE': self.X.columns,
            'GLOBAL_SHAP': global_shap
        }).sort_values(by='GLOBAL_SHAP', ascending=False)

        return feature_importance

In [None]:
# Generate synthetic data
locations = ['New York', 'Los Angeles', 'Chicago', 'Tokyo']
synthetic_data = generate_temperature_data(start_date='2023-01-01', num_days=3, locations=locations)

# Filter data for one location
ny_data = synthetic_data[synthetic_data['LOCATION'] == 'New York']

# Instantiate and run the predictor
predictor = WeatherPredictor()
predictions = predictor.predict(ny_data)

# Display predictions
print("Predictions:")
print(predictions.head())

# Calculate global SHAP values
global_shap = predictor.calculate_global_shap()
print("\nGlobal SHAP Values:")
print(global_shap)

In [None]:
table_name = "synth_weather_data"

session.use_database('GFRERE_DB')
session.use_schema('SHAP')

# Write Pandas DataFrame to Snowflake
snow_synth_weather_df = session.write_pandas(
    synthetic_data,
    table_name,
    auto_create_table=True,
    overwrite=True,  # Added missing comma
    # schema=schema
)

# snow_synth_weather_df.show()

In [None]:
# Create a model registry connection using the Snowpark session object, we will use the current database and schema for storing the model.
snowml_registry = Registry(session)

# Infer input-output schema for model tracking
predict_signature = model_signature.infer_signature(input_data=snow_synth_weather_df, output_data=predictions)

custom_mv = snowml_registry.log_model(
    my_sim,
    model_name="WeatherPredictor",
    version_name="version",
    conda_dependencies=["pandas==2.2.2", "numpy==1.26.4"], #TODO add from scipy.fft import fft, XGBOOST, SHAP
    signatures={"predict": predict_signature},
    comment = 'Testing SHAP on location partions for XGB',
    options={"relax_version": True, "function_type": "TABLE_FUNCTION"}
)

In [None]:
# Partitioned Custom Model can be invoked with a partition_column arg
# Runs the registered model
# You need to run the synthetic weather data generator and
# Save it to a view and create a Snow DF called 'snow_raw_df'
ouptut_snow_df = custom_mv.run(
  snow_raw_df,
  function_name="PREDICT",
  partition_column="LOCATION"
)

ouptut_snow_df.show()
# session.write(ouptut_snow_df, 'incident_tbl')