# Notebook 4: Detecção Forense e Análise de Manipulação

Este notebook explora técnicas **forenses avançadas** para detectar manipulação de mercado.

## Objetivos de Aprendizado

1. Implementar múltiplos detectores de manipulação
2. Avaliar performance usando ROC curves e métricas
3. Comparar eficácia de diferentes métodos
4. Desenvolver pipeline de detecção prático

In [None]:
# Imports
import random
import numpy as np
from market_lab.core.market import MarketConfig
from market_lab.core.traders import build_traders
from market_lab.core.sentiment import NoSentiment
from market_lab.core.simulation import SimulationRunner
from market_lab.manipulation.manipulator import Manipulator
from market_lab.manipulation.detection import (
    compute_price_volume_anomaly, 
    curve_imbalance_score,
    attach_anomaly_scores
)
from market_lab.viz.plots import plot_price_series, plot_manipulation_score

import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, precision_recall_curve, confusion_matrix
import seaborn as sns
%matplotlib inline

## Preparação de Dados

Vamos gerar dois datasets:
1. **Mercado limpo** (sem manipulação)
2. **Mercado manipulado** (com pump-and-dump)

In [None]:
# Configuração comum
config = MarketConfig(
    n_traders=150,
    initial_price=100.0,
    price_volatility=2.0,
    max_daily_volume=10.0,
    wealth_mode="limited",
    price_tick=1.0,
    seed=42
)

# 1. Mercado LIMPO (sem manipulação)
traders_clean = build_traders(config, random.Random(42))
runner_clean = SimulationRunner(
    config=config,
    traders=traders_clean,
    sentiment=NoSentiment(),
    manipulator=None
)
states_clean = runner_clean.run(n_days=100)

# 2. Mercado MANIPULADO (com pump-and-dump)
traders_manipulated = build_traders(config, random.Random(42))
manipulator = Manipulator(
    trader_id="manipulator",
    rng=random.Random(123),
    wealth=100_000.0,
    holdings=50.0,
    accumulation_days=30,
    pump_days=10,
    dump_days=15
)
runner_manipulated = SimulationRunner(
    config=config,
    traders=traders_manipulated,
    sentiment=NoSentiment(),
    manipulator=manipulator
)
states_manipulated = runner_manipulated.run(n_days=100)

print(f"Dados preparados:")
print(f"  Mercado limpo: {len(states_clean)} dias")
print(f"  Mercado manipulado: {len(states_manipulated)} dias")

## Criar Ground Truth Labels

Para avaliar detectores, precisamos de labels verdadeiros:
- `0` = dia normal
- `1` = dia com manipulação (pump ou dump)

In [None]:
# Ground truth para mercado manipulado
# Dias 30-39 (pump) e 40-55 (dump) são manipulação
ground_truth = []
for day in range(100):
    if 30 <= day <= 55:  # Fases de pump e dump
        ground_truth.append(1)
    else:
        ground_truth.append(0)

print(f"Ground truth criado:")
print(f"  Dias com manipulação: {sum(ground_truth)}")
print(f"  Dias normais: {len(ground_truth) - sum(ground_truth)}")

## Detector 1: Price-Volume Anomaly

Combina z-scores de preço e volume.

In [None]:
# Aplicar detector
scores_pv = compute_price_volume_anomaly(states_manipulated, window=20)

# Visualizar
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8))

# Preço
plot_price_series(states_manipulated, ax=ax1)
ax1.set_title('Mercado Manipulado')
ax1.axvspan(30, 55, alpha=0.2, color='red', label='Manipulação')
ax1.legend()

# Scores de anomalia
days = list(range(100))
ax2.plot(days, scores_pv, linewidth=2, color='purple', label='Anomaly Score')
ax2.axhline(y=np.percentile(scores_pv, 90), color='red', linestyle='--', label='90th percentile')
ax2.set_xlabel('Dia')
ax2.set_ylabel('Anomaly Score')
ax2.set_title('Detector 1: Price-Volume Anomaly')
ax2.axvspan(30, 55, alpha=0.2, color='red')
ax2.legend()
ax2.grid(alpha=0.3)

plt.tight_layout()
plt.show()

## Detector 2: Curve Imbalance

Mede desbalanceamento entre curvas de oferta e demanda.

In [None]:
# Calcular imbalance scores
scores_imbalance = [curve_imbalance_score(state.order_curves) 
                    for state in states_manipulated]

