# Módulo de Funções: Conexão e Operações com o Banco de Dados

## 1. O Alicerce do Projeto: O Módulo de Banco de Dados

Diferente dos notebooks anteriores, este não executa um processo de ponta a ponta. Ele é, na verdade, a documentação de um **módulo de funções reutilizáveis** (`banco_de_dados.py`), que serve como a fundação para todas as interações com nosso banco de dados PostgreSQL.

### O Desafio
Interagir com um banco de dados em múltiplos scripts pode ser repetitivo e arriscado. É preciso garantir que as conexões sejam abertas e fechadas corretamente, que as transações sejam confirmadas (`commit`) ou revertidas (`rollback`) em caso de erro, e que as operações sejam executadas da forma mais eficiente possível. Escrever esse código repetidamente aumenta a chance de erros.

### A Solução: Uma Biblioteca Centralizada
A solução é centralizar toda a lógica de comunicação com o banco de dados em um único local. Este módulo cria uma "biblioteca" de funções prontas para:
* Abrir e fechar conexões de forma segura.
* Ler tabelas inteiras para o Pandas.
* Inserir grandes volumes de dados de forma ultraeficiente.
* Excluir e atualizar registros em lote.

Ao usar este módulo, nossos outros notebooks ficam mais limpos, legíveis e seguros, pois eles simplesmente chamam essas funções sem precisar se preocupar com os detalhes da implementação do banco de dados.

## 2. As Ferramentas para Falar com o Banco

Para construir nosso módulo, precisamos de algumas bibliotecas chave:
* **psycopg2**: É o "tradutor" (driver) principal que permite que o Python "converse" com o banco de dados PostgreSQL.
* **pandas**: Usado para receber os dados que vêm do banco em formato de tabela e para preparar os dados que serão enviados para o banco.
* **io.StringIO**: Uma ferramenta inteligente que nos permite tratar um texto em memória como se fosse um arquivo, essencial para a nossa função de inserção em massa.
* **logging**: Para criar um registro (log) de todas as operações, o que é fundamental para depurar problemas e monitorar a saúde do nosso pipeline.

In [None]:
# Importa a biblioteca principal para conexão com o PostgreSQL.
import psycopg2
# Importa o submódulo 'extras' que contém funções otimizadas, como o 'execute_batch'.
import psycopg2.extras
# Importa a biblioteca pandas para manipulação de DataFrames.
import pandas as pd
# Importa a classe StringIO para criar um "arquivo em memória".
from io import StringIO
# Importa a biblioteca de logging para registrar eventos.
import logging

# Configura o sistema de logging para exibir mensagens de nível INFO ou superior.
# O formato inclui a data/hora, o nível da mensagem (INFO, ERROR, etc.) e a própria mensagem.
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

## 3. As Funções Essenciais

Esta seção detalha cada uma das funções que compõem nosso módulo.

### 3.1. Abrindo e Fechando as Portas: Gestão da Conexão

As duas funções mais básicas e talvez as mais importantes. `abre_conexao_banco_de_dados` contém as credenciais e a lógica para estabelecer a comunicação com o servidor PostgreSQL. `fecha_conexao_banco_de_dados` garante que essa comunicação seja encerrada corretamente, liberando recursos tanto no nosso script quanto no servidor do banco.

In [None]:
def abre_conexao_banco_de_dados():
    """
    Estabelece uma conexão com o banco de dados PostgreSQL e retorna os objetos de conexão e cursor.
    """
    # Bloco try...except para tratar possíveis erros de conexão (ex: senha errada, servidor offline).
    try:
        # Usa a função connect do psycopg2 com as credenciais do banco.
        conn = psycopg2.connect(
            host="localhost",
            database="planb",
            user="usuario",
            password="senha",
            port=5436
        )
        # Cria um objeto 'cursor', que é usado para executar comandos SQL no banco.
        cursor = conn.cursor()
        # Registra uma mensagem de sucesso no log.
        logging.info("Conexão com o banco de dados estabelecida com sucesso.")
        # Retorna os dois objetos para serem usados por outras funções.
        return conn, cursor
    except psycopg2.Error as e:
        # Se ocorrer um erro na conexão, registra o erro no log.
        logging.error(f"Erro ao conectar ao banco de dados: {e}")
        # Imprime o erro na tela para o usuário.
        print(f"Erro ao conectar ao banco de dados: {e}")
        # Retorna 'None' para indicar que a conexão falhou.
        return None, None


