In [1]:
import os, sys

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), os.pardir)))
import numpy as np
import pandas as pd
from ppi_py.datasets import load_dataset
from ppi_py import ppi_ols_ci, classical_ols_ci,ppi_ols_pointestimate
from statsmodels.regression.linear_model import OLS
from scipy.optimize import brentq  
from sklearn.linear_model import Ridge
from tqdm import tqdm
from utils import *

In [139]:
def generate_data(
    n_total: int,
    p: int,
    N_model: int,
    std: float = 1.0,
    theta: np.ndarray = None,
    random_state: int = None
):
    rng = np.random.RandomState(random_state)

    # 1) Generate theta if not provided
    if theta is None:
        theta = rng.uniform(low=-1.0, high=1.0, size=p) # * 100
        # theta = theta / np.linalg.norm(theta)
    
    # 2) Generate total data
    X_total = rng.normal(size=(n_total, p))
    noise_eval = std * rng.normal(size=n_total)

    Y_total = X_total @ theta + noise_eval

    # 3) Generate predictions
    X_model = rng.normal(size=(N_model, p))
    noise_model = std * rng.normal(size=N_model)
    Y_model = X_model @ theta + noise_model

    model = OLS(Y_model, X_model).fit()
    Y_hat = model.predict(X_total)

    return X_total, Y_total, Y_hat, theta #, X_total @ theta

In [140]:
# Set global parameters
ratios = [0.1, 0.2, 0.5, 0.8, 1.0, 2.0, 5.0]
std_list =  [0.1, 1.0, 5.0, 10.0]
ns = [10, 20, 50, 100] 
ps = [20, 50, 100, 200]
rng = np.random.RandomState()

num_trials = 10
alpha = 0.05

In [72]:
def _ols(X, Y, return_se=False):
    """OLS using pseudoinverse with HC0 robust standard errors."""
    n, p = X.shape
    theta = np.linalg.pinv(X) @ Y
    return theta

In [115]:
def ols_svd(X, Y, rcond=1e-10):
    """OLS estimate using SVD-based pseudoinverse (safe for any n, p)."""
    U, s, Vt = np.linalg.svd(X, full_matrices=False)
    print(f"min singular value: {s[-1]:.2e}, condition number: {s[0]/s[-1]:.2e}")

    s_inv = np.where(s > rcond, 1.0 / s, 0.0)  # Drop small singular values
    X_pinv = Vt.T @ np.diag(s_inv) @ U.T
    theta = X_pinv @ Y
    return theta

In [86]:
from sklearn.linear_model import LinearRegression

def ols_sklearn(X, Y):
    model = LinearRegression(fit_intercept=False)
    model.fit(X, Y)
    return model.coef_

In [128]:
def ols_truncated_svd(X, Y, tol=1e-6):
    """OLS with truncated SVD — discard unstable directions."""
    U, s, Vt = np.linalg.svd(X, full_matrices=False)
    s_inv = np.array([1/si if si > tol else 0.0 for si in s])  # zero-out unstable modes
    theta = Vt.T @ (s_inv * (U.T @ Y))
    return theta

In [160]:
n_total = 5000
n = p = 10

X_total, Y_total, Y_hat, theta = generate_data(
    n_total=n_total,
    p=p,
    N_model=1000,
    std=1.0,
    random_state=9
)
theta

array([-0.97925169,  0.00374918, -0.00845341, -0.73234094, -0.71577783,
       -0.56288265, -0.16298364, -0.50379766, -0.8318807 , -0.30900272])

In [175]:
n = 10

for i in range(1):
    idx = np.random.permutation(n_total)
    X_labeled, X_unlabeled = X_total[idx[:n]], X_total[idx[n:]]
    Y_labeled, Y_unlabeled = Y_total[idx[:n]], Y_total[idx[n:]]
    print(ols_truncated_svd(X_labeled, Y_labeled))

[ -5.48747686 -22.35407723  -3.56084267  -6.93856331   9.60865023
  -8.92238234 -13.05724445  -5.17172935  10.21825904   0.59419668]


In [None]:
n = 20

for i in range(10):
    idx = np.random.permutation(n_total)
    X_labeled, X_unlabeled = X_total[idx[:n]], X_total[idx[n:]]
    Y_labeled, Y_unlabeled = Y_total[idx[:n]], Y_total[idx[n:]]
    print(np.linalg.norm(ols_truncated_svd(X_labeled, Y_labeled)))

2.0237022948032246
1.8236296662717526
2.2993872137902343
1.6413790108947195
2.2454583616470303
2.0661108940778434
1.852647817101187
1.9078757283244567
1.8502222487957936
2.026491048325313
