In [57]:
import numpy as np
np.seterr(divide='ignore', invalid='ignore')
from scipy.stats import multivariate_normal 
import matplotlib.pyplot as plt
from PIL import Image
import scipy.stats

In [119]:
class GMM:
    def __init__(self, k, max_iter=5):
        self.k = k
        self.max_iter = int(max_iter) 

    def initialize(self, X):
        # returns the (r,c) value of the numpy array of X
        self.shape = X.shape 
        # n has the number of rows while m has the number of columns of dataset X
        self.n, self.m = self.shape 
        

        # initial weights given to each cluster are stored in phi or P(Ci=j)
        self.phi = np.full(shape=self.k, fill_value=1/self.k) 

        # initial weights given to each data point wrt to each cluster or P(Xi/Ci=j)
        self.weights = np.full(shape=self.shape, fill_value=1/self.k)
        
        # dataset is divided randomly into k parts of unequal sizes
        random_row = np.random.randint(low=0, high=self.n, size=self.k)

        # initial value of mean of k Gaussians
        self.mu = [  X[row_index,:] for row_index in random_row ] 

        # initial value of covariance matrix of k Gaussians
        self.sigma = [ np.cov(X.T) for _ in range(self.k) ] 
        # theta =(mu1,sigma1,mu2,simga2......muk,sigmak)

    # E-Step: update weights and phi holding mu and sigma constant
    def e_step(self, X):
        # updated weights or P(Xi/Ci=j)
        self.weights = self.predict_proba(X)
        # mean of sum of probability of all data points wrt to one cluster is new updated probability of cluster k or (phi)k
        self.phi = self.weights.mean(axis=0)

    # M-Step: update meu and sigma holding phi and weights constant
    def m_step(self, X):
        for i in range(self.k):
            weight = self.weights[:, [i]]
            total_weight = weight.sum()

            self.mu[i] = (X * weight).sum(axis=0) / total_weight
            self.sigma[i] = np.cov(X.T,aweights=(weight/total_weight).flatten(), bias=True)

    # responsible for clustering the data points correctly
    def fit(self, X):
        # initialise parameters like weights, phi, meu, sigma of all Gaussians in dataset X
        self.initialize(X)
        plt.figure(figsize=(16, 25))
        for iteration in range(self.max_iter):
            # iterate to update the value of P(Xi/Ci=j) and (phi)k
            self.e_step(X)
            # iterate to update the value of meu and sigma as the clusters shift
            self.m_step(X)
        return(self.mu, self.sigma)

            

    # predicts probability of each data point wrt each cluster
    def predict_proba(self, X):
        # Creates a n*k matrix denoting probability of each point wrt each cluster 
        likelihood = np.zeros( (self.n, self.k) ) 
        for i in range(self.k):
            distribution = multivariate_normal(mean=self.mu[i],cov=self.sigma[i])
            # pdf : probability denisty function
            likelihood[:,i] = distribution.pdf(X) 

        numerator = likelihood * self.phi
        denominator = numerator.sum(axis=1)[:, np.newaxis]
        weights = numerator / denominator
        return weights
    
    # predict function 
    def predict(self, X):
        weights = self.predict_proba(X)
        # datapoint belongs to cluster with maximum probability
        # returns this value
        return np.argmax(weights, axis=1)

In [137]:
data = np.array(Image.open(r"dataset3\testGrayImage_high.jpg").convert("L")).reshape(-1,1)
gm = GMM(4,19)
res = gm.fit(data)

<Figure size 1152x1800 with 0 Axes>

In [138]:
print(res)

([array([248.27679038]), array([108.0850869]), array([183.50250967]), array([12.84229714])], [array(46.95502929), array(1918.91840819), array(696.87867406), array(270.22411158)])


