<a href="https://colab.research.google.com/github/mhdelta/distributed-gmm/blob/master/Readme.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Distributed Gaussian Mixture Model
 
This repository provides a distributed adaptation to the Tobias Schlagenhauf [code](https://www.python-course.eu/expectation_maximization_and_gaussian_mixture_models)
for unsupervised clustering algorithm: [**Gaussian Mixture Models**](https://scikit-learn.org/stable/modules/mixture.html)
 
This approach uses the zeroMQ push pull model 
 
![alt text](https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/_images/pushpull.png "Logo Title Text 1")
 
However after leaving the result collector, process will start again with new parameters until model has converged.
The basic idea goes like this: 
1. Random parameters are generated in the producer. $\bf {\mu, \Sigma}$
2. Producer sends pieces of the dataset $\bf{X}$ and the parameters to the consumers.
---
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**E-STEP**
3. Consumers calculate the following ![alt text](https://latex.codecogs.com/gif.latex?r_%7Bic%7D%20%3D%20%5Cfrac%7B%5Cpi_c%20N%28%5Cboldsymbol%7Bx_i%7D%20%5C%20%7C%20%5C%20%5Cboldsymbol%7B%5Cmu_c%7D%2C%5Cboldsymbol%7B%5CSigma_c%7D%29%7D%7B%5CSigma_%7Bk%3D1%7D%5EK%20%5Cpi_k%20N%28%5Cboldsymbol%7Bx_i%20%5C%20%7C%20%5C%20%5Cboldsymbol%7B%5Cmu_k%7D%2C%5Cboldsymbol%7B%5CSigma_k%7D%7D%29%7D)
which is a piece of the vector ric containing
![alt text](https://latex.codecogs.com/gif.latex?%5Cfrac%7BProbability%20%5C%20that%20%5C%20x_i%20%5C%20belongs%20%5C%20to%20%5C%20class%20%5C%20c%7D%7BProbability%20%5C%20of%20%5C%20x_i%20%5C%20over%20%5C%20all%20%5C%20classes%7D)
 
4. Consumers report the ric piece that they calculated to the result collector
---
 
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**M-STEP**
5. Result collector also known as sink,  calculates the new parameters $\bf {\mu, \Sigma}$ and sends them back to the producer so process can start again.
---
 
In escence what **producer/vent.py** should be doing is: 
```python
X = initializeDataSet()
vent.setRandomVariables()
while(not converged):
    vent.receiveParams(vent.sinkSocket.recv_json())
    log_likelihoods.append(np.log(np.sum([k*multivariate_normal(vent.mu[i],vent.cov[j]).pdf(X) for k,i,j in zip(vent.pi,range(len(vent.mu)),range(len(vent.cov)))])))
    if len(log_likelihoods)> 1:
        if (log_likelihoods[-1] - log_likelihoods[-2]) < 0.000000000000000000009:
            converged = True
    vent.sendToMappers() # Sends a piece of X, mu and sigma
vent.plotState()
print(vent.predict(Xtest[0]))
```
---
**consumer/worker.py**:
```python
while True:
    worker = Worker(
        msg['x'], 
        msg['mu'], 
        msg['cov'], 
        )
    ric = worker.Estep()
    worker.sendToSink(ric)
```
---
and last **result collector/sink.py**:
```python
while True:
    msg = workerSocket.recv_json()
    if samples_received == n_samples:
        sink = Sink(X,ric)
        sink.sendToVent(sink.Mstep())
        samples_received = 0
    else:
        samples_received += 1
        if samples_received == 1:
            X = msg['X']
            ric = msg['ric']
        else:
            X = np.concatenate((X, msg['X']))
            ric = np.concatenate((ric,  msg['ric']))
```
 
This model is also complemented with a inner parallelism in each worker using threads, the idea is being able to compute and classify large datasets.
Results are shown with blobs as in the main Tobias article. The reason is output didn't change, just the process of getting them, with the objective of profit from various computers and their processors. 


![ini state](https://github.com/mhdelta/distributed-gmm/blob/master/img/ini_state.png?raw=true)

log likelihood trhought iterations
![Log through iterations](https://github.com/mhdelta/distributed-gmm/blob/master/img/logl.png?raw=true)

![ini state](https://github.com/mhdelta/distributed-gmm/blob/master/img/end_state.png?raw=true)





 


