# FIT5201 Assessment 4: Scalable Machine Learning

**Mark Joseph Jose**
<br/>**28066049**

## Objectives
This assignment consists of two parts and covers Module 6-scalable machine learning. Total marks for this assessment is 100.

## Part A. Soft EM for GMM with Map-Reduce
In this part, you implement a Map-Reduce version of soft EM for GMM in Spark (using Python 2).

### Question 1 [EM for GMM with Map-Reduce, 50 Marks]
1. Load __Task4A.csv__ file,
2. Implement soft Expectation Maximisation for Gaussian Mixture Model with Map-Reduce, as discussed in Chapter 5 of Module 6. Submit your well-documented Python code (in IPython Notebook format).
3. Set the number of clusters to 3 and run your implementation on the provided data. In a table, report the learnt parameters. Each row of the table belongs to one cluster. Please order the rows based on the cluster size (the smallest at the first row).

In [1]:
from __future__ import print_function
import numpy as np
from pyspark import SparkContext
from operator import add # for adding in reduce and reduceByKey 
import matplotlib.pyplot as plt
import pandas as pd
from scipy.stats import multivariate_normal as mvnorm
from scipy.misc import logsumexp

In [2]:
!head ./Task4A.csv

"x1","x2","x3","x4"
5.1,3.5,1.4,0.2
4.9,3,1.4,0.2
4.7,3.2,1.3,0.2
4.6,3.1,1.5,0.2
5,3.6,1.4,0.2
5.4,3.9,1.7,0.4
4.6,3.4,1.4,0.3
5,3.4,1.5,0.2
4.4,2.9,1.4,0.2


In [3]:
# terminate spark object in background
sc.stop()
sc = SparkContext(appName="EM")
# initiate Spark Context object
file = "./Task4A.csv"
lines = sc.textFile(file)
# remove header
header = lines.first()
lines = lines.filter(lambda line:line!=header)
lines.count()

150

## The Map-Reduce Implentation for Soft Expectation Maximization Algorithm:

## Algorithm:

1. Initialize the GMM parameters (cluster means and cluster covariances)
    - K cluster means will take random uniform numbers from 0 to 1 multiplied by 15
    - K cluster covariance matrices will take the identity matrix of D rows and K columns where D is the number of features
2. While log likelihood is not less than threshold (i.e. convergence is not met):
    - Perform Expectation by using **MAP**. Map each observation to each cluster posterior probability
    - Perform Maximisation by:
        - using **REDUCE** to calculate each cluster prior probabilities
        - using **MAP-REDUCE** to calculate each cluster means
        - using **MAP-REDUCE** to calculate each cluster covariance matrices
    - Calculate the objective (log likelihood) function

In [4]:
########################### AUXILIARY FUNCTIONS ################################
##
##    Note: This implementation was based on Johann Faouzi & Hicham Janati's
##          Implementation of the Gaussian Mixture Models' EM Algorithm
##
##    Source: https://github.com/hichamjanati/EM-Spark
##
################################################################################

# the log-likelihood function for convergence
def objective(rdd, pis, mus, sigmas):
    error = rdd.map(lambda x: logsumexp(np.array([density(x, k, pis, mus, sigmas) for k in range(K)]))).reduce(lambda x,y: x+y)
    return error

# the gaussian density function
def density(x, k, pis, mus, sigmas):
    return np.log(pis[k]) + mvnorm.logpdf(x[0],mean=mus[k],cov=sigmas[k],allow_singular=True)

# the expectation step 
def expectation(x):
    log_gammas = np.array([density(x, k, pis, mus, sigmas) for k in range(K)])
    log_gammas = log_gammas - logsumexp(log_gammas)
    return [x[0], np.exp(log_gammas)]

# the maximization step
def maximization(rdd):
    # Calculate n_ks via reduce
    # reduce by summing all the posterior probabilities per cluster k
    # x[1][k] is the posterior probability for cluster k
    sizes = np.array(rdd.reduce(lambda x1,x2: [0, [x1[1][k]+x2[1][k] for k in range(K)]])[1])
    sizes = np.clip(sizes,1e-10,np.max(sizes))
    pis = sizes / N

    # Calculate means via map-reduce
    # map each cluster posterior probability to each observation
    # note:
    # x[0] is the observation
    # x[1][k] is the posterior probability for cluster k
    mapped_means = rdd.map(lambda x: x + [[x[0] * x[1][k] for k in range(K)]])
    # reduce by summing all the mapped posterior probability * observation
    # x[2][k] is the mapped posterior * observation for cluster k
    reduced_means = mapped_means.reduce(lambda x1,x2: [0,0,[x1[2][k] + x2[2][k] for k in range(K)]])[2]
    mus = [reduced_means[k]/sizes[k] for k in range(K)]

    # Calculate sigmas via map-reduce
    # map each cluster posterior probability to each observation and cluster mus
    # x[0] is the observation
    # mus[k] is the gaussian mean for cluster k
    # x[1][k] is the posterior probability
    # x[3][k] is the mapped posterior * (obs-means) * transpose(obs-means) for cluster k
    mapped_sigmas = mapped_means.map(lambda x: x + [[x[1][k] * np.outer(x[0]-mus[k],x[0]-mus[k]) for k in range(K)]])
    reduced_sigmas = mapped_sigmas.reduce(lambda x1,x2: [0,0,0,[x1[3][k] + x2[3][k] for k in range(K)]])[3]
    sigmas = [reduced_sigmas[k]/sizes[k] for k in range(K)]
    
    return [pis, mus, sigmas]

