In [1]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import boto3
import io
import os
import psycopg2

# Variáveis de ambiente para credenciais AWS
account_key = os.getenv('AWS_ACCESS_KEY')
secret_key = os.getenv('AWS_SECRET_KEY')

    
# Função para baixar o arquivo do S3
def download_dataset(**kwargs):
    s3 = boto3.client('s3', aws_access_key_id=account_key, aws_secret_access_key=secret_key)
    bucket_name = 'investimentos-govbr'
    object_key = 'raw/investimentos_2024.csv'
    local_file_path = '/tmp/investimentos_2024.csv'
    try:
        # Faz o download do arquivo do S3
        s3.download_file(bucket_name, object_key, local_file_path)
        print(f"Arquivo {object_key} baixado para {local_file_path}")
    except s3.exceptions.NoSuchKey:
        print(f"Arquivo {object_key} não encontrado no bucket {bucket_name}. Verifique o caminho.")
# Função para fazer o upload do arquivo processado para o S3
def upload_to_s3(local_file_path, s3_bucket, s3_key):
    s3 = boto3.client('s3', aws_access_key_id=account_key, aws_secret_access_key=secret_key)
    try:
        # Faz o upload do arquivo
        s3.upload_file(local_file_path, s3_bucket, s3_key)
        print(f"Arquivo {local_file_path} enviado para {s3_bucket}/{s3_key}")
    except Exception as e:
        print(f"Erro ao enviar o arquivo {local_file_path} para o S3: {str(e)}")

OSError while attempting to symlink the latest log directory


In [2]:
s3 = boto3.client('s3', aws_access_key_id=account_key, aws_secret_access_key=secret_key)
bucket_name = 'investimentos-govbr'
source_key = 'raw/investimentos_2024.csv'
dest_key = 'processed/investimentos_2024_mod.csv'

# Lê o arquivo diretamente do S3
response = s3.get_object(Bucket=bucket_name, Key=source_key)
csv_content = response['Body'].read().decode('latin1')  # Use o encoding correto

# Carregar o CSV no Pandas
df = pd.read_csv(io.StringIO(csv_content), delimiter=";")
df.head()

Unnamed: 0,ano,mes,esfera_orcamentaria,esfera_orcamentaria_desc,orgao_maximo,orgao_maximo_desc,uo,uo_desc,grupo_despesa,grupo_despesa_desc,...,subfuncao_desc,programa,programa_desc,acao,acao_desc,regiao,uf,uf_desc,municipio,movimento_liquido_reais
0,2024,JANEIRO,1,ORCAMENTO FISCAL,1000,CAMARA DOS DEPUTADOS,1101,CAMARA DOS DEPUTADOS,4,INVESTIMENTOS,...,ACAO LEGISLATIVA,34,PROGRAMA DE GESTAO E MANUTENCAO DO PODER LEGIS...,4061,"PROCESSO LEGISLATIVO, FISCALIZACAO E REPRESENT...",CENTRO-OESTE,DF,DISTRITO FEDERAL,BRASILIA,"1.024.768,40"
1,2024,JANEIRO,1,ORCAMENTO FISCAL,1000,CAMARA DOS DEPUTADOS,1101,CAMARA DOS DEPUTADOS,4,INVESTIMENTOS,...,ADMINISTRACAO GERAL,34,PROGRAMA DE GESTAO E MANUTENCAO DO PODER LEGIS...,12F2,REFORMA DOS IMOVEIS FUNCIONAIS DESTINADOS A MO...,CENTRO-OESTE,DF,DISTRITO FEDERAL,BRASILIA,8934
2,2024,JANEIRO,1,ORCAMENTO FISCAL,2000,SENADO FEDERAL,2101,SENADO FEDERAL,4,INVESTIMENTOS,...,ACAO LEGISLATIVA,34,PROGRAMA DE GESTAO E MANUTENCAO DO PODER LEGIS...,4061,"PROCESSO LEGISLATIVO, FISCALIZACAO E REPRESENT...",CENTRO-OESTE,DF,DISTRITO FEDERAL,BRASILIA,"1.213.065,74"
3,2024,JANEIRO,1,ORCAMENTO FISCAL,3000,TRIBUNAL DE CONTAS DA UNIAO,3101,TRIBUNAL DE CONTAS DA UNIAO,4,INVESTIMENTOS,...,CONTROLE EXTERNO,34,PROGRAMA DE GESTAO E MANUTENCAO DO PODER LEGIS...,4018,FISCALIZACAO DA APLICACAO DOS RECURSOS PUBLICO...,NACIONAL,'-8,SEM INFORMACAO,SEM INFORMACAO,"484.128,40"
4,2024,JANEIRO,1,ORCAMENTO FISCAL,10000,SUPREMO TRIBUNAL FEDERAL,10101,SUPREMO TRIBUNAL FEDERAL,4,INVESTIMENTOS,...,ACAO JUDICIARIA,33,PROGRAMA DE GESTAO E MANUTENCAO DO PODER JUDIC...,6359,APRECIACAO E JULGAMENTO DE CAUSAS NO SUPREMO T...,CENTRO-OESTE,DF,DISTRITO FEDERAL,BRASILIA,"302.517,30"


