In [1]:
# Packages
import pickle
import numpy as np
import pandas as pd
from pathlib import Path
import sys, numpy.core.numeric as numeric
sys.modules['numpy._core.numeric'] = numeric

from utils.evaluation import (
    compute_kl_divergence,
    dataframe_to_latex
)

from utils.plotting import (
    plot_temperature_vs_kl,
    plot_temperature_vs_vol
)

In [2]:
# Configuration
selected_model_names = [
    "chronos_model_tiny",
    "chronos_model_small",
    "chronos_model_base",
]

In [3]:
# Paths and Setup
results_dir = Path("results_q5_temperature")
tables_dir_kl = results_dir / "tables_temperature_kl"
tables_dir_vol = results_dir / "tables_temperature_vol"
plots_dir_kl = results_dir / "plots_temperature_kl"
plots_dir_vol = results_dir / "plots_temperature_vol"

for d in [tables_dir_kl, tables_dir_vol, plots_dir_kl, plots_dir_vol]:
    d.mkdir(parents=True, exist_ok=True)

forecast_dir = Path("forecasts")
run_dir = Path("runfiles")
datasets_dir = Path("datasets")

selected_days = [0, 10, 20]
context_lengths = [22, 66, 252]
dgp_types_kl = ["gbm_high_vol", "t_garch", "mixture_normal"]

In [4]:
# Load Forecasts (only temperature-tuning runs)
forecast_files = {f.stem: f for f in forecast_dir.glob("forecast_*.pkl")}
results = []

missing_forecasts = 0
loaded_forecasts = 0

for run_file in run_dir.glob("forecast_*.txt"):
    with open(run_file, "r") as f:
        run_text = f.read()

    if "temperature" not in run_text:
        continue

    run_name = run_file.stem
    forecast_file = forecast_files.get(run_name)
    if forecast_file is None:
        missing_forecasts += 1
        continue

    run_config = {}
    for line in run_text.splitlines():
        if "=" in line:
            key, value = [x.strip() for x in line.strip().split("=", 1)]
            try:
                run_config[key] = eval(value)
            except Exception:
                run_config[key] = value.strip("\"'")

    try:
        with open(forecast_file, "rb") as f:
            forecast_result = pickle.load(f)
            low, median, high, samples, base_price = forecast_result
    except Exception:
        continue

    results.append({
        "run_name": run_name,
        "model_name": run_config.get("model_name"),
        "dgp_type": run_config.get("dataset_name"),
        "target_type": run_config.get("target_type"),
        "context_length": run_config.get("context_length"),
        "temperature": float(run_config.get("temperature", 1.0)),
        "samples": samples,
        "low": low,
        "median": median,
        "high": high,
        "base_price": base_price
    })
    loaded_forecasts += 1

print(f"[INFO] Loaded {loaded_forecasts} temperature runs.")
print(f"[WARN] {missing_forecasts} runfiles with temperature found but no forecast file.")

# Filter to Chronos models
results = [r for r in results if r["model_name"] in selected_model_names]
price_results = [r for r in results if r["target_type"] == "prices"]
return_results = [r for r in results if r["target_type"] == "returns"]

[INFO] Loaded 324 temperature runs.
[WARN] 0 runfiles with temperature found but no forecast file.


In [5]:
# Compute KL Divergence
def compute_kl_temperature(results_subset):
    kl_rows = []
    for item in results_subset:
        if item["dgp_type"] not in dgp_types_kl:
            continue
        is_price = item["target_type"] == "prices"
        model_returns = item["samples"]
        if is_price:
            model_returns = model_returns[:, 1:] / model_returns[:, :-1] - 1
        dgp_path = datasets_dir / f"{item['dgp_type']}_returns_paths.npy"
        if not dgp_path.exists():
            continue
        dgp_returns = np.load(dgp_path)
        for day_index in selected_days:
            try:
                p = dgp_returns[:, day_index]
                q = model_returns[:, day_index]
                kl = compute_kl_divergence(p, q)
                kl_rows.append({
                    "context_length": item["context_length"],
                    "dgp_type": item["dgp_type"],
                    "model_name": item["model_name"],
                    "temperature": item["temperature"],
                    "day": f"Day {day_index + 2}",
                    "kl_divergence": kl
                })
            except Exception:
                continue
    return pd.DataFrame(kl_rows).round(4)

