In [1]:
import numpy as np
from tqdm import tqdm, trange
from sklearn.linear_model import Ridge, LinearRegression, RidgeCV
from sklearn.preprocessing import scale
from scipy.linalg import pinv, svd
from scipy.stats import ttest_1samp
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

%matplotlib inline

In [2]:
def synthethic_data_inverse():
    n = 10000
    Cx = np.array([[1, -.8], [-.8, 1.]])
    X = np.random.multivariate_normal(np.zeros(2), Cx, n)
    E = np.eye(2)
    N = np.random.randn(n, 2) * 0
    F = np.array([[1.], [3]])
    Y = (X @ E + N) @ F
    return scale(X), scale(Y), E


def syntethic_data_high_dim(seed=0):
    np.random.seed(seed)

    n = 1000  # number of samples
    dim_x = 100  # dimensionality of X
    dim_y = 101
    selected = .10  # percentage of features selected in E

    Cx = np.random.randn(dim_x, dim_x)
    Cx = Cx.dot(Cx.T) / dim_x  # sym pos-semidefin

    X = np.random.multivariate_normal(np.zeros(dim_x), Cx, n)
    N = np.random.randn(n, dim_x)
    F = np.random.randn(dim_y, dim_x)
    E = np.eye(dim_x)
    E = np.diag(np.random.rand(dim_x) < selected)
    Y = (X @ E + N) @ F.T

    return scale(X), scale(Y), E


def ridge_cv(X, Y, alphas, independent_alphas=False):
    """
    Similar to sklearn RidgeCV but
    (1) can optimize a different alpha for each column of Y (independent_alphas=True)
    (2) return leave-one-out Y_hat
    """
    if isinstance(alphas, (float, int)):
        alphas = np.array([alphas, ], np.float64)
    n, n_x = X.shape
    n, n_y = Y.shape
    # Decompose X
    U, s, _ = svd(X, full_matrices=0)
    v = s**2
    UY = U.T @ Y

    # For each alpha, solve leave-one-out error coefs
    cv_duals = np.zeros((len(alphas), n, n_y))
    cv_errors = np.zeros((len(alphas), n, n_y))
    for alpha_idx, alpha in enumerate(alphas):
        # Solve
        w = ((v + alpha) ** -1) - alpha ** -1
        c = U @ np.diag(w) @ UY + alpha ** -1 * Y
        cv_duals[alpha_idx] = c

        # compute diagonal of the matrix: dot(Q, dot(diag(v_prime), Q^T))
        G_diag = (w * U ** 2).sum(axis=-1) + alpha ** -1
        error = c / G_diag[:, np.newaxis]
        cv_errors[alpha_idx] = error

    # identify best alpha for each column of Y independently
    if independent_alphas:
        best_alphas = (cv_errors ** 2).mean(axis=1).argmin(axis=0)
        duals = np.transpose([cv_duals[b, :, i]
                              for i, b in enumerate(best_alphas)])
        cv_errors = np.transpose([cv_errors[b, :, i]
                                  for i, b in enumerate(best_alphas)])
    else:
        _cv_errors = cv_errors.reshape(len(alphas), -1)
        best_alphas = (_cv_errors ** 2).mean(axis=1).argmin(axis=0)
        duals = cv_duals[best_alphas]
        cv_errors = cv_errors[best_alphas]

    coefs = duals.T @ X
    Y_hat = Y - cv_errors
    return coefs, best_alphas, Y_hat


def trunkated_ols(X, Y):
    pca = PCA('mle').fit(X)
    Xt = pca.inverse_transform(pca.transform(X))
    coef = pinv(Xt.T @ Xt) @ Xt.T @ Y
    return coef


def jrr(X, Y, alphas=np.logspace(-5, 5, 20)):
    """
    G = ls(Y, X)
    H = ls(X, YG)
    E = diag(H)
    """
    G, best_alphas, X_hat = ridge_cv(Y, X, alphas, independent_alphas=True)
    H, best_alphas, X_hat = ridge_cv(X, X_hat, alphas)
    E_hat = np.diag(H)
    return E_hat


def jrr_pca(X, Y, alphas=np.logspace(-5, 5, 20)):
    G, best_alphas, X_hat = ridge_cv(Y, X, alphas, independent_alphas=True)
    H = trunkated_ols(X, X_hat)
    E_hat = np.diag(H)
    return E_hat