# Visualizar
plt.figure(figsize=(14, 5))
plt.plot(days, scores_imbalance, linewidth=2, color='orange', label='Imbalance Score')
plt.axhline(y=np.percentile(scores_imbalance, 90), color='red', linestyle='--', 
            label='90th percentile')
plt.xlabel('Dia')
plt.ylabel('Imbalance Score')
plt.title('Detector 2: Order Curve Imbalance')
plt.axvspan(30, 55, alpha=0.2, color='red', label='Manipulação')
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

## Detector 3: Rolling Volatility Spike

Detecta mudanças súbitas em volatilidade.

In [None]:
# Calcular rolling volatility
prices = [s.price for s in states_manipulated]
returns = [(prices[i] - prices[i-1]) / prices[i-1] * 100 for i in range(1, len(prices))]

window = 10
rolling_vol = [0.0]  # Primeiro dia
for i in range(1, len(returns) + 1):
    start = max(0, i - window)
    window_returns = returns[start:i]
    rolling_vol.append(np.std(window_returns) if len(window_returns) > 1 else 0.0)

# Score baseado em desvio da volatilidade média
mean_vol = np.mean(rolling_vol)
std_vol = np.std(rolling_vol)
scores_volatility = [(v - mean_vol) / std_vol if std_vol > 0 else 0.0 for v in rolling_vol]

# Visualizar
plt.figure(figsize=(14, 5))
plt.plot(days, scores_volatility, linewidth=2, color='green', label='Volatility Z-Score')
plt.axhline(y=2, color='red', linestyle='--', label='Threshold (±2σ)')
plt.axhline(y=-2, color='red', linestyle='--')
plt.axhline(y=0, color='gray', linestyle='-', alpha=0.3)
plt.xlabel('Dia')
plt.ylabel('Volatility Z-Score')
plt.title('Detector 3: Rolling Volatility Spike')
plt.axvspan(30, 55, alpha=0.2, color='red', label='Manipulação')
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

## Avaliação: ROC Curves

Receiver Operating Characteristic (ROC) curves mostram trade-off entre:
- **True Positive Rate** (sensibilidade)
- **False Positive Rate** (1 - especificidade)

In [None]:
# Calcular ROC curves para cada detector
fpr_pv, tpr_pv, _ = roc_curve(ground_truth, scores_pv)
fpr_imb, tpr_imb, _ = roc_curve(ground_truth, scores_imbalance)
fpr_vol, tpr_vol, _ = roc_curve(ground_truth, np.abs(scores_volatility))

# Calcular AUC (Area Under Curve)
auc_pv = auc(fpr_pv, tpr_pv)
auc_imb = auc(fpr_imb, tpr_imb)
auc_vol = auc(fpr_vol, tpr_vol)

# Plotar ROC curves
plt.figure(figsize=(10, 8))
plt.plot(fpr_pv, tpr_pv, linewidth=2.5, label=f'Price-Volume (AUC={auc_pv:.3f})', color='purple')
plt.plot(fpr_imb, tpr_imb, linewidth=2.5, label=f'Curve Imbalance (AUC={auc_imb:.3f})', color='orange')
plt.plot(fpr_vol, tpr_vol, linewidth=2.5, label=f'Volatility Spike (AUC={auc_vol:.3f})', color='green')
plt.plot([0, 1], [0, 1], 'k--', linewidth=2, label='Random (AUC=0.500)', alpha=0.5)

plt.xlabel('False Positive Rate', fontsize=12)
plt.ylabel('True Positive Rate', fontsize=12)
plt.title('ROC Curves: Comparação de Detectores', fontsize=14, fontweight='bold')
plt.legend(fontsize=11, loc='lower right')
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\nComparação de AUC:")
print(f"  Price-Volume Anomaly: {auc_pv:.4f}")
print(f"  Curve Imbalance: {auc_imb:.4f}")
print(f"  Volatility Spike: {auc_vol:.4f}")
print(f"\nMelhor detector: {'Price-Volume' if auc_pv == max(auc_pv, auc_imb, auc_vol) else 'Curve Imbalance' if auc_imb == max(auc_pv, auc_imb, auc_vol) else 'Volatility'}")

## Avaliação: Precision-Recall Curves

Para datasets desbalanceados (mais dias normais que manipulados), Precision-Recall é mais informativa.

In [None]:
# Calcular Precision-Recall curves
precision_pv, recall_pv, _ = precision_recall_curve(ground_truth, scores_pv)
precision_imb, recall_imb, _ = precision_recall_curve(ground_truth, scores_imbalance)
precision_vol, recall_vol, _ = precision_recall_curve(ground_truth, np.abs(scores_volatility))

