In [None]:
# Import the required libraries
import os # Configure which GPU
if os.getenv("CUDA_VISIBLE_DEVICES") is None:
    gpu_num = 0 # Use "" to use the CPU
    os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_num}"

import sys
import tensorflow as tf
import sionna as sn
import tf2onnx

print(sys.executable)
print("TF:", tf.__version__)
print("Sionna:", sn.__version__)
print("tf2onnx:", tf2onnx.__version__)

In [None]:
# Set the TF log level to only show errors
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [None]:
import numpy as np
import tensorflow as tf

In [None]:
# Import Sionna
import sionna as sn

In [None]:
# For plotting
%matplotlib inline
import matplotlib.pyplot as plt

In [None]:
# For saving complex Python data structures efficiently
import pickle

In [None]:
# For the implementation of the neural receiver
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense

In [None]:
# For ONNX / TensorRT export
# Optional export imports (handled in separate env)
try:
    import tf2onnx
    import onnx
except ImportError:
    tf2onnx = None
    onnx = None


In [None]:
# Set seed for reproducible results
sn.phy.config.seed = 42

In [None]:
# Let's visualize all possible symbols for a 16-QAM
NUM_BITS_PER_SYMBOL = 4
mapper = sn.phy.mapping.Mapper(num_bits_per_symbol=NUM_BITS_PER_SYMBOL,
                           constellation_type="qam")
demapper = sn.phy.mapping.Demapper(demapping_method="app",
                               num_bits_per_symbol=NUM_BITS_PER_SYMBOL,
                               constellation_type="qam")
mapper.constellation.show();

In [None]:
# Binary source to generate uniform i.i.d. bits
binary_source = sn.phy.mapping.BinarySource()

# AWGN channel
awgn_channel = sn.phy.channel.AWGN()

BATCH_SIZE = 128 # How many examples are processed by Sionna in parallel
EBN0_DB = 17.0 # Eb/N0 in dB

no = sn.phy.utils.ebnodb2no(ebno_db=EBN0_DB,
                            num_bits_per_symbol=NUM_BITS_PER_SYMBOL,
                            coderate=1.0) # Coderate set to 1 as we do uncoded transmission here

bits = binary_source([BATCH_SIZE, 1200]) # Blocklength
x = mapper(bits)
y = awgn_channel(x, no)

In [None]:
# Let's visualize the received symbols
plt.figure(figsize=(8,8))
plt.axes().set_aspect(1.0)
plt.grid(True)
plt.scatter(tf.math.real(y), tf.math.imag(y), label='Received')
plt.scatter(tf.math.real(x), tf.math.imag(x), label='Transmitted')
plt.legend(fontsize=20);

In [None]:
bits = np.array([[0,0,0,1]])
print("Original bits:", bits)
x = mapper(bits)
print("Complex-valued symbol:", x.numpy())
# Add some noise
no = 0.05 # Noise variance
y = awgn_channel(x, no)
print("Received noisy symbol: ", y.numpy())
llr = demapper(y, no)
print("LLRs after demapping:", llr.numpy())

In [None]:
import sys
sys.path.append('../../../')

MAX_LINES = 10000000 # Stop after this many imported lines

# Depends on config that was used for data capture
fn_input = '../../../data_acquisition/logs/demapper_in.txt'
fn_output = '../../../data_acquisition/logs/demapper_out.txt'

import os
print(os.path.exists(fn_input))
print(os.getcwd())

# Read training data from data dump
def read_training_data(in_file, element_shape):
    with open(in_file) as f:
        lines = f.readlines()
    result = None
    i = 0
    while i < len(lines):
        try:
            i = lines.index('QAM16\n', i)
        except ValueError as e:
            break
        num = int(lines[i+1])
        #if num > 30:
        #    print(num)
        data = np.fromstring(' '.join(lines[i+2:i+2+num]), sep=' ', dtype=np.int16).reshape(num, *element_shape)
        if result is None:
            result = data
        else:
            result = np.concatenate((result, data))
        i += 2 + num
        if i>MAX_LINES:
            break
    return result

def int16_to_float16(symbols_i):
    return np.ldexp(symbols_i.astype(np.float32), -8).astype(np.float16)

