## KR260 AWGN DMA Stream


### Plot function for use in this notebook
The first code block below defines a function that we will use for plotting data throughout this notebook. Note that the function has a `n_samples` argument so that we can limit the number of samples to plot. Plotting more than a few thousand samples can be very slow and consume a lot of RAM.

### Requirements
Install the following:

```bash
pip install numpy==1.26.4
pip install ipympl
pip install ipywidgets
pip install jupyter_bokeh
```

# Initilize

#### Bokeh Plot Function

In [None]:
from bokeh.plotting import figure, output_file, show
from bokeh.io import output_notebook, push_notebook, show
from bokeh.models import ColumnDataSource
import numpy as np
import math
import time


def plot_time(in_signal, out_signal=None, n_samples=None, continuous=False, interval_s=1.0):
    """
    Plot a real or complex signal (and optional output) in the notebook using Bokeh.

    If `in_signal` is complex, the real and imag parts are plotted separately.
    `n_samples` can be used to limit the number of plotted points.
    If `continuous` is True the plot will update every `interval_s` seconds until interrupted (KeyboardInterrupt).
    Returns the bokeh handle (so users can call push_notebook if desired).
    """
    # Convert inputs to numpy arrays
    x = np.array(in_signal)
    if n_samples is not None:
        x = x[:n_samples]
    t = np.arange(len(x))

    output_notebook()
    p = figure(title="Input & Output Signal", x_axis_label="sample", y_axis_label="amplitude")

    # Prepare data source depending on complex/real input
    if np.iscomplexobj(x):
        src = ColumnDataSource(data={"t": t, "real": np.real(x), "imag": np.imag(x)})
        p.line("t", "real", source=src, legend_label="Input (real)", line_color="blue", line_width=2)
        p.line("t", "imag", source=src, legend_label="Input (imag)", line_color="green", line_width=2)
    else:
        src = ColumnDataSource(data={"t": t, "y": x})
        p.line("t", "y", source=src, legend_label="Input", line_color="blue", line_width=2)

    # Optional output signal
    dst = None
    if out_signal is not None:
        y = np.array(out_signal)
        if n_samples is not None:
            y = y[:n_samples]
        tt = np.arange(len(y))
        dst = ColumnDataSource(data={"t": tt, "y": y})
        p.line("t", "y", source=dst, legend_label="Output", line_color="red", line_width=1)

    handle = show(p, notebook_handle=True)

    # If continuous update requested, refresh the data sources every interval_s seconds
    if continuous:
        try:
            while True:
                time.sleep(interval_s)
                # Re-read/refresh arrays from the original references (use passed in objects)
                x = np.array(in_signal)
                if n_samples is not None:
                    x = x[:n_samples]
                if np.iscomplexobj(x):
                    new_data = {"t": np.arange(len(x)), "real": np.real(x), "imag": np.imag(x)}
                    src.data = new_data
                else:
                    new_data = {"t": np.arange(len(x)), "y": x}
                    src.data = new_data

                if out_signal is not None and dst is not None:
                    y = np.array(out_signal)
                    if n_samples is not None:
                        y = y[:n_samples]
                    dst.data = {"t": np.arange(len(y)), "y": y}

                push_notebook(handle=handle)
        except KeyboardInterrupt:
            print("Stopped continuous plotting by KeyboardInterrupt")
    return handle

#### Load Overlay

In [None]:
from pynq import Overlay
from pynq import allocate
from pynq import MMIO
import pynq.lib.dma
import time

#import Python library from repo:
import sys
sys.path.append('/root/jupyter_notebooks/puch/')
import fpga.py.puch as puch
import fpga.lib.timestamp.sw.timestamp_regmap as timestamp_regmap
import fpga.lib.led_reg.sw.led_regmap as led_regmap
import fpga.lib.AWGN_GNG.sw.awgn_regmap as awgn_regmap


# Load the overlay
overlay = Overlay('../../overlays/KR260_AWGN_DMA_Stream/output/kr260_awgn_dma_stream.bit')

# Assign blocks to short vars
dma          = overlay.axi_dma_0

# Create PYNQ constructor for Consair RegMap
led_module   = led_regmap.RegMap(puch.PynqInterface(overlay.led_reg_0))
awgn         = awgn_regmap.RegMap(puch.PynqInterface(overlay.gng_top_0))
timestamp    = timestamp_regmap.RegMap(puch.PynqInterface(overlay.Timestamp_0))

