In [1]:
import sys
import os

# Get the parent directory of the notebook (project root)
project_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

from dotenv import load_dotenv

load_dotenv()

In [2]:
import os

if os.getenv("CUDA_VISIBLE_DEVICES") is None:
    gpu_num = 0  # Use "" to use the CPU
    os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_num}"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

# Import Sionna
try:
    import sionna
except ImportError as e:
    # Install Sionna if package is not already installed
    import os

    os.system("pip install sionna")
    import sionna

# Configure the notebook to use only a single GPU and allocate only as much memory as needed
# For more details, see https://www.tensorflow.org/guide/gpu
import tensorflow as tf

gpus = tf.config.list_physical_devices("GPU")
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)
tf.get_logger().setLevel("ERROR")

# Set random seed for reproducibility
sionna.config.seed = 42

In [3]:
from src.data.response import response_time_domain, response_freqency_domain
from src.channels.channel_est.ls_channel import ChannelEstimator
from src.data.binary_sources import binary_sources
from src.data.qam_mapper import qam_mapper
from src.ldpc.ldpc_encoder import ldpc_encoder
from src.ofdm.ofdm_demodulation import ofdm_demodulation
from src.ofdm.ofdm_modulation import ofdm_modulation
from src.ofdm.ofdm_resource_grids import resource_grid_mapper
from src.channels.cdl_channel import (
    channel_time_domain,
    channel_frequency_domain,
    sampling_channel_freq,
)
from src.settings.config import (
    batch_size,
    bits_per_symbol,
    num_streams_per_tx,
    num_tx,
    number_of_bits,
)

# generate binary values
from src.channels.lmmse_equalizer import lmmse_equalizer
from src.data.qam_demapper import qam_demapper
from src.ldpc.ldpc_decoder import ldpc_decoder
from src.evals.ber import ber
from src.utils.plots import plot_symbols, plot_channel_frequency_domain

In [4]:
binary_values = binary_sources([batch_size, num_tx, num_streams_per_tx, number_of_bits])

# encode binary values with LDPC
encoded_binary_values = ldpc_encoder(binary_values)

# mapping binary values to QAM symbols
qam_symbols = qam_mapper(encoded_binary_values)

# mapping QAM symbols to resource grid
mapped_qam_symbol = resource_grid_mapper(qam_symbols)

# modulating QAM symbols from frequency domain to time domain
modulated_qam_symbols = ofdm_modulation(mapped_qam_symbol)

h_time = channel_time_domain()
# response signal on time domain via CDL channel model + AWGN noise
response_symbols = response_time_domain(modulated_qam_symbols, h_time)

# demodulating response signal from time domain to frequency domain
demodulated_response_symbols = ofdm_demodulation(response_symbols)

# channel estimation
channel_estimator = ChannelEstimator(interpolation_factor="nn")
h_est, err_var = channel_estimator.estimate(demodulated_response_symbols)
# plot_channel_frequency_domain(
#    h_est[0, 0, 0, 0, 0].numpy().real.T,
# )


mapped_qam_symbol_hat, no_eff = lmmse_equalizer(
    demodulated_response_symbols, h_est, err_var
)

binary_values_hat = qam_demapper(mapped_qam_symbol_hat, bits_per_symbol, no_eff)

decoded_binary_values_hat = ldpc_decoder(binary_values_hat)

error_rate = ber(binary_values, decoded_binary_values_hat)

print(f"BER: {error_rate}")

BER: 0.0


In [5]:
binary_values = binary_sources([batch_size, num_tx, num_streams_per_tx, number_of_bits])

# encode binary values with LDPC
encoded_binary_values = ldpc_encoder(binary_values)

# mapping binary values to QAM symbols
qam_symbols = qam_mapper(encoded_binary_values)

# mapping QAM symbols to resource grid
mapped_qam_symbol = resource_grid_mapper(qam_symbols)

# modulating QAM symbols from frequency domain to time domain
modulated_qam_symbols = ofdm_modulation(mapped_qam_symbol)

h_time = channel_time_domain()
# response signal on time domain via CDL channel model + AWGN noise
response_symbols = response_time_domain(modulated_qam_symbols, h_time)

# demodulating response signal from time domain to frequency domain
demodulated_response_symbols = ofdm_demodulation(response_symbols)

# channel estimation
channel_estimator = ChannelEstimator(interpolation_factor="lin")
h_est, err_var = channel_estimator.estimate(demodulated_response_symbols)
# plot_channel_frequency_domain(
#    h_est[0, 0, 0, 0, 0].numpy().real.T,
# )


mapped_qam_symbol_hat, no_eff = lmmse_equalizer(
    demodulated_response_symbols, h_est, err_var
)

binary_values_hat = qam_demapper(mapped_qam_symbol_hat, bits_per_symbol, no_eff)

decoded_binary_values_hat = ldpc_decoder(binary_values_hat)

error_rate = ber(binary_values, decoded_binary_values_hat)

print(f"BER: {error_rate}")

BER: 0.0


In [6]:
binary_values = binary_sources([batch_size, num_tx, num_streams_per_tx, number_of_bits])

