# Demodulación usando redes neuronales

## Librerías

In [3]:
import sofa
import polars as pl
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import tensorflow as tf
import os

# Scikit-Learn
from sklearn import metrics
from sklearn.cluster import KMeans
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import cross_validate
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split, GridSearchCV

# SciPy
from scipy.io import loadmat

# Tensorflow
from tensorflow.keras import models, regularizers, utils
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier

from datetime.datetime import now

%store -r

ModuleNotFoundError: No module named 'sofa'

In [None]:
PARAM_GRID_KNN = {"n_neighbors": [3, 5, 7, 9, 11, 13, 15]}
FIGSIZE = (16, 8)


# Función especial para leer todos los datos con la estructura estudiada
def read_data(folder_rx):
    data = {}

    # Leer la carpeta principal
    for folder in os.listdir(folder_rx):
        # Leer las subcarpetas
        if folder.endswith("spacing"):
            data[folder] = {}
            for file in os.listdir(f"{folder_rx}/{folder}"):
                if file.find("consY") != -1:
                    data_name = file.split("_")[2]
                    if data[folder].get(data_name) == None:
                        data[folder][data_name] = {}
                    mat_file_data = loadmat(f"{folder_rx}/{folder}/{file}")
                    data[folder][data_name] = mat_file_data
    return data


def classifier_model(layers_props_lst, loss_fn):
    model = tf.keras.Sequential()
    # Capas ocultas
    for i, layer_props in enumerate(layers_props_lst):
        if i == 0:
            # 2 entradas, correspondientes a I, Q
            model.add(ker.layers.Dense(**layer_props, input_dim=2))
        else:
            model.add(ker.layers.Dense(**layer_props))

    # Clasificador
    model.add(ker.layers.Dense(units=16, activation="softmax"))

    # Compilar modelo
    model.compile(loss=loss_fn, optimizer="adam")

    return model


def demodulate_neural(X, y, layer_props_lst, loss_fn, train_size):
    X = realify(X_rx)
    y = sym_tx

    X_train, _, y_train, _ = train_test_split(X, y, train_size=train_size)

    # Modelo clasificador
    model = classifier_model(layer_props_lst, loss_fn)
    callback = EarlyStopping(
        monitor="loss", patience=300, mode="min", restore_best_weights=True
    )
    model.fit(
        X_train, y_train, epochs=5000, batch_size=64, verbose=0, callbacks=[callback]
    )

    return model.predict(X)