# Plotar
plt.figure(figsize=(10, 8))
plt.plot(recall_pv, precision_pv, linewidth=2.5, label='Price-Volume', color='purple')
plt.plot(recall_imb, precision_imb, linewidth=2.5, label='Curve Imbalance', color='orange')
plt.plot(recall_vol, precision_vol, linewidth=2.5, label='Volatility Spike', color='green')

baseline = sum(ground_truth) / len(ground_truth)
plt.axhline(y=baseline, color='k', linestyle='--', linewidth=2, 
            label=f'Baseline ({baseline:.3f})', alpha=0.5)

plt.xlabel('Recall (Sensitivity)', fontsize=12)
plt.ylabel('Precision', fontsize=12)
plt.title('Precision-Recall Curves', fontsize=14, fontweight='bold')
plt.legend(fontsize=11, loc='lower left')
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

## Matriz de Confusão

Vamos escolher um threshold e avaliar performance detalhada.

In [None]:
# Usar Price-Volume detector (melhor AUC)
threshold = np.percentile(scores_pv, 75)  # 75th percentile como threshold
predictions = [1 if score > threshold else 0 for score in scores_pv]

# Calcular confusion matrix
cm = confusion_matrix(ground_truth, predictions)

# Visualizar
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=True,
            xticklabels=['Normal', 'Manipulation'],
            yticklabels=['Normal', 'Manipulation'])
plt.ylabel('True Label', fontsize=12)
plt.xlabel('Predicted Label', fontsize=12)
plt.title('Confusion Matrix: Price-Volume Detector', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

# Calcular métricas
tn, fp, fn, tp = cm.ravel()
accuracy = (tp + tn) / (tp + tn + fp + fn)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print(f"\nMétricas de Performance (threshold={threshold:.2f}):")
print(f"  Accuracy: {accuracy:.4f}")
print(f"  Precision: {precision:.4f}")
print(f"  Recall: {recall:.4f}")
print(f"  F1-Score: {f1_score:.4f}")
print(f"\nConfusion Matrix:")
print(f"  True Negatives: {tn}")
print(f"  False Positives: {fp}")
print(f"  False Negatives: {fn}")
print(f"  True Positives: {tp}")

## Detector Ensemble

Combinar múltiplos detectores pode melhorar performance.

In [None]:
# Normalizar scores para [0, 1]
def normalize(scores):
    min_s, max_s = min(scores), max(scores)
    if max_s - min_s == 0:
        return [0.5] * len(scores)
    return [(s - min_s) / (max_s - min_s) for s in scores]

scores_pv_norm = normalize(scores_pv)
scores_imb_norm = normalize(scores_imbalance)
scores_vol_norm = normalize([abs(s) for s in scores_volatility])

# Ensemble: média ponderada
weights = [0.5, 0.3, 0.2]  # Price-Volume tem mais peso
scores_ensemble = [
    weights[0] * pv + weights[1] * imb + weights[2] * vol
    for pv, imb, vol in zip(scores_pv_norm, scores_imb_norm, scores_vol_norm)
]

# Avaliar ensemble
fpr_ens, tpr_ens, _ = roc_curve(ground_truth, scores_ensemble)
auc_ens = auc(fpr_ens, tpr_ens)

# Comparar com detectores individuais
plt.figure(figsize=(10, 8))
plt.plot(fpr_pv, tpr_pv, linewidth=2, label=f'Price-Volume (AUC={auc_pv:.3f})', 
         color='purple', alpha=0.6)
plt.plot(fpr_imb, tpr_imb, linewidth=2, label=f'Curve Imbalance (AUC={auc_imb:.3f})', 
         color='orange', alpha=0.6)
plt.plot(fpr_vol, tpr_vol, linewidth=2, label=f'Volatility Spike (AUC={auc_vol:.3f})', 
         color='green', alpha=0.6)
plt.plot(fpr_ens, tpr_ens, linewidth=3, label=f'ENSEMBLE (AUC={auc_ens:.3f})', 
         color='red', linestyle='-')
plt.plot([0, 1], [0, 1], 'k--', linewidth=2, alpha=0.5)

plt.xlabel('False Positive Rate', fontsize=12)
plt.ylabel('True Positive Rate', fontsize=12)
plt.title('ROC Curve: Ensemble vs Individual Detectors', fontsize=14, fontweight='bold')
plt.legend(fontsize=11, loc='lower right')
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\nPerformance Comparison:")
print(f"  Ensemble AUC: {auc_ens:.4f}")
improvement = max(auc_pv, auc_imb, auc_vol)
print(f"  Best Individual: {improvement:.4f}")
print(f"  Improvement: {((auc_ens - improvement) / improvement * 100):.2f}%")

## Análise de Falsos Positivos/Negativos

Vamos investigar onde os detectores erram.

In [None]:
# Threshold para ensemble
threshold_ens = np.percentile(scores_ensemble, 75)
predictions_ens = [1 if score > threshold_ens else 0 for score in scores_ensemble]

# Identificar erros
false_positives = [i for i in range(len(ground_truth)) 
                   if ground_truth[i] == 0 and predictions_ens[i] == 1]
false_negatives = [i for i in range(len(ground_truth)) 
                   if ground_truth[i] == 1 and predictions_ens[i] == 0]

# Visualizar
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))