def float16_to_int16(llrs_h):
    return np.rint(np.ldexp(llrs_h.astype(np.float32), 8)).astype(np.int16)

def norm_int16_to_float16(symbols_i, magnitudes):
    args = symbols_i.astype(np.float32)
    if magnitudes is not None:
        args = args / magnitudes.astype(np.float32)
    return np.ldexp(args, 7).astype(np.float16)

data_in = read_training_data(fn_input, (2, 2))
data_out = read_training_data(fn_output, (4,))
assert(data_in.shape[0] == data_out.shape[0])

print("data_in.shape: ", data_in.shape)
print("data_out.shape: ", data_out.shape)

print("First 5 input symbols:", data_in[0:5,:])
print("First 5 output symbols:", data_out[0:5,:])

In [None]:
x = data_in[:10000, ...]

# --- remove divide-by-zero ---
H_MIN = 1e-3
mask_h = (np.abs(x[:,1,0]) > H_MIN) & (np.abs(x[:,1,1]) > H_MIN)
x = x[mask_h]

# OAI uses h as decision region between inner and outer bits
# We scale the received symbols y back such that y_n = y / h
y_n = x[:,0,0]/x[:,1,0] + 1.j*x[:,0,1]/x[:,1,1]

# Normalize to Sionna QAM-16 scale
s = 0.9486833 - 0.3162278
y = y_n * s

# --- remove outliers (minimal, magnitude-based) ---
R_MAX = 1.5
mask_r = np.abs(y) < R_MAX
y = y[mask_r]

# Average power check
avg_power = np.mean(np.abs(y)**2)
print("Avg. power after norm:", avg_power)

# Plot
plt.scatter(y.real, y.imag, label="OAI normalized")
plt.title("Received QAM symbols (OAI)")
plt.ylabel("imaginary part")
plt.xlabel("real part")
plt.gca().set_aspect('equal', adjustable='box')

# Overlay with Sionna constellation
c = sn.phy.mapping.Constellation("qam", 4).points.numpy()
plt.scatter(c.real, c.imag, label="Sionna QAM-16")
plt.legend()




In [None]:
demapper = sn.phy.mapping.Demapper("app", "qam", 4)

y_tf = tf.constant(y, tf.complex64)
# avg_power is signal power ps + no where ps = 1
ps = 1. # Signal power
no = tf.constant(avg_power-ps, tf.float32)

llr = -1* demapper(y_tf, no) # Flip sign due to Sionna's logit definition

# Scale llrs
llr_ = llr.numpy()

llr_ref = np.reshape(llr_.astype(int), (-1,4))

llr_oai = data_out[:llr_ref.shape[0],...]

# And plot distributions
r = np.arange(-100, 100, 1)

for bit_idx in range(4):
    plt.figure()
    plt.hist(llr_ref[:,bit_idx], bins=r, label="Sionna APP")

    plt.hist(llr_oai[:,bit_idx], bins=r, alpha=0.3, label="OAI demapper")
    plt.legend();
    plt.title(f"LLR distribution bit index {bit_idx}")

In [None]:
class NeuralDemapper(Model): # Inherits from Keras Layer

    def __init__(self, num_bits_per_symbol,):
        """Neural demapper using a simple 3-layer MLP

        Inputs
        ------
        [y_i, y_q] or [y_i, y_q, h_i, h_q]
            list of tensors:

        y_i: [batch_size, 1, 1], tf.float32
            Real part of the received IQ symbols.

        y_q: [batch_size, 1, 1], tf.float32
            Imaginary part of the received IQ symbols.

        optional:
        h_i: [batch_size, 1, 1], tf.float32
            Real part of the estimated channel magnitude.

        h_q: [batch_size, 1, 1], tf.float32
            Real part of the estimated channel magnitude.

        Outputs
        -------
        llr: [batch_size, num_bits_per_symbol], tf.float32
            LLRs for each bit of a symbol.

        """
        super().__init__()

        # We are using three layers, this is not optimized but also not needed for this project
        self.dense_1 = Dense(32, 'relu', name='dense_1')
        self.dense_2 = Dense(32, 'relu', input_shape=(32,), name='dense_2')

        # The last layer has no activation and therefore outputs logits, i.e., LLRs
        self.dense_3 = Dense(num_bits_per_symbol, None,
                             input_shape=(32,), name='dense_3')

    def call(self, inputs, training=None):

        # Supports either a list of inputs or already concatenated version
        nn_input = tf.concat(inputs, axis=-1) if isinstance(inputs, list) else inputs
        z = self.dense_1(nn_input, training=training)
        z = self.dense_2(z, training=training)
        llr = self.dense_3(z, training=training) # [batch size, 1, num_bits_per_symbol]

        return llr