##### Print out the register map for DMA and HLS Gain Blocks

##### Display Time Stamp Register
The time stamp is burned into the FPGA during the build process

In [None]:
print("FPGA Build Timestamp:  " + puch.get_timestamp_str(timestamp))

##### Toggle USER_LED[1:0] on/off

In [None]:
for i in range(16):
    led_module.user_leds = i%4
    time.sleep(1)

##### Display AWGN Block Parameters

In [None]:
print("Noise Gain = " + str(awgn.awgn_noise_gain))
awgn.awgn_noise_gain = 4
print("Noise Gain = " + str(awgn.awgn_noise_gain))

print("AWGN Enable = " + str(awgn.awgn_enable))
awgn.awgn_enable = 1
print("AWGN Enable = " + str(awgn.awgn_enable))

In [None]:
print("AWGN F_in Format: ("+str(awgn.f_in_bf.f_in_total) + ", " + str(awgn.f_in_bf.f_in_fractional)+")")
print("AWGN F_out Format: ("+str(awgn.f_out_bf.f_out_total) + ", " + str(awgn.f_out_bf.f_out_fractional)+")")
print("AWGN F_awgn Format: ("+str(awgn.f_awgn_bf.f_awgn_total) + ", " + str(awgn.f_awgn_bf.f_awgn_fractional)+")")

### DMA Transfer the Sample buffer from ARM Processor to the AWGN FPGA Module, and plot

In [53]:
awgn_gain = 0.2

# Generate input samples
fin_tBits, fin_fBits  = awgn.f_in_bf.f_in_total, awgn.f_in_bf.f_in_fractional
fout_tBits, fout_fBits = awgn.f_out_bf.f_out_total, awgn.f_out_bf.f_out_fractional
awgn_tBits, awgn_fBits = awgn.f_awgn_bf.f_awgn_total, awgn.f_awgn_bf.f_awgn_fractional


#n = 1024
#samples = np.ones(n) * 2**(fin_fBits)
#samples = samples.astype(np.int32)

# Create Sine Wave
T = 0.00002
# Sampling frequency
fs = 100e6
# Number of samples
n = int(T * fs)
# Time vector in seconds
t = np.linspace(0, T, n, endpoint=False)
# Samples of the signal
samples = np.sin(0.2e6*2*np.pi*t) 
# Convert samples to 32-bit integers
print('Number of samples: ',len(samples))
samples = samples * 2**(fin_fBits)
samples = samples.astype(np.int32)
#plot_time(samples)

# Read Gain Register
awgn.awgn_noise_gain = int(awgn_gain*(2**awgn_fBits))
#awgn.write(awgn_regmap.RegMap.AWGN_NOISE_GAIN_ADDR,int(awgn_gain*(2**awgn_fBits)))


#awgn_en_reg = awgn.read(awgn_regmap.RegMap.AWGN_ENABLE_ADDR)

print("AWGN Gain           : " + str(awgn.awgn_noise_gain/(2**awgn_fBits)))
print("AWGN Enable         : " + str(awgn.awgn_enable_bf.awgn_enable))
print("DMA Transfer Size   : " + str(len(samples)))
print("AWGN I Sat          : " + str(awgn.awgn_enable_bf.sat_i_ch))
print("AWGN Q Sat          : " + str(awgn.awgn_enable_bf.sat_q_ch))

import pynq
# Allocate buffers for the input and output signals
in_buffer = allocate(shape=(len(samples),), dtype=np.int32)
out_buffer = allocate(shape=(len(samples),), dtype=np.int32)

# Copy the samples to the in_buffer
np.copyto(in_buffer,samples)

# Trigger the DMA transfer and wait for the result
import time
start_time = time.time()
dma.sendchannel.transfer(in_buffer)
#print('Submit sending the transfer into in_buffer')
dma.recvchannel.transfer(out_buffer)
#print('Submit receiving the transfer from out_buffer')
dma.sendchannel.wait()
#print('Done Sending the transfer to in_buffer')
dma.recvchannel.wait()
#print('Done receiving the transfer from out_buffer')
stop_time = time.time()
hw_exec_time = stop_time-start_time

print('DMA Transfer Execution Time  :',hw_exec_time,' sec')

# Plot to the notebook
#plot_time(t,samples,2000,out_signal=out_buffer)