In [5]:
# convert each line of the text data file into a NumPy Array of float numbers
data = lines.map(lambda line: np.array([float(l) for l in line.split(',')]))
data.count()

# Initialisation
K = 3   # number of clusters
D = data.first().size # number of dimensions
N = lines.count() # number of observations

print(K, D, N)

# Initialization of parameters
data_list = data.map(lambda x: [x])

# initiate prior probabilities of cluster k
pis = np.ones(K)/K

np.random.seed(1)
# initiate cluster means
mus = np.array([np.random.uniform(0, 1, D) * 15 for i in range(K)])
# initiate cluster covariance matrices
sigmas = np.concatenate([np.identity(D) for i in range(K)], axis = 0).reshape(K,D,D)

# DEBUG
# print(mus)
# print(sigmas)

max_iter = 100 # maximum iteration times
threshold = .15  # threshold of convergence
 
i = 1
old_obj = new_obj = 1000
while (i < max_iter or abs(old_obj - new_obj) > threshold):
    
    # DEBUG
    # print("Iteration ", i)
    
    # save the old log likelihood function 
    old_obj = new_obj
    
    # E step - calculate the posterior probability of x belonging to cluster k
    gammas = data_list.map(expectation)
    
    # M step - update the sufficient parameters (pis - prior probability, mus - cluster means, sigmas - cluster covariances) 
    pis, mus, sigmas = maximization(gammas)    
    
    # DEBUG
    # print("\nPis:")
    # print(pis)
    # print("\nMus:")
    # print(mus)
    
    # compute the new log likelihood function
    new_obj = objective(data_list, pis, mus, sigmas)
    i = i + 1
    
print("Done!")
pd.set_option('max_colwidth',150)
d = {'priors': np.round(pis, 3), 'means': [np.round(x, 3) for x in mus], 'covariances': [np.round(x, 2) for x in sigmas]}
df = pd.DataFrame(data=d)
df = df[['priors', 'means', 'covariances']]
df

3 4 150
Done!


Unnamed: 0,priors,means,covariances
0,0.299,"[5.915, 2.778, 4.202, 1.297]","[[0.28, 0.1, 0.18, 0.05], [0.1, 0.09, 0.09, 0.04], [0.18, 0.09, 0.2, 0.06], [0.05, 0.04, 0.06, 0.03]]"
1,0.333,"[5.006, 3.428, 1.462, 0.246]","[[0.12, 0.1, 0.02, 0.01], [0.1, 0.14, 0.01, 0.01], [0.02, 0.01, 0.03, 0.01], [0.01, 0.01, 0.01, 0.01]]"
2,0.367,"[6.545, 2.949, 5.48, 1.985]","[[0.39, 0.09, 0.3, 0.06], [0.09, 0.11, 0.08, 0.06], [0.3, 0.08, 0.33, 0.07], [0.06, 0.06, 0.07, 0.09]]"


## Verify the GMMs produced by calling pyspark's GaussianMixture.train() method

The library `pyspark` has an in-house method that performs the EM algorithm using Spark via the `GaussianMixture.train()` method. We can use this to verify the resulting gaussian mixture models from the algorithm above.

In [6]:
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel

# Build the model using a simple function call
gmm = GaussianMixture.train(data, K)

for i in range(K):
    print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
          "sigma = ", gmm.gaussians[i].sigma.toArray())

weight =  0.340755948329 mu =  [6.24406944519,2.9711637503,5.06509921261,1.8887674108] sigma =  [[ 0.28487553  0.07050134  0.22350543  0.12127464]
 [ 0.07050134  0.06589001  0.06882737  0.04711748]
 [ 0.22350543  0.06882737  0.32373693  0.19536735]
 [ 0.12127464  0.04711748  0.19536735  0.15231708]]
weight =  0.326050449953 mu =  [6.27998510364,2.76812086244,4.73818033973,1.45304713707] sigma =  [[ 0.59228716  0.17774862  0.69283908  0.22065863]
 [ 0.17774862  0.13433944  0.18407464  0.06787678]
 [ 0.69283908  0.18407464  0.99228468  0.30960916]
 [ 0.22065863  0.06787678  0.30960916  0.10983144]]
weight =  0.333193601718 mu =  [5.00621125606,3.42847087502,1.4620673392,0.245976903003] sigma =  [[ 0.12170809  0.09703441  0.01600047  0.01013964]
 [ 0.09703441  0.14034373  0.0113925   0.00914124]
 [ 0.01600047  0.0113925   0.02955738  0.00595405]
 [ 0.01013964  0.00914124  0.00595405  0.01088716]]


