In [4]:
# Imports & Basics

# Import TensorFlow and NumPy
import tensorflow as tf
import numpy as np

# Import Sionna
try:
    import sionna as sn
except ImportError as e:
    # Install Sionna if package is not already installed
    import os
    os.system("pip install sionna")
    import sionna as sn

# For plotting
%matplotlib inline
# also try %matplotlib widget

import matplotlib.pyplot as plt

# for performance measurements
import time

# For the implementation of the Keras models
from tensorflow.keras import Model

from sionna.mapping import Mapper, Constellation

In [7]:
def plot_ser(snr_db,
             ser,
             legend="",
             ylabel="SER",
             title="Symbol Error Rate",
             ebno=True,
             #is_bler=None,
             xlim=None,
             ylim=None,
             save_fig=False,
             path=""):
    """Plot error-rates.

    Input
    -----
    snr_db: ndarray
        Array of floats defining the simulated SNR points.
        Can be also a list of multiple arrays.

    ser: ndarray
        Array of floats defining the SER per SNR point.
        Can be also a list of multiple arrays.

    legend: str
        Defaults to "". Defining the legend entries. Can be
        either a string or a list of strings.

    ylabel: str
        Defaults to "SER". Defining the y-label.

    title: str
        Defaults to "Symbol Error Rate". Defining the title of the figure.

    ebno: bool
        Defaults to True. If True, the x-label is set to
        "EbNo [dB]" instead of "EsNo [dB]".

    #is_bler: bool
        #Defaults to False. If True, the corresponding curve is dashed.

    xlim: tuple of floats
        Defaults to None. A tuple of two floats defining x-axis limits.

    ylim: tuple of floats
        Defaults to None. A tuple of two floats defining y-axis limits.

    save_fig: bool
        Defaults to False. If True, the figure is saved as `.png`.

    path: str
        Defaults to "". Defining the path to save the figure
        (iff ``save_fig`` is True).

    Output
    ------
        (fig, ax) :
            Tuple:

        fig : matplotlib.figure.Figure
            A matplotlib figure handle.

        ax : matplotlib.axes.Axes
            A matplotlib axes object.
    """

    # legend must be a list or string
    if not isinstance(legend, list):
        assert isinstance(legend, str)
        legend = [legend]

    assert isinstance(title, str), "title must be str."

    # broadcast snr if ber is list
    if isinstance(ser, list):
        if not isinstance(snr_db, list):
            snr_db = [snr_db]*len(ser)

#    # check that is_bler is list of same size and contains only bools
#    if is_bler is None:
#        if isinstance(ber, list):
#            is_bler = [False] * len(ber) # init is_bler as list with False
#        else:
#            is_bler = False
#    else:
#        if isinstance(is_bler, list):
#            assert (len(is_bler) == len(ber)), "is_bler has invalid size."
#        else:
#            assert isinstance(is_bler, bool), \
#                "is_bler must be bool or list of bool."
#            is_bler = [is_bler] # change to list

    # tile snr_db if not list, but ser is list

    fig, ax = plt.subplots(figsize=(16,10))

    plt.xticks(fontsize=18)
    plt.yticks(fontsize=18)

    if xlim is not None:
        plt.xlim(xlim)
    if ylim is not None:
        plt.ylim(ylim)

    plt.title(title, fontsize=25)
    # return figure handle
    if isinstance(ser, list):
        for idx, b in enumerate(ser):
#            if is_bler[idx]:
#                line_style = "--"
#            else:
            line_style = ""
            plt.semilogy(snr_db[idx], b, line_style, linewidth=2)
    else:
#        if is_bler:
#            line_style = "--"
#        else:
        line_style = ""
        plt.semilogy(snr_db, ber, line_style, linewidth=2)

    plt.grid(which="both")
    if ebno:
        plt.xlabel(r"$E_b/N_0$ (dB)", fontsize=25)
    else:
        plt.xlabel(r"$E_s/N_0$ (dB)", fontsize=25)
    plt.ylabel(ylabel, fontsize=25)
    plt.legend(legend, fontsize=20)
    if save_fig:
        plt.savefig(path)
        plt.close(fig)
    else:
        #plt.close(fig)
        pass
    return fig, ax

