# Building A Quantitive Model
Snowflake has been the trusted data platform for our financial services customers, and we are excited to show case all our latest AI functionality to further supercharge your workflow. In this notebook, we will walk through:

1. Leverage Snowflake marketplace to access market data
2. Leverage Snowflake Cortex AISQL to process the unstructured data
3. Leverage Snowflake Data Science Agent to accelerate ML model building

In [None]:
select * from ACCELERATE_AI_IN_FSI.DEFAULT_SCHEMA.UNIQUE_TRANSCRIPTS;

In [None]:
CREATE OR REPLACE TABLE DEFAULT_SCHEMA.ai_transcripts_analysts_sentiments AS

SELECT PRIMARY_TICKER, EVENT_TIMESTAMP, EVENT_TYPE, CREATED_AT, SENTIMENT_SCORE, EMOJI, UNIQUE_ANALYST_COUNT, SENTIMENT_REASON FROM 

(

{% raw %}
SELECT * FROM (
WITH 
parsed_transcripts AS (
    SELECT
        primary_ticker,
        event_timestamp,
        event_type,
        created_at,
        PARSE_JSON(transcript) AS transcript_json
    FROM unique_transcripts
),
speaker_lookup AS (
    SELECT
        primary_ticker,
        event_timestamp,
        OBJECT_AGG(
            speaker_data.value:speaker::STRING,
            OBJECT_CONSTRUCT(
                'name', speaker_data.value:speaker_data.name::STRING,
                'role', speaker_data.value:speaker_data.role::STRING
            )
        ) AS speakers
    FROM parsed_transcripts,
    LATERAL FLATTEN(input => transcript_json:speaker_mapping) speaker_data
    GROUP BY primary_ticker, event_timestamp
),
formatted_transcripts AS (
    SELECT
        p.primary_ticker,
        p.event_timestamp,
        p.event_type,
        p.created_at,
        LISTAGG(
            s.speakers[parsed_entry.value:speaker::STRING]:name::STRING ||
            CASE 
                WHEN s.speakers[parsed_entry.value:speaker::STRING]:role::STRING IS NOT NULL
                THEN ' (' || s.speakers[parsed_entry.value:speaker::STRING]:role::STRING || '): '
                ELSE ': '
            END ||
            parsed_entry.value:text::STRING,
            '\n\n'
        ) WITHIN GROUP (ORDER BY parsed_entry.index) AS transcript_text
    FROM parsed_transcripts p
    JOIN speaker_lookup s ON p.primary_ticker = s.primary_ticker AND p.event_timestamp = s.event_timestamp
    CROSS JOIN LATERAL FLATTEN(input => p.transcript_json:parsed_transcript) parsed_entry
    GROUP BY p.primary_ticker, p.event_timestamp, p.event_type, p.created_at
),
ai_analysis AS (
    SELECT
        primary_ticker,
        event_timestamp,
        event_type,
        created_at,
        ai_complete(
            'claude-4-sonnet',
            'Analyze analyst sentiment in this earnings call. Speakers with "(Analyst)" in their label are analysts. ' ||
            'Rate sentiment 1-10 based on analyst questions and tone. ' ||
            'Return ONLY valid JSON with no extra text: {"score": <1-10>, "emoji": "<emoji>", "reason": "<brief text>", "analyst_count": <number>}' ||
            '\n\nTranscript:\n\n' ||
            transcript_text
        ) AS ai_response
    FROM formatted_transcripts
),
cleaned_responses AS (
    SELECT
        primary_ticker,
        event_timestamp,
        event_type,
        created_at,
        ai_response,
        CASE 
            WHEN ai_response LIKE '%```json%```%' THEN
                REGEXP_SUBSTR(ai_response, '\{[^}]*\}', 1, 1, 'e')
            WHEN ai_response LIKE '%```%{%}%```%' THEN
                REGEXP_SUBSTR(ai_response, '\{[^}]*\}', 1, 1, 'e')
            WHEN ai_response LIKE '{%' THEN
                REGEXP_SUBSTR(ai_response, '\{[^}]*\}', 1, 1, 'e')
            ELSE
                REGEXP_SUBSTR(ai_response, '\{[^}]*\}', 1, 1, 'e')
        END AS extracted_json
    FROM ai_analysis
)
SELECT
    primary_ticker,
    event_timestamp,
    event_type,
    created_at,
    ai_response,
    extracted_json,
    (TRY_PARSE_JSON(extracted_json):emoji)::TEXT AS emoji,
    (TRY_PARSE_JSON(extracted_json):score)::INT AS sentiment_score,
    (TRY_PARSE_JSON(extracted_json):analyst_count)::INT AS unique_analyst_count,
    (TRY_PARSE_JSON(extracted_json):reason)::STRING AS sentiment_reason
FROM cleaned_responses
ORDER BY primary_ticker, event_timestamp
));
{% endraw %}

