In [None]:
# Importa los modulos Python necesarios
# OJO: En Google Colab ejecutar dos veces

# Configura GPU
import os
if os.getenv("CUDA_VISIBLE_DEVICES") is None:
    gpu_num = 0 # Reemplaza por "" para usar CPU
    os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_num}"
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Importa Sionna; si ejecuta localmente, no olvide instalar Sionna con "pip install sionna"
try:
    import sionna
except ImportError as e:
    import sys
    if 'google.colab' in sys.modules:
       # Instala Sionna en Google Colab
       print("Installing Sionna and restarting the runtime. Please run the cell again.")
       os.system("pip install sionna")
       os.kill(os.getpid(), 5)
    else:
       raise e

# Importa TensorFlow
import tensorflow as tf
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)
tf.get_logger().setLevel('ERROR')

# Importa otros modulos necesarios
import numpy as np
import pandas as pd

# Configura Matplotlib para graficos en linea
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
# Parametros generales de la simulacion

# Parametros basicos
sionna.phy.config.seed = 0 # semilla aleatoria
N = 256 # longitud del código (potencia de 2)
K = 128 # número de bits de información (0 < K < N)
bitsxsimb = 1 # bits por símbolo (BPSK)
if bitsxsimb == 1:
    modulacion = "pam" # +1+0j y -1+0j para BPSK
else:
    modulacion = "qam" # QAM para más bits por símbolo
bloq_inter = 64 # Numero de bloques de interleaving (1 = sin interleaving)
list_scl = 8 # tamaño de lista para decodificador SCL
LLR_clip = 1000 # valor de recorte para LLRs
canal = "awgn" # posibles valores: "awgn", "rayleigh_plano", "rayleigh_simbolo"

# Clases comunes
datos = sionna.phy.mapping.BinarySource() # generador de datos aleatorios
constelacion = sionna.phy.mapping.Constellation(modulacion, bitsxsimb) # define la constelacion
mapeador = sionna.phy.mapping.Mapper(constellation=constelacion) # bits a simbolos
demapeador = sionna.phy.mapping.Demapper("app", constellation=constelacion) # inverso del mapeador
interleaver = sionna.phy.fec.interleaving.RandomInterleaver(seed=0, axis=1) # interleaver a nivel de bloq_inter
deinterleaver = sionna.phy.fec.interleaving.Deinterleaver(interleaver) # inverso del interleaver

# Parametros Monte Carlo
lote_mc = 2000 // bloq_inter # Numero de muestras a procesar en paralelo (emplea VRAM)
num_iter = 5000 # Numero de muestras a procesar en secuencia (emplea tiempo)
# DATO: Cada bit-canal se simula lote_mc * bloq_inter * num_iter veces
snr_simulacion = 12.0 # Valor sugerido 4 dB para AWGN, 12 dB para Rayleigh

# Parametros de ploteo
EBNO_DB_MIN = 0 # Eb/N0 mínimo en dB
EBNO_DB_MAX = 20 # Eb/N0 máximo en dB
EBNO_RANGO = EBNO_DB_MAX - EBNO_DB_MIN + 1 # rango de Eb/N0
lote_berplot = 50 # tamaño de lote para ploteo de curvas

In [None]:
# Modelos de canal

# Canal Rayleigh con desvanecimiento plano
@tf.function(jit_compile=True)
def canalRayleighPlano(ebno_db, bitsxsimb, coderate, x):
    batch_size = tf.shape(x)[0] # extrae el tamaño del batch
    bloq_inter = tf.shape(x)[1] # extrae el número de bloques de interleaving
    no = sionna.phy.utils.ebnodb2no(ebno_db, bitsxsimb, coderate=coderate)
    awgn_real = sionna.phy.config.tf_rng.normal(tf.shape(x), mean=0.0 ,stddev=tf.sqrt(no/2.0))
    awgn_imag = sionna.phy.config.tf_rng.normal(tf.shape(x), mean=0.0 ,stddev=tf.sqrt(no/2.0))
    awgn = tf.complex(awgn_real, awgn_imag) # ruido blanco aditivo complejo
    # "h" se mantiene constante en la dimension de N
    h_real = sionna.phy.config.tf_rng.normal([batch_size, bloq_inter], stddev=tf.sqrt(0.5), dtype=tf.float32)
    h_imag = sionna.phy.config.tf_rng.normal([batch_size, bloq_inter], stddev=tf.sqrt(0.5), dtype=tf.float32)
    h = tf.expand_dims(tf.complex(h_real, h_imag), -1) # ganancia y fase aleatorias
    y = h * x + awgn
    return no, h, y

