In [1]:
import time
import sys
import numpy as np
import platform
import tqdm
from typing import Tuple, List, Union, Any
import pynq_dpu
import pynq



### This is the final part where we will test our model after quantisation on the target platform.

We define at the beginning the `TimeMeasurement` class. This is exactly the same class that we used in the previous two steps. It will allow us to check the processing time of the data.

In addition, we create the `EvalLoader` class. It will let us read the data stored in `.npz` format that we prepared in the previous section. By default, we set the size of the batch to 1, and the file name is `eval_MNIST.npz`. Only adjust the name if it does not match.

In [2]:
class EvalLoader:
    def __init__(self, 
                 batch_size: int = 1, 
                 npz_path: str = 'eval_MNIST.npz') -> None:
        data = np.load(npz_path)
        self.data = data['data'].astype(np.float32)
        self.targets = data['targets']
        self.batch_size = batch_size
    
    def __getitem__(self, i):
        if i >= len(self):
            raise StopIteration

        beg = min(i * self.batch_size, self.data.shape[0])
        end = min(beg + self.batch_size, self.data.shape[0])

        return self.data[beg:end, ...], self.targets[beg:end]
    
    def __len__(self):
        return self.data.shape[0] // self.batch_size


class TimeMeasurement:
    def __init__(self, context_name: str, frames: int) -> None:
        self.context_name: str = context_name
        self.frames: int = frames
        self.begin: float = None
        self.end: float = None

    def __enter__(self):
        self.begin = time.time()
        return self

    def __exit__(self, *args):
        self.end = time.time()

    @property
    def time(self) -> float:
        if self.begin is None or self.end is None:
            raise RuntimeError()
        return int(self.end - self.begin)

    @property
    def fps(self):
        return self.frames / self.time

    def __str__(self) -> str:
        t = self.time
        h = t // 60
        min = (t - h*60) // 60
        s = int(t - h*60 - min*60)
        ms = int((t - np.floor(t))*1000)

        return f"Execution time: {h}:{min}:{s}:{ms}, processed {self.frames} frames, throughput: {self.fps} fps."

    def __repr__(self) -> str:
        t = self.time
        h = t // 60
        min = (t - h*60) // 60
        s = np.floor(t - h*60 - min*60)
        ms = np.floor((t - np.floor(t))*1000)

        return f'TimeMeasurement(context="{self.context_name}","{h}:{min}:{s}:{ms}", frames={self.frames}, throughput={self.fps})'

We define the `Accuracy` metric. This is the same as in the previous sections, but you can define it yourself. 

From the `y_pred` values, determine the maximum values with the `np.argmax` function. Do this relative to `axis=1`. Then compare the resulting vector with `y_ref`. Enter the result of the comparison into the `cmp` variable. Finally, determine the `score` value, which is equal to the summed value of the `cmp` vector (.sum()) divided by the length of the `cmp` vector (.shape[0]).

In [3]:
class AccuracyMetric:
    
    def __init__(self) -> None:
        pass

    def __call__(self, y_pred: np.ndarray, y_ref: np.ndarray) -> float:
        y_pred_max = np.argmax(y_pred, axis=1)
        cmp = y_pred_max == y_ref
        score = cmp.sum() / cmp.shape[0]

        return score

We are creating a `CrossEntropyLoss` class. This is not required and can return 0 by default. However, if anyone would be interested in an `additional task`, it can be implemented based on the PyTorch documentation or the internet :).

In [4]:
class CrossEntropyLoss:
    def __init__(self) -> None:
        pass
        
    def __call__(self, 
                 y_pred: np.ndarray, 
                 y_ref: np.ndarray
                 ) -> Any:
        
        return 0.0

We initialise the data generator, the metric, the loss function.

In [5]:
loader = 
metric = AccuracyMetric() #TODO
criterion = CrossEntropyLoss() #TODO
tm = TimeMeasurement("Evaluation on KV260", loader.batch_size * len(loader))

Define the `softmax` function. See how it works in the PyTorch documentation or on the web.

In [6]:
def softmax(x: np.ndarray, axis=1):
    return np.exp(x) / np.sum(np.exp(x), axis=axis, keepdims=True)

We create the NetworkDPU class. During initialisation it takes the compiled `MiniResNet_qu.xmodel` and the path to the `dpu.bit` file. The other `dpu` files must be in the same folder and must have the same name!

The `input_float_to_int8` function converts data from the `float` space to `int8`.

The `output_int8_to_float` function performs the reverse operation.

The `process` function performs data processing. Implement it by performing the following operations:
1. convert the input data `x` from `float` space to `int` space,
2. write the converted data to the zero index of the input buffer `buff_in`,
3. call the function `self.dpu.execute_async`, where you specify the input buffer as the first parameter and the output buffer as the second parameter. The function will return an index - write it to the `job_id` variable,
4. wait for the computation thread to execute - use the `self.dpu.wait` function, where the parameter is the index `job_id`.
5. read the first value from the `buff_out` buffer. Assign it to the `y` variable,
6. convert the `y` variable to type `float`,
7. execute the `softmax` function on the `y` variable and return it.

