<a href="https://colab.research.google.com/github/zDaniloBernardi/Prime-Numbers/blob/Valida%C3%A7%C3%A3o-de-Parametros/Valida%C3%A7%C3%A3o_de_Par%C3%A2metros_te%C3%B3ricos_V2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

"""
Análise Preditiva de Gaps entre Números Primos

Este script realiza uma análise dos gaps entre números primos consecutivos
com o objetivo de prever características do próximo gap.
Ele inclui:
- Geração de primos e gaps.
- Análise de estacionariedade.
- Diferenciação (inteira e fracionária - ARFIMA).
- Engenharia de features (estatísticas robustas em janela, FFT, event window).
- Modelagem preditiva com XGBoost.
- Validação cruzada temporal e avaliação de performance.
- Testes estatísticos para comparação de modelos.
- Análise de importância de features.
"""

In [5]:
pip install fracdiff

[31mERROR: Ignored the following versions that require a different python version: 0.9.0 Requires-Python >=3.7.12,<3.10[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement fracdiff (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for fracdiff[0m[31m
[0m

In [3]:
# --------------------------------------------------------------------------
# Importações de Bibliotecas
# --------------------------------------------------------------------------
import numpy as np
import pandas as pd
from sympy import primerange, ntheory # ntheory para estimar o n-ésimo primo
import math
from scipy.stats import median_abs_deviation
from scipy.signal import detrend, get_window
from statsmodels.tsa.stattools import adfuller, kpss
from fracdiff import Fracdiff # Para diferenciação fracionária: pip install fracdiff
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import StandardScaler
from sklearn.inspection import permutation_importance
import xgboost as xgb
from scipy.stats import wilcoxon # Para teste de Wilcoxon
from sklearn.utils import resample # Para Bootstrap CI

# --------------------------------------------------------------------------
# ⚙️ CONFIGURAÇÕES GLOBAIS
# --------------------------------------------------------------------------
# --- Configurações de Dados ---
N_PRIMES = 1_000_000  # Número de primos a serem gerados
# Estimativa para o limite superior da geração de primos (PNT: p_n ~ n log n)
# Usaremos uma estimativa mais folgada para garantir que temos N_PRIMES
PRIME_UPPER_BOUND_ESTIMATE = int(N_PRIMES * (math.log(N_PRIMES) + math.log(math.log(N_PRIMES)) * 1.5)) if N_PRIMES > 1000 else 15_485_864 # (p_1M é ~1.5e7)

# --- Configurações de Diferenciação e ARFIMA ---
# Se True, aplica diferenciação. 'integer' ou 'fractional'.
APPLY_DIFFERENCING = True
DIFFERENCING_TYPE = 'fractional' # 'integer' ou 'fractional'
INTEGER_DIFF_ORDER = 1 # Ordem para diferenciação inteira (d=1)
# Parâmetro 'd' para diferenciação fracionária (baseado no roadmap R3.2, d≈0.215)
# Este valor pode ser ajustado/otimizado.
FRACTIONAL_DIFF_D = 0.215

# --- Configurações de Features em Janela Deslizante ---
WINDOW_SIZE_STATS = 20 # Para Median_Gap, MAD_Gap (Roadmap R2.1)
STATS_STEP = 1 # Passo para janelas de estatísticas (pode ser ajustado para reduzir correlação ou computação)

WINDOW_SIZE_FFT = 20 # Janela para FFT (Roadmap R4.1)
FFT_STEP = WINDOW_SIZE_FFT // 2 # Overlap de 50% para FFT (Roadmap R4.1)
FFT_TOP_K = 3 # Número de componentes FFT a extrair (Roadmap R4.1)
FFT_WINDOW_TYPE = 'hann' # Tipo de janela para FFT (ex: 'hann', 'hamming')
FFT_APPLY_DETREND = True # Se True, aplica detrend local antes da FFT

# --- Configurações da Feature "Event Window" ---
EVENT_WINDOW_SIGMA_MULTIPLIER = 3.0 # Multiplicador de sigma para definir um pico (Roadmap R8.1)
# Usa os mesmos WINDOW_SIZE_FFT e FFT_STEP para alinhar com features FFT

# --- Configurações do Modelo Preditivo (XGBoost) ---
# Baseado no Roadmap R5.2
XGB_PARAMS = {
    'n_estimators': 300,
    'max_depth': 3,
    'learning_rate': 0.01,
    'subsample': 0.6,
    'colsample_bytree': 0.6,
    'objective': 'reg:squarederror', # Para regressão
    'random_state': 42,
    'n_jobs': -1, # Usar todos os cores disponíveis
    'verbosity': 0 # xgb verbosity (0: silent, 1: warning, 2: info, 3: debug)
}

# --- Configurações de Validação ---
CV_N_SPLITS = 5 # Número de folds para TimeSeriesSplit (Roadmap R5.2)
TEST_SET_SIZE = 0.1 # Proporção dos dados para o conjunto de hold-out final (Roadmap R10.1)

# --- Configurações para Testes Estatísticos de Comparação de Modelos ---
# Para Bootstrap CI na diferença de MAE (Roadmap R6.2, R7.2)
BOOTSTRAP_N_REPLICAS = 1000
BOOTSTRAP_CI_PERCENTILES = [2.5, 97.5]
# Nível de significância para testes estatísticos (Wilcoxon, etc.)
ALPHA_SIGNIFICANCE = 0.05

# --------------------------------------------------------------------------
# 🔬 FUNÇÕES AUXILIARES
# --------------------------------------------------------------------------

# === Bloco R1: Geração de Dados Básicos ===
def generate_primes_gaps(n_primes_target: int, upper_bound_estimate: int) -> tuple[np.ndarray, np.ndarray]:
    """
    Gera os primeiros `n_primes_target` números primos e calcula os gaps entre eles.

    Args:
        n_primes_target: Número desejado de primos.
        upper_bound_estimate: Uma estimativa superior para o n_primes_target-ésimo primo.

    Returns:
        Tuple contendo array de primos e array de gaps.
    """
    print(f"Gerando aproximadamente {n_primes_target:,} primos até o limite superior {upper_bound_estimate:,}...")
    try:
        primes_list = list(primerange(2, upper_bound_estimate))
    except TypeError: # Em algumas versões antigas de sympy, pode haver problemas
        print("Aviso: Houve um TypeError com sympy.primerange. Tentando converter limite para int.")
        primes_list = list(primerange(2, int(upper_bound_estimate)))


    if len(primes_list) >= n_primes_target:
        primes = np.array(primes_list[:n_primes_target], dtype=np.int64)
    else:
        print(f"Aviso: A estimativa do limite superior ({upper_bound_estimate:,}) "
              f"gerou apenas {len(primes_list):,} primos. "
              f"Usando todos os primos gerados ou considere aumentar o limite.")
        primes = np.array(primes_list, dtype=np.int64)
        if not primes.size:
             raise ValueError("Nenhum primo gerado. Verifique o upper_bound_estimate.")


    gaps = np.diff(primes)
    print(f"Gerados {len(primes):,} primos, resultando em {len(gaps):,} gaps.")
    print(f"Último primo gerado: {primes[-1]:,}")
    print(f"Estatísticas básicas dos gaps: Média={np.mean(gaps):.2f}, Mediana={np.median(gaps):.2f}, StdDev={np.std(gaps):.2f}, "
          f"Min={np.min(gaps)}, Max={np.max(gaps)}")
    return primes, gaps


ModuleNotFoundError: No module named 'fracdiff'

In [None]:
# === Bloco R3: Estacionariedade e Diferenciação (ARFIMA) ===
def test_stationarity(timeseries: np.ndarray, series_name: str = "Série Temporal"):
    """
    Realiza testes de estacionariedade ADF e KPSS.

    Args:
        timeseries: Série temporal como array numpy.
        series_name: Nome da série para exibição.
    """
    print(f"\n--- Testes de Estacionariedade para: {series_name} ---")
    # Teste ADF (Augmented Dickey-Fuller)
    # H0: A série possui raiz unitária (não é estacionária).
    # p-value <= 0.05: Rejeita H0, série é estacionária.
    adf_result = adfuller(timeseries)
    print(f"ADF Statistic: {adf_result[0]:.4f}")
    print(f"ADF p-value: {adf_result[1]:.4f}")
    print("ADF Critical Values:")
    for key, value in adf_result[4].items():
        print(f"\t{key}: {value:.4f}")
    if adf_result[1] <= ALPHA_SIGNIFICANCE:
        print("Resultado ADF: Série provavelmente estacionária.")
    else:
        print("Resultado ADF: Série provavelmente não estacionária.")

    # Teste KPSS (Kwiatkowski-Phillips-Schmidt-Shin)
    # H0: A série é estacionária em torno de uma tendência determinística (ou nível).
    # p-value <= 0.05: Rejeita H0, série não é estacionária.
    # Usar 'c' para testar estacionariedade em torno de uma média constante.
    # Usar 'ct' para testar estacionariedade em torno de uma tendência.
    try:
        kpss_result = kpss(timeseries, regression='c', nlags="auto")
        print(f"\nKPSS Statistic: {kpss_result[0]:.4f}")
        print(f"KPSS p-value: {kpss_result[1]:.4f} (Atenção: p-valor pode ser truncado em 0.01 ou 0.1)")
        print("KPSS Critical Values:")
        for key, value in kpss_result[3].items():
            print(f"\t{key}: {value:.4f}")
        if kpss_result[1] > ALPHA_SIGNIFICANCE: # Se p-valor > alpha, não rejeita H0
            print("Resultado KPSS: Série provavelmente estacionária.")
        else:
            print("Resultado KPSS: Série provavelmente não estacionária.")
    except Exception as e:
        print(f"Erro ao executar KPSS: {e}. Pode ser devido a poucos dados ou variância zero.")


def difference_series(series: np.ndarray, order: int = 1) -> np.ndarray:
    """Aplica diferenciação inteira a uma série."""
    if order < 1:
        return series
    return np.diff(series, n=order)

def fractional_difference_series(series: pd.Series, d: float, threshold: float = 1e-4) -> pd.Series:
    """
    Aplica diferenciação fracionária usando a biblioteca fracdiff.

    Args:
        series: Série temporal como pandas Series.
        d: Ordem da diferenciação fracionária.
        threshold: Limiar para os pesos da diferenciação.
                   Os pesos w são calculados até que abs(w) < threshold.
                   Um valor menor significa mais termos, potencialmente mais preciso
                   mas mais lento e pode requerer mais dados históricos.
                   O default da lib `fracdiff` é 0.0001.

    Returns:
        Série diferenciada fracionariamente como pandas Series.
        Pode conter NaNs no início.
    """
    print(f"Aplicando diferenciação fracionária com d={d:.3f} e threshold={threshold}...")
    # Fracdiff espera um DataFrame ou Series. Retorna um DataFrame.
    fdiff_model = Fracdiff(d, threshold=threshold, window=len(series)) # window=len(series) para usar todos os dados disponíveis para cálculo
    transformed_series = fdiff_model.fit_transform(series.to_frame())[series.name]
    return transformed_series.dropna()


In [None]:
# === Bloco R2 & R4 & R8: Engenharia de Features ===
def get_rolling_stats_features(gaps_data: np.ndarray, window_size: int, step: int) -> pd.DataFrame:
    """
    Calcula features de estatísticas robustas em janela deslizante (Median, MAD)
    e o próximo gap (target).

    Args:
        gaps_data: Array dos gaps.
        window_size: Tamanho da janela deslizante.
        step: Passo da janela deslizante.

    Returns:
        DataFrame com Median_Gap, MAD_Gap, e Next_Gap.
    """
    print(f"Calculando features de estatísticas robustas (Janela={window_size}, Passo={step})...")
    features = []
    num_windows = (len(gaps_data) - window_size - 1) // step + 1 # -1 para ter um Next_Gap

    for i in range(num_windows):
        start_idx = i * step
        end_idx = start_idx + window_size
        window = gaps_data[start_idx:end_idx]

        median_gap = np.median(window)
        # median_abs_deviation requer `scale='normal'` para consistência com std em dados normais
        # ou `scale=1` para MAD puro. Usaremos MAD puro.
        mad_gap = median_abs_deviation(window, scale=1)

        next_gap = gaps_data[end_idx] # O gap imediatamente após a janela atual

        features.append({
            'Median_Gap': median_gap,
            'MAD_Gap': mad_gap,
            'Next_Gap': next_gap,
            'Original_Index': end_idx # Guarda o índice original do Next_Gap para alinhamento futuro
        })

    df_feats = pd.DataFrame(features)
    if df_feats.empty:
        raise ValueError("Nenhuma feature de estatísticas robustas foi gerada. Verifique o tamanho dos dados e da janela.")
    return df_feats.set_index('Original_Index', drop=True)


def get_fft_features(gaps_data: np.ndarray, window_size: int, step: int,
                     top_k: int, fft_window_type: str, apply_detrend: bool) -> pd.DataFrame:
    """
    Extrai features de FFT avançadas de janelas deslizantes de gaps.
    Inclui detrend local, janelamento (Hanning/Hamming), zero-padding,
    e extração de magnitude normalizada, energia relativa e energia absoluta.

    Args:
        gaps_data: Array dos gaps.
        window_size: Tamanho da janela para FFT.
        step: Passo da janela deslizante.
        top_k: Número das 'k' maiores frequências/magnitudes a serem extraídas.
        fft_window_type: Tipo de janela para aplicar (ex: 'hann', 'hamming').
        apply_detrend: Booleano, se True aplica detrend linear local.

    Returns:
        DataFrame com as features de FFT.
    """
    print(f"Calculando features de FFT (Janela={window_size}, Passo={step}, TopK={top_k})...")
    features_fft_list = []

    # Prepara as janelas de forma vetorizada (stride tricks)
    n_gaps = len(gaps_data)
    num_windows = (n_gaps - window_size) // step + 1

    # shape e strides para criar views das janelas sem copiar dados
    shape = (num_windows, window_size)
    strides = (gaps_data.strides[0] * step, gaps_data.strides[0])
    windows_data = np.lib.stride_tricks.as_strided(gaps_data, shape=shape, strides=strides)

    if apply_detrend:
        windows_data = detrend(windows_data, axis=1, type='linear')

    # Aplica janela de Hanning/Hamming etc.
    window_func = get_window(fft_window_type, window_size)
    windows_data = windows_data * window_func

    # Zero-padding para a próxima potência de 2 para melhor resolução de frequência
    nfft = 1 << (window_size - 1).bit_length()

    # Calcula FFT real para todas as janelas de uma vez
    fft_results = np.fft.rfft(windows_data, n=nfft, axis=1)
    magnitudes = np.abs(fft_results)
    frequencies = np.fft.rfftfreq(nfft, d=1.0) # d=1.0 pois os gaps são sequenciais

    # Calcula energia absoluta por janela (sobre os dados janelados e possivelmente detrended)
    energy_abs_per_window = np.sum(windows_data**2, axis=1)

    # Processa cada janela para extrair as top_k features
    for i in range(num_windows):
        mag_window = magnitudes[i, :]

        # Ignora a componente DC (frequência zero) para picos
        mag_window_no_dc = mag_window[1:]
        freq_no_dc = frequencies[1:]

        # Índices das top_k maiores magnitudes (descontando DC)
        # Adiciona 1 de volta aos índices para alinhar com `magnitudes` e `frequencies` originais
        # se `freq_no_dc` foi usado para `argsort`.
        # Se `mag_window_no_dc` é vazio (ex: window_size=1), argsort falha.
        if mag_window_no_dc.size > 0:
            idxs_top_k = np.argsort(mag_window_no_dc)[-top_k:][::-1] + 1
        else: # Caso raro, janela muito pequena
            idxs_top_k = np.array([], dtype=int)

        current_features = {}
        sum_mag_total_window = np.sum(mag_window[1:]) # Soma das magnitudes (excluindo DC)
        sum_energy_total_window = np.sum(mag_window[1:]**2) # Soma das energias (excluindo DC)

        for rank, idx_freq in enumerate(idxs_top_k, start=1):
            freq_val = frequencies[idx_freq]
            mag_val = magnitudes[i, idx_freq]

            current_features[f'FFT{rank}_freq'] = freq_val
            current_features[f'FFT{rank}_mag_norm'] = mag_val / sum_mag_total_window if sum_mag_total_window > 0 else 0.0
            current_features[f'FFT{rank}_energy_rel'] = (mag_val**2) / sum_energy_total_window if sum_energy_total_window > 0 else 0.0

        current_features['FFT_Energy_Abs'] = energy_abs_per_window[i]

        # Adiciona Original_Index para alinhamento
        # A feature FFT descreve a janela que *antecede* o `Next_Gap`.
        # Se uma janela é gaps[k:k+W], o Next_Gap que queremos prever é gaps[k+W].
        # O índice original aqui se refere ao final da janela.
        original_idx_of_window_end = (i * step) + window_size
        current_features['Original_Index'] = original_idx_of_window_end

        features_fft_list.append(current_features)

    df_fft = pd.DataFrame(features_fft_list)
    if df_fft.empty:
        # Não levantar erro, pode ser que não haja janelas suficientes
        print("Aviso: Nenhuma feature de FFT foi gerada. Verifique o tamanho dos dados e da janela.")
        # Retornar DataFrame vazio com colunas esperadas para evitar quebras downstream
        cols = [f'FFT{r+1}_{s}' for r in range(top_k) for s in ['freq', 'mag_norm', 'energy_rel']] + ['FFT_Energy_Abs', 'Original_Index']
        return pd.DataFrame(columns=cols).set_index('Original_Index', drop=True)

    return df_fft.set_index('Original_Index', drop=True)


def get_event_window_feature(gaps_data: np.ndarray, window_size: int, step: int,
                             sigma_multiplier: float) -> pd.DataFrame:
    """
    Cria uma feature binária 'Event_Window' que é 1 se qualquer gap
    dentro da janela exceder k*sigma_global, 0 caso contrário.

    Args:
        gaps_data: Array dos gaps (a série completa para calcular sigma_global).
        window_size: Tamanho da janela.
        step: Passo da janela.
        sigma_multiplier: Multiplicador k para definir o threshold (k * sigma_global).

    Returns:
        DataFrame com a feature 'Event_Window'.
    """
    print(f"Calculando feature 'Event Window' (Sigma_Multiplier={sigma_multiplier})...")
    if len(gaps_data) < window_size : # Não há janelas suficientes
        print("Aviso: Não há dados suficientes para gerar features 'Event Window'.")
        return pd.DataFrame(columns=['Event_Window', 'Original_Index']).set_index('Original_Index', drop=True)

    sigma_global = np.std(gaps_data)
    threshold = sigma_multiplier * sigma_global

    event_flags = []

    # Prepara as janelas de forma vetorizada (stride tricks)
    n_gaps = len(gaps_data)
    num_windows = (n_gaps - window_size) // step + 1 # Garante que a janela caiba nos dados

    shape = (num_windows, window_size)
    strides = (gaps_data.strides[0] * step, gaps_data.strides[0])
    windows_data = np.lib.stride_tricks.as_strided(gaps_data, shape=shape, strides=strides)

    # Verifica se o máximo em cada janela excede o threshold
    max_in_window = np.max(windows_data, axis=1)
    event_flags_vector = (max_in_window > threshold).astype(int)

    original_indices = [(i * step) + window_size for i in range(num_windows)]

    df_event = pd.DataFrame({
        'Event_Window': event_flags_vector,
        'Original_Index': original_indices
    })
    return df_event.set_index('Original_Index', drop=True)


# === Bloco R5, R9, R10: Modelagem e Validação ===
def train_evaluate_model(X_train: pd.DataFrame, y_train: pd.Series,
                         X_test: pd.DataFrame, y_test: pd.Series,
                         model_params: dict, model_name:str = "XGBoost"):
    """
    Treina um modelo XGBoost e o avalia no conjunto de teste.
    """
    print(f"\nTreinando modelo {model_name}...")
    model = xgb.XGBRegressor(**model_params)
    model.fit(X_train, y_train)

    print(f"Avaliando modelo {model_name} no conjunto de teste...")
    predictions = model.predict(X_test)

    mae = mean_absolute_error(y_test, predictions)
    rmse = np.sqrt(mean_squared_error(y_test, predictions))

    print(f"Performance de {model_name} no Teste: MAE = {mae:.3f}, RMSE = {rmse:.3f}")
    return model, mae, rmse, predictions


def cross_validate_model(X: pd.DataFrame, y: pd.Series, model_params: dict,
                         n_splits: int, model_name:str = "XGBoost"):
    """
    Realiza validação cruzada temporal para o modelo XGBoost.
    """
    print(f"\nIniciando Validação Cruzada Temporal para {model_name} ({n_splits} folds)...")
    tscv = TimeSeriesSplit(n_splits=n_splits)

    maes_fold, rmses_fold = [], []

    for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
        print(f"  Fold {fold+1}/{n_splits}")
        X_train_fold, X_val_fold = X.iloc[train_idx], X.iloc[val_idx]
        y_train_fold, y_val_fold = y.iloc[train_idx], y.iloc[val_idx]

        # (Opcional) Escalonamento de features dentro do fold do CV
        # scaler = StandardScaler()
        # X_train_fold_scaled = scaler.fit_transform(X_train_fold)
        # X_val_fold_scaled = scaler.transform(X_val_fold)
        # model_fold, mae, rmse, _ = train_evaluate_model(
        #     pd.DataFrame(X_train_fold_scaled, columns=X_train_fold.columns, index=X_train_fold.index), y_train_fold,
        #     pd.DataFrame(X_val_fold_scaled, columns=X_val_fold.columns, index=X_val_fold.index), y_val_fold,
        #     model_params, model_name=f"{model_name} Fold {fold+1}"
        # )

        # Sem escalonamento por enquanto, para manter simples. Adicionar se necessário.
        _, mae, rmse, _ = train_evaluate_model(
            X_train_fold, y_train_fold,
            X_val_fold, y_val_fold,
            model_params, model_name=f"{model_name} Fold {fold+1} (CV)"
        )
        maes_fold.append(mae)
        rmses_fold.append(rmse)

    print(f"\nResultados da Validação Cruzada Temporal para {model_name}:")
    print(f"  MAE médio: {np.mean(maes_fold):.3f} ± {np.std(maes_fold):.3f}")
    print(f"  RMSE médio: {np.mean(rmses_fold):.3f} ± {np.std(rmses_fold):.3f}")
    return maes_fold, rmses_fold

# === Bloco R6, R7: Teste Estatístico da Redução de Erro ===
def compare_model_performance_statistically(errors_model1: list, errors_model2: list,
                                            model1_name: str = "Modelo 1",
                                            model2_name: str = "Modelo 2 (Melhorado)",
                                            metric_name: str = "MAE"):
    """
    Compara a performance de dois modelos usando Wilcoxon signed-rank test
    e Bootstrap CI na diferença das métricas de erro (ex: MAE por fold).
    Assume que `errors_model1` são os erros do modelo baseline e
    `errors_model2` são os erros do modelo que se espera ser melhor (menor erro).
    """
    print(f"\n--- Comparação Estatística de Performance: {model1_name} vs {model2_name} ({metric_name}) ---")
    errors_model1 = np.array(errors_model1)
    errors_model2 = np.array(errors_model2)

    if len(errors_model1) != len(errors_model2) or len(errors_model1) < 2:
        print("Erro: As listas de erro devem ter o mesmo tamanho e pelo menos 2 elementos para comparação.")
        return

    differences = errors_model1 - errors_model2 # Positivo se modelo 2 é melhor (menor erro)

    mean_diff = np.mean(differences)
    std_diff = np.std(differences, ddof=1)
    print(f"Diferença média ({metric_name} {model1_name} - {metric_name} {model2_name}): {mean_diff:.4f} ± {std_diff:.4f}")

    # Wilcoxon signed-rank test
    # H0: A mediana das diferenças é zero.
    # H1: A mediana das diferenças não é zero (ou é > 0 se 'alternative' for 'greater').
    # Usamos 'greater' para testar se model1_errors > model2_errors (i.e., diffs > 0)
    try:
        # O teste requer que não haja apenas zeros nas diferenças, e não sejam todos iguais.
        if np.all(differences == differences[0]): # Todos os diffs são iguais
             print("Wilcoxon: Todas as diferenças são idênticas. Teste não aplicável ou informativo.")
             stat_w, p_w = np.nan, np.nan
        elif np.all(differences == 0): # Todos os diffs são zero
            print("Wilcoxon: Todas as diferenças são zero. Modelos idênticos em performance.")
            stat_w, p_w = np.nan, np.nan
        else:
            stat_w, p_w = wilcoxon(differences, alternative='greater')
            print(f"Wilcoxon signed-rank test (H1: {model2_name} é melhor): W = {stat_w:.3f}, p-value = {p_w:.4f}")
            if p_w <= ALPHA_SIGNIFICANCE:
                print(f"  Resultado: Rejeita H0. Evidência estatística de que {model2_name} tem {metric_name} significativamente menor.")
            else:
                print(f"  Resultado: Não rejeita H0. Não há evidência estatística suficiente de que {model2_name} é melhor.")
    except ValueError as e:
        print(f"Erro no teste de Wilcoxon: {e}. Pode ser devido a poucas amostras ou todas as diferenças serem zero.")
        stat_w, p_w = np.nan, np.nan


    # Bootstrap CI para a média da diferença
    boot_means_diff = []
    for _ in range(BOOTSTRAP_N_REPLICAS):
        sample_diffs = resample(differences, replace=True, n_samples=len(differences))
        boot_means_diff.append(np.mean(sample_diffs))

    ci_lower, ci_upper = np.percentile(boot_means_diff, BOOTSTRAP_CI_PERCENTILES)
    print(f"Bootstrap {100 - 2*BOOTSTRAP_CI_PERCENTILES[0]:.0f}% CI para a média da diferença [{metric_name} redução]: [{ci_lower:.4f}, {ci_upper:.4f}]")
    if ci_lower > 0:
        print(f"  Resultado CI: Intervalo de confiança está acima de zero, sugerindo que {model2_name} é melhor.")
    elif ci_upper < 0:
         print(f"  Resultado CI: Intervalo de confiança está abaixo de zero, sugerindo que {model1_name} é melhor.")
    else:
        print(f"  Resultado CI: Intervalo de confiança contém zero, não conclusivo sobre qual modelo é melhor.")


# === Bloco R11: Permutation Importance ===
def get_permutation_feature_importance(model, X_val: pd.DataFrame, y_val: pd.Series,
                                       n_repeats: int = 10):
    """
    Calcula a importância das features por permutação.
    """
    print(f"\nCalculando Permutation Feature Importance (n_repeats={n_repeats})...")
    perm_importance = permutation_importance(model, X_val, y_val, n_repeats=n_repeats, random_state=XGB_PARAMS['random_state'])

    sorted_idx = perm_importance.importances_mean.argsort()[::-1]

    print("Importância das Features por Permutação (queda média no score):")
    for i in sorted_idx:
        print(f"  {X_val.columns[i]:<30}: {perm_importance.importances_mean[i]:.4f} ± {perm_importance.importances_std[i]:.4f}")

    df_perm_importance = pd.DataFrame({
        'feature': X_val.columns[sorted_idx],
        'importance_mean': perm_importance.importances_mean[sorted_idx],
        'importance_std': perm_importance.importances_std[sorted_idx]
    })
    return df_perm_importance

# --------------------------------------------------------------------------
# 🚀 FLUXO PRINCIPAL DE ANÁLISE (WORKFLOW)
# --------------------------------------------------------------------------
def main():
    """
    Função principal para executar o pipeline de análise.
    """
    print("--- Iniciando Pipeline de Análise de Gaps de Primos ---")

    # --- 1. Geração de Dados (R1) ---
    primes, gaps_raw = generate_primes_gaps(N_PRIMES, PRIME_UPPER_BOUND_ESTIMATE)

    # --- 2. Análise de Estacionariedade e Diferenciação (R3 & ARFIMA) ---
    test_stationarity(gaps_raw, "Gaps Brutos")

    gaps_processed = gaps_raw.copy() # Começa com os gaps brutos
    processed_series_name = "Gaps Brutos"

    if APPLY_DIFFERENCING:
        if DIFFERENCING_TYPE == 'integer':
            print(f"\nAplicando diferenciação inteira de ordem {INTEGER_DIFF_ORDER}...")
            gaps_processed = difference_series(gaps_raw, order=INTEGER_DIFF_ORDER)
            processed_series_name = f"Gaps Diferenciados (d={INTEGER_DIFF_ORDER})"
        elif DIFFERENCING_TYPE == 'fractional':
            # Diferenciação fracionária requer pandas Series e remove NaNs,
            # o que encurtará a série. Precisamos lidar com isso no alinhamento.
            print(f"\nAplicando diferenciação fracionária com d={FRACTIONAL_DIFF_D}...")
            # Guardar o índice original antes de converter para Series para `fracdiff`
            # e depois alinhar. Fracdiff não preserva o índice original diretamente.
            # No entanto, as features são em janela, então o importante é o `gaps_processed`
            # ter o comprimento correto para a geração de features.

            # Para `fracdiff`, precisamos de uma série pandas
            gaps_series_for_fd = pd.Series(gaps_raw, name="gaps")
            gaps_frac_diff = fractional_difference_series(gaps_series_for_fd, d=FRACTIONAL_DIFF_D)

            # As features serão calculadas sobre esta série diferenciada.
            # Precisamos manter o controle de quantos pontos perdemos no início devido à diferenciação.
            num_nans_from_fd = len(gaps_raw) - len(gaps_frac_diff)
            print(f"Número de pontos perdidos no início devido à diferenciação fracionária: {num_nans_from_fd}")
            gaps_processed = gaps_frac_diff.values # Converte de volta para numpy array
            processed_series_name = f"Gaps Diferenciados Frac. (d={FRACTIONAL_DIFF_D:.3f})"

        # Testa estacionariedade da série processada
        if len(gaps_processed) > 10 : # Teste requer alguns pontos
             test_stationarity(gaps_processed, processed_series_name)
        else:
            print(f"Aviso: Série processada '{processed_series_name}' é muito curta para teste de estacionariedade.")


    # --- 3. Engenharia de Features (R2, R4, R8) ---
    # Todas as features (rolling stats, fft, event window) devem ser calculadas
    # sobre `gaps_processed`. O `Next_Gap` (target) também será derivado de `gaps_processed`.

    # 3a. Features de Estatísticas Robustas
    # O `target_df` já contém `Next_Gap` e `Original_Index` (do final da janela que o gerou)
    # Esta função agora precisa lidar com o `gaps_processed`
    if len(gaps_processed) < WINDOW_SIZE_STATS + 1:
        raise ValueError(f"Série '{processed_series_name}' muito curta ({len(gaps_processed)} pontos) para gerar features de estatísticas com janela {WINDOW_SIZE_STATS}.")

    df_stats_feats = get_rolling_stats_features(gaps_processed,
                                                window_size=WINDOW_SIZE_STATS,
                                                step=STATS_STEP)
    y_target = df_stats_feats['Next_Gap']
    df_stats_feats_for_X = df_stats_feats.drop(columns=['Next_Gap'])


    # 3b. Features de FFT
    if len(gaps_processed) >= WINDOW_SIZE_FFT:
        df_fft_feats = get_fft_features(gaps_processed,
                                        window_size=WINDOW_SIZE_FFT,
                                        step=FFT_STEP,
                                        top_k=FFT_TOP_K,
                                        fft_window_type=FFT_WINDOW_TYPE,
                                        apply_detrend=FFT_APPLY_DETREND)
    else:
        print(f"Aviso: Série '{processed_series_name}' ({len(gaps_processed)} pontos) muito curta para FFT com janela {WINDOW_SIZE_FFT}. Pulando features FFT.")
        df_fft_feats = pd.DataFrame(index=df_stats_feats_for_X.index) # DataFrame vazio com mesmo índice

    # 3c. Feature "Event Window"
    # Usa WINDOW_SIZE_FFT e FFT_STEP para alinhar com as janelas FFT
    if len(gaps_processed) >= WINDOW_SIZE_FFT: # Mesmo requisito de tamanho de FFT
        df_event_feat = get_event_window_feature(gaps_processed,
                                                 window_size=WINDOW_SIZE_FFT, # Alinhado com FFT
                                                 step=FFT_STEP, # Alinhado com FFT
                                                 sigma_multiplier=EVENT_WINDOW_SIGMA_MULTIPLIER)
    else:
        print(f"Aviso: Série '{processed_series_name}' ({len(gaps_processed)} pontos) muito curta para Event Window com janela {WINDOW_SIZE_FFT}. Pulando feature.")
        df_event_feat = pd.DataFrame(index=df_stats_feats_for_X.index) # DataFrame vazio


    # 3d. Montar DataFrame final de features (X)
    # Alinhamento é crucial aqui. Todas as DFs de features usam 'Original_Index'.
    # O `y_target` já está alinhado com `df_stats_feats_for_X` pelo índice.

    # Começa com as features de estatísticas
    X_all_features = df_stats_feats_for_X.copy()

    # Merge features FFT (se existirem)
    if not df_fft_feats.empty:
        X_all_features = X_all_features.join(df_fft_feats, how='left')

    # Merge feature Event Window (se existir)
    if not df_event_feat.empty:
        X_all_features = X_all_features.join(df_event_feat, how='left')

    # Assegurar que y_target (Next_Gap) está alinhado com o índice final de X_all_features
    # Isso é importante porque diferentes features (stats, fft) podem ter diferentes `step`
    # e, portanto, diferentes números de janelas.
    # A abordagem aqui é que df_stats_feats (com step=1 por default) define o `y_target` e o índice mestre.
    # Outras features (FFT, Event) com steps maiores serão esparsas (NaNs) se juntadas com 'left'.
    # Se os steps são diferentes, precisamos de uma estratégia de alinhamento.
    # Assumindo por agora que o `Original_Index` de todas as features é consistente
    # para os pontos onde são calculados.
    # Se FFT_STEP != STATS_STEP, precisaremos preencher NaNs ou usar um step comum.
    # Para simplificar, vamos assumir STATS_STEP=1 e outros steps podem ser maiores.
    # As colunas de features com steps maiores terão NaNs, que precisam ser tratados (ex: ffill, ou dropna).

    # Neste ponto, X_all_features e y_target compartilham o mesmo índice 'Original_Index'.
    # Vamos remover quaisquer linhas onde o y_target possa ser NaN (não deve acontecer com a lógica atual)
    # ou onde features essenciais são NaN.

    print(f"\nForma de X_all_features antes de tratar NaNs: {X_all_features.shape}")
    print(f"Colunas em X_all_features: {X_all_features.columns.tolist()}")

    # Tratar NaNs que podem surgir do join se os steps forem diferentes
    # Uma estratégia simples é preencher para frente (ffill) e depois para trás (bfill)
    # Isso assume que as features de FFT/Event são "válidas" para as janelas intermediárias de stats.
    # CUIDADO: Esta é uma simplificação. Se FFT_STEP > STATS_STEP, usar ffill significa que
    # múltiplas janelas de stats (com step=1) usarão a mesma feature FFT da janela FFT anterior.
    if FFT_STEP != STATS_STEP or (EVENT_WINDOW_SIGMA_MULTIPLIER and FFT_STEP != STATS_STEP): # Se event window usa FFT_STEP
        print(f"Aplicando ffill e bfill para NaNs devido a diferentes steps de features...")
        X_all_features = X_all_features.ffill().bfill()

    # Remove linhas onde qualquer feature ainda é NaN (ex: no início se bfill não cobrir)
    # E também garante que y_target esteja alinhado e não seja NaN.
    common_index = X_all_features.index.intersection(y_target.index)
    X_all_features = X_all_features.loc[common_index]
    y = y_target.loc[common_index]

    X_all_features = X_all_features.dropna()
    y = y.loc[X_all_features.index] # Realinhar y após dropar NaNs de X

    if X_all_features.empty or y.empty:
        raise ValueError("Nenhum dado de feature/target utilizável após tratamento de NaNs. Verifique os steps e tamanhos de janela.")

    print(f"Forma final de X: {X_all_features.shape}, Forma final de y: {y.shape}")


    # --- 4. Separação de Treino/CV e Hold-out (R10) ---
    # Antes de qualquer CV intensivo, separamos o hold-out.
    # `TimeSeriesSplit` não é usado para esta divisão inicial.
    hold_out_idx_start = math.floor(len(X_all_features) * (1 - TEST_SET_SIZE))

    X_train_cv = X_all_features.iloc[:hold_out_idx_start]
    y_train_cv = y.iloc[:hold_out_idx_start]
    X_holdout = X_all_features.iloc[hold_out_idx_start:]
    y_holdout = y.iloc[hold_out_idx_start:]

    print(f"\nDados divididos em:")
    print(f"  Treino/CV: X_train_cv shape={X_train_cv.shape}, y_train_cv shape={y_train_cv.shape}")
    print(f"  Hold-out:  X_holdout shape={X_holdout.shape}, y_holdout shape={y_holdout.shape}")

    if X_train_cv.empty or X_holdout.empty:
        raise ValueError("Conjunto de treino/CV ou hold-out está vazio. Ajuste TEST_SET_SIZE ou verifique os dados.")

    # --- 5. Validação Preditiva e Comparação de Modelos ---
    # 5a. Modelo Baseline (apenas Median_Gap, MAD_Gap)
    baseline_features = ['Median_Gap', 'MAD_Gap']
    # Verificar se as colunas existem, caso contrário pular baseline
    if all(feat in X_train_cv.columns for feat in baseline_features):
        X_baseline_train_cv = X_train_cv[baseline_features]
        maes_baseline_cv, _ = cross_validate_model(X_baseline_train_cv, y_train_cv,
                                                   XGB_PARAMS, CV_N_SPLITS,
                                                   model_name="Baseline (Stats Only)")
    else:
        print("Aviso: Colunas para modelo baseline não encontradas. Pulando modelo baseline.")
        maes_baseline_cv = None

    # 5b. Modelo Completo (com todas as features selecionadas)
    maes_full_model_cv, _ = cross_validate_model(X_train_cv, y_train_cv,
                                                 XGB_PARAMS, CV_N_SPLITS,
                                                 model_name="Full Model (All Features)")

    # --- 6. Teste Estatístico da Redução de Erro (R6) ---
    if maes_baseline_cv and maes_full_model_cv:
        compare_model_performance_statistically(maes_baseline_cv, maes_full_model_cv,
                                                model1_name="Baseline Model",
                                                model2_name="Full Model",
                                                metric_name="MAE (CV)")

    # --- 7. Treinamento do Modelo Final e Avaliação no Hold-out (R10) ---
    print("\n--- Treinando Modelo Final em todo o conjunto de Treino/CV ---")
    # Opcional: escalar X_train_cv antes de treinar modelo final
    # scaler_final = StandardScaler().fit(X_train_cv)
    # X_train_cv_scaled = scaler_final.transform(X_train_cv)
    # X_holdout_scaled = scaler_final.transform(X_holdout)
    # final_model, mae_ho, rmse_ho, _ = train_evaluate_model(
    #     pd.DataFrame(X_train_cv_scaled, columns=X_train_cv.columns, index=X_train_cv.index), y_train_cv,
    #     pd.DataFrame(X_holdout_scaled, columns=X_holdout.columns, index=X_holdout.index), y_holdout,
    #     XGB_PARAMS, model_name="Final Full Model"
    # )

    final_model, mae_ho, rmse_ho, _ = train_evaluate_model(
        X_train_cv, y_train_cv,
        X_holdout, y_holdout,
        XGB_PARAMS, model_name="Final Full Model"
    )
    print(f"Performance do Modelo Final no Hold-out: MAE = {mae_ho:.3f}, RMSE = {rmse_ho:.3f}")

    # --- 8. Permutation Importance (R11) ---
    # Usar o modelo final treinado e o conjunto de hold-out (ou um conjunto de validação separado do CV)
    # Se o hold-out for muito pequeno, pode-se usar o último fold do CV para isso,
    # mas requer refazer o split ou guardar os dados do último fold.
    # Usaremos o hold-out aqui.
    if not X_holdout.empty and not y_holdout.empty:
        # Se escalou para o modelo final, precisa usar X_holdout_scaled
        # df_perm_importance = get_permutation_feature_importance(final_model, X_holdout_scaled, y_holdout)
        df_perm_importance = get_permutation_feature_importance(final_model, X_holdout, y_holdout)
        print("\nImportância das Features (Permutation) no Hold-out:")
        print(df_perm_importance)
    else:
        print("Aviso: Conjunto de Hold-out vazio, pulando Permutation Importance.")


    # --- 9. Refinamento e Conclusões (R12) ---
    print("\n--- Conclusões e Próximos Passos ---")
    print("O pipeline foi executado.")
    print("Analise os resultados de MAE/RMSE, os testes estatísticos e a importância das features.")
    print("Próximos passos podem incluir:")
    print("  - Otimização de hiperparâmetros do XGBoost.")
    print("  - Exploração de diferentes tamanhos de janela e features.")
    print("  - Investigação mais aprofundada da estimação de 'd' para ARFIMA.")
    print("  - Remoção de features com baixa importância (conforme R12).")
    print("  - Implementação de Monte Carlo CV para resultados mais robustos (conforme R7).")

    print("\n--- Fim do Pipeline ---")

if __name__ == "__main__":
    # Adicionar um try-except para capturar erros de forma mais elegante
    try:
        main()
    except ValueError as ve:
        print(f"Erro de Valor no pipeline: {ve}")
    except Exception as e:
        print(f"Um erro inesperado ocorreu: {e}")
        # Opcional: raise e para ver o traceback completo para debugging
        # raise e