# encode binary values with LDPC
encoded_binary_values = ldpc_encoder(binary_values)

# mapping binary values to QAM symbols
qam_symbols = qam_mapper(encoded_binary_values)

# mapping QAM symbols to resource grid
mapped_qam_symbol = resource_grid_mapper(qam_symbols)

# modulating QAM symbols from frequency domain to time domain
modulated_qam_symbols = ofdm_modulation(mapped_qam_symbol)

h_time = channel_time_domain()
# response signal on time domain via CDL channel model + AWGN noise
response_symbols = response_time_domain(modulated_qam_symbols, h_time)

# demodulating response signal from time domain to frequency domain
demodulated_response_symbols = ofdm_demodulation(response_symbols)

# channel estimation
channel_estimator = ChannelEstimator(interpolation_factor="lmmse")
h_est, err_var = channel_estimator.estimate(demodulated_response_symbols)
# plot_channel_frequency_domain(
#    h_est[0, 0, 0, 0, 0].numpy().real.T,
# )


mapped_qam_symbol_hat, no_eff = lmmse_equalizer(
    demodulated_response_symbols, h_est, err_var
)

binary_values_hat = qam_demapper(mapped_qam_symbol_hat, bits_per_symbol, no_eff)

decoded_binary_values_hat = ldpc_decoder(binary_values_hat)

error_rate = ber(binary_values, decoded_binary_values_hat)

print(f"BER: {error_rate}")

I0000 00:00:1744682955.641268       1 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


BER: 0.0


In [7]:
# generate binary values
binary_values = binary_sources([batch_size, num_tx, num_streams_per_tx, number_of_bits])

# encode binary values with LDPC
encoded_binary_values = ldpc_encoder(binary_values)

# mapping binary values to QAM symbols
qam_symbols = qam_mapper(encoded_binary_values)

# mapping QAM symbols to resource grid
mapped_qam_symbol = resource_grid_mapper(qam_symbols)

# Channel frequency domain
h_freq = sampling_channel_freq()
h_freq = channel_frequency_domain()

# response signal on frequency domain via CDL channel model + AWGN noise
response_symbols = response_freqency_domain(mapped_qam_symbol, h_freq)


# channel estimation
channel_estimator = ChannelEstimator(interpolation_factor="nn")
h_est, err_var = channel_estimator.estimate(response_symbols)
# plot_channel_frequency_domain(
#     h_est[0, 0, 0, 0, 0].numpy().real.T,
# )

mapped_qam_symbol_hat, no_eff = lmmse_equalizer(response_symbols, h_est, err_var)

binary_values_hat = qam_demapper(mapped_qam_symbol_hat, bits_per_symbol, no_eff)

decoded_binary_values_hat = ldpc_decoder(binary_values_hat)

error_rate = ber(binary_values, decoded_binary_values_hat)

print(f"BER: {error_rate}")

BER: 0.0


In [8]:
# generate binary values
binary_values = binary_sources([batch_size, num_tx, num_streams_per_tx, number_of_bits])

# encode binary values with LDPC
encoded_binary_values = ldpc_encoder(binary_values)

# mapping binary values to QAM symbols
qam_symbols = qam_mapper(encoded_binary_values)

# mapping QAM symbols to resource grid
mapped_qam_symbol = resource_grid_mapper(qam_symbols)

# Channel frequency domain
h_freq = sampling_channel_freq()
h_freq = channel_frequency_domain()

# response signal on frequency domain via CDL channel model + AWGN noise
response_symbols = response_freqency_domain(mapped_qam_symbol, h_freq)


# channel estimation
channel_estimator = ChannelEstimator(interpolation_factor="lin")
h_est, err_var = channel_estimator.estimate(response_symbols)
# plot_channel_frequency_domain(
#     h_est[0, 0, 0, 0, 0].numpy().real.T,
# )

mapped_qam_symbol_hat, no_eff = lmmse_equalizer(response_symbols, h_est, err_var)

binary_values_hat = qam_demapper(mapped_qam_symbol_hat, bits_per_symbol, no_eff)

decoded_binary_values_hat = ldpc_decoder(binary_values_hat)

error_rate = ber(binary_values, decoded_binary_values_hat)

print(f"BER: {error_rate}")

BER: 0.0


In [9]:
# generate binary values
binary_values = binary_sources([batch_size, num_tx, num_streams_per_tx, number_of_bits])

# encode binary values with LDPC
encoded_binary_values = ldpc_encoder(binary_values)

# mapping binary values to QAM symbols
qam_symbols = qam_mapper(encoded_binary_values)

# mapping QAM symbols to resource grid
mapped_qam_symbol = resource_grid_mapper(qam_symbols)

# Channel frequency domain
h_freq = sampling_channel_freq()
h_freq = channel_frequency_domain()

# response signal on frequency domain via CDL channel model + AWGN noise
response_symbols = response_freqency_domain(mapped_qam_symbol, h_freq)


# channel estimation
channel_estimator = ChannelEstimator(interpolation_factor="lmmse")
h_est, err_var = channel_estimator.estimate(response_symbols)
# plot_channel_frequency_domain(
#     h_est[0, 0, 0, 0, 0].numpy().real.T,
# )