In [8]:
def simulate_ser(self,
                 mc_fun,
                 ebno_dbs,
                 batch_size,
                 max_mc_iter,
                 legend="",
                 add_ser=True,
#                 add_bler=False,
                 soft_estimates=False,
                 num_target_symbol_errors=None,
#                 num_target_block_errors=None,
                 early_stop=True,
                 graph_mode=None,
                 add_results=True,
                 forward_keyboard_interrupt=True,
                 show_fig=True,
                 verbose=True):
        """Simulate SER curves for given Keras model and saves the results.

        Internally calls :class:`sionna.utils.sim_ser`.

        Input
        -----
        mc_fun:
            Callable that yields the transmitted bits `b` and the
            receiver's estimate `b_hat` for a given ``batch_size`` and
            ``ebno_db``. If ``soft_estimates`` is True, b_hat interpreted as
            logit.

        ebno_dbs: ndarray of floats
            SNR points to be evaluated.

        batch_size: tf.int32
            Batch-size for evaluation.

        max_mc_iter: int
            Max. number of Monte-Carlo iterations per SNR point.

        legend: str
            Name to appear in legend.

        add_ser: bool
            Defaults to True. Indicate if SER should be added to plot.

#        add_bler: bool
#            Defaults to False. Indicate if BLER should be added
#            to plot.

        soft_estimates: bool
            A boolean, defaults to False. If True, ``s_hat``
            is interpreted as logit and additional hard-decision is applied
            internally.

        num_target_symbol_errors: int
            Target number of symbol errors per SNR point until the simulation
            stops.

#        num_target_block_errors: int
#            Target number of block errors per SNR point until the simulation
#            stops.

        early_stop: bool
            A boolean defaults to True. If True, the simulation stops after the
            first error-free SNR point (i.e., no error occurred after
            ``max_mc_iter`` Monte-Carlo iterations).

        graph_mode: One of ["graph", "xla"], str
            A string describing the execution mode of ``mc_fun``.
            Defaults to `None`. In this case, ``mc_fun`` is executed as is.

        add_results: bool
            Defaults to True. If True, the simulation results will be appended
            to the internal list of results.

        show_fig: bool
            Defaults to True. If True, a BER figure will be plotted.

        verbose: bool
            A boolean defaults to True. If True, the current progress will be
            printed.

        forward_keyboard_interrupt: bool
            A boolean defaults to True. If False, `KeyboardInterrupts` will be
            catched internally and not forwarded (e.g., will not stop outer
            loops). If False, the simulation ends and returns the intermediate
            simulation results.

        Output
        ------
        (ser):
            Tuple:

        ser: float
            The simulated bit-error rate.

#        bler: float
#            The simulated block-error rate.
        """

        ser = sim_ser(mc_fun,
                    ebno_dbs,
                    batch_size,
                    soft_estimates=soft_estimates,
                    max_mc_iter=max_mc_iter,
                    num_target_symbol_errors=num_target_symbol_errors,
#                    num_target_block_errors=num_target_block_errors,
                    early_stop=early_stop,
                    graph_mode=graph_mode,
                    verbose=verbose,
                forward_keyboard_interrupt=forward_keyboard_interrupt)

        if add_ser:
            self._sers += [ser]
            self._snrs +=  [ebno_dbs]
            self._legends += [legend]
#            self._is_bler += [False]

#        if add_bler:
#            self._bers += [bler]
#            self._snrs +=  [ebno_dbs]
#            self._legends += [legend + " (BLER)"]
#            self._is_bler += [True]

        if show_fig:
            self()

        # remove current curve if add_results=False
        if add_results is False:
#            if add_bler:
#                self.remove(-1)
            if add_ser:
                self.remove(-1)

        return ser

