# Gradiente Descendente Paralelo 
#### Estudo de Abordagens de Implementação
---
**Créditos:**
 Adaptação do código original desenvolvido por [Angad Gill](https://github.com/angadgill) e [Sharath Rao](https://github.com/sharathrao13) com base no paper de Zinkevich, M., Weimer, M., Li, L., & Smola, A. (2010). [Parallelized stochastic gradient descent](http://papers.nips.cc/paper/4006-parallelized-stochastic-gradient-descent). 

## Dataset

Dataset Diamonds, blablablab
~52000 observações com 8 características e preços correspondentes de diamantes, para regressão linear.

In [1]:
%matplotlib inline
from sklearn.datasets import make_regression
from sklearn.model_selection import ShuffleSplit
import numpy as np
from matplotlib import pyplot as plt

In [2]:
n_samples = 1000
n_features = 100
seed = 1
effective_rank = 100

In [3]:
n_samples = 1000
n_features = 100
seed = 1
effective_rank = 100

In [4]:
X, y = make_regression(n_samples=n_samples, n_features=n_features, 
                       random_state=seed, noise=0.0, effective_rank=effective_rank)

Preparação dos conjuntos para treinamento e testes:

In [5]:
rs = ShuffleSplit(n_splits=5, test_size=.2, random_state=0)

In [6]:
for train_index, test_index in rs.split(X):
    pass

In [7]:
X_train = X[train_index]
X_test = X[test_index]
y_train = y[train_index]
y_test = y[test_index]

## HogWild!

In [8]:
# import unittest
from hogwildsgd import HogWildRegressor
import scipy.sparse
import numpy as np

In [9]:
n_iter = 5

In [16]:
hw = HogWildRegressor(n_jobs = 1, 
                        n_epochs = n_iter,
                        batch_size = 1, 
                        chunk_size = 32,
                        learning_rate = .001,
                        generator=None,
                        verbose=2)



In [17]:
%time hw = hw.fit(X_train, y_train)

Epoch: 0


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.1s remaining:    0.0s
[Parallel(n_jobs=1)]: Done  25 out of  25 | elapsed:    0.3s finished
[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done  25 out of  25 | elapsed:    0.2s finished
[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s


Epoch: 1
Epoch: 2


[Parallel(n_jobs=1)]: Done  25 out of  25 | elapsed:    0.2s finished
[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done  25 out of  25 | elapsed:    0.2s finished
[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s


Epoch: 3
Epoch: 4


[Parallel(n_jobs=1)]: Done  25 out of  25 | elapsed:    0.2s finished


ValueError: cannot reshape array of size 100 into shape (10,1)

In [None]:
y_hat = hw.predict(X)

In [None]:
y = y.reshape((len(y),))

In [None]:
score = np.mean(abs(y-y_hat))

## 
## SGD "Normal" (baseline)

Treinameno do modelo com coleta da taxa de acurácia a cada iteração:

In [None]:
from sklearn.linear_model import SGDRegressor

In [None]:
# Número máximo de iterações:
n_iter = 5000

In [None]:
# Modelo dde regressão do SciKit-Learn
sgd = SGDRegressor(max_iter = n_iter, tol = 0.001)

In [None]:
# Treinamento do modelo e estatísticas
%time sgd.fit(X_train, y_train)

In [None]:
# Acurácia 
sgd.score(X_test, y_test)

In [None]:
# Availiação
scores_base = []
sgd = SGDRegressor(max_iter=1, tol=0.001, warm_start=True)
for i in range(n_iter):
    sgd.partial_fit(X_train, y_train)
    scores_base += [sgd.score(X_test, y_test)]

In [None]:
plt.plot(range(len(scores_base)), scores_base)

In [None]:
# acurácia final, obtida sobre o conjunto de testes:
scores_base[-1]


# SGD Paralelo
Comparação de diferentes abordagens de paralelização

In [None]:
from joblib import Parallel, delayed
import utils
import importlib
importlib.reload(utils)

### Cenário 1:
Neste cenário o dataset é dividido em partes iguais e cada uma é atribuída a um worker. Os pesos (parâmetros) finais são calculados uma única vez quando todos terminam o processamento.

In [None]:
# Quantidade de workers
n_jobs = 4

# Treinamento
%time scores = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, split_per_job=1/n_jobs, overlap=False)

In [None]:
utils.plot_scores(scores)

### Cenário 2:
Neste cenário, todos os workers tem acesso a todo o conjunto de dados. Os pesos são finais são calculados uma única vez quando todos terminam o processamento.

4 workers:

In [None]:
n_jobs = 4
%time scores_4 = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, split_per_job=1, overlap=True, verbose=True)

In [None]:
utils.plot_scores(scores_4)

20 workers:

In [None]:
n_jobs = 20
%time scores_20 = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, split_per_job=1, overlap=True, verbose=True)

In [None]:
utils.plot_scores(scores_20)

50 workers:

In [None]:
n_jobs = 50
%time scores_50 = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, split_per_job=1, overlap=True, verbose=True)

In [None]:
utils.plot_scores(scores_50)

100 workers:

In [None]:
n_jobs = 100
%time scores_100 = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, split_per_job=1, overlap=True, verbose=True)

In [None]:
utils.plot_scores(scores_100)

In [None]:
plt.figure(1)
for s in [scores_4, scores_20, scores_50, scores_100]:
    s = np.array(s).T
    plt.plot(range(len(s[-1])), s[-1], '--')

### Cenário 3:
Neste cenário uma porcentagem dos dados é distribuída a cada worker. O mesmo exemplo pode ser acessado por mais de um deles (overlap). A sincronização é feita apenas ao final.

50% dos dados para cada worker:

In [None]:
n_jobs = 4
%time scores = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, split_per_job=0.5, overlap=True)

In [None]:
utils.plot_scores(scores)

25% dos dados para cada worker:

In [None]:
n_jobs = 4
%time scores = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, split_per_job=0.25, overlap=True)

In [None]:
utils.plot_scores(scores)

### Cenário 4:
No cenário 4, todos os workers processam todo o conjunto de dados, mas a sincronização dos resultados é feita não apenas ao final, mas também durante o processamento.

Com 2 sincronizações:

In [None]:
importlib.reload(utils)

In [None]:
n_jobs = 4
n_sync = 2
%time scores_2_sync = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, n_sync=n_sync, split_per_job=1, overlap=True, verbose=True)

In [None]:
utils.plot_scores(scores_2_sync)

Com 4 sincronizações:

In [None]:
n_jobs = 4
n_sync = 4
%time scores_4_sync = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, n_sync=n_sync, split_per_job=1, overlap=True, verbose=True)

In [None]:
utils.plot_scores(scores_4_sync)

Com 5 sicncronizações:

In [None]:
n_jobs = 4
n_sync = 5
%time scores_5_sync = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, n_sync=n_sync, split_per_job=1, overlap=True, verbose=True)

In [None]:
utils.plot_scores(scores_5_sync)

Com 10 sincronizações:

In [None]:
n_jobs = 4
n_sync = 10
%time scores_10_sync = utils.sim_parallel_sgd(X_train, y_train, X_test, y_test, n_iter, n_jobs, n_sync=n_sync, split_per_job=1, overlap=True, verbose=True)

In [None]:
utils.plot_scores(scores_10_sync)

Comparativo dos testes:

In [None]:
plt.figure(1)
plt.plot(range(len(scores_base)), scores_base)
for s in [scores_2_sync, scores_4_sync, scores_5_sync, scores_10_sync]:
    s = np.array(s).T
    plt.plot(range(len(s[-1])), s[-1], '--')