# Concorrência Distribuída

# Local

A concorrência distribuída refere-se à execução de tarefas em paralelo em várias máquinas ou nós em uma rede. Uma das bibliotecas Python mais populares para concorrência distribuída é o `Dask`. O `Dask` permite que você paralelize cálculos usando APIs familiares do Python e distribua esses cálculos em clusters.

Vamos criar um exemplo simples usando `Dask` para demonstrar a concorrência distribuída no contexto de MLOps. Neste exemplo, vamos treinar vários modelos de regressão linear em diferentes subconjuntos de dados distribuídos em um cluster `Dask`.

Primeiro, você precisará instalar o `Dask` e o `dask-ml`:

```python
!pip install dask[complete] dask-ml
```

Agora, vamos ao código:

```python
import dask.array as da
from dask.distributed import Client, LocalCluster
from dask_ml.linear_model import LinearRegression
from dask_ml.datasets import make_regression

# Criando um cluster local e um cliente para interagir com ele
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)

# Gerando dados distribuídos
n_samples = 100000
n_features = 10
X, y = make_regression(n_samples=n_samples, n_features=n_features, chunks=n_samples//4)

# Função para treinar um modelo de regressão linear
def train_model(X, y):
    model = LinearRegression()
    model.fit(X, y)
    return model.coef_

# Treinando modelos em paralelo nos diferentes subconjuntos de dados
futures = client.map(train_model, X.to_delayed(), y.to_delayed())
results = client.gather(futures)

# Mostrando os coeficientes dos modelos treinados
for i, coef in enumerate(results):
    print(f"Model {i} coefficients: {coef}")

# Fechando o cliente e o cluster
client.close()
cluster.close()
```

Neste exemplo:

- Criamos um cluster `Dask` local com 4 workers.
- Geramos dados distribuídos usando `dask_ml.datasets.make_regression`.
- Treinamos modelos de regressão linear em paralelo nos diferentes subconjuntos de dados usando `client.map`.
- Coletamos e exibimos os resultados.

Este é um exemplo básico de concorrência distribuída usando `Dask`. Em cenários reais de MLOps, você pode ter um cluster `Dask` distribuído em várias máquinas ou na nuvem, permitindo treinar modelos ou processar dados em grande escala de forma distribuída e paralela.

# Em Rede

Para usar o `Dask` em um cluster distribuído com workers em diferentes máquinas, você precisa configurar um `Dask Scheduler` em uma das máquinas e `Dask Workers` nas outras. Aqui está um guia passo a passo:

### 1. Instalação:

Em todas as máquinas (incluindo a máquina que atuará como scheduler e as máquinas worker), instale o Dask:

```bash
pip install dask[complete]
```

### 2. Iniciar o Scheduler:

Escolha uma das máquinas para ser o scheduler. Execute o seguinte comando:

```bash
dask-scheduler
```

Isso iniciará o scheduler e você verá uma saída indicando o endereço do scheduler, algo como `tcp://192.168.1.100:8786`. Anote esse endereço, pois você precisará dele para conectar os workers.

### 3. Iniciar os Workers:

Nas outras máquinas (que você deseja usar como workers), execute o seguinte comando:

```bash
dask-worker tcp://192.168.1.100:8786
```

Substitua `tcp://192.168.1.100:8786` pelo endereço do scheduler que você anotou anteriormente.

### 4. Código Python:

Agora, no seu código Python (que pode estar em qualquer máquina, incluindo uma das máquinas worker ou uma máquina separada), você se conectará ao scheduler:

```python
from dask.distributed import Client
import dask.array as da
from dask_ml.linear_model import LinearRegression
from dask_ml.datasets import make_regression

# Conectando ao scheduler
client = Client('tcp://192.168.1.100:8786')  # Use o endereço do seu scheduler

# Gerando dados distribuídos
n_samples = 100000
n_features = 10
X, y = make_regression(n_samples=n_samples, n_features=n_features, chunks=n_samples//4)

# Função para treinar um modelo de regressão linear
def train_model(X, y):
    model = LinearRegression()
    model.fit(X, y)
    return model.coef_

# Treinando modelos em paralelo nos diferentes subconjuntos de dados
futures = client.map(train_model, X.to_delayed(), y.to_delayed())
results = client.gather(futures)

# Mostrando os coeficientes dos modelos treinados
for i, coef in enumerate(results):
    print(f"Model {i} coefficients: {coef}")

# Fechando o cliente
client.close()
```

Lembre-se de substituir `'tcp://192.168.1.100:8786'` pelo endereço do seu scheduler.

### Notas:

- Certifique-se de que as portas necessárias (por padrão, 8786 para o scheduler e 8787 para o dashboard) estejam abertas e acessíveis entre as máquinas.
- Se você tiver problemas de conectividade, verifique as configurações de firewall e rede.
- Para cenários de produção ou clusters maiores, você pode querer usar algo como o [Dask Kubernetes](https://kubernetes.dask.org/en/latest/) ou [Dask Yarn](https://yarn.dask.org/en/latest/) para gerenciar e escalar seu cluster Dask.