SELECT * FROM ai_transcripts_analysts_sentiments

In [None]:
import streamlit as st
import altair as alt
from snowflake.snowpark.context import get_active_session
CORE_TICKERS = ['SNOW', 'NRNT', 'ICBG', 'QRYQ', 'DFLX', 'STRM', 'VLTA', 'CTLG']
# --- Title and Introduction ---
st.title("❄️ Live FSI Data from Snowflake for Financial Services")
st.markdown(
    "This app runs directly in Snowflake and uses the data from the **Snowflake Marketplace**."
)

# --- Get Active Session and Query Data ---
# Get the current session object provided by the Streamlit in Snowflake environment
session = get_active_session()

# This query now calculates the offset needed to make the latest date appear as '2025-11-01'
query = """
WITH DateOffset AS (
    SELECT
        DATEDIFF(
            day,
            MAX(TO_TIMESTAMP_NTZ(DATE, 9)::DATE),
            '2025-11-01'::DATE
        ) AS days_to_add
    FROM DEFAULT_SCHEMA.FSI_DATA
)
SELECT
    TICKER,
    ASSET_CLASS,
    PRIMARY_EXCHANGE_NAME,
    DATEADD(
        day,
        (SELECT days_to_add FROM DateOffset),
        TO_TIMESTAMP_NTZ(DATE, 9)::DATE
    ) as DATE,
    PRICE,
    RETURN,
    IS_SPLIT,
    Y
FROM DEFAULT_SCHEMA.FSI_DATA
WHERE TICKER IN ('SNOW', 'NRNT', 'ICBG', 'QRYQ', 'DFLX', 'STRM', 'VLTA', 'CTLG')
ORDER BY DATE DESC;
"""

# Execute the query using the Snowpark session and convert to a Pandas DataFrame
data_frame = session.sql(query).to_pandas()


# --- Display the "Nice Looking" Dataframe ---
st.header("FSI Data Explorer")
st.dataframe(
    data_frame,
    use_container_width=True,
    hide_index=True,
    column_config={
        "TICKER": st.column_config.TextColumn(
            "Ticker Symbol",
            help="The stock ticker symbol.",
            width="small"
        ),
        "ASSET_CLASS": st.column_config.TextColumn(
            "Asset Class",
            width="small"
        ),
        "PRIMARY_EXCHANGE_NAME": "Primary Exchange",
        "DATE": st.column_config.DateColumn(
            "Date",
            format="DD MMM, YYYY" # Format for readability
        ),
        "PRICE": st.column_config.NumberColumn(
            "Price (USD)",
            help="Closing price in US Dollars.",
            format="$%.2f"
        ),
        "RETURN": st.column_config.NumberColumn(
            "Daily Return",
            help="The daily percentage return.",
            format="%.2f%%"
        ),
        "IS_SPLIT": st.column_config.CheckboxColumn(
            "Stock Split?",
            help="Indicates if a stock split occurred.",
            width="small"
        ),
        "Y": st.column_config.ProgressColumn(
            "Target Probability (Y)",
            help="Model prediction or target variable.",
            format="%.3f",
            min_value=0,
            max_value=1,
        ),
    }
)

# --- NEW SECTION: Daily Price Charts ---
st.header("Daily Price Charts - Core 8 Tickers")