# Canal Rayleigh con desvanecimiento por simbolo
@tf.function(jit_compile=True)
def canalRayleighSimbolo(ebno_db, bitsxsimb, coderate, x):
    no = sionna.phy.utils.ebnodb2no(ebno_db, bitsxsimb, coderate=coderate)
    awgn_real = sionna.phy.config.tf_rng.normal(tf.shape(x), mean=0.0 ,stddev=tf.sqrt(no/2.0))
    awgn_imag = sionna.phy.config.tf_rng.normal(tf.shape(x), mean=0.0 ,stddev=tf.sqrt(no/2.0))
    awgn = tf.complex(awgn_real, awgn_imag) # ruido blanco aditivo complejo
    # "h" se mantiene constante en la dimension de N
    h_real = sionna.phy.config.tf_rng.normal(tf.shape(x), stddev=tf.sqrt(0.5), dtype=tf.float32)
    h_imag = sionna.phy.config.tf_rng.normal(tf.shape(x), stddev=tf.sqrt(0.5), dtype=tf.float32)
    h = tf.complex(h_real, h_imag) # ganancia y fase aleatorias
    y = h * x + awgn
    return no, h, y

# Canal AWGN
@tf.function(jit_compile=True)
def canalAWGN(ebno_db, bitsxsimb, coderate, x):
    no = sionna.phy.utils.ebnodb2no(ebno_db, bitsxsimb, coderate=coderate)
    awgn_real = sionna.phy.config.tf_rng.normal(tf.shape(x), mean=0.0 ,stddev=tf.sqrt(no/2.0))
    awgn_imag = sionna.phy.config.tf_rng.normal(tf.shape(x), mean=0.0 ,stddev=tf.sqrt(no/2.0))
    awgn = tf.complex(awgn_real, awgn_imag) # ruido blanco aditivo complejo
    y = x + awgn
    return no, y

In [None]:
# Bloques de recepcion

# Recepcion para canal AWGN
@tf.function(jit_compile=True)
def rx_awgn(y, no):
    llr = demapeador(y, no)
    return llr

# Recepcion para otros canales, no include deinterleaving
@tf.function(jit_compile=True)
def rx_mmse(h, y, no):
    mmse = tf.math.conj(h) / tf.cast(tf.math.square(tf.abs(h)) + no, tf.complex64)
    no_eff = no * tf.math.square(tf.abs(mmse))
    y_eq = mmse * y
    llr_pre = tf.clip_by_value(demapeador(y_eq, no_eff), -LLR_clip, LLR_clip)
    return llr_pre

In [None]:
# Define la clase del sistema sin código de canal

class SinCodigo(sionna.phy.Block):
    def __init__(self, canal, bloq_inter, K, bitsxsimb):
        super().__init__()
        self.canal = canal
        self.bloq_inter = bloq_inter
        self.K = K
        self.bitsxsimb = bitsxsimb
        
    def call(self, batch_size, ebno_db):
        # Bloques de transmision
        b = datos([batch_size, self.bloq_inter, self.K])
        b_int = interleaver(b)
        x = mapeador(b_int)
        
        # Canal Rayleigh
        if self.canal == "awgn":
            no, y = canalAWGN(ebno_db, self.bitsxsimb, coderate=1.0, x=x)
        elif self.canal == "rayleigh_plano":
            no, h, y = canalRayleighPlano(ebno_db, self.bitsxsimb, coderate=1.0, x=x)
        elif self.canal == "rayleigh_simbolo":
            no, h, y = canalRayleighSimbolo(ebno_db, self.bitsxsimb, coderate=1.0, x=x)
        else:
            raise Exception("Debe seleccionar un canal valido")

        # Bloque de recepcion
        if self.canal == "awgn":
            llr_pre = rx_awgn(y, no)
            llr = deinterleaver(llr_pre)
        else:
            llr_pre = rx_mmse(h, y, no)
            llr = deinterleaver(llr_pre)

        return b, llr