# Preço com erros marcados
ax1.plot(days, prices, linewidth=2, color='blue', label='Preço')
ax1.scatter([days[i] for i in false_positives], [prices[i] for i in false_positives],
            color='orange', s=100, marker='x', label='False Positives', zorder=5)
ax1.scatter([days[i] for i in false_negatives], [prices[i] for i in false_negatives],
            color='red', s=100, marker='o', label='False Negatives', zorder=5)
ax1.axvspan(30, 55, alpha=0.2, color='gray', label='True Manipulation')
ax1.set_ylabel('Preço')
ax1.set_title('Análise de Erros: Ensemble Detector', fontweight='bold')
ax1.legend(loc='upper left')
ax1.grid(alpha=0.3)

# Scores com predictions
ax2.plot(days, scores_ensemble, linewidth=2, color='purple', label='Ensemble Score')
ax2.axhline(y=threshold_ens, color='red', linestyle='--', label=f'Threshold={threshold_ens:.2f}')
ax2.scatter([days[i] for i in false_positives], [scores_ensemble[i] for i in false_positives],
            color='orange', s=100, marker='x', zorder=5)
ax2.scatter([days[i] for i in false_negatives], [scores_ensemble[i] for i in false_negatives],
            color='red', s=100, marker='o', zorder=5)
