# Testing an IP that adds 1 to a stream

This notebook will test an IP written in Vivado HLS. The IP adds +1 to a buffer. The HP ports **must** be configured at 64bit, not 32bit.

In [8]:
from pynq import Overlay
import pynq.lib.dma
from pynq import allocate
import numpy as np
from pynq import DefaultIP
from pynq import DefaultHierarchy

We need to define our own class **before** istantiating the overlay. In this way it will be automatically bound. We can use an accelerator driver as follows:

In [9]:
class AdderDriver(DefaultIP):
    def __init__(self, description):
        super().__init__(description=description)
    bindto = ["xilinx.com:hls:hls_adder:1.0"]

    def start_accel(self):
        self.write(0x0, 1)

    def set_state(self, state):
        self.write(0x0, state)
        return self.read(0x0)

    def get_state(self):
        return self.read(0x0)

    @property
    def stream_size(self):
        return self.read(0x10)

    @stream_size.setter
    def stream_size(self, size):
        self.write(0x10, size)

But it comes more handy to use an Hierarchy class as follows:

In [10]:
class StreamAdderDriver(DefaultHierarchy):
    def __init__(self, description):
        super().__init__(description)

    def stream_add(self, stream):
        in_buffer = allocate(shape=(len(stream),), dtype=np.float32)
        out_buffer = allocate(shape=(len(stream),), dtype=np.float32)
        for i, elem in enumerate(stream):
            in_buffer[i] = elem
        # NOTE: for managing the HLS accelerator, we exploit
        # the driver that we defined above.
        self.hls_adder.stream_size = len(stream)
        self.hls_adder.start_accel() # NOTE: The start must be sent before setting the other arguments 
        self.dma.sendchannel.transfer(in_buffer)
        self.dma.recvchannel.transfer(out_buffer)
        self.dma.sendchannel.wait()
        self.dma.recvchannel.wait()
        result = out_buffer.view(dtype=np.float32).copy()
        del in_buffer, out_buffer
        return result

    @staticmethod
    def checkhierarchy(description):
        """
        An Hierarchy that meets these requirements will be
        automatically registered to this driver.
        """
        if "dma" in description["ip"] and "hls_adder" in description["ip"]:
            return True
        return False

Finally, we can istantiate the overaly, so that the drivers above will be automatically registered.

In [11]:
overlay = Overlay("overlay/streamed_add_hier.bit", download=False)
# overlay.download()
# overlay?

### Width of Buffer Length Register
This integer value specifies the number of valid bits used for the Control field buffer length and Status field bytes transferred in the Scatter/Gather descriptors. It also specifies the number of valid bits in the RX Length of the Status Stream App4 field when Use Rxlength is enabled. For Direct Register Mode, it specifies the number of valid bits in the MM2S_LENGTH and S2MM_LENGTH registers. The length width directly correlates to the number of bytes being specified in a Scatter/Gather descriptor or number of bytes being specified in App4.RxLength, MM2S_LENGTH, or S2MM_LENGTH. The number of bytes is equal to 2^Length Width. So a Length Width of 26 gives a byte count of 67,108,863 bytes. This value should be set to 23 for Multichannel mode.

In [25]:
stream = [i * np.pi for i in range(1024)]
# print(stream)
out_stream = overlay.adder.stream_add(stream)
print(out_stream)
print(np.isclose(np.array(stream) + 1, out_stream))
print(np.abs((np.array(stream) - (out_stream - 1))).mean())

# # NOTE: The following is a neat way of printing the np.floats in HEX format. 
# for orig, f32, u32 in zip(np.array(stream, dtype=np.float32).view(dtype=np.uint32), out_stream, out_stream.view(dtype=np.uint32)):
#     print("{:x}\t{:03.3}\t{:x}".format(orig, f32, u32))

[  1.00000000e+00   4.14159298e+00   7.28318548e+00 ...,   3.20856616e+03
   3.21170776e+03   3.21484937e+03]
[ True  True  True ...,  True  True  True]
3.469756501941687e-05
