In [1]:
#!/usr/bin/env python3
"""
fraud_analysis.py

Script para análise de risco de fraude em reembolsos.
"""
import argparse
import logging
import sys
from datetime import timedelta
from collections import defaultdict

import pandas as pd

# --- Constantes de negócio ---
VALOR_CONTRATO_ALTO = 3000
VALOR_REEMBOLSO_ALTO = 3000
VALOR_REEMBOLSO_MEDIO = 500
VALOR_REEMBOLSO_MUITO_ALTO = 1000
TEMPO_APROVACAO_MAXIMO = timedelta(minutes=1)
HORARIO_NOTURNO = 21
VALOR_REEMBOLSO_NOTURNO = 300
PERCENTUAL_REEMBOLSOS_ALTO = 0.7
VARIACAO_LIMIAR = 0.3
REEMBOLSOS_POR_DIA_SUSPEITOS = 3
TRANSACOES_ALTAS_POR_DIA = 2
TAXA_REEMBOLSO_ALTA_CONTRATO = 0.2


def preparar_dados(caminho_csv: str) -> pd.DataFrame:
    """
    Carrega e prepara os dados de reembolso.
    - Converte tipos
    - Renomeia colunas
    - Remove linhas faltantes
    - Extrai data, hora e simula tempo de aprovação
    """
    df = pd.read_csv(caminho_csv)

    # Conversões
    df['updated_at'] = pd.to_datetime(df['updated_at'], errors='coerce')
    df['valor_reembolso'] = pd.to_numeric(df.get('Valor do reembolso', df.get('valor_reembolso')), errors='coerce')
    df['valor_contrato'] = pd.to_numeric(df.get('Valor do contrato', df.get('valor_contrato')), errors='coerce')
    df['dias_agenciados'] = pd.to_numeric(df.get('contract_days', df.get('dias_agenciados')), errors='coerce')

    # Renomear para consistência
    df.rename(columns={
        'company_name': 'transportadora',
        'driver_name': 'motorista'
    }, inplace=True)

    # Remover nulos críticos
    df.dropna(subset=['updated_at', 'valor_reembolso', 'valor_contrato', 'dias_agenciados'], inplace=True)

    # Extrair data e hora
    df['data_transacao'] = df['updated_at'].dt.date
    df['hora_transacao'] = df['updated_at'].dt.time

    # Simulação de tempo de aprovação (substituir por dado real se disponível)
    df['tempo_aprovacao_minutos'] = [i * 0.1 for i in range(len(df))]

    return df


def identificar_transacoes_suspeitas(df: pd.DataFrame) -> pd.DataFrame:
    """
    Marca cada transação como suspeita conforme regras de negócio.
    """
    def is_suspeita(row):
        # 1. 50% do contrato alto
        if row['valor_contrato'] >= VALOR_CONTRATO_ALTO and row['valor_reembolso'] >= 0.5 * row['valor_contrato']:
            return True
        # 2. Valor muito alto
        if row['valor_reembolso'] > VALOR_REEMBOLSO_ALTO:
            return True
        # 3. Aprovação rápida e valor médio
        if row['tempo_aprovacao_minutos'] < TEMPO_APROVACAO_MAXIMO.total_seconds() / 60 and row['valor_reembolso'] > VALOR_REEMBOLSO_MEDIO:
            return True
        # 4. Reembolso noturno
        if row['hora_transacao'].hour >= HORARIO_NOTURNO and row['valor_reembolso'] > VALOR_REEMBOLSO_NOTURNO:
            return True
        return False

    df['suspeita'] = df.apply(is_suspeita, axis=1)
    return df


def analisar_comportamento(df: pd.DataFrame) -> dict:
    """
    Para cada transportadora/motorista, calcula métricas acumuladas e dias suspeitos.
    """
    resultados = defaultdict(lambda: {
        'total_reembolsos': 0,
        'num_reembolsos': 0,
        'dias_agenciados': 1,
        'percentual_reembolsos': 0,
        'variacao_reembolsos': 0,
        'variacao_valor_reembolsos': 0,
        'dias_suspeitos': []
    })

    # Sumário de contratos
    soma_contratos = df.groupby(['transportadora', 'motorista'])['valor_contrato'].sum()
    media_embolsos_por_dia = df.groupby(['transportadora', 'motorista']).apply(
        lambda x: len(x) / x['dias_agenciados'].mean() if x['dias_agenciados'].mean() > 0 else 0
    )
    media_valor_por_dia = df.groupby(['transportadora', 'motorista'])['valor_reembolso'].sum() / \
        df.groupby(['transportadora', 'motorista'])['dias_agenciados'].mean().replace(0, 1)

    for (empresa, motorista), grupo in df.groupby(['transportadora', 'motorista']):
        total = grupo['valor_reembolso'].sum()
        count = len(grupo)
        dias = grupo['dias_agenciados'].mean() or 1
        contrato = soma_contratos.get((empresa, motorista), 1)

        pct = total / contrato if contrato > 0 else 0
        reemb_dia = count / dias
        valor_dia = total / dias

        var_count = abs((reemb_dia - media_embolsos_por_dia.loc[(empresa, motorista)]) / media_embolsos_por_dia.loc[(empresa, motorista)]) \
            if media_embolsos_por_dia.loc[(empresa, motorista)] else 0
        var_valor = abs((valor_dia - media_valor_por_dia.loc[(empresa, motorista)]) / media_valor_por_dia.loc[(empresa, motorista)]) \
            if media_valor_por_dia.loc[(empresa, motorista)] else 0

        # Dias com múltiplas transações
        dias_sus = []
        for d, g in grupo.groupby('data_transacao'):
            if len(g[g['valor_reembolso'] > VALOR_REEMBOLSO_MEDIO]) >= REEMBOLSOS_POR_DIA_SUSPEITOS \
               or len(g[g['valor_reembolso'] > VALOR_REEMBOLSO_MUITO_ALTO]) >= TRANSACOES_ALTAS_POR_DIA:
                dias_sus.append(d)

        resultados[(empresa, motorista)] = {
            'total_reembolsos': total,
            'num_reembolsos': count,
            'dias_agenciados': dias,
            'percentual_reembolsos': pct,
            'variacao_reembolsos': var_count,
            'variacao_valor_reembolsos': var_valor,
            'dias_suspeitos': dias_sus
        }

    return resultados