# Use the 8 core tickers in the specific order
tickers_to_plot = CORE_TICKERS

# Only create the columns and charts if we found any tickers
if tickers_to_plot:
    # First row - First 4 tickers
    st.subheader("Row 1: Market Leaders & Competitors")
    cols_row1 = st.columns(4)
    
    for i, ticker in enumerate(tickers_to_plot[:4]):
        with cols_row1[i]:
            st.subheader(ticker)
            
            # Filter the dataframe for the current ticker
            ticker_data = data_frame[data_frame["TICKER"] == ticker]
            
            if len(ticker_data) > 0:
                # Calculate the min and max price for *this specific ticker* to set a dynamic y-axis
                min_price = ticker_data['PRICE'].min()
                max_price = ticker_data['PRICE'].max()
                
                # Use Altair to create a chart with a custom y-axis and Snowflake blue line
                chart = alt.Chart(ticker_data).mark_line(color='#00B2EE').encode(
                    x=alt.X('DATE', title='Date'),
                    y=alt.Y('PRICE', title='Price', scale=alt.Scale(domain=[min_price - (min_price*0.05), max_price + (max_price*0.05)])),
                    tooltip=['DATE', 'PRICE']
                ).interactive()
                
                # Display the Altair chart
                st.altair_chart(chart, use_container_width=True)
            else:
                st.warning(f"No data available for {ticker}")
    
    # Second row - Last 4 tickers
    st.subheader("Row 2: Platform Providers")
    cols_row2 = st.columns(4)
    
    for i, ticker in enumerate(tickers_to_plot[4:8]):
        with cols_row2[i]:
            st.subheader(ticker)
            
            # Filter the dataframe for the current ticker
            ticker_data = data_frame[data_frame["TICKER"] == ticker]
            
            if len(ticker_data) > 0:
                # Calculate the min and max price for *this specific ticker* to set a dynamic y-axis
                min_price = ticker_data['PRICE'].min()
                max_price = ticker_data['PRICE'].max()
                
                # Use Altair to create a chart with a custom y-axis and Snowflake blue line
                chart = alt.Chart(ticker_data).mark_line(color='#00B2EE').encode(
                    x=alt.X('DATE', title='Date'),
                    y=alt.Y('PRICE', title='Price', scale=alt.Scale(domain=[min_price - (min_price*0.05), max_price + (max_price*0.05)])),
                    tooltip=['DATE', 'PRICE']
                ).interactive()
                
                # Display the Altair chart
                st.altair_chart(chart, use_container_width=True)
            else:
                st.warning(f"No data available for {ticker}")
else:
    st.info("No tickers found in this data sample to plot.")

# Data Science Agent: Financial ML Model
Soon, you aill have the ability to use a **Data Science Agent** which will plan and create models for you.  the Python Scripts below have ben Generated by the Agent which is currently in Private Preview.  They were created using the prompts described in the markdown above the code.

# Prompt 1 Feature Engineering - 

Help me construct features with returns, eg. the last 1 day return using close price, return from **t-4** to **t-1**, return from **t-9** to **t-5**, return from **t-20** to **t-11**, and return from **t-62** to **t-21**. I also want to construct my predictive variable, which should be the future return from **t+2** to **t+6**. Please also help take the log across all return variables, including the previously constructed feature and this predictive variable. Please keep as panel data where ticker is a column.

In [None]:
# Generated by Snowflake Copilot
from snowflake.snowpark.context import get_active_session
import pandas as pd
import numpy as np

# Get session
session = get_active_session()

# Get the data with necessary columns
data = session.table('DEFAULT_SCHEMA.FSI_DATA') \
    .select(['TICKER', 'DATE', 'R_1', 'R_5_1', 'R_10_5', 'R_21_10', 'R_63_21',
             'RETURN_LEAD_2', 'RETURN_LEAD_3', 'RETURN_LEAD_4', 'RETURN_LEAD_5']) \
    .to_pandas()

