In [None]:
import pandas as pd
import numpy as np
import time
import os

# ------------------ Funções de limpeza ------------------ #

def padronizar_colunas(df):
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_')
    return df

def remover_duplicatas(df):
    return df.drop_duplicates()

def tratar_nulos(df, metodo='drop', valor_preencher=0):
    if metodo == 'drop':
        return df.dropna()
    elif metodo == 'fill':
        return df.fillna(valor_preencher)
    else:
        return df

def normalizar_categorias(df, coluna, substituicoes):
    if coluna in df.columns:
        df[coluna] = df[coluna].replace(substituicoes)
    return df

def remover_outliers(df, colunas_numericas, z_thresh=3):
    for col in colunas_numericas:
        if col in df.columns and pd.api.types.is_numeric_dtype(df[col]):
            col_zscore = (df[col] - df[col].mean()) / df[col].std()
            df = df[np.abs(col_zscore) < z_thresh]
    return df

# ------------------ Limpeza de novos dados ------------------ #

def limpar_novos_dados(df, col_cat=None, subs_cat=None, col_num=None):
    df = padronizar_colunas(df)
    df = remover_duplicatas(df)
    df = tratar_nulos(df, metodo='fill', valor_preencher='desconhecido')
    if col_cat and subs_cat:
        df = normalizar_categorias(df, col_cat, subs_cat)
    if col_num:
        df = remover_outliers(df, col_num)
    return df

# ------------------ Monitoramento contínuo (streaming simulado) ------------------ #

def monitorar_csv_em_streaming(caminho_csv, intervalo=2, col_cat=None, subs_cat=None, col_num=None, salvar_em=None):
    ultima_linha = 0
    dados_processados = pd.DataFrame()

    print("⏳ Iniciando monitoramento do CSV em streaming...\n")
    while True:
        try:
            if not os.path.exists(caminho_csv):
                print("Arquivo ainda não disponível...")
                time.sleep(intervalo)
                continue

            df = pd.read_csv(caminho_csv)
            novos_dados = df.iloc[ultima_linha:]
            if not novos_dados.empty:
                print(f"🔄 Novas linhas detectadas: {len(novos_dados)}")
                dados_limpos = limpar_novos_dados(novos_dados, col_cat, subs_cat, col_num)
                dados_processados = pd.concat([dados_processados, dados_limpos], ignore_index=True)
                ultima_linha = len(df)
                print(f"✅ Total de linhas processadas até agora: {len(dados_processados)}\n")

                if salvar_em:
                    dados_processados.to_csv(salvar_em, index=False)

            else:
                print("⏸️ Nenhuma nova linha. Aguardando...\n")

            time.sleep(intervalo)

        except Exception as e:
            print(f"⚠️ Erro: {e}")
            time.sleep(intervalo)

# ------------------ Exemplo de uso ------------------ #

# Descomente e edite o caminho para usar:
# monitorar_csv_em_streaming(
#     caminho_csv='dados_fluxo.csv',
#     intervalo=2,
#     col_cat='categoria',
#     subs_cat={'escola ': 'Escola', 'escola': 'Escola'},
#     col_num=['latitude', 'longitude'],
#     salvar_em='dados_limpos.csv'
# )