## Dask

Dask é uma biblioteca em Python para computação paralela e distribuída, projetada para escalar facilmente de um notebook local até clusters de grande porte. Ele permite processar grandes volumes de dados que não cabem na memória RAM, utilizando uma interface familiar ao ecossistema do Python (como pandas, NumPy e scikit-learn).

Principais características do Dask:
✅ Compatível com Pandas e NumPy: Dask oferece dask.dataframe e dask.array, que imitam as APIs de pandas e NumPy, mas funcionam de forma preguiçosa (lazy), processando os dados em partes (chamadas de partições).

🚀 Execução paralela: Usa múltiplos núcleos da CPU para acelerar tarefas pesadas.

🧠 Execução distribuída: Pode ser usado com clusters (como Kubernetes, Yarn, ou com Dask.distributed) para rodar código em várias máquinas.

📦 Integração com machine learning: Funciona bem com scikit-learn e outras bibliotecas para treinar modelos em conjuntos de dados muito grandes.

### Cluster Local
Um cluster local do Dask é um ambiente de execução paralelo criado na sua própria máquina. Ele simula um "mini cluster", mas tudo roda localmente — é uma forma prática de aproveitar todos os núcleos da sua CPU para acelerar tarefas.


In [1]:
from dask.distributed import LocalCluster, Client
from dask.diagnostics import ProgressBar
import dask.dataframe as dd

In [2]:
path = 'C:\\Users\\mateu\\Documents\\MEGA\\Projetos\\HELPS\\MiniBook\\DataAnalise\\dataset\\dados.csv'

## 🔧 `LocalCluster` – Parâmetros principais

```python
from dask.distributed import LocalCluster

cluster = LocalCluster(
    n_workers=4,
    threads_per_worker=2,
    memory_limit='2GB',
    processes=True,
    scheduler_port=0,
    dashboard_address=':8787',
    silence_logs='error',
    local_directory=None,
    name="meu_cluster",
)
```


### 📌 Explicando cada parâmetro:

| Parâmetro               | Tipo        | Descrição |
|-------------------------|-------------|-----------|
| **`n_workers`**         | `int`       | Número de *workers* (processos ou threads independentes). Cada worker executa tarefas em paralelo. |
| **`threads_per_worker`** | `int`      | Número de **threads por worker**. Se for 1, cada worker executa uma tarefa por vez. |
| **`memory_limit`**      | `str` ou `float` | Limite de memória por worker. Pode ser em GB (`'2GB'`), MB (`'512MB'`) ou um número em bytes. |
| **`processes`**         | `bool`      | Define se os workers devem ser **processos** separados (`True`, padrão) ou **threads** (`False`). |
| **`scheduler_port`**    | `int`       | Porta do scheduler. Se `0`, ele escolhe uma automaticamente. |
| **`dashboard_address`** | `str`       | Endereço do dashboard. Ex: `':8787'` (localhost:8787), ou `'127.0.0.1:0'` (porta aleatória). |
| **`silence_logs`**      | `str`       | Nível de log para silenciar mensagens. Pode ser `'error'`, `'warning'`, `'info'`, etc. |
| **`local_directory`**   | `str`       | Caminho local para armazenar arquivos temporários dos workers (como cache). |
| **`name`**              | `str`       | Nome do cluster (aparece no dashboard). |


Criando o cluster local

In [None]:
cluster = LocalCluster(
    n_workers         =6,
    threads_per_worker=2,
    memory_limit='1GB',
    name        ="Cluster Local",
    local_directory="/tmp/dask",
)

client = Client(cluster)

2025-04-05 18:56:51,039 - distributed.dashboard.components.scheduler - ERROR - 'read-_to_string_dtype-753e69c234518b5026fa74097c19801c'
Traceback (most recent call last):
  File "c:\Users\mateu\AppData\Local\Programs\Python\Python312\Lib\site-packages\distributed\utils.py", line 811, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\mateu\AppData\Local\Programs\Python\Python312\Lib\site-packages\distributed\dashboard\components\scheduler.py", line 2630, in update_layout
    x = max(xs[dep] for dep in dependencies[tg]) + 1
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\mateu\AppData\Local\Programs\Python\Python312\Lib\site-packages\distributed\dashboard\components\scheduler.py", line 2630, in <genexpr>
    x = max(xs[dep] for dep in dependencies[tg]) + 1
            ~~^^^^^
