In [1]:
%%html
<style type='text/css'>
.CodeMirror{
    font-size: 16px;
    font-family:Consolas
}

div.output_area pre {
    font-size: 16px;
    font-family:Consolas
}
</style>

## Import Packages

In [2]:
from pynq import Overlay
from pynq import allocate
from pynq import DefaultIP
import cv2
import time

## Data Loader

In [3]:
import numpy as np
import struct
from array import array
from os.path import join

class MnistDataloader(object):
    def __init__(self, training_images_filepath,training_labels_filepath,
                 test_images_filepath, test_labels_filepath):
        self.training_images_filepath = training_images_filepath
        self.training_labels_filepath = training_labels_filepath
        self.test_images_filepath = test_images_filepath
        self.test_labels_filepath = test_labels_filepath
    
    def read_images_labels(self, images_filepath, labels_filepath):        
        labels = []
        with open(labels_filepath, 'rb') as file:
            magic, size = struct.unpack(">II", file.read(8))
            if magic != 2049:
                raise ValueError('Magic number mismatch, expected 2049, got {}'.format(magic))
            labels = array("B", file.read())        
        
        with open(images_filepath, 'rb') as file:
            magic, size, rows, cols = struct.unpack(">IIII", file.read(16))
            if magic != 2051:
                raise ValueError('Magic number mismatch, expected 2051, got {}'.format(magic))
            image_data = array("B", file.read())        
        images = []
        for i in range(size):
            images.append([0] * rows * cols)
        for i in range(size):
            img = np.array(image_data[i * rows * cols:(i + 1) * rows * cols])
            img = img.reshape(28, 28)
            images[i][:] = img            
        
        return images, labels
            
    def load_data(self):
        x_train, y_train = self.read_images_labels(self.training_images_filepath, self.training_labels_filepath)
        x_test, y_test = self.read_images_labels(self.test_images_filepath, self.test_labels_filepath)
        return (x_train, y_train),(x_test, y_test)      

In [4]:
%matplotlib inline
import random
import matplotlib.pyplot as plt

input_path = './'
training_images_filepath = join(input_path, 'train-images.idx3-ubyte')
training_labels_filepath = join(input_path, 'train-labels.idx1-ubyte')
test_images_filepath = join(input_path, 't10k-images.idx3-ubyte')
test_labels_filepath = join(input_path, 't10k-labels.idx1-ubyte')

def show_images(images, title_texts):
    cols = 5
    rows = int(len(images)/cols) + 1
    plt.figure(figsize=(30,20))
    index = 1    
    for x in zip(images, title_texts):        
        image = x[0]        
        title_text = x[1]
        plt.subplot(rows, cols, index)        
        plt.imshow(image, cmap=plt.cm.gray)
        if (title_text != ''):
            plt.title(title_text, fontsize = 15);        
        index += 1

mnist_dataloader = MnistDataloader(training_images_filepath, training_labels_filepath, test_images_filepath, test_labels_filepath)
(x_train, y_train), (x_test, y_test) = mnist_dataloader.load_data()

## Upsample to 32

In [5]:
images_1000 = []
for img in x_test[5000:10000]:
    img = np.array(img, dtype='u1')
    images_1000.append(cv2.resize(img, dsize=(32, 32), interpolation=cv2.INTER_LINEAR_EXACT))

In [6]:
import struct
with open("t10k-labels.idx1-ubyte", "rb") as file:
    magic, num_items = struct.unpack(">II", file.read(8))
    labels = list(file.read())
    print(labels[:20])

[7, 2, 1, 0, 4, 1, 4, 9, 5, 9, 0, 6, 9, 0, 1, 5, 9, 7, 3, 4]


## IP Driver

In [7]:
class LeNetDriver(DefaultIP):
    bindto = ["xilinx.com:hls:lenet:1.0"]
    def __init__(self, description):
        super().__init__(description=description)
        self.img_in = 0x18
        self.ap_ctrl = 0x00
        self.ap_return = 0x10
        self.img_size = 32
        self.img_in_buf_1 = allocate(shape=(self.img_size, self.img_size), dtype='u1')
        self.img_in_buf_2 = allocate(shape=(self.img_size, self.img_size), dtype='u1')

    def predict(self, img_in):
        self.write(self.img_in, self.img_in_buf_1.physical_address)
        np.copyto(self.img_in_buf_1, np.uint8(img_in))
        
        self.write(self.ap_ctrl, 0x01)
        while self.read(self.ap_ctrl) == 0x01:
            pass
        
        return self.read(self.ap_return)
    
    # double buffer implementation for batch prediction
    def batch_predict(self, img_in):
        flag = False
        output = np.zeros(len(img_in), dtype=int)
        
        self.write(self.img_in, self.img_in_buf_1.physical_address)
        np.copyto(self.img_in_buf_1, np.uint8(img_in[0]))
        
        for i in range(1, len(img_in)):
            self.write(self.ap_ctrl, 0x01)
            current_buf = self.img_in_buf_1 if flag else self.img_in_buf_2
            self.write(self.img_in, current_buf.physical_address)
            np.copyto(current_buf, np.uint8(img_in[i]))
            while self.read(self.ap_ctrl) == 0x01:
                pass
            output[i-1] = self.read(self.ap_return)
            flag = not flag
            
        self.write(self.ap_ctrl, 0x01)
        while self.read(self.ap_ctrl) == 0x01:
            pass
        output[len(img_in)-1] = self.read(self.ap_return)
        
        return output

# Call IP

In [31]:
import pynq
#overlay = Overlay("design_lenet_acc.bit")
overlay = Overlay("design_lenet.bit")

lenet = overlay.lenet_0

rails = pynq.get_rails()
if 'VSYS' in rails.keys():
    print("Recording Ultra96 v1 power...")
    rail_name = 'VSYS'
elif 'INT' in rails.keys():
    print("Recording Ultra96 v2 power...")
    rail_name = 'INT'
else:
    raise RuntimeError("Cannot determine Ultra96 board version.")
recorder = pynq.DataRecorder(rails[rail_name].power)
recorder.reset()
with recorder.record(0.2):
    recorder.mark()
    st = time.time()
    for _ in range(10):
        output = lenet.batch_predict(images_1000)
    et = time.time()
    recorder.mark()

power_data = recorder.frame
working_power = np.mean(power_data)

print("Predict Power consumption: %.2f J" % (working_power[1] * (et - st)))

coincide = 0
for i in range(5000,10000):
    if (labels[i] == output[i-5000]):
        coincide += 1
accuracy = coincide / 5000
print("accuracy = %.2f %%" % (accuracy * 100))

Recording Ultra96 v2 power...
Predict Power consumption: 60.91 J
accuracy = 99.38 %
