# CARGA DOS DADOS DE DESPESA DO PORTAL DADOS MG

Script notebook para carga inicial de dados do portal [dados.mg](https://dados.mg.gov.br/dataset/despesa) para criação do spreadmart financeiro da Assessoria de Dados da SCPO/SEPLAG
  
  Desenvolvedor: Andrey Morais Labanca  
  Contato: moraislabanca@gmail.com  

## TO DO
1. Clausulas SQL para somente criar ou deletar tabelas caso não já exisitam
1. Tratamento de erros e exceções para quando não for possível criar ou conectar na base de dados, criar ou deletar tabelas e etc.
1. Atualmente usando pandas para concatenação de tabelas. Ineficiente e utiliza muita memória. Necessário trocar para algo relativo a SQL, DuckDB ou que vá concantenando parcialmente em disco sem carregar tudo para memória.
1. ERRO: "The kernel appears to have died. It will restart automatically". Recriar o env e ver se resolve. Se não testar bibliotecas separadamente e ver se acha um erro executando alguma função delas

## Imports, constants e setup

In [2]:
import pandas as pd
import glob
import time
import duckdb
import os

DB_NAME = 'database/dadosmg.db'
CSV_PATH = 'datasets/'

# obtem lista de paths para arquivos CSV localizados no caminho CSV_PATH
file_paths = [i.replace('\\', '/') for i in list(glob.iglob(f'{CSV_PATH}*.csv'))]

# paths de bases csv que sao separadas por anos
file_paths_desp = [x for x in file_paths if "dm_empenho_desp_" in x] 
file_paths_ft = [x for x in file_paths if "ft_despesa_" in x] 

# paths de bases csv que não são separadas por anos
file_paths = list(set(file_paths) - set(file_paths_desp) - set(file_paths_ft))


con = duckdb.connect(DB_NAME) #Cria se não existe e se conecta à base de dados

## Excluir tabelas existentes
### Exclui tabelas existentes caso DROP_TABLES = True

In [4]:
DROP_TABLES = True

if DROP_TABLES:
    if con.execute("""SHOW TABLES""").fetchall():
        for table_name in con.execute("SHOW TABLES").fetchall():
            con.execute(f"""DROP TABLE {table_name[0]} """)
            print(f"Tabela {table_name} apagada.")
    else:
        print(f"Não há tabelas na database {DB_NAME}")



Tabela ('dm_acao',) apagada.
Tabela ('dm_categ_econ',) apagada.
Tabela ('dm_elemento_desp',) apagada.
Tabela ('dm_favorecido',) apagada.
Tabela ('dm_fonte',) apagada.
Tabela ('dm_funcao_desp',) apagada.
Tabela ('dm_grupo_desp',) apagada.
Tabela ('dm_item_desp',) apagada.
Tabela ('dm_procedencia',) apagada.
Tabela ('dm_programa',) apagada.
Tabela ('dm_situacao_op_desp',) apagada.
Tabela ('dm_tipo_documento',) apagada.


## Importa dados de despesa e cria tabelas

In [None]:
#Importa CSV das bases não separadas por anos e cria respectivas tabelas
for file in file_paths:
    _ , tail = os.path.split(file)
    name , file_extension = os.path.splitext(tail)
    con.execute(f"""CREATE TABLE '{name}' AS SELECT * FROM read_csv_auto('{file}')""")
    print(f"Arquivo {file} carregado para tabela {name}")

Arquivo datasets/dm_item_desp.csv carregado para tabela dm_item_desp
Arquivo datasets/dm_acao.csv carregado para tabela dm_acao
Arquivo datasets/dm_elemento_desp.csv carregado para tabela dm_elemento_desp
Arquivo datasets/dm_categ_econ.csv carregado para tabela dm_categ_econ
Arquivo datasets/dm_tipo_documento.csv carregado para tabela dm_tipo_documento
Arquivo datasets/dm_fonte.csv carregado para tabela dm_fonte


## Descreve tabelas criadas

In [3]:
con.sql("""DESCRIBE""")

┌───────────────────┬──────────────────────────────────────────────┬───────────────────────────────────┬───────────┐
│    table_name     │                 column_names                 │           column_types            │ temporary │
│      varchar      │                  varchar[]                   │             varchar[]             │  boolean  │
├───────────────────┼──────────────────────────────────────────────┼───────────────────────────────────┼───────────┤
│ dm_acao           │ [id_acao, ano_exercicio, cd_acao, nome]      │ [BIGINT, BIGINT, BIGINT, VARCHAR] │ false     │
│ dm_categ_econ     │ [id_categ_econ, cd_categ_econ, nome]         │ [BIGINT, BIGINT, VARCHAR]         │ false     │
│ dm_elemento_desp  │ [id_elemento, cd_elemento, nome]             │ [BIGINT, BIGINT, VARCHAR]         │ false     │
│ dm_fonte          │ [id_fonte, cd_fonte, nome]                   │ [BIGINT, BIGINT, VARCHAR]         │ false     │
│ dm_item_desp      │ [id_item, cd_item, nome]                  

## Agrega arquivos de empenho anuais e cria tabela

In [None]:
tbl_agg_name = 'dm_empenho_desp'
df_agg = pd.DataFrame()
num_linhas = 0

for file in file_paths_desp:
    print(f'Lendo:', file)
    df1 = con.execute(f"""SELECT * FROM '{file}' """).df()
    df_agg = pd.concat([df_agg, df1])
    num_linhas += len(df1)
      
con.execute(f"""CREATE TABLE '{tbl_agg_name}' AS SELECT * FROM df_agg """) 
print(f'Tabela {tbl_agg_name} criada')

print('Total de linhas tabelas lidas:', num_linhas)
print('Total de linhas do dataframe concatenado:', len(df_agg))
print('Num linhas da tabela agregada:')
print(con.sql(f"""SELECT COUNT(*) FROM {tbl_agg_name}"""))

con.table(tbl_agg_name).show()

## Agrega arquivos de faturamento anuais e cria tabela

In [None]:
tbl_agg_name = 'ft_despesa'
df_agg = pd.DataFrame()
num_linhas = 0

temp_csv = con.sql(f"""SELECT * FROM '{CSV_PATH}ft_despesa_2002.csv' LIMIT 10 """)

# cria lista contendo nomes e tipos das colunas lidas em temp_csv 
table_columns = [str(temp_csv.columns[i] +' '+ temp_csv.dtypes[i]) for i in range(len(temp_csv.columns))]

# Concatena lista de strings em uma string somente para uso na criação da tabela nova.
table_columns =  ', '.join(table_columns)

# Cria tabela para agregar arquivos CSV de faturamento
#con.execute(f"""CREATE TABLE '{tbl_agg_name}'({table_columns}) """)
print(f'Tabela {tbl_agg_name} criada')


for file in file_paths_ft:
    print(f'Lendo:', file)
    df1 = con.execute(f"""SELECT * FROM '{file}' """).df()
    con.execute(f"""INSERT INTO '{tbl_agg_name}' SELECT * FROM df1""")
    print(f'Arquivo {file} concatenado na tabela {tbl_agg_name}')
    num_linhas += len(df1)
      
print('Total de linhas tabelas lidas:', num_linhas)
print('Num linhas da tabela agregada:')
print(con.sql(f"""SELECT COUNT(*) FROM {tbl_agg_name}"""))

con.table(tbl_agg_name).show()

In [None]:
tbl_agg_name = 'ft_despesa'
temp_csv = con.sql(f"""SELECT * FROM '{CSV_PATH}ft_despesa_2002.csv' LIMIT 10 """)

# cria lista contendo nomes e tipos das colunas lidas em temp_csv
table_columns = [str(temp_csv.columns[i] +' '+ temp_csv.dtypes[i]) for i in range(len(temp_csv.columns))]

# Concatena lista de strings em uma string somente para uso na criação da tabela nova.
table_columns =  ', '.join(table_columns)

# Cria tabela para agregar arquivos CSV de faturamento
con.execute(f"""CREATE TABLE '{tbl_agg_name}'({table_columns}) """)
print(f'Tabela {tbl_agg_name} criada')

In [None]:
con.sql(f"""SELECT * FROM '{tbl_agg_name}'""")

In [None]:

file = f'{CSV_PATH}ft_despesa_2002.csv'

con.execute(f"""COPY {tbl_agg_name} FROM '{file}' ( DELIMITER ';', HEADER )""")
con.sql(f"""SELECT * FROM '{file}' LIMIT 10 """)

#con.execute(f"""INSERT INTO '{tbl_agg_name}' SELECT * FROM read_csv_auto({file})""")