In [12]:
def sim_ser(mc_fun,
            ebno_dbs,
            batch_size,
            max_mc_iter,
            soft_estimates=False,
            num_target_symbol_errors=None,
#            num_target_block_errors=None,
            early_stop=True,
            graph_mode=None,
            verbose=True,
            forward_keyboard_interrupt=True,
            dtype=tf.complex64):
    """Simulates until target number of errors is reached and returns SER.

    The simulation continues with the next SNR point if
    ``num_target_symbol_errors`` symbol errors
    is achieved. Further, it continues with the next SNR point after
    ``max_mc_iter`` batches of size ``batch_size`` have been simulated.

    Input
    -----
    mc_fun:
        Callable that yields the transmitted bits `b` and the
        receiver's estimate `b_hat` for a given ``batch_size`` and
        ``ebno_db``. If ``soft_estimates`` is True, b_hat is interpreted as
        logit.

    ebno_dbs: tf.float32
        A tensor containing SNR points to be evaluated.

    batch_size: tf.int32
        Batch-size for evaluation.

    max_mc_iter: tf.int32
        Max. number of Monte-Carlo iterations per SNR point.

    soft_estimates: bool
        A boolean, defaults to False. If True, `s_hat``
        is interpreted as logit and an additional hard-decision is applied
        internally.

    num_target_symbol_errors: tf.int32
        Defaults to None. Target number of symbol errors per SNR point until
        the simulation continues to next SNR point.

#    num_target_block_errors: tf.int32
#        Defaults to None. Target number of block errors per SNR point
#        until the simulation continues

    early_stop: bool
        A boolean defaults to True. If True, the simulation stops after the
        first error-free SNR point (i.e., no error occurred after
        ``max_mc_iter`` Monte-Carlo iterations).

    graph_mode: One of ["graph", "xla"], str
        A string describing the execution mode of ``mc_fun``.
        Defaults to `None`. In this case, ``mc_fun`` is executed as is.

    verbose: bool
        A boolean defaults to True. If True, the current progress will be
        printed.

    forward_keyboard_interrupt: bool
        A boolean defaults to True. If False, KeyboardInterrupts will be
        catched internally and not forwarded (e.g., will not stop outer loops).
        If False, the simulation ends and returns the intermediate simulation
        results.

    dtype: tf.complex64
        Datatype of the model / function to be used (``mc_fun``).

    Output
    ------
    (ser) :
        Tuple:

    ser: tf.float32
        The symbol-error rate.

#    bler: tf.float32
#        The block-error rate.

    Raises
    ------
    AssertionError
        If ``soft_estimates`` is not bool.

    AssertionError
        If ``dtype`` is not `tf.complex`.

    Note
    ----
    This function is implemented based on tensors to allow
    full compatibility with tf.function(). However, to run simulations
    in graph mode, the provided ``mc_fun`` must use the `@tf.function()`
    decorator.

    """

    # utility function to print progress
    def _print_progress(is_final, rt, idx_snr, idx_it, header_text=None):
        """Print summary of current simulation progress.

        Input
        -----
        is_final: bool
            A boolean. If True, the progress is printed into a new line.
        rt: float
            The runtime of the current SNR point in seconds.
        idx_snr: int
            Index of current SNR point.
        idx_it: int
            Current iteration index.
        header_text: list of str
            Elements will be printed instead of current progress, iff not None.
            Can be used to generate table header.
        """
        # set carriage return if not final step
        if is_final:
            end_str = "\n"
        else:
            end_str = "\r"

        # prepare to print table header
        if header_text is not None:
            row_text = header_text
            end_str = "\n"
        else:
            # calculate intermediate ber / bler
            ser_np = (tf.cast(symbol_errors[idx_snr], tf.float64)
                        / tf.cast(nb_symbols[idx_snr], tf.float64)).numpy()
            ser_np = np.nan_to_num(ser_np) # avoid nan for first point
