<br>

# Introdução

In [None]:
#!pip3 install dask --upgrade
#!pip3 install dask-labextension  --upgrade
#!pip3 install pyarrow --upgrade        # Necessário para usar o parquet
#!pip3 install traquitanas --upgrade
#!jupyter labextension install dask-labextension

In [1]:
import os
import sys
import time
import ctypes
import datetime
import numpy as np
import pandas as pd
import pyarrow.parquet as pq

In [2]:
import dask.dataframe as dd
from dask import compute
from dask.delayed import delayed
from dask.distributed import Client, LocalCluster
from dask.distributed import wait, progress
from dask.diagnostics import ProgressBar

In [3]:
import dask
import dask.distributed  # populate config with distributed defaults
dask.config.get('distributed.client')

{'heartbeat': '5s',
 'scheduler-info-interval': '2s',
 'security-loader': None,
 'preload': [],
 'preload-argv': []}

In [4]:
mod_path = os.path.abspath(os.path.join(os.getcwd(), '..', 'src'))
sys.path.append(mod_path)
from sisagua.ibge import *

In [5]:
from paths import *

<br>

## Client

In [6]:
import multiprocessing as mp
from dask.distributed import Client, LocalCluster

In [7]:
#import close_process
#close_process.process()

In [8]:
cluster = LocalCluster(
    n_workers=int(0.9 * mp.cpu_count()),
    threads_per_worker=4,
    memory_limit='4GB',
    processes=True,
    env={'MALLOC_TRIM_THRESHOLD_': '65536'}
)

client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 45025 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:45025/status,

0,1
Dashboard: http://127.0.0.1:45025/status,Workers: 7
Total threads: 28,Total memory: 26.08 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:35705,Workers: 7
Dashboard: http://127.0.0.1:45025/status,Total threads: 28
Started: Just now,Total memory: 26.08 GiB

0,1
Comm: tcp://127.0.0.1:38715,Total threads: 4
Dashboard: http://127.0.0.1:35385/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:40947,
Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-b226sbfq,Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-b226sbfq

0,1
Comm: tcp://127.0.0.1:43685,Total threads: 4
Dashboard: http://127.0.0.1:34491/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:41785,
Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-82ky_8_z,Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-82ky_8_z

0,1
Comm: tcp://127.0.0.1:39657,Total threads: 4
Dashboard: http://127.0.0.1:39047/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:38829,
Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-7s2im609,Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-7s2im609

0,1
Comm: tcp://127.0.0.1:41919,Total threads: 4
Dashboard: http://127.0.0.1:38651/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:32859,
Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-eyzzekpo,Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-eyzzekpo

0,1
Comm: tcp://127.0.0.1:46111,Total threads: 4
Dashboard: http://127.0.0.1:37685/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:45297,
Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-_breq3_2,Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-_breq3_2

0,1
Comm: tcp://127.0.0.1:36849,Total threads: 4
Dashboard: http://127.0.0.1:44245/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:33533,
Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-lo7jfz3q,Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-lo7jfz3q

0,1
Comm: tcp://127.0.0.1:45593,Total threads: 4
Dashboard: http://127.0.0.1:45447/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:42089,
Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-ur6xp2_b,Local directory: /home/michel/Codes/open_geodata/br_sisagua/test/dask-worker-space/worker-ur6xp2_b


<br>

## Parameters

In [9]:
# Parameters
cod_ibge = '3505203' # Bariri
#cod_ibge = '3501608' # Americana
#cod_ibge = '3548906' # São Carlos
cod_ibge = '3526902' # Limeira
#cod_ibge = '3550308' # São Paulo

In [10]:
estado_d = find_states(cod_ibge)
estado = estado_d['sigla']

cod_ibge_ajustado = adjust_id_ibge(cod_ibge)

Padrão IBGE novo, com código de 7 dígitos.
Correções necessárias aplicadas!
Código IBGE: 352690


<br>

## Dask Zifiles

In [11]:
# Parameters
filenames = [
    # Cadastro
    os.path.join(input_path_parquet_partitioned, 'cadastro', 'cadastro_pontos_captacao'),
    os.path.join(input_path_parquet_partitioned, 'cadastro', 'cadastro_populacao_abastecida'),
    os.path.join(input_path_parquet_partitioned, 'cadastro', 'cadastro_tratamento_de_agua'),
    
    # Controle
    os.path.join(input_path_parquet_partitioned, 'controle', 'controle_mensal_amostras_fora_padrao'),
    os.path.join(input_path_parquet_partitioned, 'controle', 'controle_mensal_demais_parametros'),
    os.path.join(input_path_parquet_partitioned, 'controle', 'controle_mensal_infraestrutura_operacionais'),
    os.path.join(input_path_parquet_partitioned, 'controle', 'controle_mensal_parametros_basicos_*'),
    os.path.join(input_path_parquet_partitioned, 'controle', 'controle_semestral_*'),
    
    # Vigilância
    os.path.join(input_path_parquet_partitioned, 'vigilancia', 'vigilancia_cianobacterias_cianotoxinas'),
    os.path.join(input_path_parquet_partitioned, 'vigilancia', 'vigilancia_demais_parametros'),
    os.path.join(input_path_parquet_partitioned, 'vigilancia', 'vigilancia_parametros_basicos_*'),
]

In [12]:
for filename in filenames:    
    # Path
    basename = os.path.basename(filename).replace('_*', '')
    subdir = os.path.basename(os.path.dirname(filename))
    print('Na pasta "{}", processando arquivo "{}"'.format(subdir, basename))
    
    # Set and create output paths
    output_path_city = os.path.join(
        output_path,
        '{}'.format(cod_ibge),
        'dados brutos',
        subdir,
    )
    os.makedirs(output_path_city, exist_ok=True)
    
    # Add Filter to Filename
    filter_path = os.path.join('Uf={}'.format(estado), 'Código Ibge={}/*.parquet'.format(cod_ibge_ajustado))
    filename = os.path.join(filename, filter_path)
    
    # Set Filters
    filters = [[('Uf', '==', estado), ('Código Ibge', '=', int(cod_ibge_ajustado))]]

    # Read Dataframes
    df = dd.read_parquet(
        filename,
        filters=filters,
    )

    # Calculate
    df = df.compute()
    df.head()

    # Write to Excel
    df.to_excel(
        os.path.join(output_path_city, '{}.xlsx'.format(basename)),
        index=False,
    )

Na pasta "cadastro", processando arquivo "cadastro_pontos_captacao"
Na pasta "cadastro", processando arquivo "cadastro_populacao_abastecida"
Na pasta "cadastro", processando arquivo "cadastro_tratamento_de_agua"
Na pasta "controle", processando arquivo "controle_mensal_amostras_fora_padrao"
Na pasta "controle", processando arquivo "controle_mensal_demais_parametros"
Na pasta "controle", processando arquivo "controle_mensal_infraestrutura_operacionais"
Na pasta "controle", processando arquivo "controle_mensal_parametros_basicos"
Na pasta "controle", processando arquivo "controle_semestral"
Na pasta "vigilancia", processando arquivo "vigilancia_cianobacterias_cianotoxinas"
Na pasta "vigilancia", processando arquivo "vigilancia_demais_parametros"
Na pasta "vigilancia", processando arquivo "vigilancia_parametros_basicos"


<br>

## End

In [None]:
cluster.close()
time.sleep(1)
client.close()