# Free the buffers
in_buffer.close()
out_buffer.close()
t = (out_buffer & 0xFFFF)
out_fp = []
for x in t:
    if(x>(2**(fout_tBits-1))):
          x = x - (2**(fout_tBits))
    out_fp.append(x/(2**fout_fBits))
plot_time(samples/(2**fout_fBits),out_fp)

# Display Status
print("AWGN I Sat          : " + str(awgn.awgn_enable_bf.sat_i_ch))
print("AWGN Q Sat          : " + str(awgn.awgn_enable_bf.sat_q_ch))

print("AWGN I Sat          : " + str(awgn.awgn_enable_bf.sat_i_ch))
print("AWGN Q Sat          : " + str(awgn.awgn_enable_bf.sat_q_ch))


awgn.cnt_ctrl_bf.capture_cnt = 1
print("TVALID Count        : " + str(awgn.tvalid_cnt))
print("TLAST Count         : " + str(awgn.tlast_cnt))

awgn.cnt_ctrl_bf.clear_cnt = 1
print("TVALID Count        : " + str(awgn.tvalid_cnt))
print("TLAST Count         : " + str(awgn.tlast_cnt))


Number of samples:  2000
AWGN Gain           : 0.19970703125
AWGN Enable         : 1
DMA Transfer Size   : 2000
AWGN I Sat          : 0
AWGN Q Sat          : 0
DMA Transfer Execution Time  : 0.001735687255859375  sec


AWGN I Sat          : 0
AWGN Q Sat          : 0
AWGN I Sat          : 0
AWGN Q Sat          : 0
TVALID Count        : 2000
TLAST Count         : 1
TVALID Count        : 0
TLAST Count         : 0


In [None]:
print(np.__version__)


### Read TVALID & TLAST Counters

In [None]:
awgn.cnt_ctrl_bf.capture_cnt = 1
print("TVALID Count    : " + str(awgn.tvalid_cnt))
print("TLAST Count     : " + str(awgn.tlast_cnt))

awgn.cnt_ctrl_bf.clear_cnt = 1
print("TVALID Count    : " + str(awgn.tvalid_cnt))
print("TLAST Count     : " + str(awgn.tlast_cnt))

### Continuous DMA stream and live plotting

The next cell provides a reusable `DMAStreamer` class that:
- continuously writes an input buffer to the DMA Tx channel,
- continuously reads back from the DMA Rx channel into a queue,
- runs a separate plotting thread that consumes the queue and updates a Bokeh plot in the notebook.

Usage: Create a DMAStreamer(dma, samples, chunk_size) then call `start()`; call `stop()` to end.

In [51]:
# Continuous DMA streamer with background plotting
import threading
import queue
import numpy as np
import time
from bokeh.plotting import figure, output_notebook, show
from bokeh.models import ColumnDataSource
from bokeh.io import push_notebook

