## Detector de Tramas

### Marco teorico - Dechirping

El dechirping es la técnica central que permite que LoRa realice la detección de símbolos con complejidad reducida, transformando chirps en tonos y usando la FFT para identificar el símbolo correcto.
En el contexto de la trama, este proceso se aplica exclusivamente sobre el payload, después de que el preámbulo y el SFD cumplen su función de sincronización y delimitación de inicio.

In [48]:
import numpy as np
from scipy import signal as sp_signal

In [None]:
def waveform_former(s, SF, T, Bw):
    M = 2 ** SF
    sobremuestreo = 1/(Bw*T)
    k = np.arange(M*sobremuestreo)
    waveform = np.zeros((len(s), len(k)), dtype=complex)

    for i, simbolo in enumerate(s):
        fase = ((simbolo + k/sobremuestreo)) * (k*T*Bw) / M
        chirp = (1 / np.sqrt(M)) * np.exp(1j * 2 * np.pi * fase)
        waveform[i] = chirp

    return waveform

In [50]:
def upchirp(SF, T, Bw):
    M_local = 2**SF  
    sobremuestreo = 1/(Bw*T)
    k = np.arange(M_local*sobremuestreo)
    fase = (k/sobremuestreo) * (k*T*Bw) / M_local
    chirp = (1 / np.sqrt(M_local)) * np.exp(1j * 2 * np.pi * fase)
    return chirp

In [51]:
def downchirp(SF, T, Bw):
    return np.conj(upchirp(SF, T, Bw))

In [52]:
def n_tuple_former(chirps_recibidos, SF, T, Bw):
    producto = chirps_recibidos * downchirp(SF, T, Bw)  # Multiplicación punto a punto
    fft_producto = np.fft.fft(producto, axis=1)
    simbolos_estimados = np.argmax(np.abs(fft_producto), axis=1)  # La función np.argmax() de la biblioteca NumPy devuelve el índice del valor máximo dentro de un array. Se busca el impulso mas grande en el espectro de frecuencia y nos quedamos con el indice.

    return simbolos_estimados