def fecha_conexao_banco_de_dados(conn, cursor):
    """
    Fecha o cursor e a conexão com o banco de dados.
    """
    # Bloco try...except para tratar possíveis erros ao fechar.
    try:
        # Se o objeto cursor existir, fecha-o.
        if cursor:
            cursor.close()
        # Se o objeto de conexão existir, fecha-o.
        if conn:
            conn.close()
        # Registra a mensagem de sucesso no log.
        logging.info("Conexão com o banco de dados fechada com sucesso.")
    except psycopg2.Error as e:
        # Em caso de erro, registra no log e imprime na tela.
        logging.error(f"Erro ao fechar a conexão com o banco de dados: {e}")
        print(f"Erro ao fechar a conexão com o banco de dados: {e}")

### 3.2. Lendo Dados: A Função `retorna_tabela`

Esta função abstrai a tarefa de buscar todos os dados de uma tabela. Ela abre a conexão, executa um `SELECT *`, carrega os resultados em um DataFrame do Pandas (incluindo os nomes das colunas) e, por fim, garante o fechamento da conexão.

In [None]:
def retorna_tabela(nome_tabela, nome_schema="public"):
    """
    Conecta ao banco, lê uma tabela inteira e a retorna como um DataFrame do Pandas.
    """
    # Chama a função para abrir a conexão e obter os objetos de conexão e cursor.
    conn, cursor = abre_conexao_banco_de_dados()

    # Verifica se a conexão foi estabelecida com sucesso.
    if conn and cursor:
        try:
            # Monta a string da query SQL para selecionar todos os dados da tabela especificada.
            sql = f'SELECT * FROM "{nome_schema}"."{nome_tabela}"'
            # Imprime a query que será executada para fins de debug.
            print(f"Executando query: {sql}")
            logging.info(f"Executando query: {sql}")

            # Executa a query no banco.
            cursor.execute(sql)
            # Busca todos os resultados da query executada.
            dados = cursor.fetchall()
            # Extrai os nomes das colunas a partir da descrição do cursor.
            colunas = [desc[0] for desc in cursor.description]
            # Cria o DataFrame do Pandas com os dados e os nomes das colunas.
            df = pd.DataFrame(dados, columns=colunas)
            
            logging.info(f"Tabela '{nome_tabela}' lida com sucesso, {len(df)} linhas retornadas.")
            # Retorna o DataFrame criado.
            return df
        except Exception as e:
            # Em caso de erro na leitura, registra e imprime a falha.
            logging.error(f"Erro ao ler a tabela '{nome_tabela}': {e}")
            print(f"Erro ao ler a tabela: {e}")
            return None
        finally:
            # A cláusula 'finally' garante que este bloco de código seja executado sempre, independentemente de ter havido erro ou não.
            # É crucial para garantir que a conexão com o banco seja sempre fechada.
            fecha_conexao_banco_de_dados(conn, cursor)
    else:
        # Se a conexão inicial falhou.
        print("Não foi possível conectar ao banco de dados.")
        logging.error("Falha na leitura da tabela pois não foi possível conectar ao banco de dados.")
        return None

### 3.3. Inserção em Massa: A Função `insere_dados_no_banco`

Esta é a função mais importante para a performance do nosso ETL. Inserir milhões de linhas em um banco, uma por uma (`INSERT`), é extremamente lento. A solução é usar o comando `COPY` do PostgreSQL, que é otimizado para cargas de dados em massa.

Esta função automatiza o processo:
1.  Recebe um DataFrame do Pandas.
2.  Converte o DataFrame em um formato CSV em memória (usando `StringIO`), sem precisar criar um arquivo físico.
3.  Usa o método `copy_expert` do `psycopg2` para enviar esse "arquivo em memória" diretamente para o banco de dados através do comando `COPY`.
4.  Gerencia a transação, confirmando (`commit`) em caso de sucesso ou revertendo (`rollback`) em caso de erro.