def jrr2(X, Y, alphas=np.logspace(-5, 5, 20)):
    """
    for i in dim_x:
        g_i = ls([X_j, Y], X_i), where j is all columns but i
        x_hat_i = [X_j, Y] @ g_i
    H = ls(X, X_hat)
    E = diag(H)
    """

    dim_x = len(X.T)

    X_hat = np.zeros_like(X)
    for i in trange(dim_x):
        not_i = [j for j in range(dim_x) if j != i]
        XY = np.c_[X[:, not_i], Y]
        _, best, X_hat[:, [i, ]] = ridge_cv(XY, X[:, [i, ]], alphas)
    H, _, _ = ridge_cv(X, X_hat, alphas)
    E_hat = np.diag(H)
    return E_hat


def jrr2_pca(X, Y, alphas=np.logspace(-5, 5, 20)):
    dim_x = len(X.T)

    X_hat = np.zeros_like(X)
    for i in trange(dim_x):
        not_i = [j for j in range(dim_x) if j != i]
        XY = np.c_[X[:, not_i], Y]
        _, best, X_hat[:, [i, ]] = ridge_cv(XY, X[:, [i, ]], alphas)
    H = trunkated_ols(X, X_alphas)
    E_hat = np.diag(H)
    return E_hat


def jrr3(X, Y, alphas=np.logspace(-5, 5, 20)):
    dim_x = len(X.T)
    X_hat = np.zeros_like(X)
    for i in trange(dim_x):
        not_i = [j for j in range(dim_x) if j != i]
        XY = np.c_[X[:, not_i], Y]
        X_hat[:, [i, ]] = ridge_cv(XY, X[:, [i, ]], alphas)[2]
        X_hat[:, [i, ]] += ridge_cv(Y, X[:, [i, ]], alphas)[2]
        X_hat[:, [i, ]] -= ridge_cv(X[:, not_i], X[:, [i, ]], alphas)[2]
    H, _, _ = ridge_cv(X, X_hat, alphas)
    E_hat = np.diag(H)
    return E_hat


def jrr3_pca(X, Y, alphas=np.logspace(-5, 5, 20)):
    X_hat = np.zeros_like(X)
    for i in trange(dim_x):
        not_i = [j for j in range(dim_x) if j != i]
        XY = np.c_[X[:, not_i], Y]
        X_hat[:, [i, ]] = ridge_cv(XY, X[:, [i, ]], alphas)[2]
        X_hat[:, [i, ]] += ridge_cv(Y, X[:, [i, ]], alphas)[2]
        X_hat[:, [i, ]] -= ridge_cv(X[:, not_i], X[:, [i, ]], alphas)[2]
    H = trunkated_ols(X, X_hat)
    E_hat = np.diag(H)
    return E_hat

In [3]:
X, Y, E = synthethic_data_inverse()

print('jrr <0', jrr(X, Y, alphas=1e-4))  # negative coeff issue
print('jrr2 not<0', jrr2(X, Y, alphas=1e-4))  # great
print('jrr3 not<0', jrr3(X, Y, alphas=1e-4))  # great, although weird

100%|██████████| 2/2 [00:00<00:00, 181.56it/s]
100%|██████████| 2/2 [00:00<00:00, 190.25it/s]

jrr <0 [-0.27287689  1.27277789]
jrr2 not<0 [0.99999963 0.99999995]
jrr3 not<0 [0.72722458 2.27287653]





In [None]:
X, Y, E = syntethic_data_high_dim()

alphas = np.logspace(-4, 4, 20)
plt.plot(np.diag(E), label='E')
plt.plot(jrr(X, Y, alphas=alphas), label='jrr')  # ok
plt.plot(jrr2(X, Y, alphas=alphas), label='jrr2')  # completely fails
plt.plot(jrr3(X, Y, alphas=alphas), label='jrr3')  # ok
plt.axhline(0, color='k', label='chance')
plt.legend()
plt.ylim(-.05, .07)

# notice that jrr and jrr3 work but have a slight positive bias due to second regularization

100%|██████████| 100/100 [00:09<00:00, 10.41it/s]
 37%|███▋      | 37/100 [00:06<00:10,  6.01it/s]

In [None]:
# The svd trunkation for the last regression seems to be a good way not to have a bias.

# it s unsurprisingly a bit less good than with the bias
alphas = np.logspace(-4, 4, 20)
plt.plot(np.diag(E), label='E')
plt.plot(jrr_pca(X, Y, alphas=alphas), label='jrr')
plt.plot(jrr3_pca(X, Y, alphas=alphas), label='jrr3')
plt.axhline(0, color='k', label='chance')
plt.legend()