In [1]:
import numpy as np
import random
from scipy.stats import multivariate_normal
from sklearn.datasets import make_spd_matrix
from sklearn.mixture import GaussianMixture

In [2]:
file1 = open("points.dat.txt")
lines = file1.readlines()
x = np.array([float(line.split()[0]) for line in lines])
y = np.array([float(line.split()[1]) for line in lines])
X = np.vstack((x,y)).T
print("Load data from points.dat.txt...")
X

Load data from points.dat.txt...


array([[-6.3670924 , -0.35240556],
       [-0.08979119,  1.5166165 ],
       [ 0.22860334, -3.4007959 ],
       ...,
       [-2.5545522 , -4.7181746 ],
       [-0.20151563,  2.9328022 ],
       [-0.36624411, -3.0872141 ]])

In [3]:
class GMM:
    def __init__(self, data, k, nIter) -> None:
        # provided data
        self.X = data               # the points as n x f numpy matrix
        self.n = data.shape[0]      # num of training points
        self.f = data.shape[1]      # num of features
        self.k = k                  # num of gaussians 
        self.nIter = nIter          # iterations to run

        # initialize pi randomly - generate k random numbers then normalize for sum = 1
        self.pi = [random.random() for _ in range(self.k)]
        self.pi = np.array([r / sum(self.pi) for r in self.pi])
        # self.pi = self.pi

        # initialize sig randomly - cov = t(A) * A
        self.sig = make_spd_matrix(self.f)
        self.sig = [self.sig.tolist()]
            # had to convert to list and back because numpy is trash
        for i in range(1, self.k):
            temp = make_spd_matrix(self.f)
            temp = temp.tolist()
            self.sig.append(temp)

            # transpose at end to meet homework guidelines 
        self.sig = np.array(self.sig)
        self.sig = self.sig.T

        # initialize mu randomly
        self.mu = np.random.rand(self.k, self.f)

    # returns n x k matrix with gaussian membership weights for each point
    def Expectation(self):
        #       gaussian 1, gaussian 2, ....
        # pnt1:  w11          w12       ....
        # pnt2:  w21          w22       ....
        # pnt3:  w31          w32       ....
        #  .      .            .        ....
        
        expectation = []
        # loop through gaussians
        for i in range(self.k):
            dist_i = multivariate_normal(cov=self.sig[:,:,i], mean=self.mu[i])
            gauss_i = dist_i.pdf(self.X)    # N() for each mu_i and cov_i
            expectation.append(self.pi[i] * gauss_i)       # pi_i * N()
        
        expectation = np.array(expectation).T

        sums = np.array([sum(expectation[i]) for i in range(expectation.shape[0])])
        inv_sums = 1/sums

        return (expectation.T * inv_sums).T
    
    def MaximizeMean(self, r):
        N_k = sum(r)
        # multiple X by Rnk for each k
        # divide by sum(Rnk) for all n in each k
        return np.array([(sum((self.X.T * r[:,i]).T)/N_k[i]).tolist() for i in range(self.k)])
    
    def MaximizeCovariance(self, r):
        N_k = sum(r)
        # loop through all n training examples
        temp_sig = []
        for i in range(self.k):
            temp_sig_i = 0
            for j in range(self.n):
                temp = self.X[j] - self.mu[i]
                temp_sig_i += np.dot(temp.reshape(2,1), temp.reshape(1,2)) * r[j,i]
            temp_sig_i = temp_sig_i/N_k[i]
            temp_sig.append(temp_sig_i.tolist())
        return np.array(temp_sig).T
    
    def MaximizeMixtures(self,r):
        return sum(r)/self.X.shape[0]

    def EM(self):
        print("Randomly initialized...")
        print("Mu: \n", self.mu)
        print("Sig: \n", self.sig)
        print('Pi: \n', self.pi)

        for i in range(self.nIter):
            r = self.Expectation()
            mu_temp = self.MaximizeMean(r)
            sig_temp = self.MaximizeCovariance(r)
            pi_temp = self.MaximizeMixtures(r)
            self.mu = mu_temp
            self.sig = sig_temp
            self.pi = pi_temp
        
        print("\nAfter all iterations...")
        print("Mu: \n", self.mu)
        print("Sig: \n", self.sig)
        print('Pi: \n', self.pi)

In [4]:
gmm = GMM(X, 3, 500)
gmm.EM()

Randomly initialized...
Mu: 
 [[0.67863661 0.39765557]
 [0.75315171 0.45592   ]
 [0.86917405 0.8220177 ]]
Sig: 
 [[[ 1.88588102  2.58222575  2.08695213]
  [ 0.76753777  0.30631524 -0.66683713]]

 [[ 0.76753777  0.30631524 -0.66683713]
  [ 0.58082929  0.45185375  0.56892128]]]
Pi: 
 [0.27401242 0.44711084 0.27887673]

After all iterations...
Mu: 
 [[-0.26471486  1.71363052]
 [-2.15200362 -2.26427454]
 [ 4.20230106 -1.66352455]]
Sig: 
 [[[ 1.45993918  8.00462651  0.78897359]
  [ 0.4572851  -2.91724599  0.02863309]]

 [[ 0.4572851  -2.91724599  0.02863309]
  [ 1.16340632  3.72368701  2.3848341 ]]]
Pi: 
 [0.40958024 0.50251419 0.08790557]


In [5]:
gm = GaussianMixture(n_components=3, random_state=0).fit(X)
print("Means: \n", gm.means_)
print("Cov: \n", gm.covariances_)
print("Weight: \n", gm.weights_)

Means: 
 [[-0.03695683  1.92055016]
 [-4.1037477  -0.061991  ]
 [ 0.20533581 -2.62807614]]
Cov: 
 [[[0.99702042 0.22925469]
  [0.22925469 1.09244853]]

 [[5.03384384 2.1148836 ]
  [2.1148836  1.06949241]]

 [[7.58526272 0.09953442]
  [0.09953442 3.62490655]]]
Weight: 
 [0.32582258 0.21972655 0.45445088]