In [None]:
# Se inicializa la clase con parametros generales
sin_codigo = SinCodigo(canal, bloq_inter, K, bitsxsimb)

In [None]:
# Simulación y ploteo de BER

ber_plots = sionna.phy.utils.PlotBER("Comparacion canal")
ber_plots.simulate(sin_codigo,
                   ebno_dbs=np.linspace(EBNO_DB_MIN, EBNO_DB_MAX, EBNO_RANGO),
                   batch_size=lote_berplot,
                   num_target_block_errors=100,
                   legend="Sin codigo",
                   soft_estimates=True,
                   max_mc_iter=100,
                   show_fig=True);

In [None]:
# Generador Monte Carlo de bits congelados con XLA acceleration

# Función compilada XLA para simulacion de batch_size
@tf.function(jit_compile=True)
def simulacion_batch_size(canal, snr_simulacion, bitsxsimb, batch_size, bloq_inter, codificador, decodificador):
    # Bloques de transmision
    bits = datos([batch_size, bloq_inter, 1]) # solo 1 bit de datos por bit-canal
    bits_codif = codificador(bits) # codifica a N bits
    bits_int = interleaver(bits_codif)
    x = mapeador(bits_int)

    # Canal a simular (solo 1 bit de datos por bit-canal)
    if canal == "awgn":
        no, y = canalAWGN(snr_simulacion, bitsxsimb, coderate=1.0/N, x=x)
    elif canal == "rayleigh_plano":
        no, h, y = canalRayleighPlano(snr_simulacion, bitsxsimb, coderate=1.0/N, x=x)
    elif canal == "rayleigh_simbolo":
        no, h, y = canalRayleighSimbolo(snr_simulacion, bitsxsimb, coderate=1.0/N, x=x)
    else:
        raise Exception("Debe seleccionar un canal valido")

    # Bloques de recepcion
    if canal == "awgn":
        llr_pre = rx_awgn(y, no)
        llr = deinterleaver(llr_pre)
    else:
        llr_pre = rx_mmse(h, y, no)
        llr = deinterleaver(llr_pre)
    
    bits_decodif = decodificador(llr)

    # Cálculo de errores y borrado de variables intermedias
    err = tf.math.count_nonzero(tf.not_equal(bits, bits_decodif), dtype=tf.int32)
    del bits, bits_codif, bits_int, x, y, llr_pre, llr, bits_decodif
    
    return tf.cast(err, tf.int64), tf.constant(batch_size, dtype=tf.int64)


# Estimador de bits congelados
BER_bits = np.zeros(N)
for i in range(N):
    print(f"Procesando bit-canal {i+1}/{N}", end="", flush=True)

    # Se setean los bits congelados y reinicia conteo errores
    bits_congelados = np.array([j for j in range(N) if j != i]) # se analiza cada bit-canal
    codificador = sionna.phy.fec.polar.encoding.PolarEncoder(bits_congelados, N)
    decodificador = sionna.phy.fec.polar.decoding.PolarSCLDecoder(bits_congelados, N, list_size=list_scl)
    errores_totales = 0
    bits_totales = 0

    # Se itera num_iter veces para acumular errores
    for b in range(num_iter):
        err, total = simulacion_batch_size(canal, snr_simulacion, bitsxsimb, lote_mc, bloq_inter, codificador, decodificador)
        errores_totales += int(err.numpy())
        bits_totales += int(total.numpy())

    print()
    BER_bits[i] = errores_totales / bits_totales

