In [1]:
!pip install numdifftools



In [2]:
import numpy as np 
from scipy.stats import gamma, multivariate_normal
from scipy import optimize
import numdifftools as nd 
from scipy.special import expit as logistic

class SupervisedPoissonFactorization:
    def __init__(self, n, p, d, k, a, a_prime, b_prime, c, c_prime, d_prime, sigma_upsilon, sigma_gamma):
        
        for param, name in zip([n, p, d, k, a, a_prime, b_prime, c, c_prime, d_prime, sigma_upsilon, sigma_gamma],
                               ['n', 'p', 'd', 'k', 'a', 'a_prime', 'b_prime', 'c', 'c_prime', 'd_prime', 'sigma_upsilon', 'sigma_gamma']):
            if param <= 0:
                raise ValueError(f"Parameter {name} must be positive.")

        self.n = n #Number of samples
        self.p = p #Number of genes
        self.d = d #Number of factors
        self.k = k #Number of phenotypes
        #Hyperparameters:
        self.a = a 
        self.a_prime = a_prime 
        self.b_prime = b_prime 
        self.c = c
        self.c_prime = c_prime 
        self.d_prime = d_prime
        self.sigma_upsilon = sigma_upsilon
        self.sigma_gamma = sigma_gamma
        #Parameters
        self.xi = np.random.gamma(a_prime, b_prime/a_prime, size=n)
        self.eta = np.random.gamma(c_prime, d_prime/c_prime, size=p)
        self.theta = np.random.gamma(a, 1/self.xi[:, np.newaxis], size=(n, d))
        self.beta = np.random.gamma(c, 1/self.eta[:, np.newaxis], size=(p, d))
        self.upsilon = np.random.normal(0, sigma_upsilon, size=(k, d))
        self.gamma = np.random.normal(0, sigma_gamma, size=(k, 10)) #the 2 is for the number of aux features, should be updated acc to data

    def log_posterior_theta(self, theta_i, x_i, y_i, beta, xi_i, upsilon, gamma, x_aux_i):
        logistic = lambda x: 1 / (1 + np.exp(-x))
        log_likelihood = np.sum(x_i * np.log(theta_i @ beta.T) - (theta_i @ beta.T))
        log_prior = np.sum((self.a - 1) * np.log(theta_i) - xi_i * theta_i)
        log_y = np.sum(y_i * np.log(logistic(theta_i @ upsilon.T + x_aux_i @ gamma.T)) + 
                       (1 - y_i) * np.log(1 - logistic(theta_i @ upsilon.T + x_aux_i @ gamma.T)))
        return log_likelihood + log_prior + log_y
    
    def log_posterior_upsilon(self, upsilon_k, y_k, theta, gamma_k, x_aux):
        logistic = lambda x: 1 / (1 + np.exp(-x))
        log_likelihood = np.sum(y_k * np.log(logistic(theta @ upsilon_k + x_aux @ gamma_k)) + 
                                (1 - y_k) * np.log(1 - logistic(theta @ upsilon_k + x_aux @ gamma_k)))
        log_prior = -0.5 * np.sum(upsilon_k**2) / self.sigma_upsilon**2
        return log_likelihood + log_prior

    def log_posterior_gamma(self, gamma_k, y_k, theta, upsilon_k, x_aux):
        logistic = lambda x: 1 / (1 + np.exp(-x))
        log_likelihood = np.sum(y_k * np.log(logistic(theta @ upsilon_k + x_aux @ gamma_k)) + 
                                (1 - y_k) * np.log(1 - logistic(theta @ upsilon_k + x_aux @ gamma_k)))
        log_prior = -0.5 * np.sum(gamma_k**2) / self.sigma_gamma**2
        return log_likelihood + log_prior

    def sample_xi(self):
        shape = self.a * self.d + self.a_prime 
        rate = (self.a_prime / self.b_prime) + np.sum(self.theta, axis=1)
        self.xi = np.random.gamma(shape, 1/rate)

    def sample_eta(self):
        shape = self.c * self.d + self.c_prime 
        rate = (self.c_prime / self.d_prime) + np.sum(self.beta, axis=1)
        self.eta = np.random.gamma(shape, 1/rate)

    def sample_beta(self, X):
        shape = self.c + np.sum(X, axis=0)[:, np.newaxis]
        rate = self.eta[:, np.newaxis] + np.sum(self.theta, axis=0)
        self.beta = np.random.gamma(shape, 1/rate)

    def sample_theta_laplace(self, X, y, x_aux, beta, xi, upsilon, gamma):
        theta = np.zeros((self.n, self.d))
        for i in range(self.n):
            neg_log_posterior = lambda t: -self.log_posterior_theta(t, X[i], y[i], beta, xi[i], upsilon, gamma, x_aux[i])
            initial_guess = self.theta[i]
            result = optimize.minimize(neg_log_posterior, initial_guess, method='BFGS')
            if not result.success:
                raise RuntimeError(f"Optimization for theta_{i} failed: {result.message}")
            mode = result.x
            hessian = nd.Hessian(neg_log_posterior)(mode)
            cov = np.linalg.inv(hessian)
            theta[i] = multivariate_normal.rvs(mean=mode, cov=cov)
        self.theta = theta

    def sample_upsilon_laplace(self, y, theta, gamma, x_aux):
        upsilon = np.zeros((self.k, self.d))
        for k in range(self.k):
            neg_log_posterior = lambda u: -self.log_posterior_upsilon(u, y[:, k], theta, gamma[k], x_aux)
            initial_guess = self.upsilon[k]
            result = optimize.minimize(neg_log_posterior, initial_guess, method='BFGS')
            if not result.success:
                raise RuntimeError(f"Optimization for upsilon_{k} failed: {result.message}")
            mode = result.x
            hessian = nd.Hessian(neg_log_posterior)(mode)
            cov = np.linalg.inv(hessian)
            upsilon[k] = multivariate_normal.rvs(mean=mode, cov=cov)
        self.upsilon = upsilon

    def sample_gamma_laplace(self, y, theta, upsilon, x_aux):
        gamma = np.zeros((self.k, x_aux.shape[1]))
        for k in range(self.k):
            neg_log_posterior = lambda g: -self.log_posterior_gamma(g, y[:, k], theta, upsilon[k], x_aux)
            initial_guess = self.gamma[k]
            result = optimize.minimize(neg_log_posterior, initial_guess, method='BFGS')
            if not result.success:
                raise RuntimeError(f"Optimization for gamma_{k} failed: {result.message}")
            mode = result.x
            hessian = nd.Hessian(neg_log_posterior)(mode)
            cov = np.linalg.inv(hessian)
            gamma[k] = multivariate_normal.rvs(mean=mode, cov=cov)
        self.gamma = gamma

    def gibbs_step(self, X, y, x_aux):
        self.sample_xi()
        self.sample_eta()
        self.sample_beta(X)
        self.sample_theta_laplace(X, y, x_aux, self.beta, self.xi, self.upsilon, self.gamma)
        self.sample_upsilon_laplace(y, self.theta, self.gamma, x_aux)
        self.sample_gamma_laplace(y, self.theta, self.upsilon, x_aux)
    
    def run_gibbs(self, X, y, x_aux, num_iterations, burn_in):
        self._check_input_dimensions(X, y, x_aux)
        
        if burn_in >= num_iterations:
            raise ValueError("burn_in must be less than num_iterations")

        samples = []
        for i in range(num_iterations):
            self.gibbs_step(X, y, x_aux)
            if i >= burn_in:
                samples.append((self.xi.copy(), self.eta.copy(), self.beta.copy(), 
                                self.theta.copy(), self.upsilon.copy(), self.gamma.copy()))
        
        # Compute means of samples
        mean_xi = np.mean([s[0] for s in samples], axis=0)
        mean_eta = np.mean([s[1] for s in samples], axis=0)
        mean_beta = np.mean([s[2] for s in samples], axis=0)
        mean_theta = np.mean([s[3] for s in samples], axis=0)
        mean_upsilon = np.mean([s[4] for s in samples], axis=0)
        mean_gamma = np.mean([s[5] for s in samples], axis=0)
        
        return mean_xi, mean_eta, mean_beta, mean_theta, mean_upsilon, mean_gamma