df_kl_prices = compute_kl_temperature(price_results)
df_kl_returns = compute_kl_temperature(return_results)

In [6]:
# Compute Model Volatility by Temperature (annualized)
def compute_volatility_temperature(results_subset, last_day_index=20):
    vol_rows = []
    for item in results_subset:
        if item["dgp_type"] not in dgp_types_kl:
            continue
        is_price = item["target_type"] == "prices"
        model_returns = item["samples"]
        if is_price:
            model_returns = model_returns[:, 1:] / model_returns[:, :-1] - 1
        try:
            last_day_vol = np.std(model_returns[:, last_day_index])
            annualized_vol = last_day_vol * np.sqrt(252)
            vol_rows.append({
                "context_length": item["context_length"],
                "dgp_type": item["dgp_type"],
                "model_name": item["model_name"],
                "temperature": item["temperature"],
                "annualized_vol": round(annualized_vol, 4)
            })
        except Exception:
            continue
    return pd.DataFrame(vol_rows).round(4)

df_vol_prices = compute_volatility_temperature(price_results)
df_vol_returns = compute_volatility_temperature(return_results)

In [7]:
# Average KL and Volatility by Temperature (for tables and plotting)
def save_avg_metric_by_temperature(df_metric, value_column, label, value_label, output_dir):
    df_avg = (
        df_metric.groupby(["context_length", "dgp_type", "model_name", "temperature"])[value_column]
        .mean()
        .reset_index(name="avg_metric")
    )

    for context in context_lengths:
        df_context = df_avg[df_avg["context_length"] == context].copy()
        df_context = df_context.sort_values(["dgp_type", "model_name", "temperature"])

        pivot = df_context.pivot_table(
            index=["context_length", "dgp_type", "model_name"],
            columns="temperature",
            values="avg_metric"
        ).sort_index(level=["dgp_type", "model_name"])

        pivot.columns = [f"Temp = {col}" for col in pivot.columns]
        pivot.columns.name = None

        filename = f"q5_avg_{value_column}_temperature_{label}_context{context}.tex"
        tex_path = output_dir / filename
        dataframe_to_latex(pivot, tex_path, preserve_index_order=True, value_label=value_label)

        with open(tex_path, "r") as f:
            tex_lines = f.readlines()
        cleaned_lines = [line for line in tex_lines if "& Value" not in line]
        with open(tex_path, "w") as f:
            f.writelines(cleaned_lines)

    return df_avg

In [8]:
# KL Divergence tables
df_avg_prices = save_avg_metric_by_temperature(df_kl_prices, "kl_divergence", "prices", "KL Divergence", tables_dir_kl)
df_avg_returns = save_avg_metric_by_temperature(df_kl_returns, "kl_divergence", "returns", "KL Divergence", tables_dir_kl)

# Volatility tables
df_avg_vol_prices = save_avg_metric_by_temperature(df_vol_prices, "annualized_vol", "prices", "Annualized Volatility", tables_dir_vol)
df_avg_vol_returns = save_avg_metric_by_temperature(df_vol_returns, "annualized_vol", "returns", "Annualized Volatility", tables_dir_vol)

# Fix plotting compatibility 
df_avg_prices = df_avg_prices.rename(columns={"avg_metric": "avg_kl"})
df_avg_returns = df_avg_returns.rename(columns={"avg_metric": "avg_kl"})

In [9]:
# Plots: KL Divergence vs Temperature
for model in selected_model_names:
    plot_temperature_vs_kl(df_avg_prices, plots_dir_kl, model, "prices")
    plot_temperature_vs_kl(df_avg_returns, plots_dir_kl, model, "returns")

# Plots: Volatility vs Temperature
for model in selected_model_names:
    plot_temperature_vs_vol(df_vol_prices, plots_dir_vol, model, "prices")
    plot_temperature_vs_vol(df_vol_returns, plots_dir_vol, model, "returns")