In [None]:
NUM_BITS_PER_SYMBOL = 4 # 16-QAM

# Init components
neural_demapper_synthetic = NeuralDemapper(NUM_BITS_PER_SYMBOL)

# Binary classification problem -> train on BCE loss
bce = tf.keras.losses.BinaryCrossentropy(from_logits=True)

# Use ADAM optimizer
optimizer = tf.keras.optimizers.Adam(1e-2)

# Dummy run to init the Keras model
neural_demapper_synthetic([tf.ones((1,1)),tf.ones((1,1))])

neural_demapper_synthetic.summary()

In [None]:
BATCH_SIZE = 1024
NUM_TRAINING_ITERATIONS = 10000

# Scaling factor for QAM symbols to be compatible with the OAI demapper
qam16_threshold_mag = 2 * 0.3162278

@tf.function(jit_compile=False)
def training_step():
    # we train with a randomized noise variance
    no = tf.random.uniform([BATCH_SIZE,1], minval=0., maxval=1.,
                           dtype=tf.float32)
    # Draw random bits
    bits = binary_source([BATCH_SIZE, NUM_BITS_PER_SYMBOL])

    # Map to QAM symbols
    x = mapper(bits)

    # Transmit over Gaussian channel
    y = awgn_channel(x, no)

    # Split real and imaginary part and scale back to OAI
    qxr = tf.math.real(y) * (1.0 / qam16_threshold_mag)
    qxi = tf.math.imag(y) * (1.0 / qam16_threshold_mag)

    llr = neural_demapper_synthetic([qxr, qxi], training=True)
    loss = bce(bits, -llr) # Note: OAI uses flipped logits definition
    return loss

# Training loop
for i in range(NUM_TRAINING_ITERATIONS):
    with tf.GradientTape() as tape:
        loss = training_step()
    gradient = tape.gradient(loss, tape.watched_variables())
    optimizer.apply_gradients(zip(gradient, tape.watched_variables()));
    # Print progress
    if i % 500 == 0:
        print(f"{i}/{NUM_TRAINING_ITERATIONS}  Loss: {loss:.2E}")

In [None]:
bits = np.array([[0,1,1,1]])
print("Original bits:", bits)
x = mapper(bits)
print("Complex-valued symbol:", x.numpy())
# Add some noise
no = tf.constant([[0.4,]]) # Noise variance
y = awgn_channel(x, no)
print("Received noisy symbol: ", y.numpy())
print(y)
llr_ref = demapper(y, no)
llr_nn = -neural_demapper_synthetic([tf.math.real(y) / qam16_threshold_mag,
                                     tf.math.imag(y) / qam16_threshold_mag])
print("LLRs Reference:", llr_ref)
print("LLRs NRX :", llr_nn)

In [None]:
# Save weights for synthetic demapper
fn = "../../models/neural_demapper_weights_y2"

weights = neural_demapper_synthetic.get_weights()
with open(fn, 'wb') as f:
    pickle.dump(weights, f)

In [None]:
###########################################################################
# The following code is only relevant if training is done on captured data!
###########################################################################

# Depends on config that was used for data capture
fn_input = '../../../data_acquisition/logs/demapper_in.txt'
fn_output = '../../../data_acquisition/logs/demapper_out.txt'

# Requires data dump from the previous tutorial
demapper_in_data = read_training_data(fn_input, (2, 2))
demapper_out_data = read_training_data(fn_output, (4,))
assert(demapper_in_data.shape[0] == demapper_out_data.shape[0])

print("Training data size: ", demapper_out_data.shape[0])

In [None]:
# Regression loss for training on captured data
rge = tf.keras.losses.MeanSquaredError()

# Use ADAM optimizer
optimizer = tf.keras.optimizers.Adam(1e-2)