# Calculate future return (t+2 to t+5)
data['FUTURE_RETURN_2_5'] = (1 + data['RETURN_LEAD_2']) * (1 + data['RETURN_LEAD_3']) * \
                           (1 + data['RETURN_LEAD_4']) * (1 + data['RETURN_LEAD_5']) - 1

# Apply log transformation to all return columns
return_cols = ['R_1', 'R_5_1', 'R_10_5', 'R_21_10', 'R_63_21', 'FUTURE_RETURN_2_5']
for col in return_cols:
    data[f'LOG_{col}'] = np.log(1 + data[col])

# Select final columns
final_columns = ['TICKER', 'DATE'] + [f'LOG_{col}' for col in return_cols]
final_data = data[final_columns]

# Sort by ticker and date
final_data = final_data.sort_values(['TICKER', 'DATE'])

print(f"Final dataset shape: {final_data.shape}")
print("\nFirst few rows of the processed data:")
print(final_data.head())

# Prompt 2 - Train a Predictive Model

With those features, I want to train a predictive **lightGBM** model with **L2 metric**. Please do a walk forward training on a quarterly basis. For each test quarter:
- **Train** on all quarters \< (Q-2)
- **Validate** on (Q-2, Q-1)
- **Test** on **Q**
- Enforce strict cutoffs so rows needing returns beyond the split end are dropped (no look-ahead).


### The Result

In [None]:
# Generated by Snowflake Copilot
import pandas as pd
import numpy as np
from snowflake.snowpark.context import get_active_session
import lightgbm as lgb
from sklearn.metrics import mean_squared_error

# Get session and data
session = get_active_session()
data = session.table('DEFAULT_SCHEMA.FSI_DATA') \
    .select(['TICKER', 'DATE', 'R_1', 'R_5_1', 'R_10_5', 'R_21_10', 'R_63_21',
             'RETURN_LEAD_2', 'RETURN_LEAD_3', 'RETURN_LEAD_4', 'RETURN_LEAD_5']) \
    .to_pandas()

# Calculate future return and log transform returns
data['FUTURE_RETURN_2_5'] = (1 + data['RETURN_LEAD_2']) * (1 + data['RETURN_LEAD_3']) * \
                           (1 + data['RETURN_LEAD_4']) * (1 + data['RETURN_LEAD_5']) - 1

# Log transform all return columns
return_cols = ['R_1', 'R_5_1', 'R_10_5', 'R_21_10', 'R_63_21', 'FUTURE_RETURN_2_5']
for col in return_cols:
    data[f'LOG_{col}'] = np.log(1 + data[col])

# Create quarter information
data['DATE'] = pd.to_datetime(data['DATE'])
data['YEAR_QUARTER'] = data['DATE'].dt.to_period('Q')

# Prepare features and target
feature_cols = ['LOG_R_1', 'LOG_R_5_1', 'LOG_R_10_5', 'LOG_R_21_10', 'LOG_R_63_21']
target_col = 'LOG_FUTURE_RETURN_2_5'

# Get all quarters from 2020Q1 onwards
all_quarters = sorted([q for q in data['YEAR_QUARTER'].unique() if q >= pd.Period('2020Q1', freq='Q')])

# Initialize results storage
results = []

# Walk-forward testing with improved model parameters
for test_quarter in all_quarters:
    # Define validation quarters (Q-2, Q-1)
    val_quarters = [test_quarter - 2, test_quarter - 1]
    
    # Split the data
    train_data = data[data['YEAR_QUARTER'] < val_quarters[0]]
    val_data = data[data['YEAR_QUARTER'].isin(val_quarters)]
    test_data = data[data['YEAR_QUARTER'] == test_quarter]
    
    # Create datasets
    X_train = train_data[feature_cols]
    y_train = train_data[target_col]
    X_val = val_data[feature_cols]
    y_val = val_data[target_col]
    X_test = test_data[feature_cols]
    y_test = test_data[target_col]
    
    # Train model with improved parameters
    model = lgb.LGBMRegressor(
        objective='regression',
        metric='l2',
        n_estimators=200,
        learning_rate=0.05,
        num_leaves=31,
        min_child_samples=20,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42
    )
    
    # Train with validation set
    model.fit(X_train, y_train, eval_set=[(X_val, y_val)])
    
    # Make predictions
    test_pred = model.predict(X_test)
    
    # Calculate metrics
    mse = mean_squared_error(y_test, test_pred)
    r2 = 1 - np.sum((y_test - test_pred) ** 2) / np.sum((y_test - np.mean(y_test)) ** 2)
    
    # Store results
    results.append({
        'quarter': test_quarter,
        'train_size': len(X_train),
        'val_size': len(X_val),
        'test_size': len(X_test),
        'mse': mse,
        'r2': r2
    })

