In [1]:
%pip install pandas psycopg2

Note: you may need to restart the kernel to use updated packages.


In [2]:
import sys
sys.path.append('../')

In [3]:
import util.arquivo as HelperArquivo
import pandas as pd
import util.database as HelperDatabase
from util.postgresql import PostgreSQL
import util.general as HelperGeneral
from datetime import datetime
from util.config import postgre_config
from consultas.cartoes.transacoes import TransacaoDDL

In [4]:
HelperPostgre = PostgreSQL(postgre_config)
TABLE_NAME = TransacaoDDL.table_name

In [5]:
def criar_stg():
    print('             ' + datetime.now().strftime("%H:%M:%S") + ' - ' + 'Criando tabela de staging')
    HelperPostgre.create_staging_table(table_name=TABLE_NAME, ddl=TransacaoDDL.ddl_stg)

In [6]:
def drop_stg():
    print('             ' + datetime.now().strftime("%H:%M:%S") + ' - ' + 'Dropando tabela de staging')
    HelperPostgre.drop_table(table_name=TABLE_NAME, table_owner='stg')

In [7]:
def truncar_stg():
    print('             ' + datetime.now().strftime("%H:%M:%S") + ' - ' + 'Truncando tabela')
    HelperPostgre.truncate_table(table_name=TABLE_NAME, table_owner='stg')

In [8]:
def deleta_existing_prd():
    print('             ' + datetime.now().strftime("%H:%M:%S") + ' - ' + 'Removendo registros existentes')
    HelperPostgre.execute(str_consulta=TransacaoDDL.delete_prd_exists_stg)

In [9]:
def merge():
    print('             ' + datetime.now().strftime("%H:%M:%S") + ' - ' + 'Merge de dados')
    df = HelperPostgre.return_as_dataframe(str_consulta=TransacaoDDL.select_stg_arquivos, columns=['nom_arquivo'])
    for _, r in df.iterrows():
        HelperPostgre.execute(
            TransacaoDDL.insert_prd.replace('@NOM_ARQUIVO', r['nom_arquivo'])
        )

In [10]:
def importar_arquivos_stg():

    truncar_stg() 

    print('             ' + datetime.now().strftime("%H:%M:%S") + ' - ' + 'Inserindo novos registros')

    lst_files = []
    str_file = '../csv/db_cartoes.transacoes*.csv'

    lst_files += HelperArquivo.glob_to_list(str_file)

    for arq in lst_files:
        str_nome_arquivo = arq[0]
        path = arq[1]

        df = HelperArquivo.import_csv_to_dataframe(path)
        df = df.rename(columns=df.iloc[0]).drop(df.index[0])
        df.insert(0, 'nom_arquivo', str_nome_arquivo, True)
        df['dat_transacao'] = pd.to_datetime(df['dat_transacao'], format='%d/%m/%Y %H:%M:%S')
        df['vlr_transacao'] = (df['vlr_transacao'].replace('\.','', regex=True)
                                                .replace(',','.', regex=True)
                                                .astype(float))

        HelperPostgre.insert_list(
            str_consulta = HelperDatabase.criar_insert(TABLE_NAME, 'stg', len(df.columns)),
            obj_list = df.values.tolist()
        )

In [11]:
if __name__ == '__main__':
    print(datetime.today().strftime("%d/%m/%Y %H:%M:%S") + " - Importação de dados de {} - Início".format(TABLE_NAME))

    criar_stg()

    importar_arquivos_stg()

    deleta_existing_prd()

    merge()

    drop_stg()

    print(datetime.today().strftime("%d/%m/%Y %H:%M:%S") + " - Importação de dados de {} - Fim".format(TABLE_NAME))

02/11/2023 14:40:50 - Importação de dados de transacoes - Início
             14:40:50 - Criando tabela de staging
             14:40:50 - Truncando tabela
             14:40:50 - Inserindo novos registros


             14:40:52 - Removendo registros existentes
             14:40:52 - Merge de dados
             14:40:52 - Dropando tabela de staging
02/11/2023 14:40:52 - Importação de dados de transacoes - Fim
