In [10]:
%pip install scipy numpy matplotlib pyyaml pandas --quiet

Note: you may need to restart the kernel to use updated packages.


In [11]:
import numpy as np
import matplotlib.pyplot as plt
import yaml
import os

In [12]:
def get_params(config):
    try:
        # Acessa as configura√ß√µes aninhadas
        ds = config['DATASET']
        rf = config['RF_FRONT_END']
        gps = config['GPS_STANDARD']
        acq = config['ACQUISITION']

        # 1. Extra√ß√£o de Vari√°veis Base (Convers√£o de Tipo)
        
        FILENAME = ds['FILENAME']
        # Converte a string 'int8' para o tipo numpy.int8
        DATA_DTYPE = getattr(np, ds['DTYPE']) 
        PRN_ID_TO_SEARCH = ds['PRN_ID_TO_SEARCH']
        
        FS = rf['FS']
        FIF = rf['FIF']
        
        PRN_CHIP_RATE = gps['PRN_CHIP_RATE']
        PRN_LENGTH = gps['PRN_LENGTH']
        
        DOPPLER_RANGE = acq['DOPPLER_RANGE']
        DOPPLER_STEP = acq['DOPPLER_STEP']
        TIME_TO_PROCESS = acq['TIME_TO_PROCESS']

        # 2. C√°lculo de Par√¢metros Derivados
        
        # Amostras por Chip (define o superamostragem da r√©plica do c√≥digo)
        SAMPLES_PER_CHIP = round(FS / PRN_CHIP_RATE)

        # Alguns arquivos de configura√ß√£o podem fornecer PRN_LENGTH j√° em amostras
        # (ex: 16368 = 1023 chips * 16 amostras/chip). Detectamos esse caso e
        # convertemos para n√∫mero de chips para manter consist√™ncia interna.
        raw_prn_length = PRN_LENGTH
        if raw_prn_length > 2000:
            PRN_LENGTH = int(raw_prn_length // SAMPLES_PER_CHIP)

        # Amostras em um Per√≠odo de C√≥digo (em amostras)
        SAMPLES_PER_CODE = PRN_LENGTH * SAMPLES_PER_CHIP

        # N√∫mero de Amostras para Integra√ß√£o Coerente (idealmente 1ms ou m√∫ltiplo)
        N_SAMPLES_COHERENT = int(FS * TIME_TO_PROCESS)

        # Garante que o bloco de amostras seja um m√∫ltiplo exato do per√≠odo do c√≥digo,
        # essencial para a correla√ß√£o c√≠clica (FFT).
        N_SAMPLES_COHERENT = (N_SAMPLES_COHERENT // SAMPLES_PER_CODE) * SAMPLES_PER_CODE

        # Se o bloco calculado for 0 (por exemplo TIME_TO_PROCESS muito curto),
        # fa√ßa fallback para processar ao menos um per√≠odo de c√≥digo (evita FFT de tamanho 0)
        if N_SAMPLES_COHERENT == 0:
            N_SAMPLES_COHERENT = SAMPLES_PER_CODE

        # 3. Retorno das Constantes
        return {
            'FILENAME': FILENAME, 
            'DTYPE': DATA_DTYPE, 
            'PRN_ID_TO_SEARCH': PRN_ID_TO_SEARCH,
            'FS': FS, 
            'FIF': FIF, 
            'PRN_CHIP_RATE': PRN_CHIP_RATE, 
            'PRN_LENGTH': PRN_LENGTH,
            'DOPPLER_RANGE': DOPPLER_RANGE, 
            'DOPPLER_STEP': DOPPLER_STEP, 
            'TIME_TO_PROCESS': TIME_TO_PROCESS,
            'SAMPLES_PER_CHIP': SAMPLES_PER_CHIP,
            'SAMPLES_PER_CODE': SAMPLES_PER_CODE,
            'N_SAMPLES_COHERENT': N_SAMPLES_COHERENT
        }
    
    except KeyError as e:
        # Captura erros se uma chave essencial n√£o estiver no arquivo de configura√ß√£o
        raise KeyError(f"Erro: Chave de configura√ß√£o ausente ou incorreta: {e}")
    except Exception as e:
        # Captura erros gerais, como falha na convers√£o de tipo (ex: 'FS' n√£o √© um n√∫mero)
        raise Exception(f"Erro ao processar constantes: {e}")

def load_config(config_file):
    try:
        with open(config_file, 'r') as f:
            # Usa safe_load para evitar a execu√ß√£o de c√≥digo arbitr√°rio
            config = yaml.safe_load(f)
        return config
    except FileNotFoundError:
        raise FileNotFoundError(f"Erro: Arquivo de configura√ß√£o '{config_file}' n√£o encontrado.")
    except yaml.YAMLError as e:
        raise ValueError(f"Erro ao decodificar YAML: {e}")

CONFIG_CLEAN_PATH = os.path.join("config", "fgi", "config_clean.yaml")
CONFIG_DS2_PATH = os.path.join("config", "fgi", "config_spoofed.yaml")

config_clean_data = load_config(CONFIG_CLEAN_PATH)
config_ds2_data = load_config(CONFIG_DS2_PATH)

PARAMS_CLEAN = get_params(config_clean_data)
PARAMS_DS2 = get_params(config_ds2_data)

print(f"Amostras por chip (CLEAN): {PARAMS_CLEAN['SAMPLES_PER_CHIP']}")
print(f"Amostras por chip (DS2): {PARAMS_DS2['SAMPLES_PER_CHIP']}")

Amostras por chip (CLEAN): 25
Amostras por chip (DS2): 25


In [13]:
def desmodulate(gps_complex, N_samples, Fs, Fif_local):
    """
    Move o sinal para banda base usando a frequ√™ncia local Fif_local.
    """
    t = np.arange(N_samples) / Fs
    local_carrier = np.cos(-2 * np.pi * Fif_local * t)
    # local_carrier = np.exp(-1j * 2 * np.pi * Fif_local * t)
    return gps_complex * local_carrier
# --- 1. Mapeamento dos Taps G2 (Baseado no seu c√≥digo) ---
# O √≠ndice da linha corresponde ao PRN ID (1 a 37).
# Os valores nos vetores s√£o os est√°gios do G2 que s√£o combinados com G1.
PRN_G2_TAPS = [
    [2, 6], [3, 7], [4, 8], [5, 9], [1, 9], [2, 10], [1, 8], [2, 9], 
    [3, 10], [2, 3], [3, 4], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10], 
    [1, 4], [2, 5], [3, 6], [4, 7], [5, 8], [6, 9], [1, 3], [4, 6], 
    [5, 7], [6, 8], [7, 9], [8, 10], [1, 6], [2, 7], [3, 8], [4, 9], 
    [5, 10], [4, 10], [1, 7], [2, 8], [4, 10]
]

def generate_ca_code(prn_id, samples_per_chip, code_length=1023):
    sv_index = prn_id - 1

    if prn_id < 1 or prn_id > len(PRN_G2_TAPS):
        raise ValueError("PRN ID inv√°lido (fora do intervalo 1-37).")

    # Inicializa√ß√£o: G1 e G2 com todos os 1s (usando 0/1 para l√≥gica bin√°ria)
    LFSR_LEN = 10
    g1 = np.ones(LFSR_LEN, dtype=int)
    g2 = np.ones(LFSR_LEN, dtype=int)

    ca_code_chips = np.zeros(code_length, dtype=int)

    # Taps G2 espec√≠ficas do sat√©lite (posi√ß√µes de 1 a 10)
    g2_taps = PRN_G2_TAPS[sv_index]

    for i in range(code_length):
        # Sa√≠da dos registradores
        out_g1 = g1[9]
        out_g2 = g2[g2_taps[0] - 1] ^ g2[g2_taps[1] - 1]

        ca_chip_binary = out_g1 ^ out_g2
        ca_code_chips[i] = ca_chip_binary

        # Feedbacks
        feedback_g1 = g1[2] ^ g1[9]
        feedback_g2 = g2[1] ^ g2[2] ^ g2[5] ^ g2[7] ^ g2[8] ^ g2[9]

        g1 = np.roll(g1, 1)
        g1[0] = feedback_g1

        g2 = np.roll(g2, 1)
        g2[0] = feedback_g2

    # Convers√£o de chips bin√°rios (0, 1) para bipolares (-1, +1)
    bipolar_code_chips = 2 * ca_code_chips - 1

    # superamostragem dos chips
    bipolar_code_chips = np.repeat(bipolar_code_chips, samples_per_chip)

    # Retorna os chips na taxa de chip (sem repeti√ß√£o)
    return bipolar_code_chips

In [14]:
def normalize_correlation_by_noise(correlation_magnitude, samples_per_code, prn_length):
    # A. Acha a posi√ß√£o exata e o valor do pico bruto
    phase = np.argmax(correlation_magnitude)
    peak_value_raw = correlation_magnitude[phase]

    # B. Define a largura de exclus√£o ao redor do pico (1.5 chips)
    # samples_per_code / prn_length √© o mesmo que samples_per_chip.
    samples_per_chip = samples_per_code / prn_length
    exclude_samples = int(1.5 * samples_per_chip)
    
    # Define os limites de exclus√£o
    start = max(0, phase - exclude_samples)
    end = min(len(correlation_magnitude), phase + exclude_samples)
    
    # C. Calcula o N√≠vel de Ru√≠do (Noise Floor)
    
    # Concatena os valores antes e depois da √°rea do pico
    noise_values = np.concatenate([correlation_magnitude[:start], correlation_magnitude[end:]])
    
    # Usa a MEDIANA como medida robusta do ch√£o de ru√≠do (Noise Floor)
    if len(noise_values) == 0:
        # Fallback se o array for muito pequeno
        noise_level = np.median(correlation_magnitude) 
    else:
        noise_level = np.median(noise_values)

    # D. Normaliza√ß√£o
    if noise_level < 1e-9: # Evita divis√£o por zero ou por um n√∫mero muito pequeno
        noise_level = 1e-9

    corr_normalized = correlation_magnitude / noise_level

    # E. Valor do pico normalizado (SNR do Pico)
    peak_normalized = corr_normalized[phase]

    return corr_normalized, peak_normalized, phase

def correlate_signals(band_base_signal, local_code_replica):
    N = len(band_base_signal)
    window = np.hanning(N)
    # 1. Transformar o sinal e a r√©plica do c√≥digo
    fft_signal = np.fft.fft(band_base_signal)
    fft_code_replica = np.fft.fft(local_code_replica)

    # 2. Multiplica√ß√£o no dom√≠nio da frequ√™ncia (Correla√ß√£o)
    fft_product = fft_signal * np.conjugate(fft_code_replica)
    
    # 3. IFFT e Magnitude
    correlation_result = np.fft.ifft(fft_product)
    
    return np.abs(correlation_result)

In [15]:
def plot_fourier_spectra(band_base_signal, local_code_replica, Fs):
    """
    Calcula e plota o espectro de magnitude (FFT) do sinal em banda base 
    e da r√©plica do c√≥digo PRN.
    """
    N = len(band_base_signal)
    
    # Aplicar janela de Hanning para suavizar os picos negativos (zeros espectrais)
    window = np.hanning(N)
    
    # 1. C√°lculo da FFT
    # A FFT do c√≥digo PRN deve ser conjugada no dom√≠nio da frequ√™ncia para a correla√ß√£o, 
    # mas para a plotagem do espectro, usamos a magnitude simples.
    fft_signal = np.fft.fft(band_base_signal)
    fft_code = np.fft.fft(local_code_replica * window)
    
    # 2. Deslocamento de Frequ√™ncia (FFT-shift)
    # Move a frequ√™ncia zero (DC) para o centro do espectro.
    fft_signal_shifted = np.fft.fftshift(fft_signal)
    fft_code_shifted = np.fft.fftshift(fft_code)
    
    # 3. C√°lculo da Pot√™ncia (Magnitude ao Quadrado ou apenas Magnitude em dB)
    # Usamos o logaritmo da magnitude (em dB) para visualizar melhor a faixa din√¢mica.
    spectrum_signal = 10 * np.log10(np.abs(fft_signal_shifted) + 1e-10) # +1e-10 para evitar log(0)
    spectrum_code = 10 * np.log10(np.abs(fft_code_shifted) + 1e-10)
    
    # 4. Gera√ß√£o do Eixo de Frequ√™ncia
    # O eixo de frequ√™ncia vai de -Fs/2 at√© +Fs/2
    freq_axis = np.fft.fftshift(np.fft.fftfreq(N, d=1/Fs))
    
    plt.figure(figsize=(12, 6))

    # Plotagem do Espectro do C√≥digo PRN
    plt.plot(freq_axis / 1e6, spectrum_code, color='orange', label='Espectro da R√©plica do C√≥digo PRN')
    
    # Plotagem do Espectro do Sinal em Banda Base (com Ru√≠do)
    plt.plot(freq_axis / 1e6, spectrum_signal, color='blue', label='Espectro do Sinal GNSS em Banda Base')

    plt.title('Espectro de Frequ√™ncia dos Sinais (Dom√≠nio de Frequ√™ncia)')
    plt.xlabel('Frequ√™ncia (MHz)')
    plt.ylabel('Magnitude (dB)')
    plt.legend()
    plt.grid(True)
    plt.show()

In [16]:
def acquire_gnss_signal(filepath):

    print(f"\nIniciando aquisi√ß√£o GNSS para o arquivo: {filepath}\n")

    filename = os.path.basename(filepath)

    if "clean" in filename:
        cfg = PARAMS_CLEAN
    elif "TGS" in filename:
        cfg = PARAMS_DS2

    # --- Configura√ß√£o de processamento em blocos ---
    BLOCK_TIME = 0.001  # 1 ms por bloco
    TOTAL_TIME = cfg["TIME_TO_PROCESS"]  # Tempo total a processar (ex: 120s)
    
    samples_per_block = int(cfg["FS"] * BLOCK_TIME)
    # Garante que seja m√∫ltiplo do per√≠odo do c√≥digo
    samples_per_block = (samples_per_block // cfg["SAMPLES_PER_CODE"]) * cfg["SAMPLES_PER_CODE"]
    
    total_samples = int(cfg["FS"] * TOTAL_TIME)
    num_blocks = total_samples // samples_per_block
    
    print(f"Processando {num_blocks} blocos de {BLOCK_TIME*1000:.1f} ms cada")
    print(f"Tempo total: {TOTAL_TIME} segundos")
    print(f"Amostras por bloco: {samples_per_block}")

    # --- 1. Leitura do arquivo ---
    try:
        print(f"\nLendo arquivo: {filepath}")
        raw_data = np.fromfile(filepath, dtype=cfg["DTYPE"], count=total_samples)
        
        if len(raw_data) < total_samples:
            print(f"Aviso: Arquivo tem apenas {len(raw_data)} amostras ({len(raw_data)/cfg['FS']:.2f}s)")
            num_blocks = len(raw_data) // samples_per_block
            print(f"Ajustando para {num_blocks} blocos")
        
        # Converter para float32
        raw_data = raw_data.astype(np.float32)
        
    except FileNotFoundError:
        print(f"Erro: Arquivo '{filepath}' n√£o encontrado.")
        return

    # --- 2. Armazenar resultados de cada bloco ---
    results = {
        'block_idx': [],
        'time_s': [],
        'best_prn': [],
        'best_doppler': [],
        'best_tau_chips': [],
        'peak_snr': [],
        'peak_raw': [],
        'in_phase_lag': [],  # Nova m√©trica: atraso de fase (diferen√ßa entre blocos)
        'correlation_width': []  # Largura do pico de correla√ß√£o
    }

    # Gerar c√≥digo PRN uma vez (vamos buscar todos os PRNs)
    prn_codes = {}
    for prn in range(1, 33):
        prn_codes[prn] = generate_ca_code(prn, cfg["SAMPLES_PER_CHIP"], cfg["PRN_LENGTH"])

    doppler_freqs = np.arange(-cfg["DOPPLER_RANGE"], cfg["DOPPLER_RANGE"] + cfg["DOPPLER_STEP"], cfg["DOPPLER_STEP"])

    # --- 3. Processar cada bloco ---
    for block_idx in range(num_blocks):
        start_sample = block_idx * samples_per_block
        end_sample = start_sample + samples_per_block
        
        # Extrair bloco
        block_data = raw_data[start_sample:end_sample]
        gps_complex = block_data + 1j * np.zeros_like(block_data)
        N_block = len(gps_complex)
        
        # Busca 2D para este bloco
        max_correlation_value = 0.0
        best_doppler = 0.0
        best_tau = 0
        best_prn = 0
        best_code_replica = None
        
        for prn in range(1, 33):
            local_code_chips = prn_codes[prn]
            local_code_replica = np.tile(local_code_chips, N_block // cfg["SAMPLES_PER_CODE"])
            
            for doppler in doppler_freqs:
                Fif_local = cfg["FIF"] + doppler
                band_base_signal = desmodulate(gps_complex, N_block, cfg["FS"], Fif_local)
                
                correlation_magnitude = correlate_signals(band_base_signal, local_code_replica)
                current_max = np.max(correlation_magnitude)
                
                if current_max > max_correlation_value:
                    max_correlation_value = current_max
                    best_doppler = doppler
                    best_tau = np.argmax(correlation_magnitude)
                    best_prn = prn
                    best_code_replica = local_code_replica.copy()
        
        # Calcular SNR normalizado
        final_fif_local = cfg["FIF"] + best_doppler
        final_band_base = desmodulate(gps_complex, N_block, cfg["FS"], final_fif_local)
        final_correlation = correlate_signals(final_band_base, best_code_replica)
        _, peak_snr, peak_phase = normalize_correlation_by_noise(final_correlation, cfg['SAMPLES_PER_CODE'], cfg['PRN_LENGTH'])
        
        # ============ NOVAS M√âTRICAS ============
        
        # 1. In-Phase Lag: diferen√ßa de fase entre blocos consecutivos
        # Indica a taxa de varia√ß√£o do atraso de c√≥digo (deve ser suave para sinais leg√≠timos)
        if block_idx > 0:
            prev_tau = results['best_tau_chips'][-1]
            current_tau = best_tau / cfg["SAMPLES_PER_CHIP"]
            # Tratar wrap-around (quando œÑ vai de ~1023 para ~0)
            tau_diff = current_tau - prev_tau
            if abs(tau_diff) > cfg['PRN_LENGTH'] / 2:
                tau_diff = tau_diff - np.sign(tau_diff) * cfg['PRN_LENGTH']
            in_phase_lag = tau_diff
        else:
            in_phase_lag = 0.0
        
        # 2. Correlation Width: largura do pico de correla√ß√£o (a -3dB ou 50% do pico)
        # Picos mais estreitos indicam melhor qualidade do sinal
        peak_value = final_correlation[peak_phase]
        half_peak = peak_value * 0.5
        # Encontrar pontos onde a correla√ß√£o cruza 50% do pico
        above_half = final_correlation > half_peak
        # Contar amostras acima de 50%
        correlation_width = np.sum(above_half) / cfg["SAMPLES_PER_CHIP"]  # Em chips
        
        # Armazenar resultados
        time_s = block_idx * BLOCK_TIME
        tau_chips = best_tau / cfg["SAMPLES_PER_CHIP"];
        
        results['block_idx'].append(block_idx)
        results['time_s'].append(time_s)
        results['best_prn'].append(best_prn)
        results['best_doppler'].append(best_doppler)
        results['best_tau_chips'].append(tau_chips)
        results['peak_snr'].append(peak_snr)
        results['peak_raw'].append(max_correlation_value)
        results['in_phase_lag'].append(in_phase_lag)
        results['correlation_width'].append(correlation_width)
        
        # Progresso a cada 1000 blocos (1 segundo)
        if (block_idx + 1) % 1000 == 0:
            print(f"Bloco {block_idx + 1}/{num_blocks} ({time_s:.2f}s) - PRN {best_prn}, Doppler {best_doppler:.0f} Hz, SNR {peak_snr:.2f}")

    # --- 4. Converter para arrays numpy ---
    for key in results:
        results[key] = np.array(results[key])

    # --- 5. Plotagem dos resultados ao longo do tempo ---
    fig, axes = plt.subplots(4, 1, figsize=(14, 12), sharex=True)
    
    # SNR ao longo do tempo
    axes[0].plot(results['time_s'], results['peak_snr'], 'b-', linewidth=0.5)
    axes[0].set_ylabel('SNR (normalizado)')
    axes[0].set_title(f'Evolu√ß√£o Temporal - {filename}')
    axes[0].grid(True)
    
    # PRN detectado
    axes[1].plot(results['time_s'], results['best_prn'], 'g.', markersize=1)
    axes[1].set_ylabel('PRN Detectado')
    axes[1].set_ylim(0, 33)
    axes[1].grid(True)
    
    # Doppler
    axes[2].plot(results['time_s'], results['best_doppler'], 'r-', linewidth=0.5)
    axes[2].set_ylabel('Doppler (Hz)')
    axes[2].grid(True)
    
    # Atraso de c√≥digo (tau)
    axes[3].plot(results['time_s'], results['best_tau_chips'], 'm-', linewidth=0.5)
    axes[3].set_ylabel('Atraso œÑ (chips)')
    axes[3].set_xlabel('Tempo (s)')
    axes[3].grid(True)
    
    plt.tight_layout()
    plt.show()

    # --- 6. Estat√≠sticas ---
    print("\n--- Estat√≠sticas Gerais ---")
    print(f"Blocos processados: {num_blocks}")
    print(f"SNR m√©dio: {np.mean(results['peak_snr']):.2f}")
    print(f"SNR m√°ximo: {np.max(results['peak_snr']):.2f}")
    print(f"PRN mais frequente: {np.bincount(results['best_prn'].astype(int)).argmax()}")
    print(f"Doppler m√©dio: {np.mean(results['best_doppler']):.1f} Hz")
    
    return results

In [17]:
def calculate_spoofing_metrics(results, cfg):
    """
    Calcula m√©tricas para detec√ß√£o de spoofing GNSS.
    
    M√©tricas implementadas:
    1. SNR Statistics - Sinais spoofed tendem a ter SNR mais alto e consistente
    2. Doppler Consistency - Spoofing pode ter varia√ß√£o Doppler an√¥mala
    3. Code Phase Consistency - Varia√ß√£o do atraso de c√≥digo
    4. PRN Diversity - Quantos PRNs diferentes s√£o detectados
    5. C/N0 Variance - Vari√¢ncia da rela√ß√£o sinal-ru√≠do
    6. Multi-peak Detection - Detec√ß√£o de m√∫ltiplos picos (spoofing vs leg√≠timo)
    7. Power Level Analysis - N√≠vel de pot√™ncia anormalmente alto
    """
    
    metrics = {}
    
    # ============ 1. SNR Statistics ============
    # Sinais spoofed geralmente t√™m SNR mais alto e mais est√°vel
    snr_values = results['peak_snr']
    metrics['snr_mean'] = np.mean(snr_values)
    metrics['snr_std'] = np.std(snr_values)
    metrics['snr_max'] = np.max(snr_values)
    metrics['snr_min'] = np.min(snr_values)
    metrics['snr_cv'] = metrics['snr_std'] / metrics['snr_mean'] if metrics['snr_mean'] > 0 else 0  # Coeficiente de varia√ß√£o
    
    # Threshold: SNR muito alto pode indicar spoofing
    SNR_THRESHOLD_HIGH = 15.0  # Ajustar conforme necess√°rio
    metrics['snr_above_threshold'] = np.sum(snr_values > SNR_THRESHOLD_HIGH) / len(snr_values) * 100
    
    # ============ 2. Doppler Consistency ============
    # Doppler deve variar suavemente para sat√©lites reais
    doppler_values = results['best_doppler']
    metrics['doppler_mean'] = np.mean(doppler_values)
    metrics['doppler_std'] = np.std(doppler_values)
    
    # Taxa de varia√ß√£o do Doppler (deve ser suave para sinais leg√≠timos)
    doppler_diff = np.diff(doppler_values)
    metrics['doppler_rate_mean'] = np.mean(np.abs(doppler_diff))
    metrics['doppler_rate_max'] = np.max(np.abs(doppler_diff))
    
    # Saltos abruptos de Doppler (indicador de spoofing)
    DOPPLER_JUMP_THRESHOLD = 500  # Hz
    metrics['doppler_jumps'] = np.sum(np.abs(doppler_diff) > DOPPLER_JUMP_THRESHOLD)
    metrics['doppler_jump_rate'] = metrics['doppler_jumps'] / len(doppler_diff) * 100
    
    # ============ 3. Code Phase (œÑ) Consistency ============
    # O atraso de c√≥digo deve variar suavemente
    tau_values = results['best_tau_chips']
    metrics['tau_mean'] = np.mean(tau_values)
    metrics['tau_std'] = np.std(tau_values)
    
    # Taxa de varia√ß√£o do œÑ
    tau_diff = np.diff(tau_values)
    # Tratar wrap-around (quando œÑ vai de ~1023 para ~0)
    tau_diff = np.where(np.abs(tau_diff) > cfg['PRN_LENGTH']/2, 
                        tau_diff - np.sign(tau_diff) * cfg['PRN_LENGTH'], 
                        tau_diff)
    metrics['tau_rate_mean'] = np.mean(np.abs(tau_diff))
    metrics['tau_rate_std'] = np.std(tau_diff)
    
    # ============ 4. PRN Diversity ============
    # Sinais leg√≠timos devem mostrar m√∫ltiplos PRNs vis√≠veis
    prn_values = results['best_prn']
    unique_prns = np.unique(prn_values)
    metrics['prn_unique_count'] = len(unique_prns)
    metrics['prn_unique_list'] = unique_prns.tolist()
    
    # PRN dominante e sua frequ√™ncia
    prn_counts = np.bincount(prn_values.astype(int), minlength=33)
    metrics['prn_dominant'] = np.argmax(prn_counts)
    metrics['prn_dominant_ratio'] = prn_counts[metrics['prn_dominant']] / len(prn_values) * 100
    
    # ============ 5. Power Level Analysis ============
    # Pot√™ncia bruta do pico de correla√ß√£o
    power_values = results['peak_raw']
    metrics['power_mean'] = np.mean(power_values)
    metrics['power_std'] = np.std(power_values)
    metrics['power_cv'] = metrics['power_std'] / metrics['power_mean'] if metrics['power_mean'] > 0 else 0
    
    # ============ 6. Temporal Correlation ============
    # Autocorrela√ß√£o do SNR (sinais spoofed podem ter padr√µes artificiais)
    if len(snr_values) > 100:
        snr_normalized = (snr_values - np.mean(snr_values)) / (np.std(snr_values) + 1e-10)
        autocorr = np.correlate(snr_normalized[:1000], snr_normalized[:1000], mode='full')
        autocorr = autocorr[len(autocorr)//2:]  # Pegar apenas lag positivo
        autocorr = autocorr / autocorr[0]  # Normalizar
        metrics['snr_autocorr_lag1'] = autocorr[1] if len(autocorr) > 1 else 0
        metrics['snr_autocorr_lag10'] = autocorr[10] if len(autocorr) > 10 else 0
    else:
        metrics['snr_autocorr_lag1'] = 0
        metrics['snr_autocorr_lag10'] = 0
    
    # ============ 7. In-Phase Lag Analysis ============
    # Varia√ß√£o do atraso de fase entre blocos consecutivos
    in_phase_lag_values = results['in_phase_lag']
    metrics['in_phase_lag_mean'] = np.mean(np.abs(in_phase_lag_values))
    metrics['in_phase_lag_std'] = np.std(in_phase_lag_values)
    metrics['in_phase_lag_max'] = np.max(np.abs(in_phase_lag_values))
    
    # Saltos abruptos de fase (indicador de spoofing)
    PHASE_JUMP_THRESHOLD = 0.5  # chips
    phase_jumps = np.sum(np.abs(in_phase_lag_values) > PHASE_JUMP_THRESHOLD)
    metrics['phase_jumps'] = phase_jumps
    metrics['phase_jump_rate'] = phase_jumps / len(in_phase_lag_values) * 100
    
    # ============ 8. Correlation Width Analysis ============
    # Largura do pico de correla√ß√£o (picos muito estreitos ou largos podem indicar anomalias)
    corr_width_values = results['correlation_width']
    metrics['corr_width_mean'] = np.mean(corr_width_values)
    metrics['corr_width_std'] = np.std(corr_width_values)
    metrics['corr_width_cv'] = metrics['corr_width_std'] / metrics['corr_width_mean'] if metrics['corr_width_mean'] > 0 else 0
    
    # ============ 9. Temporal Correlation ============
    # Autocorrela√ß√£o do SNR (sinais spoofed podem ter padr√µes artificiais)
    if len(snr_values) > 100:
        snr_normalized = (snr_values - np.mean(snr_values)) / (np.std(snr_values) + 1e-10)
        autocorr = np.correlate(snr_normalized[:1000], snr_normalized[:1000], mode='full')
        autocorr = autocorr[len(autocorr)//2:]  # Pegar apenas lag positivo
        autocorr = autocorr / autocorr[0]  # Normalizar
        metrics['snr_autocorr_lag1'] = autocorr[1] if len(autocorr) > 1 else 0
        metrics['snr_autocorr_lag10'] = autocorr[10] if len(autocorr) > 10 else 0
    else:
        metrics['snr_autocorr_lag1'] = 0
        metrics['snr_autocorr_lag10'] = 0
    
    # ============ 10. Spoofing Score (Combinado) ============
    # Score baseado em m√∫ltiplos indicadores (0-100, maior = mais suspeito)
    score = 0
    
    # SNR muito alto e est√°vel √© suspeito
    if metrics['snr_mean'] > 10:
        score += min(20, (metrics['snr_mean'] - 10) * 2)
    if metrics['snr_cv'] < 0.1:  # Muito est√°vel
        score += 10
    
    # Pouca diversidade de PRN √© suspeito
    if metrics['prn_unique_count'] < 4:
        score += 15
    if metrics['prn_dominant_ratio'] > 90:
        score += 10
    
    # Saltos de Doppler s√£o suspeitos
    if metrics['doppler_jump_rate'] > 5:
        score += 10
    
    # Saltos de fase s√£o suspeitos (NOVA M√âTRICA)
    if metrics['phase_jump_rate'] > 5:
        score += 15
    
    # In-phase lag muito baixo ou muito est√°vel pode indicar spoofing
    if metrics['in_phase_lag_std'] < 0.01:  # Varia√ß√£o muito pequena
        score += 10
    
    # Largura de correla√ß√£o an√¥mala
    if metrics['corr_width_cv'] < 0.05:  # Muito consistente (artificial)
        score += 10
    
    # Alta autocorrela√ß√£o no SNR pode indicar padr√£o artificial
    if metrics['snr_autocorr_lag1'] > 0.8:
        score += 10
    
    metrics['spoofing_score'] = min(100, score)
    
    # Classifica√ß√£o
    if metrics['spoofing_score'] < 30:
        metrics['classification'] = 'LIKELY AUTHENTIC'
    elif metrics['spoofing_score'] < 60:
        metrics['classification'] = 'SUSPICIOUS'
    else:
        metrics['classification'] = 'LIKELY SPOOFED'
    
    return metrics


def print_spoofing_report(metrics, filename):
    """
    Imprime um relat√≥rio formatado das m√©tricas de spoofing.
    """
    print("\n" + "="*70)
    print(f"  RELAT√ìRIO DE DETEC√á√ÉO DE SPOOFING - {filename}")
    print("="*70)
    
    print("\nüìä ESTAT√çSTICAS DE SNR:")
    print(f"   M√©dia: {metrics['snr_mean']:.2f}")
    print(f"   Desvio Padr√£o: {metrics['snr_std']:.2f}")
    print(f"   Coef. Varia√ß√£o: {metrics['snr_cv']:.3f}")
    print(f"   M√°x/M√≠n: {metrics['snr_max']:.2f} / {metrics['snr_min']:.2f}")
    print(f"   % acima do threshold: {metrics['snr_above_threshold']:.1f}%")
    
    print("\nüì° AN√ÅLISE DOPPLER:")
    print(f"   M√©dia: {metrics['doppler_mean']:.1f} Hz")
    print(f"   Desvio Padr√£o: {metrics['doppler_std']:.1f} Hz")
    print(f"   Taxa m√©dia de varia√ß√£o: {metrics['doppler_rate_mean']:.2f} Hz/ms")
    print(f"   Saltos abruptos: {metrics['doppler_jumps']} ({metrics['doppler_jump_rate']:.2f}%)")
    
    print("\n‚è±Ô∏è AN√ÅLISE DE ATRASO (œÑ):")
    print(f"   M√©dia: {metrics['tau_mean']:.2f} chips")
    print(f"   Desvio Padr√£o: {metrics['tau_std']:.2f} chips")
    print(f"   Taxa m√©dia de varia√ß√£o: {metrics['tau_rate_mean']:.4f} chips/ms")
    
    print("\nüõ∞Ô∏è DIVERSIDADE DE PRN:")
    print(f"   PRNs √∫nicos detectados: {metrics['prn_unique_count']}")
    print(f"   PRNs: {metrics['prn_unique_list']}")
    print(f"   PRN dominante: {metrics['prn_dominant']} ({metrics['prn_dominant_ratio']:.1f}%)")
    
    print("\nüìà AN√ÅLISE DE POT√äNCIA:")
    print(f"   M√©dia: {metrics['power_mean']:.2f}")
    print(f"   Coef. Varia√ß√£o: {metrics['power_cv']:.3f}")
    
    print("\nüìê AN√ÅLISE DE IN-PHASE LAG:")
    print(f"   M√©dia |lag|: {metrics['in_phase_lag_mean']:.4f} chips")
    print(f"   Desvio Padr√£o: {metrics['in_phase_lag_std']:.4f} chips")
    print(f"   M√°ximo |lag|: {metrics['in_phase_lag_max']:.4f} chips")
    print(f"   Saltos de fase: {metrics['phase_jumps']} ({metrics['phase_jump_rate']:.2f}%)")
    
    print("\nüìè LARGURA DE CORRELA√á√ÉO:")
    print(f"   M√©dia: {metrics['corr_width_mean']:.2f} chips")
    print(f"   Desvio Padr√£o: {metrics['corr_width_std']:.2f} chips")
    print(f"   Coef. Varia√ß√£o: {metrics['corr_width_cv']:.3f}")
    
    print("\nüîó CORRELA√á√ÉO TEMPORAL:")
    print(f"   Autocorrela√ß√£o SNR (lag=1): {metrics['snr_autocorr_lag1']:.3f}")
    print(f"   Autocorrela√ß√£o SNR (lag=10): {metrics['snr_autocorr_lag10']:.3f}")
    
    print("\n" + "="*70)
    print(f"  üéØ SPOOFING SCORE: {metrics['spoofing_score']}/100")
    print(f"  üìã CLASSIFICA√á√ÉO: {metrics['classification']}")
    print("="*70)
    
    return metrics


def plot_spoofing_comparison(results_clean, results_spoofed, metrics_clean, metrics_spoofed):
    """
    Plota compara√ß√£o visual entre sinal limpo e spoofed.
    """
    fig, axes = plt.subplots(3, 2, figsize=(16, 12))
    
    # SNR Distribution
    axes[0, 0].hist(results_clean['peak_snr'], bins=50, alpha=0.7, label='Clean', color='green')
    axes[0, 0].hist(results_spoofed['peak_snr'], bins=50, alpha=0.7, label='Spoofed', color='red')
    axes[0, 0].set_xlabel('SNR')
    axes[0, 0].set_ylabel('Frequ√™ncia')
    axes[0, 0].set_title('Distribui√ß√£o de SNR')
    axes[0, 0].legend()
    axes[0, 0].grid(True)
    
    # Doppler Distribution
    axes[0, 1].hist(results_clean['best_doppler'], bins=50, alpha=0.7, label='Clean', color='green')
    axes[0, 1].hist(results_spoofed['best_doppler'], bins=50, alpha=0.7, label='Spoofed', color='red')
    axes[0, 1].set_xlabel('Doppler (Hz)')
    axes[0, 1].set_ylabel('Frequ√™ncia')
    axes[0, 1].set_title('Distribui√ß√£o de Doppler')
    axes[0, 1].legend()
    axes[0, 1].grid(True)
    
    # PRN Distribution
    prn_clean = np.bincount(results_clean['best_prn'].astype(int), minlength=33)[1:]
    prn_spoofed = np.bincount(results_spoofed['best_prn'].astype(int), minlength=33)[1:]
    x = np.arange(1, 33)
    width = 0.35
    axes[1, 0].bar(x - width/2, prn_clean, width, label='Clean', color='green', alpha=0.7)
    axes[1, 0].bar(x + width/2, prn_spoofed, width, label='Spoofed', color='red', alpha=0.7)
    axes[1, 0].set_xlabel('PRN')
    axes[1, 0].set_ylabel('Contagem')
    axes[1, 0].set_title('Distribui√ß√£o de PRN Detectados')
    axes[1, 0].legend()
    axes[1, 0].grid(True)
    
    # SNR over time
    axes[1, 1].plot(results_clean['time_s'], results_clean['peak_snr'], 'g-', linewidth=0.5, label='Clean', alpha=0.7)
    axes[1, 1].plot(results_spoofed['time_s'], results_spoofed['peak_snr'], 'r-', linewidth=0.5, label='Spoofed', alpha=0.7)
    axes[1, 1].set_xlabel('Tempo (s)')
    axes[1, 1].set_ylabel('SNR')
    axes[1, 1].set_title('SNR ao Longo do Tempo')
    axes[1, 1].legend()
    axes[1, 1].grid(True)
    
    # Spoofing Score Comparison
    categories = ['SNR\nM√©dia', 'SNR\nCV', 'PRN\nDiversidade', 'Doppler\nEstabilidade', 'Score\nTotal']
    clean_scores = [
        min(metrics_clean['snr_mean']/20*100, 100),
        (1 - min(metrics_clean['snr_cv'], 1)) * 100,
        min(metrics_clean['prn_unique_count']/10*100, 100),
        max(0, 100 - metrics_clean['doppler_jump_rate']*10),
        100 - metrics_clean['spoofing_score']
    ]
    spoofed_scores = [
        min(metrics_spoofed['snr_mean']/20*100, 100),
        (1 - min(metrics_spoofed['snr_cv'], 1)) * 100,
        min(metrics_spoofed['prn_unique_count']/10*100, 100),
        max(0, 100 - metrics_spoofed['doppler_jump_rate']*10),
        100 - metrics_spoofed['spoofing_score']
    ]
    
    x = np.arange(len(categories))
    axes[2, 0].bar(x - width/2, clean_scores, width, label='Clean', color='green', alpha=0.7)
    axes[2, 0].bar(x + width/2, spoofed_scores, width, label='Spoofed', color='red', alpha=0.7)
    axes[2, 0].set_ylabel('Score (0-100)')
    axes[2, 0].set_title('Compara√ß√£o de M√©tricas')
    axes[2, 0].set_xticks(x)
    axes[2, 0].set_xticklabels(categories)
    axes[2, 0].legend()
    axes[2, 0].grid(True)
    
    # Tau variation
    axes[2, 1].plot(results_clean['time_s'], results_clean['best_tau_chips'], 'g-', linewidth=0.5, label='Clean', alpha=0.7)
    axes[2, 1].plot(results_spoofed['time_s'], results_spoofed['best_tau_chips'], 'r-', linewidth=0.5, label='Spoofed', alpha=0.7)
    axes[2, 1].set_xlabel('Tempo (s)')
    axes[2, 1].set_ylabel('œÑ (chips)')
    axes[2, 1].set_title('Atraso de C√≥digo ao Longo do Tempo')
    axes[2, 1].legend()
    axes[2, 1].grid(True)
    
    plt.tight_layout()
    plt.show()


print("Fun√ß√µes de detec√ß√£o de spoofing carregadas!")
print("- calculate_spoofing_metrics(results, cfg)")
print("- print_spoofing_report(metrics, filename)")
print("- plot_spoofing_comparison(results_clean, results_spoofed, metrics_clean, metrics_spoofed)")

Fun√ß√µes de detec√ß√£o de spoofing carregadas!
- calculate_spoofing_metrics(results, cfg)
- print_spoofing_report(metrics, filename)
- plot_spoofing_comparison(results_clean, results_spoofed, metrics_clean, metrics_spoofed)


In [None]:
FILENAME_CLEAN = '/home/rc-2d/Downloads/gnss-dataset/OSNMA_cleandata_opensky_460s.dat'
FILENAME_SPOOFED = '/home/rc-2d/Downloads/gnss-dataset/TGS_L1_E1.dat'

# Diret√≥rio para salvar os CSVs
OUTPUT_DIR = '/home/rc-2d/gnss-spoofing-detection/data'
os.makedirs(OUTPUT_DIR, exist_ok=True)

# 1. Processar sinal limpo
print("="*70)
print("  PROCESSANDO SINAL LIMPO (CLEAN)")
print("="*70)
results_clean = acquire_gnss_signal(FILENAME_CLEAN)

# 2. Processar sinal spoofed
print("\n" + "="*70)
print("  PROCESSANDO SINAL SPOOFED")
print("="*70)
results_spoofed = acquire_gnss_signal(FILENAME_SPOOFED)

# 3. Calcular m√©tricas de spoofing
if results_clean is not None and results_spoofed is not None:
    metrics_clean = calculate_spoofing_metrics(results_clean, PARAMS_CLEAN)
    metrics_spoofed = calculate_spoofing_metrics(results_spoofed, PARAMS_DS2)
    
    # 4. Imprimir relat√≥rios
    print_spoofing_report(metrics_clean, "CLEAN DATA")
    print_spoofing_report(metrics_spoofed, "SPOOFED DATA")
    
    # 5. Plotar compara√ß√£o visual
    plot_spoofing_comparison(results_clean, results_spoofed, metrics_clean, metrics_spoofed)
    
    # 6. Salvar dados em CSV
    import pandas as pd
    
    # Criar DataFrame para dados limpos (label = 0)
    df_clean = pd.DataFrame({
        'block_idx': results_clean['block_idx'],
        'time_s': results_clean['time_s'],
        'best_prn': results_clean['best_prn'],
        'best_doppler': results_clean['best_doppler'],
        'best_tau_chips': results_clean['best_tau_chips'],
        'peak_snr': results_clean['peak_snr'],
        'peak_raw': results_clean['peak_raw'],
        'in_phase_lag': results_clean['in_phase_lag'],
        'correlation_width': results_clean['correlation_width'],
        'label': 0,  # 0 = aut√™ntico/limpo
        'label_name': 'authentic'
    })
    
    # Criar DataFrame para dados spoofed (label = 1)
    df_spoofed = pd.DataFrame({
        'block_idx': results_spoofed['block_idx'],
        'time_s': results_spoofed['time_s'],
        'best_prn': results_spoofed['best_prn'],
        'best_doppler': results_spoofed['best_doppler'],
        'best_tau_chips': results_spoofed['best_tau_chips'],
        'peak_snr': results_spoofed['peak_snr'],
        'peak_raw': results_spoofed['peak_raw'],
        'in_phase_lag': results_spoofed['in_phase_lag'],
        'correlation_width': results_spoofed['correlation_width'],
        'label': 1,  # 1 = spoofed
        'label_name': 'spoofed'
    })
    
    # Salvar CSVs individuais
    clean_csv_path = os.path.join(OUTPUT_DIR, 'gnss_clean_samples.csv')
    spoofed_csv_path = os.path.join(OUTPUT_DIR, 'gnss_spoofed_samples.csv')
    
    df_clean.to_csv(clean_csv_path, index=False)
    df_spoofed.to_csv(spoofed_csv_path, index=False)
    
    print(f"\n‚úÖ Dados limpos salvos em: {clean_csv_path}")
    print(f"   Amostras: {len(df_clean)}")
    
    print(f"\n‚úÖ Dados spoofed salvos em: {spoofed_csv_path}")
    print(f"   Amostras: {len(df_spoofed)}")
    
    # Criar dataset combinado para treinamento de ML
    df_combined = pd.concat([df_clean, df_spoofed], ignore_index=True)
    
    # Embaralhar os dados
    df_combined = df_combined.sample(frac=1, random_state=42).reset_index(drop=True)
    
    combined_csv_path = os.path.join(OUTPUT_DIR, 'gnss_dataset_labeled.csv')
    df_combined.to_csv(combined_csv_path, index=False)
    
    print(f"\n‚úÖ Dataset combinado salvo em: {combined_csv_path}")
    print(f"   Total de amostras: {len(df_combined)}")
    print(f"   - Aut√™nticas: {len(df_clean)} ({len(df_clean)/len(df_combined)*100:.1f}%)")
    print(f"   - Spoofed: {len(df_spoofed)} ({len(df_spoofed)/len(df_combined)*100:.1f}%)")
    
    # Salvar m√©tricas agregadas
    metrics_df = pd.DataFrame([
        {
            'source': 'clean',
            'label': 0,
            **{k: v for k, v in metrics_clean.items() if not isinstance(v, list)}
        },
        {
            'source': 'spoofed', 
            'label': 1,
            **{k: v for k, v in metrics_spoofed.items() if not isinstance(v, list)}
        }
    ])
    
    metrics_csv_path = os.path.join(OUTPUT_DIR, 'gnss_metrics_summary.csv')
    metrics_df.to_csv(metrics_csv_path, index=False)
    
    print(f"\n‚úÖ M√©tricas agregadas salvas em: {metrics_csv_path}")
    
    # Mostrar preview dos dados
    print("\n" + "="*70)
    print("  PREVIEW DO DATASET COMBINADO")
    print("="*70)
    print(df_combined.head(10).to_string())
    print("\n" + "="*70)
    print("  ESTAT√çSTICAS DO DATASET")
    print("="*70)
    print(df_combined.describe().to_string())

  PROCESSANDO SINAL LIMPO (CLEAN)

Iniciando aquisi√ß√£o GNSS para o arquivo: /home/rc-2d/Downloads/gnss-dataset/OSNMA_cleandata_opensky_460s.dat

Processando 10 blocos de 1.0 ms cada
Tempo total: 0.01 segundos
Amostras por bloco: 25575

Lendo arquivo: /home/rc-2d/Downloads/gnss-dataset/OSNMA_cleandata_opensky_460s.dat