KeyError: 'read-_to_string_dtype-753e69c234518b5026fa74097c19801c'
2025-04-05 18:56:51,050 - distributed.dashboard.components.scheduler - ERROR - 'read-_

In [4]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 12,Total memory: 5.59 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:62195,Workers: 6
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 5.59 GiB

0,1
Comm: tcp://127.0.0.1:62269,Total threads: 2
Dashboard: http://127.0.0.1:62278/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:62199,
Local directory: c:\tmp\dask\dask-scratch-space\worker-45l3ir0e,Local directory: c:\tmp\dask\dask-scratch-space\worker-45l3ir0e

0,1
Comm: tcp://127.0.0.1:62281,Total threads: 2
Dashboard: http://127.0.0.1:62292/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:62201,
Local directory: c:\tmp\dask\dask-scratch-space\worker-kocm5hpk,Local directory: c:\tmp\dask\dask-scratch-space\worker-kocm5hpk

0,1
Comm: tcp://127.0.0.1:62271,Total threads: 2
Dashboard: http://127.0.0.1:62283/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:62203,
Local directory: c:\tmp\dask\dask-scratch-space\worker-2f0tto3h,Local directory: c:\tmp\dask\dask-scratch-space\worker-2f0tto3h

0,1
Comm: tcp://127.0.0.1:62270,Total threads: 2
Dashboard: http://127.0.0.1:62280/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:62205,
Local directory: c:\tmp\dask\dask-scratch-space\worker-y_0c6nvp,Local directory: c:\tmp\dask\dask-scratch-space\worker-y_0c6nvp

0,1
Comm: tcp://127.0.0.1:62277,Total threads: 2
Dashboard: http://127.0.0.1:62290/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:62207,
Local directory: c:\tmp\dask\dask-scratch-space\worker-78e1ppk0,Local directory: c:\tmp\dask\dask-scratch-space\worker-78e1ppk0

0,1
Comm: tcp://127.0.0.1:62275,Total threads: 2
Dashboard: http://127.0.0.1:62286/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:62209,
Local directory: c:\tmp\dask\dask-scratch-space\worker-dhmcduiw,Local directory: c:\tmp\dask\dask-scratch-space\worker-dhmcduiw


In [5]:
data = dd.read_csv(path, delimiter=',')

`npartitions` : é definido pelo `n_workers` + `threads_per_worker`

In [6]:
data = data.repartition(npartitions=12)

In [7]:
data.head(2)

Unnamed: 0,id,ano,nome,preço,prime,empresa,desconto,estado
0,9001763654,1973,Evelyn Fogaça,3587,False,nissan,0.05,Alagoas
1,8096557597,2002,Sr. Davi Lucca Rezende,1289,False,nissin,0.0,Pernambuco


In [8]:
with ProgressBar():
    descricao = data.describe().compute()

descricao

Unnamed: 0,id,ano,preço,desconto
count,10000000.0,10000000.0,10000000.0,10000000.0
mean,4999143000.0,1997.131,2999.507,0.04999308
std,2887131000.0,15.95329,1155.006,0.04081858
min,2616.0,1970.0,1000.0,0.0
25%,2510162000.0,1983.0,2002.0,0.0
50%,5006660000.0,1997.0,3004.0,0.05
75%,7507617000.0,2011.0,4004.0,0.1
max,10000000000.0,2025.0,4999.0,0.1


In [9]:
data.groupby(by=['empresa'])['preço'].mean().compute()

empresa
brastemp    2998.400483
nissan      2999.779444
nissin      2999.596394
toyota      3000.252135
Name: preço, dtype: float64

In [10]:
def pow(x, lamb=2):
    return x**lamb

data['preço'].apply(lambda x: x, meta=('preço','int64')).compute()

0          3587
1          1289
2          1343
3          4538
4          2710
           ... 
1000074    3990
1000075    4650
1000076    3471
1000077    1842
1000078    1034
Name: preço, Length: 10000000, dtype: int64