#            bler_np = (tf.cast(block_errors[idx_snr], tf.float64)
#                        / tf.cast(nb_blocks[idx_snr], tf.float64)).numpy()
#            bler_np = np.nan_to_num(bler_np) # avoid nan for first point

            # load statuslevel
            # print current iter if simulation is still running
            if status[idx_snr]==0:
                status_txt = f"iter: {idx_it:.0f}/{max_mc_iter:.0f}"
            else:
                status_txt = status_levels[int(status[idx_snr])]

            # generate list with all elements to be printed
            row_text = [str(np.round(ebno_dbs[idx_snr].numpy(), 3)),
                        f"{ser_np:.4e}",
#                        f"{bler_np:.4e}",
                        np.round(symbol_errors[idx_snr].numpy(), 0),
                        np.round(nb_symbols[idx_snr].numpy(), 0),
#                        np.round(block_errors[idx_snr].numpy(), 0),
#                        np.round(nb_blocks[idx_snr].numpy(), 0),
                        np.round(rt, 1),
                        status_txt]

        # pylint: disable=line-too-long, consider-using-f-string
        print("{: >9} |{: >11} |{: >12} |{: >12} |{: >12} |{: >10}".format(*row_text), end=end_str)


     # init table headers
    header_text = ["EbNo [dB]", "SER", "symbol errors",
                   "num bits",
                   "runtime [s]", "status"]

    # replace status by text
    status_levels = ["not simulated", # status=0
            "reached max iter       ", # status=1; spacing for impr. layout
            "no errors - early stop", # status=2
            "reached target symbol errors"] # status=3
#            "reached target block errors"] # status=4

    # check inputs for consistency
    assert isinstance(early_stop, bool), "early_stop must be bool."
    assert isinstance(soft_estimates, bool), "soft_estimates must be bool."
    assert dtype.is_complex, "dtype must be a complex type."
    assert isinstance(verbose, bool), "verbose must be bool."

    if graph_mode is None:
        graph_mode="default" # applies default graph mode
    assert isinstance(graph_mode, str), "graph_mode must be str."

    if graph_mode=="default":
        pass # nothing to do
    elif graph_mode=="graph":
        # avoid retracing -> check if mc_fun is already a function
        if not isinstance(mc_fun, tf.types.experimental.GenericFunction):
            mc_fun = tf.function(mc_fun,
                                 jit_compile=False,
                                 experimental_follow_type_hints=True)
    elif graph_mode=="xla":
        # avoid retracing -> check if mc_fun is already a function
        if not isinstance(mc_fun, tf.types.experimental.GenericFunction) or \
           not mc_fun.function_spec.jit_compile:
            mc_fun = tf.function(mc_fun,
                                 jit_compile=True,
                                 experimental_follow_type_hints=True)
    else:
        raise TypeError("Unknown graph_mode selected.")

    ebno_dbs = tf.cast(ebno_dbs, dtype.real_dtype)
    batch_size = tf.cast(batch_size, tf.int32)
    num_points = tf.shape(ebno_dbs)[0]
    symbol_errors = tf.Variable(   tf.zeros([num_points], dtype=tf.int64),
                            dtype=tf.int64)
#    block_errors = tf.Variable(  tf.zeros([num_points], dtype=tf.int64),
#                            dtype=tf.int64)
    nb_symbols = tf.Variable(  tf.zeros([num_points], dtype=tf.int64),
                            dtype=tf.int64)
#    nb_blocks = tf.Variable(  tf.zeros([num_points], dtype=tf.int64),
#                            dtype=tf.int64)

    # track status of simulation (early termination etc.)
    status = np.zeros(num_points)

    # measure runtime per SNR point
    runtime = np.zeros(num_points)

    # ensure num_target_errors is a tensor
    if num_target_symbol_errors is not None:
        num_target_symbol_errors = tf.cast(num_target_symbol_errors, tf.int64)
#    if num_target_block_errors is not None:
#        num_target_block_errors = tf.cast(num_target_block_errors, tf.int64)

    try:
        # simulate until a target number of errors is reached
        for i in tf.range(num_points):
            runtime[i] = time.perf_counter() # save start time
            iter_count = -1 # for print in verbose mode
            for ii in tf.range(max_mc_iter):

                iter_count += 1

                outputs = mc_fun(batch_size=batch_size, ebno_db=ebno_dbs[i])

                # assume first and second return value is b and b_hat
                # other returns are ignored
                b = outputs[0]
                b_hat = outputs[1]

                if soft_estimates:
                    b_hat = hard_decisions(b_hat)

                # count symbol errors
		            s = Mapper(b)
		            s_hat = Mapper(b_hat)
                symbol_e = count_errors(s, s_hat)