# Función para demodular de manera tradicional, usando KNN y SVM
def demodulation(X_rx, X_tx, train_size):
    ber = {}

    for i, snr in enumerate(X_rx):
        # Extraer información
        X_ch_norm = X_rx[snr].get("const_Y").flatten()
        X_ch = sofa.mod_norm(X_ch_norm, 10) * X_ch_norm

        # Arreglos para el BER de cada algoritmo
        trad_ber = np.empty(4)
        knn_ber = np.empty(4)
        neural_ber = np.empty(4)

        for ph in range(4):
            # Rotar constelación
            rotated_X = X_ch * np.exp(ph * 1j * np.pi / 2)

            # Sincronizar de las señales
            synced_X_tx = sofa.sync_signals(X_tx, rotated_X)

            # Demodular señal transmitida
            y = sofa.demodulate(synced_X_tx, sofa.MOD_DICT)

            # Mejores parámetros para los algoritmos
            best_params_knn = sofa.find_best_params(
                KNeighborsClassifier, PARAM_GRID_KNN, rotated_X, y
            )
            k = best_params_knn["n_neighbors"]

            max_neurons = 64
            activations = ["relu", "sigmoid"]
            layer_props = [
                {"units": max_neurons // (2**i), "activation": activation}
                for i, activation in enumerate(activations)
            ]
            loss_fn = tf.keras.losses.SparseCategoricalCrossentropy

            # Demodulación
            trad = sofa.demodulate(rotated_X, sofa.MOD_DICT)
            knn = sofa.demodulate_knn(rotated_X, y, k=k, train_size=train_size)
            neural = sofa.demodulate_neural(
                rotated_X, y, layer_props, loss_fn, activations
            )

            # BER de la demodulación con respecto a la transmitida desplazada
            # Índice 0 del retorno para tomar el BER e ignorar la cantidad de errores
            trad_ber[ph] = sofa.bit_error_rate(trad, y)[0]
            knn_ber[ph] = sofa.bit_error_rate(knn, y)[0]
            neural_ber[ph] = sofa.bit_error_rate(neural, y)[0]

        ber[snr] = {
            "trad": np.amin(trad_ber),
            "knn": np.amin(knn_ber),
            "neural": np.amin(neural_ber),
        }

        # Mensaje para saber que se progresa
        print(f"SNR {snr[5:]} terminado.")
    return ber


def curve_fit(f, x, y):
    popt, pcov = sp.optimize.curve_fit(f, x, y)
    return popt


def spacing_ber_eval(ber, spacing):
    print(
        f"Evaluación de la red neuronal con respecto a KNN para espaciamiento de {spacing} GHz"
    )

    # Lista de strings con los SNR
    SNR = [snr[5:-2] for snr in list(data[f"{spacing}GHz_spacing"].keys())]

    SNR.sort()

    get_ber = lambda algorithm: [
        np.log10(ber.get(f"consY{snr_i}dB").get(algorithm)) for snr_i in SNR
    ]
    tBER = get_ber("trad")
    kBER = get_ber("knn")
    nBER = get_ber("neural")

    # Función para usar en la gráfica
    f = lambda x, a, b, c: a * x**2 + b * x + c

    # Arreglo con los SNR como flotantes
    dSNR = np.array(SNR, dtype=np.float64)
    xSNR = np.linspace(float(SNR[0]), float(SNR[-1]), 1000)

    plt.rcParams["text.usetex"] = True
    plt.figure(figsize=(10, 6))
    # Tradicional: Brown
    # KNN: Green
    # Neural Network: Orange
    plt.scatter(dSNR, tBER, marker="^", c="brown", label="Tradicional")
    plt.plot(
        xSNR,
        f(xSNR, *curve_fit(f, dSNR, tBER)),
        c="brown",
        ls="dashed",
        label="Tradicional (fit)",
    )
    plt.scatter(dSNR, kBER, marker="x", c="green", label="KNN")
    plt.plot(
        xSNR,
        f(xSNR, *curve_fit(f, dSNR, kBER)),
        c="green",
        ls="dashed",
        label="KNN (fit)",
    )
    plt.scatter(dSNR, nBER, marker="o", c="orange", label="Neural Network")
    plt.plot(
        xSNR,
        f(xSNR, *curve_fit(f, dSNR, nBER)),
        c="orange",
        ls="dashed",
        label="Neural Network (fit)",
    )

    plt.title(f"{spacing} GHz spacing")
    plt.xlabel("OSNR (dB)")
    yticks = [f"$10^{{{tick}}}$" for tick in plt.yticks()]
    plt.yticks(yticks)
    plt.ylabel("BER")
    plt.legend(loc="upper right")
    plt.grid()
    plt.show()


def calc_once(varname, fn, args):
    """Calcular una variable una sola vez."""
    if varname not in globals():
        return fn(**args)
    return eval(varname)


## Datos experimentales

In [4]:
file_tx = "Datos/2x16QAM_16GBd.mat"
folder_rx = "Datos/"

# Datos transmitidos
X_tx_norm = loadmat(file_tx)
X_tx_norm = X_tx_norm.get("Constellation").flatten()[0][0].flatten()
X_tx = sofa.mod_norm(X_tx_norm, 10)*X_tx_norm

# Leer los datos recibidos
data = read_data(folder_rx)

In [5]:
print(f"Inicio: {now()}")

Inicio: 2023-03-17 16:00:29.378404


## Espaciamiento de 18 GHz

In [None]:
spacing = "18"

neural_vs_knn_ber18 = calc_ber_once(spacing)

%store neural_vs_knn_ber18

spacing_ber_eval(neural_vs_knn_ber18, spacing)