In [39]:
import dask.dataframe as dd
import pandas as pd
import os, json

In [40]:
import dask
dask.config.set(scheduler="processes")

<dask.config.set at 0x1d141306710>

In [41]:
COLUMNS_ORDER = ["Ano","Tipo","Instituição","Cod.IBGE","UF","População","Coluna","Conta","Descrição Conta","Identificador da Conta","Valor"]
YEARS = ["2022","2021","2020","2019","2018","2017","2016","2015","2014","2013"]
DATA_PATH = "data"
FILES_NAMES = os.listdir(os.path.join(DATA_PATH,"2022"))

with open("file_field.json", "r") as f:
    file_field = json.load(f)

In [42]:
to_float = lambda x: x.replace(",", ".")

def get_conta(x:str) -> str:
    sx = x.split(" - ")
    if len(sx) == 1:
        return "Sem Numero"
    return sx[0]

def get_des_conta(x:str) -> str:
    sx = x.split(" - ")
    if len(sx) == 1:
        return x
    return sx[1]

In [43]:
def read_and_format_ddf(year:str, file_name:str) -> dask.dataframe:
    file_path = os.path.join(DATA_PATH,year,file_name)
    ddf = dd.read_csv(file_path, sep=";", encoding="latin-1", skiprows=3)
    ddf["Valor"] = ddf["Valor"].apply(to_float, meta=pd.Series(name="Valor",dtype=str)).astype(float)
    ddf["Descrição Conta"] = ddf["Conta"].apply(get_des_conta, meta=pd.Series(name="Conta",dtype=str))
    ddf["Conta"] = ddf["Conta"].apply(get_conta, meta=pd.Series(name="Conta",dtype=str))
    ddf["Ano"] = year
    ddf["Tipo"] = file_field[file_name].encode("latin-1")
    return ddf[COLUMNS_ORDER]

In [44]:
balanco_patrimonial = dd.concat([read_and_format_ddf(year,FILES_NAMES[0]) for year in YEARS])
depesas_orcamentarias = dd.concat([read_and_format_ddf(year,FILES_NAMES[1]) for year in YEARS])
despesas_por_funcao = dd.concat([read_and_format_ddf(year,FILES_NAMES[2]) for year in YEARS])
execucao_restos_pagar = dd.concat([read_and_format_ddf(year,FILES_NAMES[3]) for year in YEARS])
execucao_restos_pagar_funcao = dd.concat([read_and_format_ddf(year,FILES_NAMES[4]) for year in YEARS])
receitas_orcamentarias = dd.concat([read_and_format_ddf(year,FILES_NAMES[5]) for year in YEARS])
variacoes_patrimoniais = dd.concat([read_and_format_ddf(year,FILES_NAMES[6]) for year in YEARS])

In [45]:
full_ddf = dd.concat([balanco_patrimonial,
                      depesas_orcamentarias,
                      despesas_por_funcao,
                      execucao_restos_pagar,
                      execucao_restos_pagar_funcao,
                      receitas_orcamentarias,
                      variacoes_patrimoniais])

In [46]:
full_ddf.tail()

Unnamed: 0,Ano,Tipo,Instituição,Cod.IBGE,UF,População,Coluna,Conta,Descrição Conta,Identificador da Conta,Valor
278599,2013,Variações Patrimoniais,Prefeitura Municipal de São Sebastião do Umbuz...,2515203,PB,3380,31/12/2013,4.0.0.0.0.00.00,Variação Patrimonial Aumentativa,siconfi-cor_P4.0.0.0.0.00.00,11495462.69
278600,2013,Variações Patrimoniais,Prefeitura Municipal de São Sebastião do Umbuz...,2515203,PB,3380,31/12/2013,4.1.0.0.0.00.00,"Impostos, Taxas e Contribuições de Melhoria",siconfi-cor_P4.1.0.0.0.00.00,11495462.69
278601,2013,Variações Patrimoniais,Prefeitura Municipal de São Sebastião do Umbuz...,2515203,PB,3380,31/12/2013,4.1.1.0.0.00.00,Impostos,siconfi-cor_P4.1.1.0.0.00.00,11495462.69
278602,2013,Variações Patrimoniais,Prefeitura Municipal de São Sebastião do Umbuz...,2515203,PB,3380,31/12/2013,4.1.1.2.0.00.00,Impostos sobre Patrimônio e a Renda,siconfi-cor_P4.1.1.2.0.00.00,11495462.69
278603,2013,Variações Patrimoniais,Prefeitura Municipal de São Sebastião do Umbuz...,2515203,PB,3380,31/12/2013,4.1.1.2.1.00.00,Impostos sobre Patrimônio e a Renda,siconfi-cor_P4.1.1.2.1.00.00,11495462.69