# Resultados Monte Carlo
indices_ordenados = np.argsort(BER_bits)
bits_congelados = indices_ordenados[-(N-K):]

In [None]:
# Bloque de guardado de variables de simulacion Monte Carlo

output_dir = "resultados"
os.makedirs(output_dir, exist_ok=True)
df = pd.DataFrame({"BER bits": BER_bits})
# OJO: Modificar abajo segun nombre de archivo deseado
filename = os.path.join(output_dir,f"prueba_awgn_BER_bits_{N}_{K}_{num_iter}.csv")
# OJO: Modificar arriba segun nombre de archivo deseado

df.to_csv(filename, index=True)

In [None]:
# Bloque de recupero de variables de simulacion Monte Carlo

output_dir = "resultados"
# OJO: Modificar abajo segun nombre de archivo deseado
filename = os.path.join(output_dir,f"prueba_awgn_simbolo_BER_bits_{N}_{K}_{num_iter}.csv")
# OJO: Modificar arriba segun nombre de archivo deseado

df = pd.read_csv(filename)
BER_bits = df["BER bits"].to_numpy()
indices_ordenados = np.argsort(BER_bits)
bits_congelados = indices_ordenados[-(N-K):]

In [None]:
# Define la clase del sistema con código polar

class CodigoPolar(sionna.phy.Block):
    def __init__(self, canal, N, K, bloq_inter, bits_congelados, lista_SCL, bitsxsimb):
        super().__init__()
        self.canal = canal
        self.N = N
        self.K = K
        self.bloq_inter = bloq_inter
        self.bits_congelados = bits_congelados
        self.lista_SCL = lista_SCL
        self.bitsxsimb = bitsxsimb
        
        # Inicializa los bloques necesarios
        self.polar_codif = sionna.phy.fec.polar.encoding.PolarEncoder(self.bits_congelados, self.N)
        self.polar_decodif = sionna.phy.fec.polar.decoding.PolarSCLDecoder(self.bits_congelados, self.N, list_size=self.lista_SCL)
        
    def call(self, batch_size, ebno_db):
        # Bloques de transmision
        b = datos([batch_size, self.bloq_inter, self.K])
        bits_codif = self.polar_codif(b)
        bits_int = interleaver(bits_codif)
        x = mapeador(bits_int)

        # Canal Rayleigh
        if self.canal == "awgn":
            no, y = canalAWGN(ebno_db, self.bitsxsimb, coderate=self.K/self.N, x=x)
        elif self.canal == "rayleigh_plano":
            no, h, y = canalRayleighPlano(ebno_db, self.bitsxsimb, coderate=self.K/self.N, x=x)
        elif self.canal == "rayleigh_simbolo":
            no, h, y = canalRayleighSimbolo(ebno_db, self.bitsxsimb, coderate=self.K/self.N, x=x)
        else:
            raise Exception("Debe seleccionar un canal valido")

        # Bloques de recepcion
        if self.canal == "awgn":
            llr_pre = rx_awgn(y, no)
            llr = deinterleaver(llr_pre)
        else:
            llr_pre = rx_mmse(h, y, no)
            llr = deinterleaver(llr_pre)
        b_hat = self.polar_decodif(llr)

        return b, b_hat

In [None]:
# Se inicializa la clase con los bits congelados obtenidos por Monte Carlo y parametros generales

codigo_polar = CodigoPolar(canal, N, K, bloq_inter, bits_congelados, list_scl, bitsxsimb)

In [None]:
# Ploteo de la tasa de error para Codigo Polar
# En caso de error de memoria: tf.keras.backend.clear_session()
ber_plots.simulate(codigo_polar,
                   ebno_dbs=np.linspace(EBNO_DB_MIN, EBNO_DB_MAX, EBNO_RANGO),
                   batch_size=lote_berplot,
                   num_target_block_errors=400,
                   legend="Codigo Polar",
                   soft_estimates=False,
                   max_mc_iter=25,
                   show_fig=True,
                   forward_keyboard_interrupt=False);