# Monthly current account forecasting with Spark data

This notebook illustrates how to train and score the monthly balance forecaster
using production tables available through Spark.  Unlike the synthetic demo,
here we rely on live extracts loaded by `MonthlyBalanceDataLoader` and reuse the
helpers in `core/models/current_accounts/monthly_balance/prediction.py`.

## Prerequisites

* Access to Spark with the tables referenced by `MonthlyBalanceConfig`.
* A CSV with historical key-rate data (update `key_rate_path` if needed).
* The utilities defined in `prediction.py` available on the Python path.

In [None]:
import pandas as pd
import polars as pl
from dateutil.relativedelta import relativedelta
from pyspark.sql import SparkSession

from core.models.current_accounts.monthly_balance.prediction import (
    MonthlyBalanceConfig,
    MonthlyBalanceDataLoader,
    MonthlyBalanceModelTrainer,
)

## Connect to Spark

Initialise or reuse a Spark session so the loader can query the source tables.

In [None]:
spark = SparkSession.builder.getOrCreate()

spark

## Discover the available training window

The loader inspects the account aggregate table to determine the earliest and
latest report dates we can use for modelling.  Adjust `key_rate_path` if the
default CSV location differs in your environment.

In [None]:
config = MonthlyBalanceConfig(
    key_rate_path='/path/to/key_rate.csv'  # update with the actual file location
)
loader = MonthlyBalanceDataLoader(config)
trainer = MonthlyBalanceModelTrainer(config)

train_start, train_end = loader.get_maximum_train_range(spark)
train_start, train_end

## Fit the forecaster on the historical window

Here we retrain the linear baseline on the full range.  Persist the fitted
model with `trainer.save_trained_model` if you want to reuse it later.

In [None]:
model = trainer.get_trained_model(spark=spark, end_date=train_end, start_date=train_start)

## Load the latest aggregates for scoring

The prediction payload reuses the same extracts as the trainer.  We keep the
frames in Polars so they can be passed directly to `MonthlyBalanceForecaster`.

In [None]:
prediction_frames = loader.get_prediction_data(spark, start_date=train_start, end_date=train_end)

current_accounts = pl.DataFrame(prediction_frames['current_accounts']).with_columns(
    pl.col('report_dt').cast(pl.Date)
)
ftp_rates = pl.DataFrame(prediction_frames['ftp_rates']).with_columns(
    pl.col('report_dt').cast(pl.Date)
)
market_rates = pl.DataFrame(prediction_frames['market_rates']).with_columns(
    pl.col('report_dt').cast(pl.Date)
)

## Build a baseline scenario

The forecaster needs future paths for FTP and key rates.  Replace the block
below with your scenario feeds (for example, corporate planning files or risk
scenarios).  As a placeholder we keep the last observed values constant over
the forecast horizon.

In [None]:
forecast_dates = pd.date_range(train_end + relativedelta(months=1), periods=config.horizon, freq='M')

ftp_history = ftp_rates.to_pandas().sort_values('report_dt')
market_history = market_rates.to_pandas().sort_values('report_dt')

last_ftp = ftp_history.iloc[-1]
last_market = market_history.iloc[-1]

scenario_pdf = pd.DataFrame({
    'report_dt': forecast_dates,
    'VTB_90d_ftp_rate': last_ftp['VTB_90d_ftp_rate'],
    'VTB_365d_ftp_rate': last_ftp['VTB_365d_ftp_rate'],
    'key_rate': last_market['key_rate'],
})

ftp_scenario = pl.DataFrame(scenario_pdf[['report_dt', 'VTB_90d_ftp_rate', 'VTB_365d_ftp_rate']]).with_columns(
    pl.col('report_dt').cast(pl.Date)
)
market_scenario = pl.DataFrame(scenario_pdf[['report_dt', 'key_rate']]).with_columns(
    pl.col('report_dt').cast(pl.Date)
)

## Run the forecast

Invoke the fitted forecaster with the assembled scenario to obtain the balance
projection for each segment.

In [None]:
forecast_start = forecast_dates[0].date()

forecast_df = model.predict(
    current_accounts=current_accounts,
    ftp_rates=ftp_rates,
    ftp_rates_scenario=ftp_scenario,
    market_rates=market_rates,
    market_rates_scenario=market_scenario,
    forecast_start=forecast_start,
    horizon=config.horizon,
)

forecast_df.head()

## Persist the results (optional)

Export the forecast to a warehouse or a CSV as required by your workflow.

In [None]:
forecast_df.to_csv('monthly_balance_forecast.csv', index=False)
forecast_df.shape