In [19]:
import os
import time
import pandas as pd
import numpy as np
from pynq import Overlay, allocate

In [6]:
# Ensure bitstream works

try:
    ol = Overlay("cnn.bit")
    print("Overlay loaded successfully!")
    print(f"IP found: {list(ol.ip_dict.keys())}\n")
    
    print("Checking registers:")
    print(ol.ip_dict['cnn_top_0']['registers'])
except Exception as e:
    print(f"Error loading overlay: {e}")

Overlay loaded successfully!
IP found: ['cnn_top_0', 'zynq_ultra_ps_e_0']

Checking registers:
{'CTRL': {'address_offset': 0, 'size': 32, 'access': 'read-write', 'description': 'Control signals', 'fields': {'AP_START': {'bit_offset': 0, 'bit_width': 1, 'access': 'read-write', 'description': "Control signal Register for 'ap_start'."}, 'AP_DONE': {'bit_offset': 1, 'bit_width': 1, 'access': 'read-only', 'description': "Control signal Register for 'ap_done'."}, 'AP_IDLE': {'bit_offset': 2, 'bit_width': 1, 'access': 'read-only', 'description': "Control signal Register for 'ap_idle'."}, 'AP_READY': {'bit_offset': 3, 'bit_width': 1, 'access': 'read-only', 'description': "Control signal Register for 'ap_ready'."}, 'RESERVED_1': {'bit_offset': 4, 'bit_width': 3, 'access': 'read-only', 'description': 'Reserved.  0s on read.'}, 'AUTO_RESTART': {'bit_offset': 7, 'bit_width': 1, 'access': 'read-write', 'description': "Control signal Register for 'auto_restart'."}, 'RESERVED_2': {'bit_offset': 8, 'b

In [14]:
class CNN:
    # Data dimensions
    IN_CH = 8
    IN_LEN = 25
    NUM_CLASSES = 8
    
    """
    Setup CNN IP block and input/output buffers
    """
    def __init__(self, bitstream_path):
        self.cnn = Overlay("cnn.bit").cnn_top_0
        
        # Allocate DMA memory
        # int16 matches ap_fixed<16,6>
        self.input_buffer = allocate(shape=(CNN.IN_CH, CNN.IN_LEN), dtype=np.int16)
        self.output_buffer = allocate(shape=(CNN.NUM_CLASSES,), dtype=np.int16)
        
        # Tell the IP where the data is in physical memory -> see registers
        in_addr = self.input_buffer.device_address
        out_addr = self.output_buffer.device_address

        # Input Address (0x10 is low 32 bits, 0x14 is high 32 bits)
        self.cnn.write(0x10, in_addr & 0xFFFFFFFF)
        self.cnn.write(0x14, in_addr >> 32)

        # Output Address (0x1c is low 32 bits, 0x20 is high 32 bits)
        self.cnn.write(0x1c, out_addr & 0xFFFFFFFF)
        self.cnn.write(0x20, out_addr >> 32)
        
        print("CNN successfully setup")

    # In ap_fixed<16,10>, the 1.0' is represented as 2^10 = 1024
    def to_fixed(self, float_val, frac_bits=10):
        return np.int16(np.round(float_val * (2**frac_bits)))

    def from_fixed(self, int_val, frac_bits=10):
        return int_val.astype(float) / (2**frac_bits)

    def predict(self, data):
        fixed_data = self.to_fixed(data)
        self.input_buffer[:] = fixed_data

        # Start HW
        self.cnn.write(0x00, 1) # ap_start

        # Wait for it to finish (poll ap_done)
        start_time = time.time()
        while not (self.cnn.read(0x00) & 0x2): pass
        end_time = time.time()

        results = self.from_fixed(self.output_buffer.copy())
        time_taken = end_time - start_time
        return results, time_taken

In [36]:
def get_csv_data(path):
    df = pd.read_csv(path)

    # Ensure (CNN.IN_CH, CNN.IN_LEN) i.e. (8, 25)
    data = df.iloc[:, 1:].values         # remove timestamp
    if data.shape == (CNN.IN_LEN, CNN.IN_CH):
        data = data.T
    
    return data
        
data = get_csv_data("dummy_data_1.csv")

In [38]:
# Predict

cnn = CNN("cnn.bit")
# dummy_data = np.random.uniform(-1, 1, (8, 25))
results, time_taken = cnn.predict(data)

print(f"HW inference completed in {time_taken:.6f} s")
print("Scores:", results)
print("Prediction Class:", np.argmax(results))

CNN successfully setup
HW inference completed in 0.000968 s
Scores: [ 0.06542969 -0.04492188 -0.00390625 -0.12304688  0.06054688  0.02148438
  0.07519531  0.05175781]
Prediction Class: 6
