## GMM Classification from scratch on non linear separable data CASE#1

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.mixture import GaussianMixture
import numpy as np
from sklearn.preprocessing import StandardScaler

#file input

x1,y1,x2,y2 = [],[],[],[]

with open(r"data_1\Class1.txt") as df :
    for line in df:
        val = line.split(",")
        x1.append(float(val[0]))
        y1.append(float(val[1]))
print(len(x1),len(y1))

with open(r"data_1\Class2.txt") as df :
    for line in df:
        val2 = line.split(",")
        x2.append(float(val2[0]))
        y2.append(float(val2[1]))
print(len(x2),len(y2))

#dataframe construction

Actual_classes =  [1]*len(x1) + [2]*len(x2)
X = x1 + x2
Y = y1 + y2
df = pd.DataFrame()
df['X'] = X
df['Y'] = Y

# to understand the data

# plt.scatter(df['X'],df['Y'],c = Actual_classes)
# plt.show()
def standardisation(dataframe): 
    return StandardScsaler().fit_transform(dataframe.values)


#List of lists of classes 1 and 2 with x and y as inputs
def matrix_construct(X,Y):
    d1 = [[],[]]
    d2 = [[],[]]
    for i in range(len(Y)):
        Xrow = list(X.iloc[i])
        if Y[i] == 1:
            d1[0].append(Xrow[0])
            d1[1].append(Xrow[1])
        elif Y[i] == 2:
            Xrow =list(X.iloc[i]) 
            d2[0].append(Xrow[0])
            d2[1].append(Xrow[1])
    return d1,d2

class GMM:
    def __init__(self, n_components, max_iter = 100):
        self.q_value = n_components
        self.max_iter = max_iter
        self.pi = [1/self.q_value for _ in range(self.q_value)]
        self.class_tag = [index+1 for  index in range(self.q_value)]
        
    def multivariate_normal(self, X, mean_vector, covariance_matrix):
        return ((2*np.pi)**(-len(X)/2))*(np.linalg.det(covariance_matrix)**(-1/2))*np.exp(-(np.dot(np.dot((X-mean_vector).T, np.linalg.inv(covariance_matrix)), (X-mean_vector)))/2)
    
    def fit(self, X):
        SplitData = np.array_split(X, self.q_value)
        
        # Initial calculation of the mean-vector and covarience matrix
        
        self.mean_vector = [np.mean(x, axis=0) for x in SplitData]
        self.covariance_matrices = [np.cov(x.T) for x in SplitData]
        
        del SplitData
        
        for t in range(self.max_iter):
            #E- Step
            
            self.responsibility = np.zeros((len(X), self.q_value))
            
            for n in range(len(X)):
                for k in range(self.q_value):
                    num =  self.pi[k] * self.multivariate_normal(X[n], self.mean_vector[k], self.covariance_matrices[k])
                    den =  sum([self.pi[j]*self.multivariate_normal(X[n], self.mean_vector[j], self.covariance_matrices[j]) for j in range(self.q_value)])
                    self.responsibility[n][k] = num/den
           
            # Calculating the N
            N = np.sum(self.responsibility, axis=0)
            
            #M-Step
            
            # initialising the mean vector as a zero vector
            self.mean_vector = np.zeros((self.q_value, len(X[0])))
            
            # Updating the mean vector
            for k in range(self.q_value):
                for n in range(len(X)):
                    self.mean_vector[k] += self.responsibility[n][k] * X[n]
            self.mean_vector = [1/N[k]*self.mean_vector[k] for k in range(self.q_value)]
            
            # intialising the list of the covariance matrices
            self.covariance_matrices = [np.zeros((len(X[0]), len(X[0]))) for k in range(self.q_value)]
            
            # Updating the covariance matrices
            for k in range(self.q_value):
                self.covariance_matrices[k] = np.cov(X.T, aweights=(self.responsibility[:, k]), ddof=0)
                
            self.covariance_matrices = [1/N[k]*self.covariance_matrices[k] for k in range(self.q_value)]
            
            # Updating the pi list
            self.pi = [N[k]/len(X) for k in range(self.q_value)]
    
    def predict(self, X):
        
        pdf_values = []
        for n in range(len(X)):
            pdf_values.append([self.multivariate_normal(X[n], self.mean_vector[k], self.covariance_matrices[k])
                           for k in range(self.q_value)])
        Cluster = []
        for pdf_value in pdf_values:
            Cluster.append(self.class_tag[pdf_value.index(max(pdf_value))])
        return Cluster

def accuracy(val1,val2):
    ans = len(val2)
    for i in range(len(val1)):
        if val1[i] != val2[i]:
            ans -= 1
    return (ans/len(val2))*100

    

#main function
    
X_train, X_test, Y_train, Y_test = train_test_split(df, Actual_classes, test_size=0.3, random_state=42)

# mdel = GMM(2)
# mdel.fit(X_train.values[:])
# scores = mdel.predict(X_test.values[:])B
# # print(scores)
# # print(Y_test[:])
# plt.scatter(X_test['X'][:], X_test['Y'][:], c = Y_test[:])
# plt.show()
# # print(accuracy(scores,Y_test))
# # #the actual test data 
# plt.scatter(X_test['X'][:], X_test['Y'][:], c = scores)
# plt.show()

model1 = GMM(2)
model1.fit(X_train.values[:])
scores = model1.predict(X_train.values[:])
# print(scores)
# print(Y_test[:])
plt.scatter(X_train['X'][:], X_train['Y'][:], c = Y_train[:])
plt.show()
# #the actual test data 
plt.scatter(X_train['X'][:], X_train['Y'][:], c = scores)
plt.show()

# print("accuracy percentage", round(accuracy(scores,Y_test),3))


2000 2000
2000 2000


