In [None]:
from typing import Tuple
import numpy as np
from scipy.special import logsumexp
from common import GaussianMixture


In [None]:
def estep(X: np.ndarray, mixture: GaussianMixture) -> Tuple[np.ndarray, float]:
    
    mu, var, pi = mixture  
    K = mu.shape[0]
    n, d = X.shape
    
    M = np.dot((-np.sum(X.astype(bool).astype(int), axis=1).reshape(-1,1)/2.0),(np.log((2*np.pi*var)).reshape(-1,1)).T) - (np.sum(X**2, axis=1)[:,None] + np.dot(X.astype(bool).astype(int),mu.T**2) - 2*np.dot(X, mu.T))/(2*var)+ np.log(pi + 1e-16) 

    
    return np.exp(M - logsumexp(M, axis=1).reshape(-1,1) ), np.sum(logsumexp(M, axis=1).reshape(-1,1), axis=0).item()  
    
    raise NotImplementedError

    

In [2]:
def mstep(X: np.ndarray, post: np.ndarray, mixture: GaussianMixture,
          min_variance: float = .25) -> GaussianMixture:
    """M-step: Updates the gaussian mixture by maximizing the log-likelihood
    of the weighted dataset

    Args:
        X: (n, d) array holding the data, with incomplete entries (set to 0)
        post: (n, K) array holding the soft counts
            for all components for all examples
        mixture: the current gaussian mixture
        min_variance: the minimum variance for each gaussian

    Returns:
        GaussianMixture: the new gaussian mixture
    """
    raise NotImplementedError

NameError: name 'np' is not defined

In [None]:
def mstep(X: np.ndarray, post: np.ndarray) -> GaussianMixture:

    n, d = X.shape
    K = post.shape[1]
    
    mu = (post.T @ X)/(np.sum(post, axis=0).reshape(-1,1))   
    var = np.sum(post*(np.linalg.norm(X[:, None] - mu, ord=2, axis=2)**2), axis=0)/(np.sum(post, axis=0)*d)   
    p = np.sum(post, axis=0)/n 
    
    return GaussianMixture(mu, var, p)

In [None]:
def mstep(X: np.ndarray, post: np.ndarray, mixture: GaussianMixture,
          min_variance: float = .25) -> GaussianMixture:
    
    n, d = X.shape
    mu_, _, _ = mixture
    K = mu_.shape[0]
    
    pi_ = np.sum(post, axis=0)/n

    
    ni = np.where(np.dot(post.T , X.astype(bool).astype(int))>= 1)
    mu_[ni] = np.dot(post.T , X)[ni]/np.dot(post.T , X.astype(bool).astype(int))[ni] 

    
    
   
    X_ = X.astype(bool).astype(int)

    norm = np.sum(X**2, axis=1)[:,None] + (X_ @ (mu_.T)**2) - 2*(X@ mu_.T)
        
    var_ = np.maximum(np.sum(post*norm, axis=0)/(np.sum(post*np.sum(X_, axis=1).reshape(-1,1), axis=0)) , min_variance)  
   
    return GaussianMixture(mu_, var_, pi_)

    raise NotImplementedError


In [None]:
def run(X: np.ndarray, mixture: GaussianMixture,
        post: np.ndarray) -> Tuple[GaussianMixture, np.ndarray, float]:

    prev_cost = None
    cost = None  
    

    while (prev_cost is None or cost - prev_cost >= (1e-6*np.abs(cost))):
        prev_cost = cost
#         post = estep(X, mixture)
#         mixture, cost = mstep(X, post)
        post, cost = estep(X, mixture)
        mixture = mstep(X, post)
            
    return mixture, post, cost

In [None]:
def run(X: np.ndarray, mixture: GaussianMixture,
        post: np.ndarray) -> Tuple[GaussianMixture, np.ndarray, float]:
    """Runs the mixture model

    Args:
        X: (n, d) array holding the data
        post: (n, K) array holding the soft counts
            for all components for all examples

    Returns:
        GaussianMixture: the new gaussian mixture
        np.ndarray: (n, K) array holding the soft counts
            for all components for all examples
        float: log-likelihood of the current assignment
    """
    old_log_lh = None
    new_log_lh = None  # Keep track of log likelihood to check convergence
    
    # Start the main loop
    while old_log_lh is None or (new_log_lh - old_log_lh > 1e-6*np.abs(new_log_lh)):
        
        old_log_lh = new_log_lh
        
        # E-step
        post, new_log_lh = estep(X, mixture)
        
        # M-step
        mixture = mstep(X, post, mixture)
            
    return mixture, post, new_log_lh



In [None]:
def fill_matrix(X: np.ndarray, mixture: GaussianMixture) -> np.ndarray:
    mu, var, pi = mixture
    X_pred = X.copy()
    mu, _, _ = mixture

    a=X
    X_ = a
    M = np.dot((-np.sum(X.astype(bool).astype(int), axis=1).reshape(-1,1)/2.0),(np.log((2*np.pi*var)).reshape(-1,1)).T) - (np.sum(X**2, axis=1)[:,None] + np.dot(X.astype(bool).astype(int),mu.T**2) - 2*np.dot(X, mu.T))/(2*var)+ np.log(pi + 1e-16)
    
    post= np.exp(M - logsumexp(M, axis=1).reshape(-1,1) )
    
    X_pred[np.where(X == 0)] = np.dot(post , mu)[np.where(X == 0)]
    
    return X_pred