#                block_e = count_block_errors(b, b_hat)

                # count total number of bits
                symbol_n = tf.size(s)
#                block_n = tf.size(b[...,-1])

                # update variables
                symbol_errors = tf.tensor_scatter_nd_add(  symbol_errors, [[i]],
                                                    tf.cast([symbol_e], tf.int64))
#                block_errors = tf.tensor_scatter_nd_add(  block_errors, [[i]],
#                                                tf.cast([block_e], tf.int64))
                nb_symbols = tf.tensor_scatter_nd_add( nb_symbols, [[i]],
                                                    tf.cast([symbol_n], tf.int64))
#                nb_blocks = tf.tensor_scatter_nd_add( nb_blocks, [[i]],
#                                                tf.cast([block_n], tf.int64))

                # print progress summary
                if verbose:
                    # print summary header during first iteration
                    if i==0 and iter_count==0:
                        _print_progress(is_final=True,
                                        rt=0,
                                        idx_snr=0,
                                        idx_it=0,
                                        header_text=header_text)
                        # print seperator after headline
                        print('-' * 135)

                    # evaluate current runtime
                    rt = time.perf_counter() - runtime[i]
                    # print current progress
                    _print_progress(is_final=False, idx_snr=i, idx_it=ii, rt=rt)


                # symbol-error based stopping cond.
                if num_target_symbol_errors is not None:
                    if tf.greater_equal(symbol_errors[i], num_target_symbol_errors):
                        status[i] = 3 # change internal status for summary
                        # stop runtime timer
                        runtime[i] = time.perf_counter() - runtime[i]
                        break # enough errors for SNR point have been simulated

#                # block-error based stopping cond.
#                if num_target_block_errors is not None:
#                    if tf.greater_equal(block_errors[i],
#                                        num_target_block_errors):
#                        # stop runtime timer
#                        runtime[i] = time.perf_counter() - runtime[i]
#                        status[i] = 4 # change internal status for summary
#                        break # enough errors for SNR point have been simulated

                # max iter have been reached -> continue with next SNR point
                if iter_count==max_mc_iter-1: # all iterations are done
                    # stop runtime timer
                    runtime[i] = time.perf_counter() - runtime[i]
                    status[i] = 1 # change internal status for summary

            # print results again AFTER last iteration / early stop (new status)
            if verbose:
                _print_progress(is_final=True,
                                idx_snr=i,
                                idx_it=iter_count,
                                rt=runtime[i])

            # early stop if no error occurred
            if early_stop: # only if early stop is active
                if block_errors[i]==0:
                    status[i] = 2 # change internal status for summary
                    if verbose:
                        print("\nSimulation stopped as no error occurred " \
                              f"@ EbNo = {ebno_dbs[i].numpy():.1f} dB.\n")
                    break

    # Stop if KeyboardInterrupt is detected and set remaining SNR points to -1
    except KeyboardInterrupt as e:

        # Raise Interrupt again to stop outer loops
        if forward_keyboard_interrupt:
            raise e

        print("\nSimulation stopped by the user " \
              f"@ EbNo = {ebno_dbs[i].numpy()} dB")
        # overwrite remaining SER positions with -1
        for idx in range(i+1, num_points):
            symbol_errors = tf.tensor_scatter_nd_update( symbol_errors, [[idx]],
                                                    tf.cast([-1], tf.int64))
#            block_errors = tf.tensor_scatter_nd_update( block_errors, [[idx]],
#                                                    tf.cast([-1], tf.int64))
            nb_symbols = tf.tensor_scatter_nd_update( nb_symbols, [[idx]],
                                                    tf.cast([1], tf.int64))
#            nb_blocks = tf.tensor_scatter_nd_update( nb_blocks, [[idx]],
#                                                    tf.cast([1], tf.int64))

    # calculate SER
    ser = tf.cast(symbol_errors, tf.float64) / tf.cast(nb_symbols, tf.float64)
#    bler = tf.cast(block_errors, tf.float64) / tf.cast(nb_blocks, tf.float64)

    # replace nans (from early stop)
    ser = tf.where(tf.math.is_nan(ser), tf.zeros_like(ser), ser)