def apply_advanced_rules(resultados: dict) -> None:
    """
    Adiciona flag de "alto volume" segundo taxa em contrato.
    """
    for key, val in resultados.items():
        val['regra_volume_alto'] = val['percentual_reembolsos'] > TAXA_REEMBOLSO_ALTA_CONTRATO


def calcular_indicador_risco(resultados: dict) -> dict:
    """
    Computa um score de risco (número de alertas) por transportadora/motorista.
    """
    scores = {}
    for key, val in resultados.items():
        count = 0
        if val['percentual_reembolsos'] >= PERCENTUAL_REEMBOLSOS_ALTO: count += 1
        if val['variacao_reembolsos'] >= VARIACAO_LIMIAR: count += 1
        if val['variacao_valor_reembolsos'] >= VARIACAO_LIMIAR: count += 1
        if val['dias_suspeitos']: count += 1
        if val.get('regra_volume_alto'): count += 1
        scores[key] = count
    return scores


def gerar_relatorio(df: pd.DataFrame, resultados: dict, scores: dict) -> str:
    """
    Monta texto de relatório com detalhes e casos de alto risco.
    """
    rel = ["Análise de Risco de Fraude em Reembolsos:\n"]
    alto = [(k, s) for k, s in scores.items() if s >= 2]
    if not alto:
        rel.append("Nenhum caso de alto risco detectado.\n")
    else:
        rel.append("Transportadoras/Motoristas de Alto Risco:\n")
        for (empresa, motorista), s in sorted(alto, key=lambda x: -x[1]):
            rel.append(f"- {empresa} / {motorista}: alertas = {s}\n")
    return ''.join(rel)


# def main():
#     parser = argparse.ArgumentParser(description="Análise de risco de fraude em reembolsos.")
#     parser.add_argument('input_csv', help='Caminho para o arquivo de dados CSV')
#     parser.add_argument('--report', help='Arquivo de saída do relatório (texto)', default=None)
#     args = parser.parse_args()

#     logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

#     try:
#         df = preparar_dados(args.input_csv)
#         df = identificar_transacoes_suspeitas(df)
#         resultados = analisar_comportamento(df)
#         apply_advanced_rules(resultados)
#         scores = calcular_indicador_risco(resultados)
#         rel_text = gerar_relatorio(df, resultados, scores)

#         if args.report:
#             with open(args.report, 'w', encoding='utf-8') as f:
#                 f.write(rel_text)
#             logging.info(f"Relatório salvo em {args.report}")
#         else:
#             sys.stdout.write(rel_text)
#     except Exception as e:
#         logging.error(str(e))
#         sys.exit(1)


# if __name__ == '__main__':
#     main()


In [2]:

# 2. Carregue e prepare os dados
df = preparar_dados("./src/data/data.csv")

# 3. Identifique transações suspeitas
df = identificar_transacoes_suspeitas(df)

# 4. Analise o comportamento agregado
resultados = analisar_comportamento(df)

# 5. Aplique regras avançadas (flag de alto volume)
apply_advanced_rules(resultados)

# 6. Calcule o score de risco por transportadora/motorista
scores = calcular_indicador_risco(resultados)

# 7. Gere o relatório final
relatorio = gerar_relatorio(df, resultados, scores)

# 8. Exiba ou salve
print(relatorio)
# ou
with open("relatorio.txt", "w", encoding="utf-8") as f:
    f.write(relatorio)

  media_embolsos_por_dia = df.groupby(['transportadora', 'motorista']).apply(


Análise de Risco de Fraude em Reembolsos:
Transportadoras/Motoristas de Alto Risco:
- ABC CARGAS LTDA / ANDRE LUIZ DOS SANTOS DE SOUZA: alertas = 3
- ABC CARGAS LTDA / CARLOS ALMIR PAIXAO DA SILVA: alertas = 3
- ABC CARGAS LTDA / CLAUDINEI BAPTISTA DOS SANTOS: alertas = 3
- ABC CARGAS LTDA / DANTE BARBOSA DIAS CARNEIRO: alertas = 3
- ABC CARGAS LTDA / DAVID CICERO DA SILVA ALMEIDA: alertas = 3
- ABC CARGAS LTDA / HAILTON PAIVA BARBOSA: alertas = 3
- ABC CARGAS LTDA / ISAIAS DE LIMA BATISTA: alertas = 3
- ABC CARGAS LTDA / JOSE MARTINS BARBOSA DE OLIVEIRA: alertas = 3
- ABC CARGAS LTDA / LUIZ CARLOS DE REZENDE: alertas = 3
- ABC CARGAS LTDA / MANOEL ROSA DOS SANTOS: alertas = 3
- ABC CARGAS LTDA / NORTON LUIS DE ALMEIDA: alertas = 3
- ABC CARGAS LTDA / ROSIVALDO MANOEL DA SILVA: alertas = 3
- ABC CARGAS LTDA / anonymous: alertas = 3
- ATHIVALOG - CUIABA / UGLEIDSON FERREIRA DE BARROS: alertas = 3
- AXON TRANSPORTES S/A / JOHN PASCH: alertas = 3
- Atmosfera - Jundiaí Industrial / VALDENI