In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import torch
import cvxpy as cp
from cvxpylayers.torch import CvxpyLayer
import yfinance as yf
import random
from dataclasses import dataclass
from typing import Dict, Tuple

### Simulation 1 : Estimation Error in $\hat{V}$

In [2]:
SIM1_CFG = dict(
    sigma=0.0125,
    rho_grid=[0.0, 0.25, 0.5, 0.75],
    d_z=10,
    d_x=10,
    res_grid=[5, 10, 20],
    snr_grid=[0.001, 0.002, 0.003, 0.004, 0.005, 0.01, 0.05, 0.10],
    n_total=2000,
    n_train=1000,
    n_test=1000,
    n_reps=100,
    delta=1.0,
    cov_structure="toeplitz_rho_absdiff",
    x_dist="normal",
    theta0_dist="normal",
    snr_matching="scale_tau_per_snr",
    mvo_case="equality",          # 1^T z = 1
    vhat_mode="single_per_split", # estimation error 주입 방식
    vhat_estimator="sample_cov",
)
SIM1_CFG["s_grid"] = [r * SIM1_CFG["d_z"] for r in SIM1_CFG["res_grid"]]


#### (1) ~ (5)

In [3]:
def make_V(d, sigma, rho):
    idx = torch.arange(d)
    dist = (idx[:, None] - idx[None, :]).abs()
    V = (sigma ** 2) * (rho ** dist)
    return V



def compute_tau(signal, V, target_snr):
    # signal = (n,d) / Var(d, d)
    var_signal = signal.var(unbiased = False)
    var_eps = torch.trace(V) / V.shape[0]
    tau = torch.sqrt(var_signal / (target_snr * var_eps))
    return tau


def generate_sim1(cfg, rho, snr, s_cov, seed = 0, device = 'cpu'):
    torch.manual_seed(seed)

    d = cfg["d_z"]
    n_total = cfg["n_total"]
    n_train = cfg["n_train"]

    # (1) Ground Truth Coefficients
    theta0 = torch.randn(d, device = device)

    # (2) x ~ N(0, I)
    x = torch.randn(n_total, d, device = device)
    V = make_V(d, cfg["sigma"], rho).to(device)

    # (3) tau 생성, eps ~ N(0, V) 샘플링
    signal = x * theta0
    tau = compute_tau(signal, V, snr)

    L = torch.linalg.cholesky(V)
    z = torch.randn(n_total, d, device = device)
    eps = z @ L.T  

    y = signal + tau * eps

    # (4) Train-Test Split
    x_train, y_train = x[:n_train], y[:n_train]
    x_test, y_test = x[n_train:], y[n_train:]

    # (5) Generate Estimates of V
    idx = torch.randperm(n_train, device = device)[:s_cov]
    Y_s = y_train[idx]
    Yc = Y_s - Y_s.mean(dim = 0, keepdim = True)
    Vhat = (Yc.T @ Yc) / (s_cov - 1)

    return V, Vhat, theta0, x_train, y_train, x_test, y_test, tau

##### 실행 예시

In [4]:
cfg = SIM1_CFG

V, Vhat, theta0, x_tr, y_tr, x_te, y_te, tau = generate_sim1(
    cfg = cfg,
    rho = 0.5,
    snr = 0.005,
    s_cov = 100,
    seed = 42
)

print("V_shape: ", V.shape)
print("Vhat shape: ", Vhat.shape)
print("tau: ", tau.item())
print("train: ", x_tr.shape, y_tr.shape, "test: ", x_te.shape, y_te.shape)

signal_tr = x_tr * theta0
noise_tr  = y_tr - signal_tr

emp_snr = signal_tr.var(unbiased=False) / noise_tr.var(unbiased=False)
print("Empirical SNR:", emp_snr.item())

V_shape:  torch.Size([10, 10])
Vhat shape:  torch.Size([10, 10])
tau:  956.8330078125
train:  torch.Size([1000, 10]) torch.Size([1000, 10]) test:  torch.Size([1000, 10]) torch.Size([1000, 10])
Empirical SNR: 0.004950998816639185


#### (6) OLS 계수 추정

In [5]:
def fit_ols_univariate(x_train, y_train, eps = 1e-12):
    num = (x_train * y_train).sum(dim = 0)
    den = (x_train * x_train).sum(dim = 0)
    theta_hat = num / (den + eps)

    return theta_hat


def predict_univariate(x, theta_hat):
    return x * theta_hat


theta_ols = fit_ols_univariate(x_tr, y_tr)
print(theta_ols.shape)

yhat_tr = predict_univariate(x_tr, theta_ols)
mse_tr = ((yhat_tr - y_tr)**2).mean().item()
print("train MSE:", mse_tr)


torch.Size([10])
train MSE: 145.52479553222656


In [8]:
def make_F_and_z0(d, device= 'cpu'):
    one = torch.ones(d, device = device)
    z0 = one / d

    R = torch.randn(d, d-1, device = device)
    R = R - one[:, None] * (one @ R) / (one @ one)
    Q, _ = torch.linalg.qr(R)
    F = Q

    return F, z0


def fit_ipo_eq(x_train, y_train, V, Vhat, delta = 1.0, ridge = 1e-10):
    device = x_train.device
    m, d = x_train.shape

    F, z0 = make_F_and_z0(d, device = device)

    mid = torch.linalg.inv(F.T @ Vhat @ F)
    G = F @ mid @ F.T

    I = torch.eye(d, device = device)

    A = G @ V @ G
    b = V @ (I - G @ Vhat) @ z0

    Heq = torch.zeros(d, d, device = device)
    deq = torch.zeros(d, device = device)

    for i in range(m):
        x = x_train[i]
        y = y_train[i]

        Heq += (x[:, None] * A * x[None, :])
        deq += x * (G @ (y - b))

    Heq = Heq / (m * delta)
    deq = deq / (m * delta)

    Heq = Heq + ridge * torch.eye(d, device=device)

    theta_ipo = torch.linalg.solve(Heq, deq)
    return theta_ipo