In [None]:
# importar bibliotecas
import pandas as pd
import sqlite3
import re
import os

# definir esquema completo de colunas por tabela com base nas planilhas fornecidas
esquemas = {
    'clientes': {
        'id_cliente': int,
        'nome': str,
        'email': str,
        'cpf': str
    },
    'despesas': {
        'id_despesa': int,
        'id_empresa': int,
        'id_cliente': int,
        'categoria': str,
        'valor': float,
        'data': 'date',
        'descricao': str
    },
    'empresas': {
        'id_empresa': int,
        'nome_fantasia': str,
        'cnpj': str,
        'data_fundacao': 'date'
    },
    'orcamentos': {
        'id_orcamento': int,
        'id_empresa': int,
        'ano': int,
        'mes': int,
        'tipo': str,
        'valor_estimado': float
    },
    'receitas': {
        'id_receita': int,
        'id_empresa': int,
        'id_cliente': int,
        'categoria': str,
        'valor': float,
        'data': 'date',
        'descricao': str
    },
    'transferencias': {
        'id_transferencia': int,
        'id_empresa_origem': int,
        'id_empresa_destino': int,
        'tipo': str,
        'valor': float,
        'data': 'date',
        'descricao': str
    }
}

# função para listar arquivos csv automaticamente
def listar_arquivos_csv(pasta='.'):
    """
    Lista automaticamente os arquivos .csv presentes em uma pasta.

    Parâmetros:
    -----------
    pasta : str
        Caminho da pasta onde os arquivos serão listados. Por padrão, é a pasta atual ('.').

    Retorna:
    --------
    dict
        Um dicionário no formato {nome_arquivo_sem_extensao: caminho_completo_para_arquivo_csv}
    """
    return {
        os.path.splitext(f)[0]: os.path.join(pasta, f)
        for f in os.listdir(pasta)
        if f.endswith('.csv')
    }

# padronizar nomes de colunas para snake_case
def snake_case(texto: str):
    """
    Converte uma string para o formato snake_case.

    Parâmetros:
    -----------
    texto : str
        String a ser convertida.

    Retorna:
    --------
    str
        String convertida para snake_case.
    """
    texto = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', texto)
    texto = re.sub(r'[\s\-]+', '_', texto)
    return texto.lower()

# formatar colunas monetárias para exibição
def formatar_valores(dados: pd.DataFrame):
    """
    Formata colunas com a palavra 'valor' para o padrão monetário brasileiro (BRL).

    Parâmetros:
    -----------
    dados : pd.DataFrame
        DataFrame com colunas a serem formatadas.

    Retorna:
    --------
    pd.DataFrame
        DataFrame com os valores monetários formatados.
    """
    for coluna in dados.columns:
        if 'valor' in coluna:
            dados[coluna] = dados[coluna].apply(
                lambda x: f'R$ {x:,.2f}'.replace('.', '#').replace(',', '.').replace('#', ',')
                if pd.notnull(x) else x
            )
    return dados