# Print results
print("Walk-forward testing results:")
for r in results:
    print(f"\nQuarter: {r['quarter']}")
    print(f"Train size: {r['train_size']}, Val size: {r['val_size']}, Test size: {r['test_size']}")
    print(f"MSE: {r['mse']:.6f}")
    print(f"R2: {r['r2']:.6f}")

# Calculate and print average metrics
avg_mse = np.mean([r['mse'] for r in results])
avg_r2 = np.mean([r['r2'] for r in results])
print(f"\nAverage MSE across all quarters: {avg_mse:.6f}")
print(f"Average R2 across all quarters: {avg_r2:.6f}")

# Save predictions for the most recent quarter
last_quarter = all_quarters[-1]
last_quarter_data = data[data['YEAR_QUARTER'] == last_quarter].copy()
X_last = last_quarter_data[feature_cols]
last_quarter_data['predicted_return'] = model.predict(X_last)
final_predictions = last_quarter_data[['TICKER', 'DATE', target_col, 'predicted_return']]
print("\nSample of final predictions for the last quarter:")
print(final_predictions.head())

## Prompt 3 - Backtest the model

Let's test if the strategy can work, starting 2021, for each portfolio construction, generate forecasts on **Tuesdays** in the test quarter.
- At **Wednesday close**, go **long top-5** and **short bottom-5** by predicted return (equal weight).
- Hold through **Thu → next Wed** (exactly the `t+2..t+6` window used for training).
- **Transaction cost**: 3.0 bps one-way applied via weekly turnover.
Show Strategy metrics:
- Information Ratio (before/after costs)
- Max drawdown
- At the end plot the equity curve on before & after cost.

In [None]:
# Generated by Snowflake Copilot
import pandas as pd
import numpy as np
from snowflake.snowpark.context import get_active_session
import lightgbm as lgb
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

# Get session and data
session = get_active_session()
data = session.table('DEFAULT_SCHEMA.FSI_DATA') \
    .select(['TICKER', 'DATE', 'R_1', 'R_5_1', 'R_10_5', 'R_21_10', 'R_63_21',
             'RETURN_LEAD_2', 'RETURN_LEAD_3', 'RETURN_LEAD_4', 'RETURN_LEAD_5']) \
    .to_pandas()

# Calculate future return and log transform
data['FUTURE_RETURN_2_5'] = (1 + data['RETURN_LEAD_2']) * (1 + data['RETURN_LEAD_3']) * \
                           (1 + data['RETURN_LEAD_4']) * (1 + data['RETURN_LEAD_5']) - 1

return_cols = ['R_1', 'R_5_1', 'R_10_5', 'R_21_10', 'R_63_21', 'FUTURE_RETURN_2_5']
for col in return_cols:
    data[f'LOG_{col}'] = np.log(1 + data[col])

# Create quarter and day information
data['DATE'] = pd.to_datetime(data['DATE'])
data['YEAR_QUARTER'] = data['DATE'].dt.to_period('Q')
data['DAY_OF_WEEK'] = data['DATE'].dt.day_name()

# Prepare features and target
feature_cols = ['LOG_R_1', 'LOG_R_5_1', 'LOG_R_10_5', 'LOG_R_21_10', 'LOG_R_63_21']
target_col = 'LOG_FUTURE_RETURN_2_5'

# Get quarters from 2021Q1 onwards for strategy testing
strategy_quarters = sorted([q for q in data['YEAR_QUARTER'].unique() if q >= pd.Period('2021Q1', freq='Q')])

