## Soft EM for GMM with Map-Reduce
Implementation a Map-Reduce version of soft EM for GMM in Spark (using Python 3.6.5).

### 1. Load data_file.csv file

In [1]:
import numpy as np
import pandas as pd
# to compute covariance matrix
from scipy.stats import multivariate_normal as mvn 

#libray pyspark to work with spark
from pyspark import SparkContext 

from __future__ import print_function 
from __future__ import division # interger division vs float division


In [4]:
# stop all spark context then creat new
sc.stop() 
sc = SparkContext(appName = "EMMapReduce") # create a new SparkContext object
#read data
lines = sc.textFile("./data_file.csv") 

!head ./data_file.csv # view first few line of data

"x1","x2","x3","x4"
5.1,3.5,1.4,0.2
4.9,3,1.4,0.2
4.7,3.2,1.3,0.2
4.6,3.1,1.5,0.2
5,3.6,1.4,0.2
5.4,3.9,1.7,0.4
4.6,3.4,1.4,0.3
5,3.4,1.5,0.2
4.4,2.9,1.4,0.2


In [3]:
# remove the header
header = lines.first()
data = lines.filter(lambda row: row != header)

In [4]:
# converet each line of the text data file into a NumPy Array of float numbers
data = data.map(lambda line: np.array([float(l) for l in line.split(',')])).cache()

In [5]:
#get number of data points
N = data.count() # 
# number of dimenstions
D = data.collect()[0].shape[0]

data.collect()[1:5] 

[array([ 4.9,  3. ,  1.4,  0.2]),
 array([ 4.7,  3.2,  1.3,  0.2]),
 array([ 4.6,  3.1,  1.5,  0.2]),
 array([ 5. ,  3.6,  1.4,  0.2])]

### 2. Implement soft Expectation Maximisation for Gaussian Mixture Model with Map-Reduce

In [6]:
# clusterFunc  for each x to calculate all sigma, mu and posterior for that x and K
def clusterFunc(x, K_input, muHat, sigmaHat, NkHat):
    K = K_input
    post_x = list(range(K))
    sigma_hat_x = sigmaHat.copy()
    mu_hat_x = muHat.copy()
    
    tempt_muHat = muHat
    
    #find the posterior of x in cluster k
    for k in range(K):
        post_x[k] = mvn.pdf(
            x, 
            mean = np.squeeze(np.array(muHat[k])),  
            cov=np.reshape(sigmaHat[k], (D,D))) * NkHat[k]
    
    #normalization
    post_x = post_x / np.sum(post_x)
        
    #for each cluster k, calculate the sigma and mu for that x
    for k in range(K):
        sigma_hat_x[k] = np.dot(np.matrix(x).T, post_x[k]*np.matrix(x)).reshape(1,16)
        mu_hat_x[k] = (post_x[k]*np.matrix(x))
    
    return np.matrix(post_x), mu_hat_x, sigma_hat_x

#### Map reduce function

In [7]:
def emMapReduceFunc(K_input):
    ############### set initial##############
    #set number of cluster
    K = K_input 
    # set maximum number of iterations
    max_i = 100 
    
    # condition to stop the loop
    prgrs_min = float(0.001) 
    prgrs = [float('+inf')]

    phiHat = [1/K]*3  # assume all clusters have the same size (we will update this later on)
    
    #initial the number of data points in each cluster
    NkHat = [N/K] * K 

    # randomly select cluster centers from the data set
    muHat = np.matrix(data.takeSample(withReplacement=False, num=K, seed=150))

    # create empty covariance matrices
    sigmaHat = np.matrix(np.zeros((K, D*D))) 
    # initialize the covariance matrix with identity matrix
    for k in range(K):
        sigmaHat[k] = np.identity(D).flatten()
        
    ######################start for EM map reduce#########################
    for i in range(max_i):
        #store the previous cluster centres
        previous_mu_h = muHat.copy() 
        prgrs.append(float('+inf'))

        #map each element x to calculate the sigma, mu and Nk 
        indx_sum_one = data.map(lambda x: (clusterFunc(x, K, muHat, sigmaHat, NkHat)[0], clusterFunc(x, K, muHat, sigmaHat, NkHat)[1], clusterFunc(x, K, muHat, sigmaHat, NkHat)[2] ))
        #reduce, sum all the sigma, mu and nk for all K 
        reduced_results = indx_sum_one.reduce(lambda x1, x2: (x1[0] + x2[0], x1[1] + x2[1],x1[2] + x2[2]))

        #now from the reduced_results, calculate and update NkHat, muHat and sigmaHat
        for k in range(K):
        # print(reduced_results[0][0, k])   
            NkHat[k] = reduced_results[0][0, k]
            muHat[k] = reduced_results[1][k] / NkHat[k]
            sigmaHat[k] = (reduced_results[2][k] / NkHat[k]) - (np.dot(np.matrix(muHat[k]).T, np.matrix(muHat[k]))).flatten()
            phiHat[k] = NkHat[k]/N
            
     # check termination threshold
        prgrs[i] = 0
        # calculate sum of distances between the cureent location of the clusters and their previous locations
        for k in range(K):
            prgrs[i] += np.sum(previous_mu_h[k] - muHat[k])**2
    #   print(prgrs[i])

        if prgrs[i] <= prgrs_min: break
            
    return muHat, NkHat, phiHat, sigmaHat         
            

### Now, we will set the number of clusters to 3 and run with the data file above. 

In [8]:
model = emMapReduceFunc(3)
muHat = model[0]
NkHat = model[1]
phiHat = model[2]
sigmaHat = model[3] 

In [9]:
#get the cluster position
clusterCenters = pd.DataFrame(muHat)
clusterCenters = clusterCenters.round(2)
print("Cluster centres:")
clusterCenters

Cluster centres:


Unnamed: 0,0,1,2,3
0,5.94,2.77,4.26,1.33
1,6.57,2.97,5.53,2.01
2,5.01,3.43,1.46,0.25


In [10]:
# get the size for each cluster
clusterSizes = pd.DataFrame(NkHat)
clusterSizes = clusterSizes.round(2)
print("Number of data points in each cluster:")
clusterSizes

Number of data points in each cluster:


Unnamed: 0,0
0,49.09
1,50.91
2,50.0


In [11]:
clusterProbability = pd.DataFrame(phiHat)
clusterProbability = clusterProbability.round(2)
print("cluster probability:")
clusterProbability

cluster probability:


Unnamed: 0,0
0,0.33
1,0.34
2,0.33


In [12]:
result = pd.concat([clusterCenters, clusterSizes, clusterProbability], axis=1)
result.columns = ['X1mu','X2mu','X2mu','X2mu','Size', "ClusterProbability"]
result.index.name = "K"
result = result.reset_index()
result["K"] += 1

In [13]:
print("Learnt parameters of the model:")
sort_results = result.sort_values(by = ['Size'], ascending = True)
print()
print(sort_results.to_string(index = False))

Learnt parameters of the model:

K  X1mu  X2mu  X2mu  X2mu   Size  ClusterProbability
1  5.94  2.77  4.26  1.33  49.09                0.33
3  5.01  3.43  1.46  0.25  50.00                0.33
2  6.57  2.97  5.53  2.01  50.91                0.34