In [7]:
class NetworkDPU:
    
    def __init__(self, xmodel_path: str = 'MiniResNet_qu.xmodel', dpu_path: str = 'dpu.bit'):
        self.ov: pynq_dpu.DpuOverlay = pynq_dpu.DpuOverlay(dpu_path, download=True)
        self.ov.load_model(xmodel_path)
        self.dpu = self.ov.runner
        print(self.ov.runner)
        inputTensors = self.dpu.get_input_tensors()
        outputTensors = self.dpu.get_output_tensors()
        # get list of shapes
        shapeIn = np.array([it.dims for it in inputTensors])
        shapeOut = np.array([ot.dims for ot in outputTensors])
        self.shapeIn = shapeIn
        self.shapeOut = shapeOut
        self.buff_in = [np.zeros(sh, np.int8, order='C') for sh in shapeIn]
        self.buff_out = [np.zeros(sh, np.int8, order='C') for sh in shapeOut]
        
        self.input_repr = [(it.get_attr('bit_width'), it.get_attr('fix_point')) for it in inputTensors]
        self.output_repr = [(ot.get_attr('bit_width'), ot.get_attr('fix_point')) for ot in outputTensors]
    
    def input_float_to_int8(self, x: np.ndarray) -> np.ndarray:
        BIT_WIDTH, PRECISION_BITS = self.input_repr[0]
        x = x * (2**PRECISION_BITS)
        x = np.floor(x)
        x = np.clip(x,-128, 127)
        return x.astype(np.int8)
    
    def output_int8_to_float(self, y: np.ndarray):
        BIT_WIDTH, PRECISION_BITS = self.output_repr[0]
        PRECISION = 1 / 2**PRECISION_BITS
        y = y * PRECISION
        return y.astype(np.float32)
    
    def process(self, x: np.ndarray):
        # convert the input data `x` from `float` space to `int` space,
        x = self.input_float_to_int8(x)
        # write the converted data to the zero index of the input buffer `buff_in`
        self.buff_in[0] = x
        # start DPU thread
        # call the function `self.dpu.execute_async`, where you specify the input buffer as the first parameter and the output buffer as the second parameter. 
        # The function will return an index - write it to the `job_id` variable
        job_id = self.dpu.execute_async(self.buff_in, self.buff_out)
        self.dpu.wait(job_id)
        # read the first value from the `buff_out` buffer. Assign it to the `y` variable
        y = self.buff_out[0]
        y = self.output_int8_to_float(y)
        y = softmax(y)
        return y
    
    def __call__(self, x: np.ndarray) -> Any:
        return self.process(x)

Initialise the DPU network model, providing paths to the model and the `dpu.bit` file.

In [8]:
net = NetworkDPU(xmodel_path='MiniResNet_qu.xmodel', 
                 dpu_path='dpu.bit')

vart::Runner@0x39a31f50


We are creating a function to evaluate the model. If someone has implemented Cross Entropy function, the loss value will be taken into account. Otherwise it will return 0 and we will not pay attention to it.

In [9]:
def evaluation(model: NetworkDPU,
               data_loader: EvalLoader,
               criterion: CrossEntropyLoss,
               metric: AccuracyMetric,
               ) -> Tuple[float, float]:

    print(f"Running on platform: {platform.platform()}, "
          f"machine: {platform.machine()}, "
          f"python_version: {platform.python_version()}, "
          f"processor: {platform.processor()}, "
          f"system: {platform.system()}, "
          )
    total_loss: float = 0.0
    total_accuracy: float = 0.0
    samples_num: int = 0
    
    for i, (X, y_ref) in tqdm.tqdm(enumerate(data_loader),):
        y_pred = model(X)
        
        # calculate loss
        loss = criterion(y_pred, y_ref)
        
        # calculate accuracy
        accuracy = metric(y_pred, y_ref)

        total_loss += loss * y_pred.shape[0]
        total_accuracy += accuracy * y_pred.shape[0]
        samples_num += y_pred.shape[0]

    if samples_num == 0:
        return 0.0, 0.0

    return total_loss / samples_num, total_accuracy / samples_num


Start the evaluation. Compare the results obtained when evaluating the floating point model.

An increase in the amount of data processed per second is expected, with minimal or zero loss of accuracy.

In [10]:
with tm:
    loss, acc = evaluation(net, loader, criterion, metric)
    
print(str(tm))
print("Loss: ", loss)
print("Accuracy: ", acc)

Running on platform: Linux-5.4.0-1021-xilinx-zynqmp-aarch64-with-glibc2.29, machine: aarch64, python_version: 3.8.10, processor: aarch64, system: Linux, 


10000it [00:07, 1412.94it/s]

Execution time: 0:0:7:0, processed 10000 frames, throughput: 1428.5714285714287 fps.
Loss:  0.0
Accuracy:  0.982





## You have successfully completed all the tasks! Congratulations :)