# tratar, limpar e validar dados
def tratar_csv(caminho: str, nome_tabela: str):
    """
    Realiza o tratamento e validação de dados de um arquivo CSV conforme esquema definido.

    Parâmetros:
    -----------
    caminho : str
        Caminho para o arquivo CSV a ser tratado.

    nome_tabela : str
        Nome da tabela que será usada como referência de esquema.

    Retorna:
    --------
    pd.DataFrame
        DataFrame tratado e validado.
    """
    tabela = pd.read_csv(caminho)
    tabela.columns = [snake_case(col) for col in tabela.columns]

    for coluna in tabela.columns:
        if 'data' in coluna or 'dt' in coluna:
            tabela[coluna] = pd.to_datetime(tabela[coluna], errors='coerce').dt.date

    tabela = tabela.drop_duplicates()
    tabela = tabela.dropna(how='all')

    for coluna in tabela.columns:
        if tabela[coluna].dtype in ['float64', 'int64']:
            tabela[coluna] = tabela[coluna].fillna(0)
        elif tabela[coluna].dtype == 'object':
            tabela[coluna] = tabela[coluna].fillna('desconhecido')
        elif 'data' in coluna or 'dt' in coluna:
            tabela[coluna] = tabela[coluna].fillna(pd.NaT)

    if nome_tabela in esquemas:
        esquema = esquemas[nome_tabela]
        colunas_esperadas = set(esquema.keys())
        colunas_encontradas = set(tabela.columns)
        faltantes = colunas_esperadas - colunas_encontradas

        if faltantes:
            print(f'Aviso: Tabela "{nome_tabela}" está com colunas faltantes: {faltantes}')

        for coluna, tipo_esperado in esquema.items():
            if coluna in tabela.columns:
                if tipo_esperado == int:
                    tabela[coluna] = pd.to_numeric(tabela[coluna], errors='coerce').fillna(0).astype(int)
                elif tipo_esperado == float:
                    tabela[coluna] = pd.to_numeric(tabela[coluna], errors='coerce').fillna(0.0)
                elif tipo_esperado == 'date':
                    tabela[coluna] = pd.to_datetime(tabela[coluna], errors='coerce').dt.date
                elif tipo_esperado == str:
                    tabela[coluna] = tabela[coluna].astype(str).fillna('desconhecido')

    return tabela

# criar schema em SQL
script_schema = '''
    create table if not exists clientes (
        id_cliente integer primary key,
        nome text,
        email text,
        cpf text
    );

    create table if not exists despesas (
        id_despesa integer primary key,
        id_empresa integer,
        id_cliente integer,
        categoria text,
        valor real,
        data date,
        descricao text
    );

    create table if not exists empresas (
        id_empresa integer primary key,
        nome_fantasia text,
        cnpj text,
        data_fundacao date
    );

    create table if not exists orcamentos (
        id_orcamento integer primary key,
        id_empresa integer,
        ano integer,
        mes integer,
        tipo text,
        valor_estimado real
    );

    create table if not exists receitas (
        id_receita integer primary key,
        id_empresa integer,
        id_cliente integer,
        categoria text,
        valor real,
        data date,
        descricao text
    );

    create table if not exists transferencias (
        id_transferencia integer primary key,
        id_empresa_origem integer,
        id_empresa_destino integer,
        tipo text,
        valor real,
        data date,
        descricao text
    );
'''

# função para carregar dados no SQLite
def carregar_para_sqlite(banco_dados='dados.db', pasta_csv='.'):
    """
    Carrega dados CSV tratados em um banco SQLite com base no schema definido.

    Parâmetros:
    -----------
    banco_dados : str
        Caminho para o arquivo .db do SQLite (será criado se não existir).

    pasta_csv : str
        Caminho da pasta contendo os arquivos .csv.
    """
    arquivos_csv = listar_arquivos_csv(pasta_csv)
    conexao = sqlite3.connect(banco_dados)
    cursor = conexao.cursor()

    cursor.executescript(script_schema)

    for nome_tabela, caminho_arquivo in arquivos_csv.items():
        print(f'carregando {nome_tabela} ...')
        dados = tratar_csv(caminho_arquivo, nome_tabela)

        colunas = ', '.join(dados.columns)
        placeholders = ', '.join(['?' for _ in dados.columns])
        query = f'insert into {nome_tabela} ({colunas}) values ({placeholders})'

        cursor.executemany(query, dados.values.tolist())
        conexao.commit()

    print('dados carregados com sucesso.')
    conexao.close()

# executar carga
if __name__ == '__main__':
    carregar_para_sqlite()
    print('pipeline concluído.')


carregando transferencias ...
carregando receitas ...
carregando clientes ...
carregando empresas ...
carregando orcamentos ...
carregando despesas ...
dados carregados com sucesso.
pipeline concluído.