ax2.axvspan(30, 55, alpha=0.2, color='gray')
ax2.set_xlabel('Dia')
ax2.set_ylabel('Anomaly Score')
ax2.legend()
ax2.grid(alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nAnálise de Erros:")
print(f"  False Positives: {len(false_positives)} dias - {false_positives[:5]}...")
print(f"  False Negatives: {len(false_negatives)} dias - {false_negatives[:5]}...")
print(f"\nPossíveis causas:")
print(f"  - False Positives: Volatilidade natural alta")
print(f"  - False Negatives: Manipulação sutil (accumulation phase)")

## Pipeline de Detecção Prático

Vamos criar uma função reutilizável para detectar manipulação.

In [None]:
class ManipulationDetector:
    """
    Pipeline completo para detectar manipulação de mercado.
    """
    
    def __init__(self, threshold_percentile=75):
        self.threshold_percentile = threshold_percentile
        self.threshold = None
    
    def fit(self, states_clean):
        """Treinar detector em mercado limpo para calibrar threshold."""
        scores = compute_price_volume_anomaly(states_clean, window=20)
        self.threshold = np.percentile(scores, self.threshold_percentile)
        return self
    
    def predict(self, states):
        """Detectar dias com manipulação."""
        scores = compute_price_volume_anomaly(states, window=20)
        predictions = [1 if score > self.threshold else 0 for score in scores]
        return predictions, scores
    
    def report(self, states, ground_truth=None):
        """Gerar relatório de detecção."""
        predictions, scores = self.predict(states)
        
        print("\n" + "="*60)
        print("RELATÓRIO DE DETECÇÃO DE MANIPULAÇÃO")
        print("="*60)
        
        days_flagged = [i for i, p in enumerate(predictions) if p == 1]
        print(f"\nDias flagged: {len(days_flagged)} / {len(predictions)}")
        print(f"Dias suspeitos: {days_flagged[:10]}..." if len(days_flagged) > 10 else f"Dias suspeitos: {days_flagged}")
        
        if ground_truth:
            cm = confusion_matrix(ground_truth, predictions)
            tn, fp, fn, tp = cm.ravel()
            accuracy = (tp + tn) / (tp + tn + fp + fn)
            precision = tp / (tp + fp) if (tp + fp) > 0 else 0
            recall = tp / (tp + fn) if (tp + fn) > 0 else 0
            
            print(f"\nPerformance Metrics:")
            print(f"  Accuracy: {accuracy:.4f}")
            print(f"  Precision: {precision:.4f}")
            print(f"  Recall: {recall:.4f}")
            print(f"  True Positives: {tp}")
            print(f"  False Positives: {fp}")
            print(f"  False Negatives: {fn}")
        
        print("="*60 + "\n")
        return predictions, scores

# Usar detector
detector = ManipulationDetector(threshold_percentile=75)
detector.fit(states_clean)
predictions, scores = detector.report(states_manipulated, ground_truth=ground_truth)

## Exercícios Práticos

### Exercício 1: Otimizar Threshold
Use grid search para encontrar o threshold ótimo:
- Teste percentiles de 50 a 95
- Maximize F1-score
- Compare com threshold fixo

### Exercício 2: Feature Engineering
Crie novos detectores baseados em:
- Skewness e kurtosis dos retornos
- Autocorrelação de primeira ordem
- Ratio volume/preço

### Exercício 3: Detecção em Tempo Real
Implemente detector online que:
- Usa apenas dados passados (sem look-ahead bias)
- Atualiza threshold dinamicamente
- Emite alertas em tempo real

### Exercício 4: Robustez
Teste robustez do detector:
- Manipulação mais sutil (capital menor)
- Diferentes fases (só pump, só dump)
- Múltiplos manipuladores simultâneos

### Exercício 5: Custo de Erros
Considere custos assimétricos:
- False Negative custa 10x mais que False Positive
- Ajuste threshold para minimizar custo total
- Compare ROC tradicional com cost-sensitive ROC

In [None]:
# Exemplo de solução para Exercício 1: Otimizar Threshold
from sklearn.metrics import f1_score

# Grid search sobre percentiles
percentiles = range(50, 96, 5)
best_f1 = 0
best_percentile = None
results = []

for p in percentiles:
    threshold = np.percentile(scores_pv, p)
    predictions = [1 if score > threshold else 0 for score in scores_pv]
    f1 = f1_score(ground_truth, predictions)
    results.append((p, f1, threshold))
    
    if f1 > best_f1:
        best_f1 = f1
        best_percentile = p

# Visualizar
plt.figure(figsize=(12, 6))
percentiles_list = [r[0] for r in results]
f1_scores = [r[1] for r in results]
plt.plot(percentiles_list, f1_scores, marker='o', linewidth=2, markersize=8)
plt.axvline(x=best_percentile, color='red', linestyle='--', 
            label=f'Optimal: {best_percentile}th percentile (F1={best_f1:.4f})')
plt.xlabel('Threshold Percentile', fontsize=12)
plt.ylabel('F1-Score', fontsize=12)
plt.title('Threshold Optimization via Grid Search', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\nOptimal threshold:")
print(f"  Percentile: {best_percentile}")
print(f"  F1-Score: {best_f1:.4f}")
print(f"  Threshold value: {results[percentiles_list.index(best_percentile)][2]:.4f}")

## Conclusões

Neste notebook, você aprendeu:

1. **Múltiplos detectores**: Diferentes métodos capturam diferentes aspectos de manipulação
2. **Métricas de avaliação**: ROC curves, AUC, Precision-Recall, Confusion Matrix
3. **Ensemble methods**: Combinar detectores melhora performance
4. **Análise de erros**: Entender falsos positivos/negativos ajuda a refinar detectores
5. **Pipeline prático**: Detector reutilizável para aplicações reais

### Aplicações no Mundo Real

- **Reguladores financeiros**: Detectar manipulação em exchanges
- **Exchanges**: Monitoramento em tempo real
- **Investidores**: Evitar ativos manipulados
- **Pesquisa acadêmica**: Estudar eficiência de mercado

### Limitações e Desafios

1. **Dados limitados**: Poucos casos reais rotulados
2. **Evolução de táticas**: Manipuladores adaptam estratégias
3. **Trade-offs**: Sensibilidade vs especificidade
4. **Custo de erros**: False positives custam investigação, false negatives permitem fraude

### Aprendizados Finais

Você completou todos os 4 notebooks do Market Manipulation Lab! Agora você entende:
- Como mercados funcionam (Notebook 1)
- Como restrições criam dinâmicas (Notebook 2)
- Como manipulação acontece (Notebook 3)
- Como detectar manipulação (Notebook 4)

**Parabéns!** Continue experimentando e explorando.