Since the resulting gaussians with priors (0.34, 0.33, 0.33) is quite close to the earlier  result (0.30, 0.33, 0.37) then we can say that the algorithm implemented for task 1 is an accurate implementation of EM using Spark map-reduce.

## Part B. Document Clustering with Map-Reduce
In this part, you are asked to write down the steps of Map-Reduce for Document Clustering. In particular, you should specify the parameters to be learnt, as well as the Map and Reduce steps that one should take to implement a distributed document clustering algorithm.

__Note:__ __You do not need to implement your document clustering algorithm.__ Providing the algorithm and high-level explanation (using mathematical convention) in your report is sufficient. Indeed, we ask you to provide the steps of such algorithm (Map-Reduce for document clustering) similar to what we have provided for EM for GMMs using Map-Reduce in Chapter 5 of Module 6.

### Question 2 [Document Clustering with Map-Reduce, 50 Marks]
1. Assume we want to distribute EM for Document Clustering (described in Chapter 4 of Module 4) using Map-Reduce technique in Spark. Answer the following questions:
   1. Which quantities should be computed in each mapper?
   2. How the reducer updates the quantities that are sent by themappers? 
   3. Write down the steps of document clustering using Map-Reduce.
   
## Expectation-Maximisation Steps

<u>Initialisation:</u>

### 1) Initialise the parameters 
$$
\theta^{old}=\ (\varphi^{old},\ \mu_1,\ \mu_2,...,\mu_k)
$$

Where 
<br/>all elements in $\varphi^{old}$ are the **cluster prior probabilities** 1/K where K = number of clusters, and 
<br/> µ are the **cluster means** and a matrix of random uniform numbers whose row sums up to 1.

### WHILE CONVERGENCE IS NOT MET: 

<u>Expectation:</u>
 
### 2) For each document n, find the log posterior probability $ln(\gamma\left(z_{n,k}\right))$ per cluster k

**MAP** : Map each document *n* to cluster *k* by computing its log posterior probability $\gamma\left(z_{n,k}\right)$ for cluster k

$$
\ln{\gamma\left(z_{n,k}\right)}=\Sigma_{n=1}^N\Sigma_{k=1}^K\left(\ln{\varphi_k+\ \Sigma_{w\in\mathcal{A}}c\left(w,n\right)ln}\mu_{k,w}\right)
$$

Where 
<br/>$\gamma\left(z_{n,k}\right)$ is the posterior probability of document n belonging to cluster k, 
<br/>$\varphi_k$ is the prior probability or the mixing components of cluster k, 
<br/>$c\left(w,n\right)$  is the frequency of the word w in document n and 
<br/>$\mu_{k,w}$ is the word proportion of the word w for each cluster k.

<u>Maximisation:</u>

### 3) For each cluster k, calculate the following:

<u>** THE PRIOR PROBABILITIES $\varphi_k$**</u>

**REDUCE**: reduce each prior probability by summing the current posterior probabilities per cluster

$$
\varphi_k = \frac{\Sigma_{m=1}^M\sum_{n=1}^{N}\gamma\left(z_{n,k}\right)}{N}
$$

for all machines m = 1 ... M, documents n = 1 ... N, and clusters k = 1 ... K

<u>** THE CLUSTER MEANS $\mu_k$ **</u>

For the cluster means, we need a map-reduce operation for the mean numerator and another map-reduce operation for the mean denominator.

### CLUSTER MEANS' NUMERATOR 
**MAP**: map each word *w* to cluster *k* by mapping the posterior probability of cluster k, $\gamma(z_{n,k})$  to each occurence of word *w* in the documents $c(w,n)$

$$
\tilde{\mu}_{k,w} = \tilde{\mu}_{k,w} + \gamma(z_{n,k}) * c(w, n)
$$

for all occurences of word *w* in cluster *k*

**REDUCE**: sum all the mapped posterior probabilities of cluster *k* to the word *w* $\gamma(z_{n,k}) * c(w, n)$

$$
\mu_{num} = \Sigma_{m=1}^M\tilde{\mu}_{k,w}
$$

for all machines m = 1 ... M

### CLUSTER MEANS' DENOMINATOR

**MAP**: map <u>all</u> words *w'* to each cluster *k* by mapping each cluster's posterior probability to <u>all</u> words in the Dictionary $w'\ \in\mathcal{A}$

$$
\tilde{\mu}_{k, w'} = \tilde{\mu}_{k, w'} + \gamma(z_{n,k})* c(w',n)
$$

for all w' in the dictionary

**REDUCE**: sum all the mapped posterior probabilities and words w' in cluster k 

$$
\mu_{denom} = \Sigma_{m=1}^M\tilde{\mu}_{k, w'}
$$

for all machines m = 1 ... M

Then, to get the cluster means of word *w* $\mu_{k,w}$, divide the map-reduced $\mu_\text{num}$ by $\mu_\text{denom}$

$$
\mu_{k,w} = \frac{\mu_\text{num}}{\mu_\text{denom}}
$$