In [10]:
import numpy as np
import struct
import time
import os
from tqdm import trange
from pynq import Overlay, allocate
print(os.getcwd())

NUM_TESTS = 10000

# Static allocation of network parameters and outputs
images = np.zeros((NUM_TESTS, 28, 28), dtype=np.uint8)
labels = np.zeros(NUM_TESTS, dtype=np.uint8)

image = np.zeros((1, 32, 32))
conv1_weights = np.zeros((6, 1, 5, 5))
conv1_bias = np.zeros(6)
conv1_output = np.zeros((6, 28, 28))

pool2_output = np.zeros((6, 14, 14))

conv3_weights = np.zeros((16, 6, 5, 5))
conv3_bias = np.zeros(16)
conv3_output = np.zeros((16, 10, 10))

pool4_output = np.zeros((16, 5, 5))

conv5_weights = np.zeros((120, 16, 5, 5))
conv5_bias = np.zeros(120)
conv5_output = np.zeros((120, 1, 1))

fc6_weights = np.zeros((10, 120, 1, 1))
fc6_bias = np.zeros(10)
fc6_output = np.zeros(10)

def relu(input):
    return np.maximum(0, input)

def convolution1(input, weights, bias, output):
    overlay = Overlay("lenet_conv1.bit")
    ip = overlay.convolution1_hls_0

    input_buffer = allocate(shape=(1, 32, 32), dtype=np.float32)
    weights_buffer = allocate(shape=(6, 1, 5, 5), dtype=np.float32)
    bias_buffer = allocate(shape=(6,), dtype=np.float32)
    output_buffer = allocate(shape=(6, 28, 28), dtype=np.float32)
    print("1")
  

    np.copyto(input_buffer, input)
    np.copyto(weights_buffer, weights)
    np.copyto(bias_buffer, bias)

    ip.write(0x10, input_buffer.device_address)
    ip.write(0x14, weights_buffer.device_address)
    ip.write(0x18, bias_buffer.device_address)
    ip.write(0x1C, output_buffer.device_address)
    ip.write(0x00, 1)  # Start the IP
    print("2")
    while (ip.read(0x00) & 0x2) == 0:
        pass  # Wait for the IP to finish
    print("3")
    np.copyto(output, output_buffer)

    input_buffer.freebuffer()
    weights_buffer.freebuffer()
    bias_buffer.freebuffer()
    output_buffer.freebuffer()

def max_pooling2(input, output):
    for c in range(6):
        for h in range(14):
            for w in range(14):
                region = input[c, h*2:h*2+2, w*2:w*2+2]
                output[c, h, w] = np.max(region)

def convolution3(input, weights, bias, output):
    for co in range(16):
        for h in range(10):
            for w in range(10):
                region = input[:, h:h+5, w:w+5]
                output[co, h, w] = np.sum(weights[co] * region) + bias[co]

def max_pooling4(input, output):
    for c in range(16):
        for h in range(5):
            for w in range(5):
                region = input[c, h*2:h*2+2, w*2:w*2+2]
                output[c, h, w] = np.max(region)

def convolution5(input, weights, bias, output):
    for co in range(120):
        region = input
        output[co, 0, 0] = np.sum(weights[co] * region) + bias[co]

def fc6(input, weights, bias, output):
    for n in range(10):
        output[n] = np.sum(weights[n, :, 0, 0] * input[:, 0, 0]) + bias[n]

def parse_mnist_images(filename, images):
    with open(filename, 'rb') as f:
        f.read(16)  # Skip header
        data = f.read(NUM_TESTS * 28 * 28)
        images[:] = np.frombuffer(data, dtype=np.uint8).reshape(NUM_TESTS, 28, 28)

def parse_mnist_labels(filename, labels):
    with open(filename, 'rb') as f:
        f.read(8)  # Skip header
        data = f.read(NUM_TESTS)
        labels[:] = np.frombuffer(data, dtype=np.uint8)

def parse_parameters(filename):
    with open(filename, 'rb') as f:
        conv1_weights[:] = np.frombuffer(f.read(150 * 4), dtype=np.float32).reshape(6, 1, 5, 5)
        conv1_bias[:] = np.frombuffer(f.read(6 * 4), dtype=np.float32)
        conv3_weights[:] = np.frombuffer(f.read(2400 * 4), dtype=np.float32).reshape(16, 6, 5, 5)
        conv3_bias[:] = np.frombuffer(f.read(16 * 4), dtype=np.float32)
        conv5_weights[:] = np.frombuffer(f.read(48000 * 4), dtype=np.float32).reshape(120, 16, 5, 5)
        conv5_bias[:] = np.frombuffer(f.read(120 * 4), dtype=np.float32)
        fc6_weights[:] = np.frombuffer(f.read(1200 * 4), dtype=np.float32).reshape(10, 120, 1, 1)
        fc6_bias[:] = np.frombuffer(f.read(10 * 4), dtype=np.float32)

def get_image(images, idx, image):
    for i in range(32):
        for j in range(32):
            if i < 2 or i > 29 or j < 2 or j > 29:
                image[0, i, j] = -1.0
            else:
                image[0, i, j] = images[idx, i-2, j-2] / 255.0 * 2.0 - 1.0

if __name__ == "__main__":
    print("Starting LeNet")

    print("Parsing MNIST images")
    parse_mnist_images("../../lenet/images.bin", images)

    print("Parsing MNIST labels")
    parse_mnist_labels("../../lenet/labels.bin", labels)

    print("Parsing parameters")
    parse_parameters("../../lenet/params.bin")

    print("Running inference")
    num_correct = 0
    start_time = time.time()

    for k in trange(NUM_TESTS, desc="infer"):
        get_image(images, k, image)

        convolution1(image, conv1_weights, conv1_bias, conv1_output)
        conv1_output[:] = relu(conv1_output)

        max_pooling2(conv1_output, pool2_output)
        pool2_output[:] = relu(pool2_output)

        convolution3(pool2_output, conv3_weights, conv3_bias, conv3_output)
        conv3_output[:] = relu(conv3_output)

        max_pooling4(conv3_output, pool4_output)
        pool4_output[:] = relu(pool4_output)

        convolution5(pool4_output, conv5_weights, conv5_bias, conv5_output)
        conv5_output[:] = relu(conv5_output)

        fc6(conv5_output, fc6_weights, fc6_bias, fc6_output)

        result = np.argmax(fc6_output)
        if result == labels[k]:
            num_correct += 1

    end_time = time.time()
    print(f"Accuracy = {num_correct / NUM_TESTS * 100.0}%")
    print(f"Execution Time: {end_time - start_time} seconds")


/root/jupyter_notebooks/getting_started/Accelerator/conv1
Starting LeNet
Parsing MNIST images
Parsing MNIST labels
Parsing parameters
Running inference


infer:   0%|          | 0/10000 [00:00<?, ?it/s]

1
2


infer:   0%|          | 0/10000 [00:38<?, ?it/s]


KeyboardInterrupt: 