In [241]:
class EM:
    def __init__(self, K, mus, sigmas, weights, tol_level, image_path):
        self.k = K # number of clusters
        self.mus = mus # means k x 1
        self.sigmas = sigmas # variances K x1
        self. weights = weights # wight of each cluster k x1
        self.tol_level = tol_level # the tolarance level
        self.data = np.array(Image.open(image_path).convert("L")).reshape(-1,1) #data points  n x 1
        # self.data = np.array([[1,1,2,3,4,3,5,7,1,2,3,4]])
        # self.data=  np.array(  [[1],
        #                         [1],
        #                         [2],
        #                         [3],
        #                         [4],
        #                         [3],
        #                         [5],
        #                         [7],
        #                         [1],
        #                         [2],
        #                         [3],
        #                         [4]])
        # print(self.data.shape)
        self.probabilities = np.array([0])# the probability for each class for each point N X K (2)
        

    def e_step(self): # prob of each data point for each cluster N X K and the new classes weights
        # we need the weights of the classes, parameters of the classes, data point
        
        temp = np.zeros( (self.data.shape[0], self.k)) # N X K
        for i in range(self.k):
            pd = scipy.stats.norm(self.mus[i],self.sigmas[i]).pdf(self.data)
            temp[:,i] = pd.reshape(-1)
        numerator = temp * self.weights.reshape(-1) # numerator done N X K (1)
        denominator = numerator.sum(axis=1)[:, np.newaxis]
        self.probabilities = numerator / denominator # the probability for each class for each point N X K (2)
        self.probabilities[np.isnan(self.probabilities)] = 0
        
        self.weights = self.probabilities.mean(axis=0).reshape(-1,1)
        
        
        

    def m_step(self):
        for i in range(self.k):
            weight = self.probabilities[:, [i]]
            total_weight = weight.sum()
            a1 = (self.data * weight).sum(axis=0)
            a2 = total_weight
            if a1 == 0 or a2 == 0:
                self.mus[i] = 0
            else:
                self.mus[i] = a1 / a2
        p1 = np.transpose(np.array([ self.data[:,0] for _ in range(0,self.k)]))
        p2 = np.array([ self.mus.reshape(-1) for _ in range(0,self.data.shape[0])]) # n x k
        p2 = np.square(p1 - p2)
        p2 = p2 * self.probabilities
        p2 = np.sum(p2,axis=0).reshape(-1,1)
        total_weight = np.sum(self.probabilities, axis=0).reshape(-1,1)
        p2 = p2 / total_weight
        p2[np.isnan(p2)] = 0
        self.sigmas = p2.reshape(-1,1)
        
    def q_objecive(self,old_mus, old_sigmas,old_weights):
        temp = np.zeros( (self.data.shape[0], self.k)) # N X K
        for i in range(self.k):
            pd = scipy.stats.norm(self.mus[i],self.sigmas[i]).pdf(self.data)
            temp[:,i] = pd.reshape(-1)
        temp1 = np.log(self.probabilities * temp)
        temp1[np.isnan(temp1)] = 0
        for i in range(self.k):
            pd = scipy.stats.norm(old_mus[i],old_sigmas[i]).pdf(self.data)
            temp[:,i] = pd.reshape(-1)
        
        numerator = temp * old_weights.reshape(-1)
        denominator = numerator.sum(axis=1)[:, np.newaxis]
        temp = numerator / denominator # the probability for each class for each point N X K (previous) 
        temp[np.isnan(temp)] = 0
        temp = temp * temp1
        temp = np.sum(temp, axis=1)
        temp = np.sum(temp, axis=0)
        if temp < self.tol_level:
            return True
        # print(temp)
        return False
        
    def cluster(self):
        stop = False
        self.e_step
        self.m_step
        itrs = 0
        while stop == False:
        # for i in range(0,1):
            itrs+=1
            old_mus = np.array(self.mus)
            old_sigs = np.array(self.sigmas)
            old_weights = np.array(self.weights)
            self.e_step()
            self.m_step()
            print(itrs)
            stop = self.q_objecive(old_mus, old_sigs, old_weights)
        return(self.mus, self.sigmas)
    
        


In [242]:
k = 4
phi = np.full(shape=k, fill_value=1/k) 
mu = np.array([[25],
               [90],
               [160],
               [240]])
sigma = np.array([[1],
               [1],
               [1],
               [1]])
e = 0.01

em1 = EM(k, mu, sigma, phi, e, r"dataset3\testGrayImage.jpg")
# em1.test()
results = em1.cluster()

1


In [243]:
print(results)

(array([[  0],
       [ 85],
       [170],
       [255]]), array([[0.],
       [0.],
       [0.],
       [0.]]))