In [None]:
def insere_dados_no_banco(dados, tabela_destino, nome_schema="public"):
    """
    Recebe um DataFrame e insere seus dados em uma tabela do banco de dados
    usando o método de alta performance COPY FROM.
    """
    # Validação inicial: se o DataFrame estiver vazio, não faz nada.
    if dados.empty:
        print("DataFrame de entrada está vazio. Nenhuma inserção será realizada.")
        logging.warning(f"Tentativa de inserção na tabela '{tabela_destino}' com DataFrame vazio.")
        return

    # Inicializa as variáveis de conexão como None.
    conn = None
    cursor = None
    
    # Bloco try...except...finally para garantir a gestão correta da conexão e transação.
    try:
        # Abre a conexão com o banco.
        conn, cursor = abre_conexao_banco_de_dados()
        if conn is None or cursor is None:
            # Lança um erro se a conexão falhar.
            raise ConnectionError("Falha ao abrir a conexão com o banco de dados.")

        # Cria um buffer de texto em memória. Funciona como um arquivo temporário que não é salvo no disco.
        buffer = StringIO()
        # Converte o DataFrame para um formato CSV e o escreve no buffer.
        # index=False: não escreve o índice do DataFrame.
        # header=False: não escreve o cabeçalho (nomes das colunas).
        # sep=';': usa ponto e vírgula como delimitador.
        dados.to_csv(buffer, index=False, header=False, sep=';', quotechar='"')
        # "Rebobina" o buffer para o início, para que o comando COPY possa lê-lo do começo.
        buffer.seek(0)

        # Monta a string dos nomes das colunas no formato "(col1, col2, ...)" para o comando SQL.
        colunas_str = ', '.join(f'"{col}"' for col in dados.columns)
        # Monta o comando COPY completo.
        sql_copy_command = (
            f'COPY "{nome_schema}"."{tabela_destino}" ({colunas_str}) '
            f"FROM STDIN WITH (FORMAT CSV, HEADER FALSE, DELIMITER ';', QUOTE '\"')"
        )

        logging.info(f"Executando comando COPY para a tabela \"{nome_schema}\".\"{tabela_destino}\"")
        print(f"Executando comando COPY para a tabela \"{nome_schema}\".\"{tabela_destino}\"")

        # Executa o comando COPY, passando o buffer como a fonte de dados (STDIN).
        cursor.copy_expert(sql=sql_copy_command, file=buffer)
        # Se o comando foi bem-sucedido, confirma a transação, tornando as inserções permanentes.
        conn.commit()

        # Prepara e exibe uma mensagem de sucesso.
        mensagem_sucesso = f"{cursor.rowcount} linhas inseridas com sucesso na tabela \"{nome_schema}\".\"{tabela_destino}\"."
        logging.info(mensagem_sucesso)
        print(mensagem_sucesso)

    except Exception as e:
        # Se qualquer erro ocorrer no bloco 'try', este bloco é executado.
        if conn:
            # Desfaz todas as alterações feitas nesta transação.
            conn.rollback()
        # Monta e exibe uma mensagem de erro.
        mensagem_erro = f"Ocorreu um erro ao inserir dados na tabela \"{nome_schema}\".\"{tabela_destino}\": {e}"
        print(mensagem_erro)
        logging.error(mensagem_erro)
        # Relança a exceção para que o script que chamou a função saiba que algo deu errado.
        raise
    finally:
        # Garante que a conexão seja sempre fechada.
        if conn:
            fecha_conexao_banco_de_dados(conn, cursor)

### 3.4. Excluindo Dados em Lote: A Função `excluir_linhas_por_dataframe`

Esta função utilitária permite excluir registros de uma tabela no banco com base em uma lista de valores de um DataFrame. Por exemplo, podemos usá-la para deletar todos os registros de imóveis que foram removidos do site antes de inserir uma nova carga de dados. Ela usa um `DELETE ... WHERE ... IN`, que é eficiente para exclusões em lote.

In [None]:
def excluir_linhas_por_dataframe(df_referencia, coluna_df, tabela_alvo, coluna_tabela, nome_schema="public"):
    """
    Exclui linhas de uma tabela do banco de dados com base nos valores de uma coluna de um DataFrame.
    """
    print(f"--- Iniciando processo de EXCLUSÃO na tabela '{nome_schema}'.'{tabela_alvo}' ---")

    # Validações iniciais para evitar execuções desnecessárias ou com erro.
    if df_referencia.empty:
        print("O DataFrame de referência está vazio. Nenhuma exclusão será realizada.")
        return
    if coluna_df not in df_referencia.columns:
        raise ValueError(f"A coluna '{coluna_df}' não foi encontrada no DataFrame de referência.")

    # Extrai os valores únicos da coluna do DataFrame que serão usados para a exclusão.
    valores_para_excluir = df_referencia[coluna_df].dropna().unique().tolist()
    if not valores_para_excluir:
        print(f"Nenhum valor válido encontrado na coluna '{coluna_df}' para exclusão.")
        return

    conn, cursor = None, None
    try:
        conn, cursor = abre_conexao_banco_de_dados()
        if conn is None or cursor is None:
            raise ConnectionError("Não foi possível estabelecer conexão com o banco de dados.")

        # Monta a query de exclusão. A cláusula 'IN %s' permite passar uma tupla de valores.
        query = f'DELETE FROM "{nome_schema}"."{tabela_alvo}" WHERE "{coluna_tabela}" IN %s'
        # Converte a lista de valores para uma tupla, que é o formato esperado pelo psycopg2.
        valores_tuple = tuple(valores_para_excluir)

        print(f"Preparando para excluir registros onde '{coluna_tabela}' corresponde a {len(valores_tuple)} valores únicos.")
        # Executa a query, passando a tupla de valores como parâmetro.
        cursor.execute(query, (valores_tuple,))
        # 'cursor.rowcount' retorna o número de linhas afetadas pelo último comando.
        linhas_excluidas = cursor.rowcount
        # Confirma a transação para tornar a exclusão permanente.
        conn.commit()

        print(f"Operação concluída. {linhas_excluidas} linha(s) foram excluídas com sucesso.")
        logging.info(f"{linhas_excluidas} linha(s) excluídas da tabela '{nome_schema}'.'{tabela_alvo}'.")

    except Exception as e:
        print(f"Ocorreu um erro durante a exclusão. A transação será revertida (rollback). Erro: {e}")
        logging.error(f"Erro na exclusão da tabela '{tabela_alvo}': {e}. Transação revertida.")
        if conn:
            conn.rollback()
        raise
    finally:
        if conn:
            fecha_conexao_banco_de_dados(conn, cursor)