#    bler = tf.where(tf.math.is_nan(bler), tf.zeros_like(bler), bler)

    return ser

TabError: ignored

In [5]:
class UncodedSystemAWGN(Model): # Inherits from Keras Model
    def __init__(self, num_bits_per_symbol, block_length):
        """
        A keras model of an uncoded transmission over the AWGN channel.

        Parameters
        ----------
        num_bits_per_symbol: int
            The number of bits per constellation symbol, e.g., 4 for QAM16.

        block_length: int
            The number of bits per transmitted message block (will be the codeword length later).

        Input
        -----
        batch_size: int
            The batch_size of the Monte-Carlo simulation.

        ebno_db: float
            The `Eb/No` value (=rate-adjusted SNR) in dB.

        Output
        ------
        (bits, llr):
            Tuple:

        bits: tf.float32
            A tensor of shape `[batch_size, block_length] of 0s and 1s
            containing the transmitted information bits.

        llr: tf.float32
            A tensor of shape `[batch_size, block_length] containing the
            received log-likelihood-ratio (LLR) values.
        """

        super().__init__() # Must call the Keras model initializer

        self.num_bits_per_symbol = num_bits_per_symbol
        self.block_length = block_length
        self.constellation = sn.mapping.Constellation("qam", self.num_bits_per_symbol)
        self.mapper = sn.mapping.Mapper(constellation=self.constellation)
        self.demapper = sn.mapping.Demapper("app", constellation=self.constellation)
        self.binary_source = sn.utils.BinarySource()
        self.awgn_channel = sn.channel.AWGN()

    # @tf.function # Enable graph execution to speed things up
    def __call__(self, batch_size, ebno_db):

        # no channel coding used; we set coderate=1.0
        no = sn.utils.ebnodb2no(ebno_db,
                                num_bits_per_symbol=self.num_bits_per_symbol,
                                coderate=1.0)

        bits = self.binary_source([batch_size, self.block_length]) # Blocklength set to 1024 bits
        #print("bits=",bits)
        x = self.mapper(bits)
        #print("x=",x)
        y = self.awgn_channel([x, no])
        #print("y=",y)
        llr = self.demapper([y,no])
        #print("llr=",llr)
        return bits, llr

In [7]:
# Constellation

NUM_BITS_PER_SYMBOL = 4 # QPSK
#constellation = sn.mapping.Constellation("qam", NUM_BITS_PER_SYMBOL)

#constellation.show(figsize=(7,7));
model_uncoded_awgn = UncodedSystemAWGN(num_bits_per_symbol=NUM_BITS_PER_SYMBOL, block_length=1024)
EBN0_DB_MIN = -3.0 # Minimum value of Eb/N0 [dB] for simulations
EBN0_DB_MAX = 5.0 # Maximum value of Eb/N0 [dB] for simulations
BATCH_SIZE = 2000 # How many examples are processed by Sionna in parallel

#model_uncoded_awgn(batch_size=BATCH_SIZE, ebno_db=-3)

ber_plots = sn.utils.PlotBER("AWGN")
# ber_plots.simulate(model_uncoded_awgn,
#                   ebno_dbs=np.linspace(EBN0_DB_MIN, EBN0_DB_MAX, 20),
#                   batch_size=BATCH_SIZE,
#                   num_target_block_errors=100, # simulate until 100 block errors occured
#                   legend="Uncoded",
#                   soft_estimates=True,
#                   max_mc_iter=100, # run 100 Monte-Carlo simulations (each with batch_size samples)
#                   show_fig=True);

<sionna.utils.plotting.PlotBER object at 0x7f1322134e80>


In [23]:
import numpy as np

# define two arrays with the same shape
arr1 = np.array([[1, 2], [3, 4]])
arr2 = np.array([[1, 2], [2, 4]])

# compare the two arrays
comparison = arr1 == arr2

# count the number of different elements
different_elements = np.count_nonzero(comparison == False)

# print the result
print("The number of different elements is:", different_elements)


The number of different elements is: 1