# Initialize storage for all predictions
all_predictions = []

# Walk-forward testing starting from 2021Q1
for test_quarter in strategy_quarters:
    # Define validation quarters (Q-2, Q-1)
    val_quarters = [test_quarter - 2, test_quarter - 1]
    
    # Split the data - train on all quarters < (Q-2)
    train_data = data[data['YEAR_QUARTER'] < val_quarters[0]]
    val_data = data[data['YEAR_QUARTER'].isin(val_quarters)]
    test_data = data[data['YEAR_QUARTER'] == test_quarter]
    
    # Skip if insufficient training data
    if len(train_data) < 1000:
        continue
    
    # Prepare training data
    X_train = train_data[feature_cols].dropna()
    y_train = train_data[target_col].loc[X_train.index]
    X_val = val_data[feature_cols].dropna()
    y_val = val_data[target_col].loc[X_val.index]
    
    # Train LightGBM model
    model = lgb.LGBMRegressor(
        objective='regression',
        metric='l2',
        n_estimators=200,
        learning_rate=0.05,
        num_leaves=31,
        min_child_samples=20,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
        verbose=-1
    )
    
    model.fit(X_train, y_train, eval_set=[(X_val, y_val)], callbacks=[lgb.early_stopping(50), lgb.log_evaluation(0)])
    
    # Get Tuesdays in the test quarter for prediction
    test_tuesdays = test_data[test_data['DAY_OF_WEEK'] == 'Tuesday'].copy()
    
    if len(test_tuesdays) > 0:
        # Generate predictions for Tuesdays
        X_test_tuesday = test_tuesdays[feature_cols].dropna()
        if len(X_test_tuesday) > 0:
            predictions = model.predict(X_test_tuesday)
            test_tuesdays_clean = test_tuesdays.loc[X_test_tuesday.index].copy()
            test_tuesdays_clean['predicted_return'] = predictions
            test_tuesdays_clean['test_quarter'] = test_quarter
            all_predictions.append(test_tuesdays_clean)

# Combine all predictions
predictions_df = pd.concat(all_predictions, ignore_index=True)

# Strategy implementation
def calculate_strategy_returns(predictions_df):
    strategy_results = []
    
    # Group by date (Tuesday predictions)
    for date, group in predictions_df.groupby('DATE'):
        # Rank by predicted return
        group_sorted = group.sort_values('predicted_return', ascending=False)
        
        # Select top 5 (long) and bottom 5 (short)
        if len(group_sorted) >= 10:
            long_positions = group_sorted.head(5).copy()
            short_positions = group_sorted.tail(5).copy()
            
            # Calculate portfolio return (equal weight)
            long_return = long_positions['LOG_FUTURE_RETURN_2_5'].mean()
            short_return = short_positions['LOG_FUTURE_RETURN_2_5'].mean()
            
            # Long-short strategy return
            strategy_return = long_return - short_return
            
            # Store results
            strategy_results.append({
                'date': date,
                'long_return': long_return,
                'short_return': short_return,
                'strategy_return': strategy_return,
                'num_stocks': len(group_sorted),
                'long_tickers': list(long_positions['TICKER']),
                'short_tickers': list(short_positions['TICKER'])
            })
    
    return pd.DataFrame(strategy_results)

# Calculate strategy returns
strategy_df = calculate_strategy_returns(predictions_df)

# Calculate transaction costs based on turnover
def calculate_turnover_and_costs(strategy_df):
    strategy_df = strategy_df.copy()
    strategy_df['turnover'] = 0.0
    strategy_df['transaction_cost'] = 0.0
    
    prev_long = set()
    prev_short = set()
    
    for i, row in strategy_df.iterrows():
        current_long = set(row['long_tickers'])
        current_short = set(row['short_tickers'])
        
        if i > 0:
            # Calculate turnover as fraction of positions that changed
            long_changes = len(current_long.symmetric_difference(prev_long))
            short_changes = len(current_short.symmetric_difference(prev_short))
            total_changes = long_changes + short_changes
            total_positions = 10  # 5 long + 5 short
            
            turnover = total_changes / total_positions
            # Transaction cost: 3.0 bps one-way on turnover
            transaction_cost = turnover * 0.0003  # 3.0 bps = 0.0003
            
            strategy_df.loc[i, 'turnover'] = turnover
            strategy_df.loc[i, 'transaction_cost'] = transaction_cost
        
        prev_long = current_long
        prev_short = current_short
    
    return strategy_df

