In [None]:
import numpy as np

In [None]:
def normal_density(x, mu, std):
    return np.log((np.pi * std) * np.exp(-0.5*((x- mu) / std)**2))

In [None]:
class SuSiE():
    def __init__(self, X, y, L, prior_weights, p_tol=1e-9, max_iter=100):
        n, p = X.shape

        self.n, self.p = n, p

        self.X = X
        self.get_colstats()

        self.meanY = np.mean(y)
        self.varY = np.var(y)
        self.y = y - self.meanY

        self.L = min(L, p)
        self.p_tol = p_tol
        self.alpha = np.full((L, p), 1 / p)
        self.mu = np.full((L, p), 0.)
        self.mu2 = np.full((L, p), 0.)
        self.prior_weights = prior_weights

        if self.prior_weights is None:
            self.prior_weights = np.full(p, 1/p)
        self.pi = prior_weights
        self.max_iter = max_iter
        self.elbo = np.full(max_iter + 1, np.nan)
        self.kl = np.full(L, np.nan)
        self.sigma2 = 1
        self.Xr = np.full(n, 0.)
        self.lbf = np.full(L, 0.)
        self.lbf_variable = np.full((L, p), 0.)

    def iterate(self):
        for i in range(1, self.max_iter + 1):
            self.update_each_effect()

            self.elbo[i] = self.get_objective()
            if (self.elbo[i] - self.elbo[i - 1]) < self.tol:
                break

        self.intercept = np.mean(self.y) - np.sum(self.cm)
        self.fitted = self.Xr + np.mean(self.y)

        sorted_alphas = sorted(self.alphas, key = lambda x: -x)

        cur_p = 0
        res = []
        for alpha in sorted_alphas:
            cur_p += alpha
            res.append(alpha)
            if cur_p >= self.p_tol:
                return res
        return res

    def get_colstats(self):
        self.cm = np.mean(self.X, axis=0)
        self.csd = np.std(self.X, axis=0)
        self.csd[self.csd == 0] = 1
        d = self.n * (self.cm) ** 2 + (n - 1) * self.csd ** 2
        self.d = (d - self.n * self.cm ** 2) / self.csd ** 2

    def single_effect_regression(self):
        Xty = self.X.T @ self.y
        betahat = (1 / self.d) * Xty
        shat2 = self.varY / self.d

        lbf = normal_density(betahat, 0, np.sqrt(self.varY + shat2)) - normal_density(betahat, 0, np.sqrt(shat2))
        max_lbf = max(lbf)

        w = np.exp(lbf - max_lbf)
        w_weighted = w * self.prior_weights
        weighted_sum_w = np.sum(w_weighted)
        alpha = w_weighted / weighted_sum_w
        post_var = (1 / self.varY + self.d / self.varY) ** (-1)
        post_mean = (1 / self.varY) * post_var * Xty
        post_mean2 = post_var + post_mean ** 2

        lbf_model = max_lbf + np.log(weighted_sum_w)
        loglik = lbf_model + np.sum(normal_density(self.y, 0, np.sqrt(self.varY)))

        return alpha, post_mean, post_mean2, lbf, lbf_model, loglik

    def update_each_effect(self):
        for l in range(self.L):
            self.Xr -= self.X @ (self.alpha[l, :] * self.mu[l, :])
            R = self.y - self.Xr

            alpha, post_mean, post_mean2, lbf, lbf_model, loglik = self.single_effect_regression()

            self.mu[l, :] = post_mean
            self.mu2[l, :] = post_mean2
            self.alpha[l, :] = alpha
            self.lbf[l] = lbf_model
            self.lbf_variable[l, :] = lbf
            self.kl[l] = -loglik + self.SER_posterior_e_loglik(alpha * post_mean, alpha * post_mean2)

            self.Xr += self.X @ (self.alpha[l, :] * self.mu[l, :])


    def get_objective(self):
        return self.Eloglik() - sum(self.kl)

    def Eloglik(self):
        return (-(self.n/2) * np.log(2*np.pi*self.sigma2) - (1/(2*self.sigma2)) * self.get_ER2())


    def get_ER2(self):
        Xr_L = (self.alpha * self.mu) @ self.X.T
        postb2 = self.alpha * self.mu2
        a = sum((self.y - self.Xr) ** 2) - sum(Xr_L ** 2)
        b = sum(self.d * postb2.T)
        print(a.shape, b.shape)
        return sum((self.y - self.Xr) ** 2) - sum(Xr_L ** 2) + sum(self.d * postb2.T)

    def SER_posterior_e_loglik(self, Eb, Eb2):
        return(-0.5 * self.n * np.log(2* np.pi * self.varY) - 0.5/self.varY*(sum(self.y * self.y) - 2*sum(self.y * (self.X @ Eb)) + sum(self.d * Eb2)))



In [None]:
n = 50
p = 100

X = np.random.rand(n, p)
coeff = np.array([1.0] + (p - 1) * [0.0])
noise = 0.3
Y_star = X @ coeff + noise * np.random.rand(n)
Y = X @ coeff

s = SuSiE(X, Y, 1, None)
s.iterate()

(50,) (100,)


ValueError: ignored