# Init new receiver
neural_demapper_capture = NeuralDemapper(NUM_BITS_PER_SYMBOL)

# Dummy run to init the Keras model
# Note that we now have 4 inputs as we do not scale automatically
neural_demapper_capture([tf.ones((1,4))])

neural_demapper_capture.summary()

In [None]:
@tf.function(jit_compile=False)
def training_step():
    # Train on captured data
    if True:
        i = np.random.randint(0, demapper_in_data.shape[0] - BATCH_SIZE)
        llr_ref = demapper_out_data[i:i+BATCH_SIZE]
        y_no = demapper_in_data[i:i+BATCH_SIZE]

        llr_ref = int16_to_float16(llr_ref)
        y_no = int16_to_float16(y_no)

        llr = neural_demapper_capture(y_no.reshape(-1, 4), training=True)
        loss = rge(llr_ref, llr)
        return loss

# Training loop
for i in range(NUM_TRAINING_ITERATIONS):
    with tf.GradientTape() as tape:
        loss = training_step()
    gradient = tape.gradient(loss, tape.watched_variables())
    optimizer.apply_gradients(zip(gradient, tape.watched_variables()));
    # Print progress
    if i % 500 == 0:
        print(f"{i}/{NUM_TRAINING_ITERATIONS}  Loss: {loss:.2E}")

In [None]:
# Save weights of captured demapper
fn = "../../models/neural_demapper_weights_y4"

weights = neural_demapper_capture.get_weights()
with open(fn, 'wb') as f:
    pickle.dump(weights, f)

In [None]:
synthetic = True

if synthetic:
    num_inputs = 2  # 2xfloat16
else:
    num_inputs = 4  # 4xfloat16

fn = f"../../models/neural_demapper_weights_y{num_inputs}"

# Load weights via pickle
with open(fn, 'rb') as f:
    weights = pickle.load(f)
neural_demapper = NeuralDemapper(NUM_BITS_PER_SYMBOL)

# Dummy run to init layers
neural_demapper([tf.ones((1,num_inputs))])

# And load weights
neural_demapper.set_weights(weights)

In [None]:
bs_max = 512 # max batch size for inference
import onnx

@tf.function(input_signature=input_signature)
#def demapper_func(y):
#    return neural_demapper(y)

def demapper_func(y):
    out = neural_demapper(y)
    if isinstance(out, (tuple, list)):
        out = out[0]
    return tf.identity(out, name="output_1")


# Convert FUNCTION directly (bypasses keras.model issues)
onnx_model, _ = tf2onnx.convert.from_function(
    demapper_func, 
    input_signature=input_signature
)
onnx.checker.check_model(onnx_model)

# And save the ONNX model
onnx.save(onnx_model, f"../../models/neural_demapper.{num_inputs}xfloat16.onnx")

m = onnx.load(f"../../models/neural_demapper.{num_inputs}xfloat16.onnx")

#m.graph.output[0].name = "output_1"

# Rename graph output
old_name = m.graph.output[0].name
m.graph.output[0].name = "output_1"

# Rename any value_info / node outputs that reference it
for node in m.graph.node:
    for i, out in enumerate(node.output):
        if out == old_name:
            node.output[i] = "output_1"

onnx.save(m, f"../../models/neural_demapper.{num_inputs}xfloat16.onnx")



m = onnx.load(f"../../models/neural_demapper.{num_inputs}xfloat16.onnx")

print("=== ONNX OUTPUT TENSORS ===")
for o in m.graph.output:
    print("Output name:", o.name)


# And build trtengine
trt_command = f'/usr/src/tensorrt/bin/trtexec --fp16'
trt_command += f' --onnx=../../models/neural_demapper.{num_inputs}xfloat16.onnx'
trt_command += f' --saveEngine=../../models/neural_demapper.{num_inputs}xfloat16.plan'
trt_command += f' --preview=+profileSharing0806'
trt_command += f' --inputIOFormats=fp16:chw'
trt_command += f' --outputIOFormats=fp16:chw'

trt_command += f" --minShapes=y:1x{num_inputs}"
trt_command += f" --optShapes=y:{64}x{num_inputs}"
trt_command += f" --maxShapes=y:{bs_max}x{num_inputs}"

# And run the command
os.system(trt_command)