mapped_qam_symbol_hat, no_eff = lmmse_equalizer(response_symbols, h_est, err_var)

binary_values_hat = qam_demapper(mapped_qam_symbol_hat, bits_per_symbol, no_eff)

decoded_binary_values_hat = ldpc_decoder(binary_values_hat)

error_rate = ber(binary_values, decoded_binary_values_hat)

print(f"BER: {error_rate}")

BER: 0.0


In [10]:
from src.ml.utils import compute_mean_std, reshape_data, reversed_reshape_data
from src.channels.channel_est.ml_channel import VAE
from src.settings.config import (
    num_tx,
    num_rx,
    num_bs_ant,
    num_ut_ant,
    num_ofdm_symbols,
    num_effective_subcarriers,
    speed,
    ebno_db,
)
from src.settings.ml import device, number_of_samples
import numpy as np
import torch
from src.ml.gen_data import get_pilot_matrix
from src.ml.transform import MinMaxScaler4D


scaler = MinMaxScaler4D()
root_dir = ""


def set_up_model():
    src_dir = f"txant_{num_ut_ant}_rxant_{num_bs_ant}_speed_{speed}_samples_{number_of_samples}_ebno_0"
    h_freqs = np.load(f"{root_dir}/data/{src_dir}/h_freqs.npy")

    h_freqs = reshape_data(h_freqs)

    h_freqs = scaler.fit_transform(h_freqs)

    mu_h, std_h = compute_mean_std(h_freqs)
    mu_h = mu_h.to(device)
    std_h = std_h.to(device)

    vae = VAE(
        h_freqs.shape[1:],
        mu_h,
        std_h,
    )

    vae.load_state_dict(
        torch.load(
            f"{root_dir}/results/checkpoints/{src_dir}/vae_fold_1_best.pth",
            map_location=device,
        )["state_dict"]
    )
    vae = vae.to(device)
    vae.eval()

    return vae

In [11]:
binary_values = binary_sources([batch_size, num_tx, num_streams_per_tx, number_of_bits])

# encode binary values with LDPC
encoded_binary_values = ldpc_encoder(binary_values)

# mapping binary values to QAM symbols
qam_symbols = qam_mapper(encoded_binary_values)

# mapping QAM symbols to resource grid
mapped_qam_symbol = resource_grid_mapper(qam_symbols)

# Channel frequency domain
h_freq = channel_frequency_domain()

# response signal on frequency domain via CDL channel model + AWGN noise
response_symbols = response_freqency_domain(mapped_qam_symbol, h_freq)


# channel estimation
vae = set_up_model()
pilot_matrix = get_pilot_matrix(response_symbols)
pilot_matrix = reshape_data(pilot_matrix)
pilot_matrix = scaler.fit_transform(pilot_matrix, False).to(device)

h_est_hat, *values = vae(pilot_matrix)

h_est_hat = scaler.inverse_transform(h_est_hat.cpu().detach())
h_est_hat = reversed_reshape_data(h_est_hat)

mapped_qam_symbol_hat, no_eff = lmmse_equalizer(response_symbols, h_est_hat, 0)

binary_values_hat = qam_demapper(mapped_qam_symbol_hat, bits_per_symbol, no_eff)

decoded_binary_values_hat = ldpc_decoder(binary_values_hat)

error_rate = ber(binary_values, decoded_binary_values_hat)

print(f"BER: {error_rate}")

BER: 0.0


In [12]:
binary_values = binary_sources([batch_size, num_tx, num_streams_per_tx, number_of_bits])

# encode binary values with LDPC
encoded_binary_values = ldpc_encoder(binary_values)

# mapping binary values to QAM symbols
qam_symbols = qam_mapper(encoded_binary_values)

# mapping QAM symbols to resource grid
mapped_qam_symbol = resource_grid_mapper(qam_symbols)

# modulating QAM symbols from frequency domain to time domain
modulated_qam_symbols = ofdm_modulation(mapped_qam_symbol)

h_time = channel_time_domain()
# response signal on time domain via CDL channel model + AWGN noise
response_symbols = response_time_domain(modulated_qam_symbols, h_time)

# demodulating response signal from time domain to frequency domain
demodulated_response_symbols = ofdm_demodulation(response_symbols)


# channel estimation
vae = set_up_model()
pilot_matrix = get_pilot_matrix(demodulated_response_symbols)
pilot_matrix = reshape_data(pilot_matrix)
pilot_matrix = scaler.fit_transform(pilot_matrix, False).to(device)

h_est_hat, *values = vae(pilot_matrix)
h_est_hat = scaler.inverse_transform(h_est_hat.cpu().detach())
h_est_hat = reversed_reshape_data(h_est_hat)

mapped_qam_symbol_hat, no_eff = lmmse_equalizer(
    demodulated_response_symbols, h_est_hat, 0
)

binary_values_hat = qam_demapper(mapped_qam_symbol_hat, bits_per_symbol, no_eff)

decoded_binary_values_hat = ldpc_decoder(binary_values_hat)

error_rate = ber(binary_values, decoded_binary_values_hat)

print(f"BER: {error_rate}")

BER: 0.0
