# Leitura e particionamento de dados com Dask

Exemplo de leitura de arquivo `csv` e particionamento usando `Dask`, seja no formato `Parquet` ou em `csv`. Usei este exemplo para adequar melhor à [Base dos Dados](https://basedosdados.org)

Originalmente este arquivo foi criado para refatoração dos dados do [Comex Stats](http://comexstat.mdic.gov.br/pt/home)

In [None]:
import pandas as pd
import dask.dataframe as dd

from pathlib import Path

In [None]:
df_dd = dd.read_csv('nome_do_arquivo.csv', sep=";")

In [None]:
df_dd.npartitions

In [None]:
df_dd._meta_nonempty

In [None]:
df_dd._meta.dtypes

## Manipulação do arquivo

In [None]:
# o arquivo utilizado aqui era muito grande, mas era necessário usá-lo. Posteriormente inserirei um arquivo menor e mais útil para este exemplo
rename_ncm = {
    "CO_ANO": "ano",
    "CO_MES": "mes",
    "CO_NCM": "id_ncm",
    "CO_UNID": "id_unidade",
    "CO_PAIS": "id_pais",
    "SG_UF_NCM": "sigla_uf",
    "CO_VIA": "id_via",
    "CO_URF": "id_urf",
    "QT_ESTAT": "quantidade_estatistica",
    "KG_LIQUIDO": "peso_liquido_kg",
    "VL_FOB": "valor_fob_dolar",
    "VL_FRETE": "valor_frete",
    "VL_SEGURO": "valor_seguro",
}

In [None]:
# No Dask o rename não possui a flag inplace
df_dd = df_dd.rename(columns=rename_ncm)

## Parquet

Os arquivos parquet são particionados conforme uma função de nome dos mesmos. Estes precisam ter nomes únicos. Importante ressaltar que dentro das partições manuais o Dask particionará conforme sua configuração. Isso ainda precisa ser melhorado aqui.

In [None]:
# para nomear as partições
def name_func(x):
    return f"nome_particao{x}.parquet"

In [None]:
# exemplos de partição baseados na BD
out_path = Path('data_pq')
df_dd.to_parquet(out_path, partition_on=["ano", "mes", "sigla_uf"], name_function=name_func)

## CSV

Todo o controle de particionamento cabe ao usuário. Aqui usei uma função para controlar e salvar os arquivos particionados (`write_file`). O resultado é de um `group_by` com `apply`. Também precisa ser melhorado.

In [None]:
data_dir = Path("data_csv")
if not data_dir.is_dir():
    data_dir.mkdir(parents=True, exist_ok=False)

def write_file(grp):
    out_path = Path(f"{data_dir}/ano={grp['ano'].unique()[0]}/mes={grp['mes'].unique()[0]}/sigla_uf={grp['sigla_uf'].unique()[0]}")
    if not out_path.is_dir():
        out_path.mkdir(parents=True, exist_ok=True)
    grp.to_csv(out_path / "arquivo.csv",
               header=True,
               index=False)
    return None

In [None]:
df_dd.groupby(
    ["ano", "mes", "sigla_uf"]
).apply(
    write_file, meta={'ano': int, 'mes': int}
).compute()