In [3]:
df.shape

(16505, 27)

In [None]:
def load_data_to_postgres(**kwargs):
    s3 = boto3.client('s3', aws_access_key_id=account_key, aws_secret_access_key=secret_key)
    bucket_name = 'investimentos-govbr'
    file_key = 'processed/investimentos_2024_mod.csv'
    # Lê o arquivo transformado diretamente do S3
    obj = s3.get_object(Bucket=bucket_name, Key=file_key)
    df = pd.read_csv(io.BytesIO(obj['Body'].read()), delimiter=',', header=0, encoding='latin-1')
    
    # Converter valores para Inteiro com tratamento de erros
    col_for_int = [
        'ano', 
        'esfera_orcamentaria', 
        'orgao_maximo', 
        'uo', 
        'grupo_despesa', 
        'aplicacao', 
        'resultado', 
        'funcao', 
        'subfuncao', 
        'programa'
        ]
    
    for col in col_for_int:
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
    # Limpar e converter a coluna 'movimento_liquido_reais'
    df['movimento_liquido_reais'] = (
        df['movimento_liquido_reais']
        .str.replace('.', '', regex=False)  # Remove os separadores de milhar
        .str.replace(',', '.', regex=False)  # Substitui a vírgula decimal por ponto
        .str.replace(r'\((.*?)\)', r'-\1', regex=True)  # Converte (valor) para -valor
        .astype(float)  # Converte para float
    )
    print(df.dtypes)
    # Conexão ao banco PostgreSQL usando gerenciador de contexto
    with psycopg2.connect(
            host='postgres',
            database='airflow',
            user='airflow',
            password='airflow') as conn:
        
        with conn.cursor() as cursor:
            # Criar a tabela (caso ainda não exista)
            cursor.execute("""
            CREATE TABLE IF NOT EXISTS investimentos (
                ano INT,
                mes VARCHAR(10),
                esfera_orcamentaria INT,
                esfera_orcamentaria_desc VARCHAR(255),
                orgao_maximo INT,
                orgao_maximo_desc VARCHAR(255),
                uo INT,
                uo_desc VARCHAR(255),
                grupo_despesa INT,
                grupo_despesa_desc VARCHAR(255),
                aplicacao INT,
                aplicacao_desc VARCHAR(255),
                resultado INT,
                resultado_desc VARCHAR(255),
                funcao INT,
                funcao_desc VARCHAR(255),
                subfuncao INT,
                subfuncao_desc VARCHAR(255),
                programa INT,
                programa_desc VARCHAR(255),
                acao INT,
                acao_desc VARCHAR(255),
                regiao VARCHAR(50),
                uf VARCHAR(2),
                uf_desc VARCHAR(255),
                municipio VARCHAR(255),
                movimento_liquido_reais NUMERIC
            )
            """)
            conn.commit()
            # Insere os dados no PostgreSQL usando executemany para eficiência
            insert_query = """
            INSERT INTO investimentos (
                ano, mes, esfera_orcamentaria, esfera_orcamentaria_desc,
                orgao_maximo, orgao_maximo_desc, uo, uo_desc, grupo_despesa,
                grupo_despesa_desc, aplicacao, aplicacao_desc, resultado,
                resultado_desc, funcao, funcao_desc, subfuncao, subfuncao_desc,
                programa, programa_desc, acao, acao_desc, regiao, uf, uf_desc,
                municipio, movimento_liquido_reais
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
                      %s, %s, %s, %s, %s, %s, %s, %s,
                      %s, %s, %s, %s, %s, %s,
                      %s)
            """
            
            cursor.executemany(insert_query, df.values.tolist())
    
    print("Dados carregados no banco de dados!")

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe3 in position 64: invalid continuation byte