In [53]:
def build_tx_frame(simbolos_data, SF, preamble_len=8):
    M  = 2**SF
    up = upchirp(SF, 1, 1)          # up-chirp s=0 (forma discreta típica)
    down = downchirp(SF,1 ,1)

    # preámbulo: Np up-chirps
    pre = np.tile(up, preamble_len)

    # SFD: 2 + 1/4 down-chirps (2.25)
    sfd = np.concatenate([np.tile(down, 2), down[:M//4]])

    # payload (matriz símbolos → vector)
    payload = waveform_former(simbolos_data, SF, 1, 1).flatten()

    trama = np.concatenate([pre, sfd, payload])

    return trama

In [54]:
def dechirp(sig, start_idx, SF, up=True, zero_padding_ratio=10):
    """Dechirping de una ventana comenzando en start_idx"""
    M = 2**SF
    if up:
        ref = downchirp(SF, 1, 1)
    else:
        ref = upchirp(SF, 1, 1)
    fft_len = M * zero_padding_ratio
    chirp_segment = sig[start_idx:start_idx+M]
    ft = np.fft.fft(chirp_segment * ref, fft_len)
    ft_mag = np.abs(ft[:fft_len//2])
    peak_bin = np.argmax(ft_mag)
    peak_val = ft_mag[peak_bin]
    return peak_val, peak_bin

In [55]:
def detect_preamble(sig, SF, preamble_len=8, zero_padding_ratio=10):
    
    M = 2**SF
    sample_num = M
    bin_num = M * zero_padding_ratio
    ii = 0
    pk_bin_list = []
    while ii < len(sig) - sample_num * preamble_len:
        if len(pk_bin_list) >= preamble_len - 1:
            # Preambulo detectado
            x = ii - round((pk_bin_list[-1]) / zero_padding_ratio * 2)
            return x
        pk_val, pk_bin = dechirp(sig, ii, SF, up=True, zero_padding_ratio=zero_padding_ratio)
        if pk_bin_list:
            bin_diff = (pk_bin_list[-1] - pk_bin) % bin_num
            if bin_diff > bin_num/2:
                bin_diff = bin_num - bin_diff
            if bin_diff <= zero_padding_ratio:
                pk_bin_list.append(pk_bin)
            else:
                pk_bin_list = [pk_bin]
        else:
            pk_bin_list = [pk_bin]
        ii += sample_num
    return -1  # no detectado

In [68]:
SF = 7
simbolos_tx = np.array([5, 23, 85, 100])
trama_tx = build_tx_frame(simbolos_tx, SF)

# Añadir ruido, desplazamiento o offset si querés simular canal
trama_rx = trama_tx.copy()

# Detectar preámbulo
x = detect_preamble(trama_rx, SF)
print("Preambulo detectado en índice:", x)

Preambulo detectado en índice: -1


In [57]:
def process_frame(trama_rx, SF, preamble_len=8):
    """
    Procesa una trama LoRa recibida:
      - descarta preámbulo y SFD
      - aplica dechirping (n_tuple_former) al payload
      - devuelve símbolos estimados
    """
    M = 2**SF

    # Longitudes
    pre_len = preamble_len*M               # preámbulo = upchirps de longitud 2M
    sfd_len = 2*M + (M//4)                    # SFD = 2.25 downchirps de 2M
    start_payload = pre_len + sfd_len

    # Cortar payload de la trama
    payload_rx = trama_rx[start_payload:]

    # Reorganizar en ventanas de tamaño 2M
    n_sym = len(payload_rx) // (M)
    chirps_recibidos = payload_rx[:n_sym*M].reshape((n_sym, M))


    # Pasar al detector
    simbolos_estimados = n_tuple_former(chirps_recibidos, SF, 1, 1)

    return simbolos_estimados


In [58]:
SF = 7
simbolos_tx = np.array([5, 23, 90, 100, 100])   # Ejemplo de payload

# Construir trama Tx
trama_tx = build_tx_frame(simbolos_tx, SF, 8)

# Procesar trama
simbolos_hat = process_frame(trama_tx, SF, 8)

print("Símbolos transmitidos :", simbolos_tx)
print("Símbolos detectados   :", simbolos_hat)


Símbolos transmitidos : [  5  23  90 100 100]
Símbolos detectados   : [  5  23  90 100 100]


In [59]:

# Funciones específicas para oversampling 2x (para el detector)
def upchirp_2x(SF):
    """
    Genera up-chirp con oversampling 2x para detección.
    Según el paper (pág. 7): señal muestreada a 2*Bw
    Fase: k²/(2M) para k=0..2M-1
    """
    M_local = 2**SF
    sample_num = 2 * M_local  # Oversampling 2x
    k = np.arange(sample_num)
    fase = (k**2) / (2 * M_local)
    chirp = (1 / np.sqrt(M_local)) * np.exp(1j * 2 * np.pi * fase)
    return chirp

def downchirp_2x(SF):
    """Genera down-chirp con oversampling 2x (conjugado del up-chirp)"""
    return np.conj(upchirp_2x(SF))

## Detector de Tramas V2 - Implementación Completa

### Basado en LoRaPHY.m

Esta sección implementa el detector de tramas completo siguiendo el paper "From Demodulation to Decoding: Toward Complete LoRa PHY Understanding and Implementation" y el código de referencia LoRaPHY.m.

**Componentes principales:**
1. **dechirp_cpa**: Dechirping con oversampling 2x y método CPA (Coarse Phase Alignment)
2. **detect_preamble_v2**: Detección robusta de preámbulo buscando 7 up-chirps consecutivos
3. **sync_frame**: Sincronización fina detectando SFD y calculando CFO
4. **demodulate_frame_complete**: Flujo completo de recepción

In [60]:
def dechirp_cpa(sig, start_idx, SF, T, Bw, is_up=True, zero_padding_ratio=10):

    M_local = 2**SF
    sample_num = 2 * M_local  # Oversampling 2x: señal muestreada a 2*Bw
    
    # CORREGIDO: Usar funciones upchirp_2x/downchirp_2x en lugar de generación inline
    up_ref = upchirp_2x(SF)
    down_ref = downchirp_2x(SF)

    if is_up:   # Para detectar up-chirps, multiplicamos por down-chirp      
        ref = down_ref
    else:       # Para detectar down-chirps, multiplicamos por up-chirp      
        ref = up_ref

    if start_idx + sample_num > len(sig):   # Extraer segmento de señal
        return (0, 0)  # Fuera de rango

    chirp_segment = sig[start_idx:start_idx+sample_num]   
    dechirped = chirp_segment * ref     # Dechirping: multiplicar por chirp de referencia   
    fft_len = sample_num * zero_padding_ratio       # FFT con zero-padding
    ft = np.fft.fft(dechirped, fft_len)

    ft_mag = np.abs(ft)  # Usar solo la magnitud de la FFT completa (sin CPA)

    peak_bin = np.argmax(ft_mag)    # Encontrar pico máximo
    peak_value = ft_mag[peak_bin]

    return (peak_value, peak_bin)

In [61]:
def detect_preamble_v2(sig, SF, T, Bw, preamble_len=8, zero_padding_ratio=10):
    
    M = 2**SF
    sample_num = 2 * M  # Oversampling 2x
    bin_num = M * zero_padding_ratio  
    ii = 0
    pk_bin_list = []

    while (ii < len(sig) - sample_num * preamble_len):  # Si encontramos preamble_len chirps consecutivos, tenemos un preámbulo    
        if len(pk_bin_list) >= preamble_len:  # CORREGIDO: Detectar los 8 chirps completos
            x = ii - round(pk_bin_list[-1] / zero_padding_ratio * 2)
            return x       
      
        pk_val, pk_bin = dechirp_cpa(sig, ii, SF, T, Bw, is_up=True, zero_padding_ratio=zero_padding_ratio) # Aplicar dechirping en la ventana actual
        
        if pk_bin_list: # Verificar consistencia con el bin anterior        
            bin_diff = (pk_bin_list[-1] - pk_bin) % bin_num
            if bin_diff > bin_num / 2:
                bin_diff = bin_num - bin_diff           
            if bin_diff <= zero_padding_ratio:              
                pk_bin_list.append(pk_bin)   # Bins consistentes → agregar a la lista
            else:              
                pk_bin_list = [pk_bin]  # Bins inconsistentes → reiniciar búsqueda
        else:          
            pk_bin_list = [pk_bin]  # Primera detección
        
        ii += sample_num    # Avanzar ventana
    
    return -1  # No se detectó preámbulo

In [62]:
def sync_frame(sig, x_coarse, SF, T, Bw, zero_padding_ratio=10):
   
    M = 2**SF
    sample_num = 2 * M
    bin_num = M * zero_padding_ratio  
    x = x_coarse    
    found = False
    
    while (x < len(sig) - sample_num):    # Paso 1: Encontrar el SFD (transición a down-chirps)
        up_peak = dechirp_cpa(sig, x, SF, T, Bw, is_up=True, zero_padding_ratio=zero_padding_ratio)
        down_peak = dechirp_cpa(sig, x, SF, T, Bw, is_up=False, zero_padding_ratio=zero_padding_ratio)

        if (abs(down_peak[0]) > abs(up_peak[0])):
            # Down-chirp detectado → encontramos el SFD
            found = True
        x = x + sample_num
        if (found):
            break

    if (not found): # No se encontró SFD, retornar valores por defecto     
        return x_coarse, 0, 0.0   
    
    pkd = dechirp_cpa(sig, x, SF, T, Bw, is_up=False, zero_padding_ratio=zero_padding_ratio)    # Paso 2: Up-Down Alignment (alineación fina)

    if (pkd[1] > bin_num / 2):    # CORREGIDO: Ajustar cálculo sin el -1 porque peak_bin ahora empieza desde 0
        to = round((pkd[1] - bin_num) / zero_padding_ratio)
    else:
        to = round(pkd[1] / zero_padding_ratio)
    
    x = x + to 

    # Paso 3: Establecer preamble_bin de referencia
    pku = dechirp_cpa(sig, x - 4*sample_num, SF, T, Bw, is_up=True, zero_padding_ratio=zero_padding_ratio) # Mirar hacia atrás 4 símbolos (estamos después del preámbulo)
    preamble_bin = pku[1]

    if (preamble_bin > bin_num / 2):  # Paso 4: Estimar CFO, CORREGIDO: Ajustar cálculo sin el -1
        cfo = (preamble_bin - bin_num) * Bw / bin_num
    else:
        cfo = preamble_bin * Bw / bin_num

    # Paso 5: Determinar si estamos en el 1er o 2do down-chirp del SFD
    pku_prev = dechirp_cpa(sig, x - sample_num, SF, T, Bw, is_up=True, zero_padding_ratio=zero_padding_ratio)
    pkd_prev = dechirp_cpa(sig, x - sample_num, SF, T, Bw, is_up=False, zero_padding_ratio=zero_padding_ratio)

    if (abs(pku_prev[0]) > abs(pkd_prev[0])):
        x_sync = x + round(2.25 * sample_num)   # El símbolo anterior es up-chirp → estamos en el 1er down-chirp
    else:
        x_sync = x + round(1.25 * sample_num)   # El símbolo anterior es down-chirp → estamos en el 2do down-chirp
    
    return x_sync, preamble_bin, cfo

In [63]:
def demodulate_frame_complete(trama_rx, SF, T, Bw, preamble_len=8, zero_padding_ratio=10):
     
    # Paso 1: Resamplear a 2*Bw (oversampling)
    # Usamos resample de scipy para interpolación precisa
    
    
    # La señal de entrada está a fs = Bw (por defecto en build_tx_frame)
    # Necesitamos resamplear a 2*Bw
    num_samples_original = len(trama_rx)
    num_samples_target = num_samples_original * 2
    sig_resampled = sp_signal.resample(trama_rx, num_samples_target)
    
    # Paso 2: Detectar preámbulo (alineación gruesa)
    x_coarse = detect_preamble_v2(sig_resampled, SF, T, Bw, preamble_len, zero_padding_ratio)
    
    if (x_coarse == -1):
        print("No se detectó preámbulo")
        return np.array([]), -1, 0.0, {'status': 'no_preamble'}
    
    print(f"Preámbulo detectado en índice: {x_coarse} (señal oversampleada)")
    
    # Paso 3: Sincronización fina con SFD
    x_sync, preamble_bin, cfo = sync_frame(sig_resampled, x_coarse, SF, T, Bw, zero_padding_ratio)
    
    print(f"Sincronización fina completada")
    print(f"- Inicio payload: {x_sync}")
    print(f"- Preamble bin: {preamble_bin}")
    print(f"- CFO estimado: {cfo:.6f} Hz")

    # Paso 4: Extraer y demodular payload
    M = 2**SF
    sample_num = 2 * M  # Oversampling 2x
    
    # Extraer payload (resto de la señal después de x_sync)
    payload_signal = sig_resampled[x_sync:]
    
    # Calcular número de símbolos disponibles
    n_symbols = len(payload_signal) // sample_num
    
    if (n_symbols == 0):
        print("No hay suficientes muestras para payload")
        return np.array([]), x_sync, cfo, {'status': 'no_payload'}
    
    # Reorganizar en matriz de chirps (n_symbols x sample_num)
    payload_chirps = payload_signal[:n_symbols*sample_num].reshape((n_symbols, sample_num))
    
    # CORREGIDO: Usar función downchirp_2x() en lugar de generación inline
    down_ref = downchirp_2x(SF)
    
    # Demodular: multiplicar por downchirp y aplicar FFT
    producto = payload_chirps * down_ref
    fft_producto = np.fft.fft(producto, axis=1)
    
    # Encontrar bins de máxima magnitud
    simbolos_raw = np.argmax(np.abs(fft_producto), axis=1)
    
    # CORREGIDO: Compensar CFO usando preamble_bin como referencia
    # Ajustar símbolos para compensar el offset de frecuencia
    # Como la señal tiene oversampling 2x, el bin corresponde a sample_num bins totales
    # Normalizar al rango [0, 2^SF)
    simbolos_ajustados = (simbolos_raw - preamble_bin) % sample_num
    simbolos_rx = (simbolos_ajustados * M // sample_num).astype(int) % (2**SF)
    
    info = {
        'status': 'success',
        'x_coarse': x_coarse,
        'x_sync': x_sync,
        'preamble_bin': preamble_bin,
        'cfo': cfo,
        'n_symbols': n_symbols
    }
    
    return simbolos_rx, x_sync, cfo, info

In [65]:
def ser(s_tx, s_rx):
    return np.mean(s_rx != s_tx)

### Prueba del Detector Completo

In [66]:
# PRUEBA DEL DETECTOR DE TRAMAS COMPLETO

print("="*60)
print("PRUEBA DEL DETECTOR DE TRAMAS LoRa V2")
print("="*60)

# Parámetros de prueba
SF = 7
Bw = 1
T = 1/Bw
M = 2**SF

# Generar símbolos de prueba
simbolos_tx = np.array([5, 23, 90, 100, 100])
print(f"Símbolos TX: {simbolos_tx}")

# Construir trama completa
trama_tx = build_tx_frame(simbolos_tx, SF, preamble_len=8)
print(f"Trama construida: {len(trama_tx)} muestras")
print(f"- Preámbulo: {8*M} muestras")
print(f"- SFD: {int(2.25*M)} muestras")
print(f"- Payload: {len(simbolos_tx)*M} muestras")

# Simular canal (sin ruido para validación inicial)
print("Canal: Sin ruido (validación ideal)")
trama_rx = trama_tx.copy()

# Aplicar detector completo
print("Aplicando detector completo...")
print("-"*60) 
simbolos_rx, x_payload, cfo, info = demodulate_frame_complete(
    trama_rx, SF, T, Bw, preamble_len=8, zero_padding_ratio=10
)
print("-"*60)

# Mostrar resultados
if info['status'] == 'success':
    print(f"DETECCIÓN EXITOSA!")
    print(f"Símbolos RX: {simbolos_rx}")
    print(f"Comparación:")
    print(f"TX: {simbolos_tx}")
    print(f"RX: {simbolos_rx}")
    
    # Calcular métricas
    if len(simbolos_rx) >= len(simbolos_tx):
        simbolos_rx_comparar = simbolos_rx[:len(simbolos_tx)]
        error_simbolos = np.sum(simbolos_tx != simbolos_rx_comparar)
        ser_result = ser(simbolos_tx, simbolos_rx_comparar)
        
        print(f"Métricas:")
        print(f"- SER: {ser_result:.6f}")
        print(f"- Símbolos erróneos: {error_simbolos}/{len(simbolos_tx)}")
        
        if ser_result == 0:
            print(f"¡PERFECTO! SER = 0 (canal ideal)")
        else:
            print(f"Hay errores de símbolo")
    else:
        print(f"Advertencia: Se demodularon menos símbolos de los esperados")
        print(f"Esperados: {len(simbolos_tx)}, Recibidos: {len(simbolos_rx)}")
else:
    print(f"DETECCIÓN FALLIDA: {info['status']}")


PRUEBA DEL DETECTOR DE TRAMAS LoRa V2
Símbolos TX: [  5  23  90 100 100]
Trama construida: 1952 muestras
- Preámbulo: 1024 muestras
- SFD: 288 muestras
- Payload: 640 muestras
Canal: Sin ruido (validación ideal)
Aplicando detector completo...
------------------------------------------------------------
No se detectó preámbulo
------------------------------------------------------------
DETECCIÓN FALLIDA: no_preamble