# Calculate costs
strategy_df = calculate_turnover_and_costs(strategy_df)

# Calculate net returns after transaction costs
strategy_df['net_return'] = strategy_df['strategy_return'] - strategy_df['transaction_cost']

# Calculate cumulative returns and strategy metrics
strategy_df['cumulative_gross'] = (1 + strategy_df['strategy_return']).cumprod()
strategy_df['cumulative_net'] = (1 + strategy_df['net_return']).cumprod()

# Calculate Information Ratio (annualized)
weeks_per_year = 52
gross_annual_return = strategy_df['strategy_return'].mean() * weeks_per_year
net_annual_return = strategy_df['net_return'].mean() * weeks_per_year
gross_annual_vol = strategy_df['strategy_return'].std() * np.sqrt(weeks_per_year)
net_annual_vol = strategy_df['net_return'].std() * np.sqrt(weeks_per_year)

ir_gross = gross_annual_return / gross_annual_vol
ir_net = net_annual_return / net_annual_vol

# Calculate Maximum Drawdown
def calculate_max_drawdown(cumulative_returns):
    peak = cumulative_returns.expanding().max()
    drawdown = (cumulative_returns - peak) / peak
    return drawdown.min()

max_dd_gross = calculate_max_drawdown(strategy_df['cumulative_gross'])
max_dd_net = calculate_max_drawdown(strategy_df['cumulative_net'])

# Print strategy metrics
print("=== STRATEGY PERFORMANCE METRICS ===")
print(f"\nGross Performance (Before Costs):")
print(f"  Annual Return: {gross_annual_return:.2%}")
print(f"  Annual Volatility: {gross_annual_vol:.2%}")
print(f"  Information Ratio: {ir_gross:.3f}")
print(f"  Maximum Drawdown: {max_dd_gross:.2%}")

print(f"\nNet Performance (After Costs):")
print(f"  Annual Return: {net_annual_return:.2%}")
print(f"  Annual Volatility: {net_annual_vol:.2%}")
print(f"  Information Ratio: {ir_net:.3f}")
print(f"  Maximum Drawdown: {max_dd_net:.2%}")

print(f"\nTransaction Cost Impact:")
print(f"  Cost Drag: {(gross_annual_return - net_annual_return):.2%} annually")
print(f"  Average Weekly Turnover: {strategy_df['turnover'].mean():.1%}")

# Final performance summary
total_gross_return = strategy_df['cumulative_gross'].iloc[-1] - 1
total_net_return = strategy_df['cumulative_net'].iloc[-1] - 1
print(f"\nTotal Strategy Performance ({strategy_df['date'].min().strftime('%Y-%m-%d')} to {strategy_df['date'].max().strftime('%Y-%m-%d')}):")
print(f"  Gross Total Return: {total_gross_return:.2%}")
print(f"  Net Total Return: {total_net_return:.2%}")

# Create equity curve plot
plt.figure(figsize=(12, 8))
plt.plot(strategy_df['date'], strategy_df['cumulative_gross'], label='Gross Returns (Before Costs)', linewidth=2)
plt.plot(strategy_df['date'], strategy_df['cumulative_net'], label='Net Returns (After Costs)', linewidth=2)
plt.axhline(y=1, color='black', linestyle='--', alpha=0.5, label='Breakeven')

plt.title('Long-Short Strategy Equity Curve (2021-2025)', fontsize=14, fontweight='bold')
plt.xlabel('Date', fontsize=12)
plt.ylabel('Cumulative Return (Base = 1)', fontsize=12)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()