### 3.5. Atualizando Registros em Lote: A Função `atualizar_dados_no_banco`

Similar à exclusão, esta função é projetada para atualizar múltiplos registros de uma vez. Ela recebe um DataFrame contendo uma coluna "chave" (para o `WHERE`) e as colunas com os novos valores. Para otimizar a performance, ela utiliza o método `execute_batch` do psycopg2, que é projetado especificamente para executar o mesmo comando `UPDATE` várias vezes com dados diferentes.

In [None]:
def atualizar_dados_no_banco(df_atualizacao, tabela_alvo, colunas_para_atualizar, coluna_chave, nome_schema="public"):
    """
    Atualiza registros em uma tabela do banco de dados com base em um DataFrame de entrada.
    """
    print(f"--- Iniciando processo de ATUALIZAÇÃO na tabela '{nome_schema}'.'{tabela_alvo}' ---")

    # Validações iniciais.
    if df_atualizacao.empty:
        print("O DataFrame de atualização está vazio. Nenhuma operação será realizada.")
        return
    
    # Verifica se todas as colunas necessárias para a atualização existem no DataFrame.
    colunas_necessarias = colunas_para_atualizar + [coluna_chave]
    for col in colunas_necessarias:
        if col not in df_atualizacao.columns:
            raise ValueError(f"A coluna '{col}' necessária para a atualização não foi encontrada no DataFrame.")

    conn, cursor = None, None
    try:
        conn, cursor = abre_conexao_banco_de_dados()
        if conn is None or cursor is None:
            raise ConnectionError("Não foi possível estabelecer conexão com o banco de dados.")

        # Monta a cláusula SET da query de forma dinâmica (ex: "col1" = %s, "col2" = %s).
        set_clause = ", ".join([f'"{col}" = %s' for col in colunas_para_atualizar])
        # Monta a query UPDATE completa.
        query = f'UPDATE "{nome_schema}"."{tabela_alvo}" SET {set_clause} WHERE "{coluna_chave}" = %s'

        # Prepara os dados para a atualização, criando uma lista de tuplas.
        # A ordem das colunas na tupla deve corresponder exatamente aos '%s' na query.
        colunas_ordenadas = colunas_para_atualizar + [coluna_chave]
        dados_para_atualizar = [tuple(row) for row in df_atualizacao[colunas_ordenadas].itertuples(index=False)]

        print(f"Preparando para atualizar {len(dados_para_atualizar)} registros.")

        # Usa execute_batch, que é otimizado para executar um comando (UPDATE) várias vezes com diferentes parâmetros.
        psycopg2.extras.execute_batch(cursor, query, dados_para_atualizar)

        # Recupera o número de linhas atualizadas.
        linhas_atualizadas = cursor.rowcount
        # Confirma a transação.
        conn.commit()

        print(f"Operação concluída. {linhas_atualizadas} linha(s) foram atualizadas na tabela '{tabela_alvo}'.")
        logging.info(f"{linhas_atualizadas} linha(s) atualizadas na tabela '{nome_schema}'.'{tabela_alvo}'.")

    except Exception as e:
        print(f"Ocorreu um erro durante a atualização. A transação será revertida (rollback). Erro: {e}")
        logging.error(f"Erro na atualização da tabela '{tabela_alvo}': {e}. Transação revertida.")
        if conn:
            conn.rollback()
        raise
    finally:
        if conn:
            fecha_conexao_banco_de_dados(conn, cursor)