class DMAStreamer:
    """Continuously stream `samples` to `dma` and read back results."""
    def __init__(self, dma, samples, chunk_size=2048, dtype=np.int32, plot_n=1024, interval_s=0.5):
        self.dma = dma
        self.samples = np.array(samples, dtype=dtype)
        self.chunk_size = int(chunk_size)
        self.dtype = dtype
        self.plot_n = int(plot_n)
        self.interval_s = float(interval_s)

        # threads and coordination
        self._stop_event = threading.Event()
        self._write_thread = None
        self._read_thread = None
        self._plot_thread = None
        self._q = queue.Queue(maxsize=64)

        # bokeh plot setup (in notebook)
        output_notebook()
        self._fig = figure(title="DMA Readback", x_axis_label="sample", y_axis_label="value", y_range=(-3, 3))
        self._src = ColumnDataSource(data={"t": np.arange(self.plot_n), "y": np.zeros(self.plot_n)})
        self._fig.line("t", "y", source=self._src, line_width=2)
        self._handle = show(self._fig, notebook_handle=True)

    def start(self):
        "Start write/read/plot threads."
        self._stop_event.clear()
        self._write_thread = threading.Thread(target=self._writer, daemon=True)
        self._read_thread = threading.Thread(target=self._reader, daemon=True)
        self._plot_thread = threading.Thread(target=self._plotter, daemon=True)
        self._write_thread.start()
        self._read_thread.start()
        self._plot_thread.start()

    def stop(self, timeout=2.0):
        "Signal threads to stop and join."
        self._stop_event.set()
        for t in (self._write_thread, self._read_thread, self._plot_thread):
            if t is not None:
                t.join(timeout)

    def _writer(self):
        # Re-use PYNQ allocate to create DMA buffers per chunk
        from pynq import allocate
        N = len(self.samples)
        idx = 0
        while not self._stop_event.is_set():
            # prepare chunk
            end = idx + self.chunk_size
            if end <= N:
                chunk = self.samples[idx:end]
            else:
                # wrap-around
                part1 = self.samples[idx:N]
                part2 = self.samples[0:(end%N)]
                chunk = np.concatenate((part1, part2))
            buf = allocate(shape=(len(chunk),), dtype=self.dtype)
            np.copyto(buf, chunk)
            try:
                # submit send and corresponding recv (same length)
                self.dma.sendchannel.transfer(buf)
                # allocate recv buffer
                recv_buf = allocate(shape=(len(chunk),), dtype=self.dtype)
                self.dma.recvchannel.transfer(recv_buf)
                # wait for completion (blocks briefly)
                self.dma.sendchannel.wait()
                self.dma.recvchannel.wait()
                # push received buffer reference into queue for plotting/processing
                try:
                    self._q.put_nowait(recv_buf)
                except queue.Full:
                    # drop oldest if queue full
                    try:
                        old = self._q.get_nowait()
                        old.close()
                    except Exception:
                        pass
                    try:
                        self._q.put_nowait(recv_buf)
                    except Exception:
                        recv_buf.close()
            except Exception as e:
                print("DMA writer exception:", e)
                try:
                    recv_buf.close()
                except Exception:
                    pass
            # advance index
            idx = (idx + self.chunk_size) % N
            # small sleep to avoid tight loop if needed
            time.sleep(0.0001)

    def _reader(self):
        # This implementation simply lets writer place recv buffers into queue
        # but keep thread for symmetry and future extension.
        while not self._stop_event.is_set():
            time.sleep(0.1)

    def _plotter(self):
        "Consume recv buffers from queue and update Bokeh plot."
        while not self._stop_event.is_set():
            try:
                buf = self._q.get(timeout=self.interval_s)
            except queue.Empty:
                continue
            try:
                data = np.array(buf, dtype=self.dtype)
                # convert fixed-point if needed outside (user can pass floats)
                n = min(self.plot_n, len(data))
                #y = data[:n].astype(np.float64)
                
                t = (data[:n] & 0xFFFF)
                out_fp = []
                for x in t:
                    if(x>(2**(fout_tBits-1))):
                          x = x - (2**(fout_tBits))
                    out_fp.append(x/(2**fout_fBits))
                y = out_fp

                x = np.arange(n)
                self._src.data = {"t": x, "y": y}
                push_notebook(handle=self._handle)
            except Exception as e:
                print("Plotter exception:", e)
            finally:
                try:
                    buf.close()
                except Exception:
                    pass
        print("Plot thread exiting")

awgn_gain = 0.3
awgn.awgn_noise_gain = int(awgn_gain*(2**awgn_fBits))


# Example usage (uncomment to run):
streamer = DMAStreamer(dma, samples, chunk_size=1024, dtype=np.int32, plot_n=1024, interval_s=1.0)
streamer.start()  # runs in background
time.sleep(10)    # let it run
streamer.stop()   # stop threads

awgn.cnt_ctrl_bf.capture_cnt = 1
print("TVALID Count    : " + str(awgn.tvalid_cnt))
print("TLAST Count     : " + str(awgn.tlast_cnt))

awgn.cnt_ctrl_bf.clear_cnt = 1
print("TVALID Count    : " + str(awgn.tvalid_cnt))
print("TLAST Count     : " + str(awgn.tlast_cnt))

print("AWGN Gain       : " + str(awgn.awgn_noise_gain/(2**awgn_fBits)))
print("AWGN I Sat      : " + str(awgn.awgn_enable_bf.sat_i_ch))
print("AWGN Q Sat      : " + str(awgn.awgn_enable_bf.sat_q_ch))

Plot thread exiting
TVALID Count    : 203776
TLAST Count     : 199
TVALID Count    : 0
TLAST Count     : 0
AWGN Gain       : 0.2998046875
AWGN I Sat      : 0
AWGN Q Sat      : 0