# Add performance annotations
final_gross = strategy_df['cumulative_gross'].iloc[-1]
final_net = strategy_df['cumulative_net'].iloc[-1]
plt.text(0.02, 0.98, f'Final Gross: {final_gross:.3f} (+{(final_gross-1)*100:.1f}%)', 
         transform=plt.gca().transAxes, fontsize=10, verticalalignment='top',
         bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.8))
plt.text(0.02, 0.90, f'Final Net: {final_net:.3f} (+{(final_net-1)*100:.1f}%)', 
         transform=plt.gca().transAxes, fontsize=10, verticalalignment='top',
         bbox=dict(boxstyle='round', facecolor='lightcoral', alpha=0.8))

plt.show()

print("Strategy implementation completed successfully!")
print(f"Total trading weeks: {len(strategy_df)}")
print(f"Strategy period: {strategy_df['date'].min().strftime('%Y-%m-%d')} to {strategy_df['date'].max().strftime('%Y-%m-%d')}")

## Prompt 4 - Register the Model

Register only the final model in Snowflake Registry with the following options:

- model name "STOCK_RETURN_PREDICTOR_GBM"
- sample input of 100 rows
- target_platforms=["WAREHOUSE"]
- options={
    "relax_version": False,
    "target_methods": ["predict"],
    "method_options": {
        "predict": {"case_sensitive": True}
    }
}

In [None]:
# Generated by Snowflake Copilot
from snowflake.snowpark.context import get_active_session
import pandas as pd
import numpy as np
import lightgbm as lgb
from snowflake.ml.registry import Registry

session = get_active_session()

data = session.table('DEFAULT_SCHEMA.FSI_DATA') \
    .select(['TICKER', 'DATE', 'R_1', 'R_5_1', 'R_10_5', 'R_21_10', 'R_63_21',
             'RETURN_LEAD_2', 'RETURN_LEAD_3', 'RETURN_LEAD_4', 'RETURN_LEAD_5']) \
    .to_pandas()

data['FUTURE_RETURN_2_5'] = (1 + data['RETURN_LEAD_2']) * (1 + data['RETURN_LEAD_3']) * \
                           (1 + data['RETURN_LEAD_4']) * (1 + data['RETURN_LEAD_5']) - 1

return_cols = ['R_1', 'R_5_1', 'R_10_5', 'R_21_10', 'R_63_21', 'FUTURE_RETURN_2_5']
for col in return_cols:
    data[f'LOG_{col}'] = np.log(1 + data[col])

data['DATE'] = pd.to_datetime(data['DATE'])
data['YEAR_QUARTER'] = data['DATE'].dt.to_period('Q')

feature_cols = ['LOG_R_1', 'LOG_R_5_1', 'LOG_R_10_5', 'LOG_R_21_10', 'LOG_R_63_21']
target_col = 'LOG_FUTURE_RETURN_2_5'

training_data = data[data['YEAR_QUARTER'] < pd.Period('2025Q4', freq='Q')].copy()
training_data_clean = training_data.dropna(subset=feature_cols + [target_col])

X_train = training_data_clean[feature_cols]
y_train = training_data_clean[target_col]

final_model = lgb.LGBMRegressor(
    objective='regression',
    metric='l2',
    n_estimators=200,
    learning_rate=0.05,
    num_leaves=31,
    min_child_samples=20,
    subsample=0.8,
    colsample_bytree=0.8,
    random_state=42,
    verbose=-1
)

final_model.fit(X_train, y_train)

sample_input = X_train.head(100)

registry = Registry(session=session)

model_ref = registry.log_model(
    model=final_model,
    model_name="STOCK_RETURN_PREDICTOR_GBM",
    sample_input_data=sample_input,
    target_platforms=["WAREHOUSE"],
    options={
        "relax_version": False,
        "target_methods": ["predict"],
        "method_options": {
            "predict": {"case_sensitive": True}
        }
    }
)

print(f"Model registered successfully!")
print(f"Model name: {model_ref.model_name}")
print(f"Model version: {model_ref.version_name}")
print(f"Sample input shape: {sample_input.shape}")
print(f"Feature columns used: {list(sample_input.columns)}")

The Model Registered can now be used as an Agent Tool for Snowflake Intelligence.