<a href="https://colab.research.google.com/github/ionhedes/CA2020/blob/master/SW/MNIST_BNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

## Installs

In [7]:
!pip install larq



In [8]:
!pip install netron



## Settings & imports

In [9]:
#===== Reproducibility Settings (Before TensorFlow Import) =====#
import os
#*IMPORANT*: Have to do this line *before* importing tensorflow
os.environ['PYTHONHASHSEED']=str(1)

In [46]:
from math import frexp
import tensorflow as tf
import larq as lq
import numpy as np
import netron
from google.colab import output, drive
from typing import Dict, List
from json import dump, load
import matplotlib.pyplot as plt
%matplotlib inline

In [11]:
tf.random.set_seed(1)

## Storage and paths

In [12]:
drive.mount('/content/drive')

Mounted at /content/drive


In [13]:
workdir = './drive/MyDrive/'

# Debug

In [14]:
DEBUG = True

In [15]:
def _dbg(msg: str) -> None:
    if 'DEBUG' in globals() and DEBUG == True:
        print(msg)

# Dataset download

In [16]:
(train_x, train_y), (test_x, test_y) = tf.keras.datasets.mnist.load_data()

train_x = train_x.reshape((60000, 28, 28, 1))
test_x = test_x.reshape((10000, 28, 28, 1))

# Center pixel values around 0. They will be quantized to -1, 1 by the network
train_x, test_x = train_x - 127.5, test_x - 127.5

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


# Model architecture and instantiation

In [17]:
def create_lenet() -> tf.keras.Model:
    # All quantized layers use ste_sign for input and weights, and the weights are clipped between -1 and 1
    kwargs = dict(use_bias=False,
                input_quantizer="ste_sign",
                kernel_quantizer="ste_sign",
                kernel_constraint="weight_clip")

    model = tf.keras.models.Sequential()

    # 5x5 first layer much better (~95 to ~98) and slightly cheaper (2.7 to 2.6). 16 channel first layer much cheaper (1.1 to 0.6), and slightly worse (~98 to ~97), 8 channel is 0.6 to 0.3 and ~97 to ~94
    model.add(lq.layers.QuantConv2D(8, (5, 5), input_shape=(28, 28, 1), **kwargs))
    model.add(tf.keras.layers.MaxPooling2D((3, 3))) # With 5x5 kernel, 3x3 max pooling is only slightly lower than 2x2 (~98 to ~97), and much cheaper (2.6 to 1.1)
    model.add(tf.keras.layers.BatchNormalization(scale=False, center=False)) # center=False lowers accuracy from ~96 to ~95, LayerNorm seems better but also more complex

    # Given above 8, 64 -> 32 is 0.3 to 0.2 and -0.5
    model.add(lq.layers.QuantConv2D(32, (3, 3), **kwargs))
    model.add(tf.keras.layers.MaxPooling2D((2, 2)))
    model.add(tf.keras.layers.BatchNormalization(scale=False, center=False))

    # Optional layer for ~+0
    # model.add(lq.layers.QuantConv2D(64, (3, 3), **kwargs)) # A third conv layer is cheaper (2.7M MACs) and better (~95) than a 256 dense layer (2.8M MACs and ~94)
    # model.add(tf.keras.layers.BatchNormalization(scale=False, center=False))

    model.add(tf.keras.layers.Flatten())
    model.add(lq.layers.QuantDense(64, **kwargs))
    model.add(tf.keras.layers.BatchNormalization(scale=False, center=False))
    model.add(lq.layers.QuantDense(10, **kwargs))
    model.add(tf.keras.layers.BatchNormalization(scale=False,  center=False))
    #model.add(tf.keras.layers.Activation(lq.quantizers.SteSign()))
    model.add(tf.keras.layers.Activation("softmax"))

    model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

    return model

In [18]:
model = create_lenet()
lq.models.summary(model)

+sequential stats----------------------------------------------------------------------------+
| Layer                  Input prec.          Outputs  # 1-bit  # 32-bit  Memory  1-bit MACs |
|                              (bit)                       x 1       x 1    (kB)             |
+--------------------------------------------------------------------------------------------+
| quant_conv2d                     1  (-1, 24, 24, 8)      200         0    0.02      115200 |
| max_pooling2d                    -    (-1, 8, 8, 8)        0         0       0           0 |
| batch_normalization              -    (-1, 8, 8, 8)        0        16    0.06           0 |
| quant_conv2d_1                   1   (-1, 6, 6, 32)     2304         0    0.28       82944 |
| max_pooling2d_1                  -   (-1, 3, 3, 32)        0         0       0           0 |
| batch_normalization_1            -   (-1, 3, 3, 32)        0        64    0.25           0 |
| flatten                          -        (-1, 2

# Training
***Do not run if you're importing the weights from a previous run!***

In [None]:
model.fit(train_x, train_y, batch_size=64, epochs=10)
test_loss, test_acc = model.evaluate(test_x, test_y)
print(f"Test accuracy {test_acc * 100:.2f} %")

# Integer quantization of non-binary weights
***Don't run the weight import if you run this!***

**What is the objective?**
Get a Keras model with integer and binary weights only.

**What strategies have I thought of?**
1. naively truncate the weights of the current model
2. apply [post-training quantization](https://www.tensorflow.org/lite/performance/post_training_integer_quant) using TFLite

**Which strategy should we use?**

Naively truncating the weights to integers is easier and results in the same evaluation accuracy (in software). I haven't managed to make the 2nd method work yet. Therefore, I propose we use it (timestamp for future reference: **27.05, 3:59 PM**)

## Naively truncating the weights

**Steps:**
1. Copy the model without truncating the weights. Evaluate. *Why?* To make sure the same accuracy is achievable by naively copying weights from one model to another.
2. Copy the model and truncate the weights. Evaluate.
3. Copy the model (with `larq`'s `quantized_scope` set to `True`) and truncate the rest of the weights. Evaluate. *Why isn't step 2 enough?* Copying weights might not preserve `larq` quantization. The `larq` execution model is still unclear to me.

**Conclusion:**
- copying weights from one model to another is fine in general.
- truncating weights without ensuring `larq`'s quantization scope destroys performance.
- truncating weights while ensuring `larq`'s quantization scope preserves performance. <font color="lightgreen">We can use this strategy to obtain integer weights!</font>

In [None]:
# copy & evaluate without truncation
copy_model = create_lenet()
for copy_layer, layer in zip(copy_model.layers, model.layers):
    copy_layer.set_weights(layer.get_weights())

copy_model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
copy_model.evaluate(test_x, test_y)

In [None]:
# copy & evaluate with truncation
int_model = create_lenet()
for int_layer, layer in zip(int_model.layers, model.layers):
    trunc_weights = [np.trunc(v) for v in layer.get_weights()]
    int_layer.set_weights(trunc_weights)

int_model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
int_model.evaluate(test_x, test_y)

In [None]:
# copy & evaluate with truncation & larq quantized scope
lq_int_model = create_lenet()
with lq.context.quantized_scope(True):
    for lq_int_layer, layer in zip(lq_int_model.layers, model.layers):
        trunc_weights = [np.ceil(v) for v in layer.get_weights()]  # better perf than .trunc
        lq_int_layer.set_weights(trunc_weights)

lq_int_model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
lq_int_model.evaluate(test_x, test_y)

## ~Applying post-training quantization using TFLite~

**Steps:**
1. train model normally (see previous section)
2. quantize to integer weights using TFLite
3. inspect and evaluate TFLite model
4. extract the model weights and move them to a new Keras model (this step might be necessary to check if TFLite introduces bullshit optimizations)
5. evaluate the new Keras model. Are all the weights integers? Is the accuracy comparable to that of the initial model?

**Why not just use the TFLite model?**
Because it might include additional implicit optimizations that we're not aware of/can't easily reproduce in hardware.

**Conclusion:**
- TFLite introduces additional quantization operations that might be hard to replicate in hardware.
- Stopped at step 4. Until proven inappropriate, the **naive weight integer truncation** method is easier, and the go-to.

### Quantize

See [this](https://www.tensorflow.org/lite/performance/post_training_quantization), and [this](https://www.tensorflow.org/lite/performance/post_training_integer_quant) for possible integer quantization strategies.

Trying to apply **full-integer quantization** (all intermediate operations use integers, no falling back to float operations) to best simulate the hardware environment we're building

See [this](https://www.tensorflow.org/lite/guide/signatures) to learn what **signatures** are (used for creating representative datasets). A good explanation on why they're needed in this context can be found [here](https://www.tensorflow.org/lite/performance/post_training_integer_quant#convert_using_float_fallback_quantization).

In [None]:
REPR_DATASET_SIZE = 100

def representative_data_gen():
    # note: tensor-flavored generators are not encouraged, but I think they're
    # enough for our needs
    for inp in tf.data.Dataset.from_tensor_slices(train_x).batch(1).take(REPR_DATASET_SIZE):
        yield [np.float32(inp)]

In [None]:
# set up converter
converter = tf.lite.TFLiteConverter.from_keras_model(model)

# convert to tflite unquantized
tflite_model = converter.convert()

# convert to tflite quantized
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8  # signed 8-bit integer
converter.inference_output_type = tf.int8  # signed 8-bit integer
tflite_int_quant_model = converter.convert()

In [None]:
# save quantized model
tflite_int_quant_model_name = 'tflite_int_quant_model'
with open(f'{workdir}/{tflite_int_quant_model_name}.tflite', 'wb') as fp:
    fp.write(tflite_int_quant_model)

# save non-quantized model as well
tflite_model_name = 'tflite_std_model'
with open(f'{workdir}/{tflite_model_name}.tflite', 'wb') as fp:
    fp.write(tflite_model)

### Inspect

**27.05 12:40 PM**
- `larq` might not be playing nicely with tflite (see Netron below) -- weights set to $\pm127$
- the `tflite` quantization seems to be introducing some bullshit in the model; do we need to move that to hardware?

In [None]:
# inspect using prints (not really useful)
interpreter = tf.lite.Interpreter(model_content=tflite_int_quant_model)
tensor_details = interpreter.get_tensor_details()
for layer_dict in tensor_details:
    print(layer_dict)

In [None]:
# visualize the quantized model using Netron
with output.temporary():
    host, port = netron.start(f'{workdir}/{tflite_int_quant_model_name}.tflite')

output.serve_kernel_port_as_iframe(port, height='800')

### Evaluate

**27.05 13:09 PM**
- `accuracy_tf_std == accuracy_tfl_std`
- `abs(accuracy_tf_std - accuracy_tfl_int_quant) == 0.12%`
- not sure if the tested models preserve the `larq` quantization

#### Helper functions
Copy-pasted and adjusted from [here](https://www.tensorflow.org/lite/performance/post_training_integer_quant#run_the_tensorflow_lite_models)

In [None]:
# Helper function to run inference on a TFLite model
def run_tflite_model(tflite_model, test_images):
    # Initialize the interpreter
    interpreter = tf.lite.Interpreter(model_content=tflite_model)
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()[0]

    predictions = np.zeros((len(test_images),), dtype=int)
    for i, test_image in enumerate(test_images):

        # Check if the input type is quantized, then rescale input data to uint8
        if input_details['dtype'] == np.uint8:
            input_scale, input_zero_point = input_details["quantization"]
            test_image = test_image / input_scale + input_zero_point

        test_image = np.expand_dims(test_image, axis=0).astype(input_details["dtype"])
        interpreter.set_tensor(input_details["index"], test_image)
        interpreter.invoke()
        output = interpreter.get_tensor(output_details["index"])[0]

        predictions[i] = output.argmax()

    return predictions

In [None]:
# Helper function to evaluate a TFLite model on all images
def evaluate_model(tflite_model, test_images, test_labels, model_type):
    predictions = run_tflite_model(tflite_model, test_images)

    accuracy = (np.sum(test_labels== predictions) * 100) / len(test_images)

    print('%s model accuracy is %.4f%% (Number of test samples=%d)' % (
          model_type, accuracy, len(test_images)))

#### Actual evaluation

In [None]:
# evaluate integer quantized model (tflite version)
evaluate_model(tflite_int_quant_model, test_x, test_y, model_type='Quantized')

In [None]:
# evaluate non-quantized model (tflite version)
evaluate_model(tflite_model, test_x, test_y, model_type='Float')

### Create new Keras model

In [None]:
# stopped here because I succeeded obtaining an integer-weighted model
# using the other method

# Weight import
***Don't run the weight export and weight quantization steps if you run this!***

## Weight import script and execution

In [19]:
def _import_dense_weights(layer,
                           weights: List) -> Dict:

    if len(weights.keys()) == 2:
        # with biases
        # make sure this is indeed the correct order
        _dbg(f"\t\t - weights\n"
             f"\t\t - biases")
    elif len(weights.keys()) == 1:
        # without biases
        _dbg(f"\t\t - weights")
    else:
        raise ValueError('Too many weight tensors in this layer.')

    weights_list = [np.array(arr) for arr in weights.values()]
    layer.set_weights(weights_list)

In [20]:
def _import_bn_weights(layer,
                        weights: List) -> Dict:

    if len(weights.keys()) == 2:
        # no betas, gammas
        # make sure this is indeed the correct order
        _dbg(f"\t\t - means\n"
             f"\t\t - variances")
    elif len(weights.keys()) == 4:
        # with betas, gammas
        _dbg(f"\t\t - means\n"
             f"\t\t - variances\n"
             f"\t\t - betas\n"
             f"\t\t - gammas")
        raise NotImplementedError('Importing weights is not implemented for batch normalization with scaling and centering.')
    else:
        raise ValueError('Too many weight tensors in this layer.')


    weights_list = [np.array(arr) for arr in weights.values()]
    layer.set_weights(weights_list)

In [21]:
def _import_conv_weights(layer,
                          weights: Dict) -> None:

    if len(weights.keys()) == 2:
        # with biases
        # make sure this is indeed the correct order
        _dbg(f"\t\t - kernels\n"
             f"\t\t - biases")
    elif len(weights.keys()) == 1:
        # without biases
        _dbg(f"\t\t - kernels")
    else:
        raise ValueError('Too many weight tensors in this layer.')

    weights_list = [np.array(arr) for arr in weights.values()]
    layer.set_weights(weights_list)

In [22]:
def _dispatch_import_weights_layer_specific(layer,
                                            weights: Dict) -> None:
    weightless_layers = ['pool', 'flatten', 'activation']

    layer_name = layer.get_config()['name']

    if 'conv2d' in layer_name:
        _dbg(f"\t - convolutional layer <{layer_name}>")
        _import_conv_weights(layer, weights)
    elif 'batch_normalization' in layer_name:
        _dbg(f"\t - batch normalization layer <{layer_name}>")
        _import_bn_weights(layer, weights)
    elif 'dense' in layer_name:
        _dbg(f"\t - dense layer <{layer_name}>")
        _import_dense_weights(layer, weights)
    elif any([wl in layer_name for wl in weightless_layers]):
        # a supported layer that has no weights
        _dbg(f"\t - ignoring weightless layer <{layer_name}>")
        return dict()
    else:
        raise ValueError('This layer type is not supported.')

In [23]:
def import_weights(model: tf.keras.Model,
                    file_path: str,
                    file_name: str) -> None:
    """Imports the weights to the model. Make sure the neural network topology
    is the same.

    Args:
    `model` -- neural net model
    `quantized` -- set to true if you want to export `larq`-quantized weights
    `file_path` -- path to weight file.
    `file_name` -- name of the weight file.
    """

    _dbg(f"Importing weights from {file_path}/{file_name}.json")

    ret = dict()
    name_idx_dict = dict()

    with open(f'{file_path}{file_name}.json', 'r') as fp:

        # load json dict
        weights = load(fp)

        # iterate through layers
        for l, w in zip(model.layers, weights.values()):
            _dispatch_import_weights_layer_specific(l, w)

    _dbg("Weight import complete.")

In [24]:
weights_file_path = workdir
weights_file = 'weights_test'

In [25]:
model = create_lenet()
import_weights(model, file_path=weights_file_path, file_name=weights_file)

Importing weights from ./drive/MyDrive//weights_test.json
	 - convolutional layer <quant_conv2d_2>
		 - kernels
	 - ignoring weightless layer <max_pooling2d_2>
	 - batch normalization layer <batch_normalization_4>
		 - means
		 - variances
	 - convolutional layer <quant_conv2d_3>
		 - kernels
	 - ignoring weightless layer <max_pooling2d_3>
	 - batch normalization layer <batch_normalization_5>
		 - means
		 - variances
	 - ignoring weightless layer <flatten_1>
	 - dense layer <quant_dense_2>
		 - weights
	 - batch normalization layer <batch_normalization_6>
		 - means
		 - variances
	 - dense layer <quant_dense_3>
		 - weights
	 - batch normalization layer <batch_normalization_7>
		 - means
		 - variances
	 - ignoring weightless layer <activation_1>
Weight import complete.


## Weight import test
Evaluate the accuracy

In [26]:
test_loss, test_acc = model.evaluate(test_x, test_y)
print(f"Test accuracy {test_acc * 100:.2f} %")

Test accuracy 94.39 %


# Intermediate layer output inspection

In [None]:
def print_intermediate_results(model: tf.keras.Model,
                               inp: np.ndarray,
                               file: str = None,
                               binary: bool = False):
    """Runs an inference on a model, printing all the intermediate results

    Args:
    - `model` -- the model object
    - `inp` -- the input tensor (unbatched).
    - `file` -- path to a text dump; if set,
    it will not print to the standard output anymore
    (default `None`)
    - binary -- if yes, output the 32-bit binary representation
    (default `False`)
    """

    extractor_inputs = model.inputs
    extractor_outputs = [layer.output for layer in model.layers]
    layer_names = [layer.name for layer in model.layers]

    result_extractor = tf.keras.Model(
        inputs=extractor_inputs,
        outputs=extractor_outputs
    )

    results = result_extractor(inp[None, :])  # dimension expansion

    for lname, res in zip(layer_names, results):
        int_res_list = np.trunc(res.numpy()).astype(np.int32).tolist()
        str_int_res_list = [f"{el}\n" for el in int_res_list]

        print(f"========================================================\n"
              f"{lname}:\n"
            #   f"{res.numpy().astype(np.int32)}\n"
              f"{str_int_res_list}\n"
              f"========================================================")

In [None]:
print_intermediate_results(model, test_x[0])

quant_conv2d_2:
['[[[1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9]], [[1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7, 7, -3, 9], [1, -7, 1, -1, -7,

# Weight extraction

In [None]:
def _extract_dense_weights(layer_name: str,
                           weight_list: List,
                           dtype: np.dtype | str = np.float32) -> Dict:
    ret = dict()

    if len(weight_list) == 2:
        # with biases
        # make sure this is indeed the correct order
        _dbg(f"\t\t - weights\n"
             f"\t\t - biases")
        ret['weights'] = weight_list[0].astype(dtype).tolist()
        ret['biases'] = weight_list[1].astype(dtype).tolist()
    elif len(weight_list) == 1:
        # without biases
        _dbg(f"\t\t - weights")
        ret['weights'] = weight_list[0].astype(dtype).tolist()
    else:
        raise ValueError('Too many weight tensors in this layer.')

    return ret

In [None]:
def _extract_bn_weights(layer_name: str,
                        weight_list: List,
                        dtype: np.dtype | str = np.float32) -> Dict:
    ret = dict()

    if len(weight_list) == 2:
        # no betas, gammas
        # make sure this is indeed the correct order
        _dbg(f"\t\t - means\n"
             f"\t\t - variances")
        ret['means'] = weight_list[0].astype(dtype).tolist()
        ret['vars'] = weight_list[1].astype(dtype).tolist()
    elif len(weight_list) == 4:
        # with betas, gammas
        _dbg(f"\t\t - means\n"
             f"\t\t - variances\n"
             f"\t\t - betas\n"
             f"\t\t - gammas")
        raise NotImplementedError('Extracting weights is not implemented for batch normalization with scaling and centering.')
    else:
        raise ValueError('Too many weight tensors in this layer.')

    return ret

In [None]:
def _extract_conv_weights(layer_name: str,
                          weight_list: List,
                          dtype: np.dtype | str = np.float32) -> Dict:
    ret = dict()

    if len(weight_list) == 2:
        # with biases
        # make sure this is indeed the correct order
        _dbg(f"\t\t - kernels\n"
             f"\t\t - biases")
        ret['kernels'] = weight_list[0].astype(dtype).tolist()
        ret['biases'] = weight_list[1].astype(dtype).tolist()
    elif len(weight_list) == 1:
        # without biases
        _dbg(f"\t\t - kernels")
        ret['kernels'] = weight_list[0].astype(dtype).tolist()
    else:
        raise ValueError('Too many weight tensors in this layer.')

    return ret

In [None]:
def _dispatch_extract_weights_layer_specific(layer_name: str,
                                             weight_list: List,
                                             dtype: np.dtype | str = np.float32) -> Dict:
    weightless_layers = ['pool', 'flatten', 'activation']

    if 'conv2d' in layer_name:
        _dbg(f"\t - convolutional layer <{layer_name}>")
        return _extract_conv_weights(layer_name, weight_list, dtype)
    elif 'batch_normalization' in layer_name:
        _dbg(f"\t - batch normalization layer <{layer_name}>")
        return _extract_bn_weights(layer_name, weight_list, dtype)
    elif 'dense' in layer_name:
        _dbg(f"\t - dense layer <{layer_name}>")
        return _extract_dense_weights(layer_name, weight_list, dtype)
    elif any([wl in layer_name for wl in weightless_layers]):
        # a supported layer that has no weights
        _dbg(f"\t - ignoring weightless layer <{layer_name}>")
        return dict()
    else:
        raise ValueError('This layer type is not supported.')

In [None]:
def extract_weights(model: tf.keras.Model,
                    quantized: bool = False,
                    file_path: str = None,
                    file_name: str = None,
                    dtype: np.dtype | str = np.float32) -> Dict:
    """Extracts the (quantized) weights from the model.
    Optionally saves them in a file.

    Args:
    `model` -- neural net model
    `quantized` -- set to true if you want to export `larq`-quantized weights
    (default `False`)
    `file_path` -- path to resulting file. if `None`, weights are not saved
    (default `None`)
    `file_name` -- name of the resulting file (default `None`)
    `dtype` -- data type to save the weights as (default `np.float32`)

    Returns:
    `Dict` containing the weights for each layer
    """

    _dbg(f"Extracting weights {f'to {file_path}/{file_name}.json' if file_path else ''}")

    ret = dict()
    name_idx_dict = dict()

    # build weight dictionary
    with lq.context.quantized_scope(quantized):
        for layer in model.layers:
            # get layer raw name and info
            layer_cfg = layer.get_config()
            weight_list = layer.get_weights()
            weight_list_len = len(weight_list)

            # adjust layer name
            ## you want to replace the trailing index of the layer,
            ## because that increases each time you create a new model
            ## in the same session
            last_underscore_idx = layer_cfg['name'].rfind('_')
            raw_name = layer_cfg['name'][:last_underscore_idx]
            if raw_name not in name_idx_dict:
                name_idx_dict[raw_name] = 1
            else:
                name_idx_dict[raw_name] += 1
            name = f'{raw_name}_{name_idx_dict[raw_name]}'

            # add (name, (weights)) pair in dictionary
            ret[name] = _dispatch_extract_weights_layer_specific(name, weight_list, dtype)

    # save dictionary to file if requested
    if file_path is not None:
        with open(f'{file_path}{file_name}.json', 'w') as fp:
            dump(ret, fp)

    _dbg("Weight extraction complete.")

    return ret

In [None]:
weights_file_path = workdir
weights_file = 'weights_test'

In [None]:
lq_int_model_dict = extract_weights(lq_int_model, quantized=True, file_path=weights_file_path, file_name=weights_file, dtype=np.int32)

# BatchNorm compensation

TODO: adjust the weights, so that the batchnorm can be a single comparator in hardware

In [27]:
def add_to_bn(value: int, bn: Dict):
    bn["means"] = [int(np.ceil((x + value)/2)) for x in bn["means"]]

def add_compensation_to_batchnorm(weights_in: Dict | str,
                           weights_out: str) -> None:
    """Adds weigth size to batchnorm for compensation.

    Args:
    `weights_in` -- object contains weights or path to json
    `weights_out` -- the path of the output `.json` file
    """
    _dbg(f"Adding compensation to weights from "
         f"{weights_in if type(weights_in) == str else 'pre-loaded model'} "
         f"to {weights_out}")

    # load weights in memory if a file was provided
    if type(weights_in) == str:
        with open(weights_in, 'r') as fp:
            weights_in = load(fp)

    # Sanity check, don't compensate twice. Put key in skipped layer to prevent
    # serialisation issues.
    key = "COMPENSATED"
    if key in weights_in["activation_1"]:
      print("Skipping, input already compensated")
      return

    weights_in["activation_1"][key] = 1

    add_to_bn(5*5*1, weights_in["batch_normalization_1"])  ## ioan -- sure these are correct?
    add_to_bn(3*3*8, weights_in["batch_normalization_2"])
    add_to_bn(288, weights_in["batch_normalization_3"])

    # Don't compensate last batch norm,
    # add_to_bn(64, weights_in["batch_normalization_4"])

    # write the .json file
    if weights_out is not None:
        with open(f'{weights_out}', 'w') as fp:
            dump(weights_in, fp)

weights_out = f'{workdir}/weights_compensated.json'
add_compensation_to_batchnorm(f'{workdir}/weights_test.json', weights_out)


Adding compensation to weights from ./drive/MyDrive//weights_test.json to ./drive/MyDrive//weights_compensated.json


# VHDL weight file generation

**What is the objective?**

Build a script that generates a VHDL file containing the hard-coded weights.

In [114]:
def convert_to_2s_complement(num: int) -> str:
    """Converts a number to its 2's complement representation."""

    if num < 0:
        num = 2 ** 32 + num

    return f"{num:032b}"

In [115]:
def _export_quant_dense_weights_to_vhdl_weights(fp, layer_name: str, weights: np.ndarray):

    num_inputs = weights.shape[0]
    num_outputs = weights.shape[1]

    total_size = num_inputs * num_outputs

    ##convert to n_out x n_in###
    weights = weights.transpose()
    #####

    # write sizes
    fp.write(f"\tconstant {layer_name.upper()}_WEIGHTS_DIM_NUM_IN : integer := {num_inputs};\n")
    fp.write(f"\tconstant {layer_name.upper()}_WEIGHTS_DIM_NUM_OUT : integer := {num_outputs};\n")

    # write weights
    fp.write(f"\tconstant {layer_name.upper()}_WEIGHTS : std_logic_vector(0 to {total_size - 1}) := (")
    for idx, el in enumerate(weights.flatten()):
        repr_el = 0 if el == -1 else 1  # binary representation of el
        fp.write(f"{', ' if idx != 0 else str()}"
                 f"{f'{chr(10)}{chr(9)}{chr(9)}' if idx % 4 == 0 else ''}"
                 f"\'{repr_el}\'")
    fp.write("\n\t);\n\n")


def _export_dense_weights_to_vhdl_weights(fp, layer_name: str, weights: np.ndarray):

    num_inputs = weights.shape[0]
    num_outputs = weights.shape[1]

    total_size = num_inputs * num_outputs

    ##convert to n_out x n_in###
    weights = weights.transpose()
    #####

    # write sizes
    fp.write(f"\tconstant {layer_name.upper()}_WEIGHTS_DIM_NUM_IN : integer := {num_inputs};\n")
    fp.write(f"\tconstant {layer_name.upper()}_WEIGHTS_DIM_NUM_OUT : integer := {num_outputs};\n")

    # write weights
    fp.write(f"\tconstant {layer_name.upper()}_WEIGHTS : t_weight_array(0 to {total_size - 1}) := (")
    for idx, el in enumerate(weights.flatten()):
        el_2sc = (2 ** 32 + el) if el < 0 else el  # 2's compl
        fp.write(f"{', ' if idx != 0 else str()}"
                 f"{f'{chr(10)}{chr(9)}{chr(9)}' if idx % 16 == 0 else ''}"
                 f"\"{el_2sc:032b}\"")
    fp.write("\n\t);\n\n")


def _export_quant_dense_weights_to_vhdl_biases(fp, layer_name: str, biases: np.ndarray):
    raise NotImplementedError("Quantized dense layers with biases are not supported.")


def _export_dense_weights_to_vhdl_biases(fp, layer_name: str, biases: np.ndarray):
    raise NotImplementedError("Dense layers with biases are not supported.")


def _export_dense_weights_to_vhdl(fp, layer_name: str,
                                  layer_data: Dict,
                                  quantized: bool = False):

    # set functions according to the quantization flag
    if quantized:
        export_weights = _export_quant_dense_weights_to_vhdl_weights
        export_biases = _export_quant_dense_weights_to_vhdl_biases
    else:
        export_weights = _export_dense_weights_to_vhdl_weights
        export_biases = _export_dense_weights_to_vhdl_biases

    if len(layer_data.keys()) == 2:
        # with biases
        # make sure this is indeed the correct order
        weights = np.array(layer_data['weights'])
        biases = np.array(layer_data['biases'])
        _dbg(f"\t\t - weights (shape {weights.shape})\n"
             f"\t\t - biases (shape {biases.shape})")
        export_weights(fp, layer_name, weights)
        export_biases(fp, layer_name, biases)
    elif len(layer_data.keys()) == 1:
        # without biases
        weights = np.array(layer_data['weights'])
        _dbg(f"\t\t - weights (shape {weights.shape})")
        export_weights(fp, layer_name, weights)
    else:
        raise ValueError('Too many weight tensors in this layer.')

In [144]:
def _export_bn_weights_to_vhdl_means(fp, layer_name: str, means: np.ndarray):

    num_means = means.shape[0]

    total_size = num_means

    # write sizes
    fp.write(f"\tconstant {layer_name.upper()}_MEANS_NUM : integer := {num_means};\n")

    # write weights (first as std_logic)
    fp.write(f"\tconstant {layer_name.upper()}_MEANS : t_weight_array(0 to {total_size - 1}) := (")
    for idx, el in enumerate(means.flatten()):
        el_2sc = (2 ** 32 + el) if el < 0 else el  # 2's compl
        fp.write(f"{', ' if idx != 0 else str()}"
                 f"{f'{chr(10)}{chr(9)}{chr(9)}' if idx % 4 == 0 else ''}"
                 f"\"{el_2sc:032b}\"")
    fp.write("\n\t);\n\n")

    # write weights (then as std_logic_vector)
    fp.write(f"\tconstant {layer_name.upper()}_MEANS_VECTOR : std_logic_vector(0 to {total_size * 32 - 1}) := (")
    for idx, el in enumerate(means.flatten()):
        el_2sc = (2 ** 32 + el) if el < 0 else el  # 2's compl
        fp.write(f"{' & ' if idx != 0 else str()}"
                 f"{f'{chr(10)}{chr(9)}{chr(9)}' if idx % 4 == 0 else ''}"
                 f"\"{el_2sc:032b}\"")
    fp.write("\n\t);\n\n")


def _export_bn_weights_to_vhdl_vars(fp, layer_name: str,
                                    vars: np.ndarray,
                                    invert: bool = False,
                                    fixed_pos: int = 10):

    num_vars = vars.shape[0]

    total_size = num_vars

    # write sizes
    fp.write(f"\tconstant {layer_name.upper()}_VARS_NUM : integer := {num_vars};\n")

    # write weights
    fp.write(f"\tconstant {layer_name.upper()}_VARS : t_weight_array(0 to {total_size - 1}) := (")
    for idx, el in enumerate(vars.flatten()):
        el_2sc = (2 ** 32 + el) if el < 0 else el  # 2's compl
        fp.write(f"{', ' if idx != 0 else str()}"
                 f"{f'{chr(10)}{chr(9)}{chr(9)}' if idx % 4 == 0 else ''}"
                 f"\"{el_2sc:032b}\"")
    fp.write("\n\t);\n\n")

    # if you don't want fixed point inverted vars in your file, return here
    if not invert:
        return

    # compute inverted vars
    ## compute floating point inverted vars
    raw_inv_vars = (1 / vars).astype(np.float32)
    ## extract exponents of the floating representation (unbiased)
    inv_exps = np.array([frexp(el)[1] for el in raw_inv_vars])
    ## compute largest exponen
    max_inv_exp = np.max(inv_exps)
    ## compute the number of shifts
    amount_to_shift = np.abs(fixed_pos - max_inv_exp)
    ## adjust and convert to unsigned
    inv_vars = (raw_inv_vars * (2 ** amount_to_shift)).astype(np.uint32)

    _dbg(f"\t\t - shifting inverted vars {amount_to_shift} times to the left")

    # write inverted vars (as std_logic)
    fp.write(f"\tconstant {layer_name.upper()}_INV_VARS : t_weight_array(0 to {total_size - 1}) := (")
    for idx, el in enumerate(inv_vars.flatten()):
        el_2sc = (2 ** 32 + el) if el < 0 else el  # 2's compl
        fp.write(f"{', ' if idx != 0 else str()}"
                 f"{f'{chr(10)}{chr(9)}{chr(9)}' if idx % 4 == 0 else ''}"
                 f"\"{el_2sc:032b}\"")
    fp.write("\n\t);\n\n")


    # write inverted vars (as std_logic_vector)
    fp.write(f"\tconstant {layer_name.upper()}_INV_VARS_VECTOR : std_logic_vector(0 to {total_size * 32 - 1}) := (")
    for idx, el in enumerate(inv_vars.flatten()):
        el_2sc = (2 ** 32 + el) if el < 0 else el  # 2's compl
        fp.write(f"{' & ' if idx != 0 else str()}"
                 f"{f'{chr(10)}{chr(9)}{chr(9)}' if idx % 4 == 0 else ''}"
                 f"\"{el_2sc:032b}\"")
    fp.write("\n\t);\n\n")


def _export_bn_weights_to_vhdl_betas(fp, layer_name: str, betas: np.ndarray):
    raise NotImplementedError("Batch normalization layers with betas are not supported.")


def _export_bn_weights_to_vhdl_gammas(fp, layer_name: str, gammas: np.ndarray):
    raise NotImplementedError("Batch normalization layers with gammas are not supported.")


def _export_bn_weights_to_vhdl(fp, layer_name: str,
                               layer_data: Dict,
                               invert_vars: bool = False,
                               inverted_vars_prec: int = 10):

    if len(layer_data.keys()) == 2:
        # no betas, gammas
        # make sure this is indeed the correct order
        means = np.array(layer_data['means'])
        vars = np.array(layer_data['vars'])
        _dbg(f"\t\t - means (shape {means.shape})\n"
             f"\t\t - variances (shape {vars.shape})")
        _export_bn_weights_to_vhdl_means(fp, layer_name, means)
        _export_bn_weights_to_vhdl_vars(fp, layer_name, vars, invert_vars, inverted_vars_prec)
    elif len(layer_data.keys()) == 4:
        # with betas, gammas
        means = np.array(layer_data['means'])
        vars = np.array(layer_data['vars'])
        betas = np.array(layer_data['betas'])
        gammas = np.array(layer_data['gammas'])
        _dbg(f"\t\t - means (shape {means.shape})\n"
             f"\t\t - variances (shape {vars.shape})\n"
             f"\t\t - betas (shape {betas.shape})\n"
             f"\t\t - gammas (shape {gammas.shape})")
        _export_bn_weights_to_vhdl_means(fp, layer_name, means)
        _export_bn_weights_to_vhdl_vars(fp, layer_name, vars, invert_vars, inverted_vars_prec)
        _export_bn_weights_to_vhdl_betas(fp, layer_name, betas)
        _export_bn_weights_to_vhdl_gammas(fp, layer_name, gammas)
    else:
        raise ValueError('Too many weight tensors in this layer.')

In [117]:
def _export_quant_conv_weights_to_vhdl_kernels(fp, layer_name: str, kernels: np.ndarray):
    kernel_h = kernels.shape[0]
    kernel_w = kernels.shape[1]
    num_c_in = kernels.shape[2]
    num_c_out = kernels.shape[3]

    total_size = kernel_h * kernel_w * num_c_in * num_c_out

    # write sizes
    fp.write(f"\tconstant {layer_name.upper()}_KERNELS_HEIGHT : integer := {kernel_h};\n")
    fp.write(f"\tconstant {layer_name.upper()}_KERNELS_WIDTH : integer := {kernel_w};\n")
    fp.write(f"\tconstant {layer_name.upper()}_KERNELS_C_IN : integer := {num_c_in};\n")
    fp.write(f"\tconstant {layer_name.upper()}_KERNELS_C_OUT : integer := {num_c_out};\n")

    # write weights
    fp.write(f"\tconstant {layer_name.upper()}_KERNELS : std_logic_vector(0 to {total_size - 1}) := (")
    for idx, el in enumerate(kernels.flatten()):
        repr_el = 0 if el == -1 else 1  # binary representation of el
        fp.write(f"{', ' if idx != 0 else str()}"
                 f"{f'{chr(10)}{chr(9)}{chr(9)}' if idx % 16 == 0 else ''}"
                 f"\'{repr_el}\'")
    fp.write("\n\t);\n\n")


def _export_conv_weights_to_vhdl_kernels(fp, layer_name: str, kernels: np.ndarray):
    kernel_h = kernels.shape[0]
    kernel_w = kernels.shape[1]
    num_c_in = kernels.shape[2]
    num_c_out = kernels.shape[3]

    total_size = kernel_h * kernel_w * num_c_in * num_c_out

    # write sizes
    fp.write(f"\tconstant {layer_name.upper()}_KERNELS_HEIGHT : integer := {kernel_h};\n")
    fp.write(f"\tconstant {layer_name.upper()}_KERNELS_WIDTH : integer := {kernel_w};\n")
    fp.write(f"\tconstant {layer_name.upper()}_KERNELS_C_IN : integer := {num_c_in};\n")
    fp.write(f"\tconstant {layer_name.upper()}_KERNELS_C_OUT : integer := {num_c_out};\n")

    # write weights
    fp.write(f"\tconstant {layer_name.upper()}_KERNELS : t_weight_array(0 to {total_size - 1}) := (")
    for idx, el in enumerate(kernels.flatten()):
        el_2sc = (2 ** 32 + el) if el < 0 else el  # 2's compl
        fp.write(f"{', ' if idx != 0 else str()}"
                 f"{f'{chr(10)}{chr(9)}{chr(9)}' if idx % 4 == 0 else ''}"
                 f"\"{el_2sc:032b}\"")
    fp.write("\n\t);\n\n")


def _export_quant_conv_weights_to_vhdl_biases(fp, layer_name: str, biases: np.ndarray):
    raise NotImplementedError("Quantized convolutional layers with biases are not supported.")


def _export_conv_weights_to_vhdl_biases(fp, layer_name: str, biases: np.ndarray):
    raise NotImplementedError("Convolutional layers with biases are not supported.")


def _export_conv_weights_to_vhdl(fp, layer_name: str,
                                 layer_data: Dict,
                                 quantized: bool = False):

    # set functions according to the quantization flag
    if quantized:
        export_kernels = _export_quant_conv_weights_to_vhdl_kernels
        export_biases = _export_quant_conv_weights_to_vhdl_biases
    else:
        export_kernels = _export_conv_weights_to_vhdl_kernels
        export_biases = _export_conv_weights_to_vhdl_biases

    if len(layer_data.keys()) == 2:
        # with biases
        # make sure this is indeed the correct order
        kernels = np.array(layer_data['kernels'])
        biases = np.array(layer_data['biases'])
        _dbg(f"\t\t - weights (shape {kernels.shape})\n"
             f"\t\t - biases (shape {biases.shape})")
        export_kernels(fp, layer_name, kernels)
        export_biases(fp, layer_name, kernels)
    elif len(layer_data.keys()) == 1:
        # without biases
        kernels = np.array(layer_data['kernels'])
        _dbg(f"\t\t - weights (shape {kernels.shape})")
        export_kernels(fp, layer_name, kernels)
    else:
        raise ValueError('Too many weight tensors in this layer.')

In [118]:
def _dispatch_export_weights_layer_specific(fp, layer_name: str,
                                            layer_data: Dict,
                                            invert_bn_vars: bool = False,
                                            inverted_bn_vars_prec: int = 10) -> None:
    weightless_layers = ['pool', 'flatten', 'activation']

    if 'quant_conv2d' in layer_name:
        _dbg(f"\t - quantized convolutional layer <{layer_name}>")
        fp.write(f"\t--- Layer <{layer_name}>\n")
        _export_conv_weights_to_vhdl(fp, layer_name, layer_data, quantized=True)
    elif 'conv2d' in layer_name:
        _dbg(f"\t - convolutional layer <{layer_name}>")
        # write layer name
        fp.write(f"\t--- Layer <{layer_name}>\n")
        _export_conv_weights_to_vhdl(fp, layer_name, layer_data)
    elif 'batch_normalization' in layer_name:
        _dbg(f"\t - batch normalization layer <{layer_name}>")
        # write layer name
        fp.write(f"\t--- Layer <{layer_name}>\n")
        _export_bn_weights_to_vhdl(fp, layer_name, layer_data, invert_bn_vars, inverted_bn_vars_prec)
    elif 'quant_dense' in layer_name:
        _dbg(f"\t - quantized dense layer <{layer_name}>")
        # write layer name
        fp.write(f"\t--- Layer <{layer_name}>\n")
        _export_dense_weights_to_vhdl(fp, layer_name, layer_data, quantized=True)
    elif 'dense' in layer_name:
        _dbg(f"\t - dense layer <{layer_name}>")
        # write layer name
        fp.write(f"\t--- Layer <{layer_name}>\n")
        _export_dense_weights_to_vhdl(fp, layer_name, layer_data)
    elif any([wl in layer_name for wl in weightless_layers]):
        # a supported layer that has no weights
        _dbg(f"\t - ignoring weightless layer <{layer_name}>")
        pass
    else:
        raise ValueError('This layer type is not supported.')

In [119]:
def export_weights_to_vhdl(model: Dict | str,
                           vhdl_out_path: str,
                           vhdl_out: str,
                           invert_bn_vars: bool = False,
                           inverted_bn_vars_prec: int = -10) -> None:
    """Hardcodes neural net model weights into a `.vhd` file.
    Only supports integer weights for now.

    Args:
    `model` -- object containing the weights. if this is a string,
    then it should point to a `.json` file containing the weights
    `vhdl_out_path` -- the path to the output `.vhd` file
    `vhdl_out` -- the name of the output `.vhd` file
    `invert_bn_vars` -- if set, batch normalization layer variances are also
    exported inverted, to make it easier for hardware multiplication
    (default `False`)
    `inverted_bn_vars_prec` -- the fixed decimal precision of the inverted variances
    """

    _dbg(f"Exporting weights from "
         f"{model if type(model) == str else 'pre-loaded model'} "
         f"to {vhdl_out_path}/{vhdl_out}.vhd")

    # load model weights in memory if a file was provided
    if type(model) == str:
        with open(model, 'r') as fp:
            model = load(fp)

    # write the .vhd file
    full_vhdl_out_path = f'{vhdl_out_path}/{vhdl_out}.vhd'
    with open(full_vhdl_out_path, 'w') as fp:

        # write vhdl package header
        fp.write(f"library IEEE;\n"
                 f"use IEEE.STD_LOGIC_1164.ALL;\n\n"
                 f"package WeightsPack is\n"
                 f"\t-- Weight array type definition\n")

        # write array type declarations
        fp.write(f"\ttype t_weight_array is array (integer range <>) of std_logic_vector(31 downto 0);\n"
                 f"\ttype t_quant_weight_array is array (integer range <>) of std_logic;\n\n")

        # export the weights of each layer based on the layer type
        for layer_name, layer_data in model.items():
            _dispatch_export_weights_layer_specific(fp,
                                                    layer_name,
                                                    layer_data,
                                                    invert_bn_vars,
                                                    inverted_bn_vars_prec)

        # write vhdl package end footer
        fp.write(f"end package WeightsPack;\n")

    _dbg("Weight export complete.")

In [140]:
vhdl_out_path = workdir
vhdl_out = 'weights'
invert_bn_vars = True
inverted_bn_vars_prec = 6

In [143]:
# export_weights_to_vhdl(lq_int_model_dict, vhdl_out_path, vhdl_out)
# export_weights_to_vhdl(f'{workdir}/weights_test.json', vhdl_out_path, vhdl_out)
export_weights_to_vhdl(f'{workdir}/weights_compensated.json',
                       vhdl_out_path,
                       vhdl_out,
                       invert_bn_vars,
                       inverted_bn_vars_prec)

Exporting weights from ./drive/MyDrive//weights_compensated.json to ./drive/MyDrive//weights.vhd
	 - quantized convolutional layer <quant_conv2d_1>
		 - weights (shape (5, 5, 1, 8))
	 - ignoring weightless layer <max_pooling2d_1>
	 - batch normalization layer <batch_normalization_1>
		 - means (shape (8,))
		 - variances (shape (8,))
		 - shifting inverted vars 9 times to the left
	 - quantized convolutional layer <quant_conv2d_2>
		 - weights (shape (3, 3, 8, 32))
	 - ignoring weightless layer <max_pooling2d_2>
	 - batch normalization layer <batch_normalization_2>
		 - means (shape (32,))
		 - variances (shape (32,))
		 - shifting inverted vars 12 times to the left
	 - ignoring weightless layer <flatten_1>
	 - quantized dense layer <quant_dense_1>
		 - weights (shape (288, 64))
	 - batch normalization layer <batch_normalization_3>
		 - means (shape (64,))
		 - variances (shape (64,))
		 - shifting inverted vars 14 times to the left
	 - quantized dense layer <quant_dense_2>
		 - weight

# VHDL inference input file generation

## Input image integer quantization

Test if ceiling the pixel values significantly changes the input image

In [None]:
img = test_x[0]
quant_img = np.empty_like(img, dtype=np.int32)
# quan_bin_img = np.empty_like(img, dtype=np.int8)
np.ceil(img, out=quant_img, casting='unsafe')
quant_bin_img = (quant_img >= 0)

fig, axs = plt.subplots(1, 3)

# unquantized
axs[0].imshow(img, cmap='gray')
axs[0].set_title('Unquantized')

# quantized
axs[1].imshow(quant_img, cmap='gray')
axs[1].set_title('Quantized')

# quantized binary
axs[2].imshow(quant_bin_img, cmap='gray')
axs[2].set_title('Quantized binary')

## Extract the  image into a file

In [167]:
def _export_mnist_example_row_to_vhdl(fp,
                                      row: np.ndarray,
                                      last: bool = False,
                                      invert: bool = False) -> None:
    row = np.squeeze(row, 0)
    str_row = ', '.join(f"\"{convert_to_2s_complement(el)}\"" for el in row)
    fp.write(f"\t\t{str_row}{'' if last else ','}\n")


def _export_mnist_example_row_to_vhdl_binary(fp,
                                             row: np.ndarray,
                                             last: bool = False,
                                             invert: bool = False) -> None:
    row = np.squeeze(row, 0)
    binary_row = row >= 0
    str_binary_row = f"{''.join(f'{abs(int(invert) - int(el))}' for el in binary_row)}"
    fp.write(f"\t\t\"{str_binary_row}\"{'' if last else ' &'}\n")

In [168]:
def export_mnist_example_to_vhdl(ex: np.ndarray,
                           vhdl_out_path: str,
                           vhdl_out: str,
                           binary: bool = False,
                           invert: bool = False) -> None:
    """Hardcodes an example from the MNIST dataset into a `.vhd` file.
    Quantizes the values to integers.

    Args:
    `ex` -- array representing the input example
    `vhdl_out_path` -- the path to the output `.vhd` file
    `vhdl_out` -- the name of the output `.vhd` file
    `binary` -- if true, export the example as binary (default `False`)
    `invert` -- if true, will invert the bits in
    the representation (default `False`)
    """

    MNIST_W = 28
    MNIST_H = 28

    _dbg(f"Exporting example "
         f"to {vhdl_out_path}/{vhdl_out}.vhd")

    if binary:
        array_el_type = None
        export_row = _export_mnist_example_row_to_vhdl_binary
    else:
        array_el_type = "std_logic_vector(31 downto 0)"
        export_row = _export_mnist_example_row_to_vhdl

    # squeeze the channel dimension, since MNIST has only one channel
    ex = np.squeeze(ex, -1)

     # write the .vhd file
    full_vhdl_out_path = f'{vhdl_out_path}/{vhdl_out}.vhd'
    with open(full_vhdl_out_path, 'w') as fp:

        # write vhdl package header
        fp.write(f"library IEEE;\n"
                 f"use IEEE.STD_LOGIC_1164.ALL;\n\n"
                 f"package InputExamplePack is\n"
                 f"\t-- Input array type definition\n")

        if not binary:
            # write array type declarations
            fp.write(f"\ttype t_input_array is array (0 to ({MNIST_H} * {MNIST_W} - 1)) of {array_el_type};\n")

        # quantize input image
        quant_ex = np.empty_like(ex, dtype=np.int32)
        np.ceil(ex, out=quant_ex, casting='unsafe')

        if not binary:
            # write the constant array declaration
            fp.write(f"\tconstant example_in : t_input_array := (\n")
        else:
            fp.write(f"\tconstant example_in : std_logic_vector(0 to ({MNIST_H} * {MNIST_W} - 1)) := (\n")

        # export the input example
        for idx, row in enumerate(np.split(quant_ex, MNIST_H)):
            export_row(fp, row, last=(idx == MNIST_W - 1), invert=invert)

        fp.write("\t);\n")

        # write vhdl package end footer
        fp.write(f"end package InputExamplePack;\n")

    _dbg("Example export complete.")

In [151]:
example_vhdl_out_path = workdir
example_vhdl_out = 'example_test'
example_vhdl_out_binary = 'example_test_binary'

In [159]:
export_mnist_example_to_vhdl(test_x[1], example_vhdl_out_path, example_vhdl_out)

Exporting example to ./drive/MyDrive//example_test.vhd
Example export complete.


In [169]:
export_mnist_example_to_vhdl(test_x[1], example_vhdl_out_path, example_vhdl_out_binary, binary=True, invert=True)

Exporting example to ./drive/MyDrive//example_test_binary.vhd
Example export complete.


## Export all test examples


In [178]:
!cd $workdir & mkdir $workdir/mnist_test_vhd

In [179]:
path = f"{workdir}/mnist_test_vhd/"
for idx, (ex, label) in enumerate(zip(test_x, test_y)):
    filename = f"example_{idx}_label_{label}"
    export_mnist_example_to_vhdl(ex, path, filename, binary=True, invert=True)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Exporting example to ./drive/MyDrive//mnist_test_vhd//example_7500_label_8.vhd
Example export complete.
Exporting example to ./drive/MyDrive//mnist_test_vhd//example_7501_label_3.vhd
Example export complete.
Exporting example to ./drive/MyDrive//mnist_test_vhd//example_7502_label_8.vhd
Example export complete.
Exporting example to ./drive/MyDrive//mnist_test_vhd//example_7503_label_6.vhd
Example export complete.
Exporting example to ./drive/MyDrive//mnist_test_vhd//example_7504_label_7.vhd
Example export complete.
Exporting example to ./drive/MyDrive//mnist_test_vhd//example_7505_label_0.vhd
Example export complete.
Exporting example to ./drive/MyDrive//mnist_test_vhd//example_7506_label_0.vhd
Example export complete.
Exporting example to ./drive/MyDrive//mnist_test_vhd//example_7507_label_1.vhd
Example export complete.
Exporting example to ./drive/MyDrive//mnist_test_vhd//example_7508_label_2.vhd
Example export complete.

# Comparison between label and predicted value

Use this to see if the results are consistent with the hardware implementation

In [185]:
dump_file = 'mnist_test_sw_dump.txt'

with open(f"{workdir}/{dump_file}", 'w') as fp:
    for idx, (ex, label) in enumerate(zip(test_x, test_y)):
        pred_label = np.argmax(model(ex[None, :]))
        fp.write(f"Example #{idx} -- {'CORRECT' if label == pred_label else 'INCORRECT'} actual {label}; predicted {pred_label}\n")

# Playground

Fuck around here

In [None]:
# for layer in model.layers:
    # print(layer.get_config(), layer.get_weights())

with lq.context.quantized_scope(True):
  for layer in model.layers:

    # maybe take the config dict and remove some keys; only keep the name of the layer, maybe the kernel size and the weights
    # print(layer.get_config(), layer.get_weights())
    print(layer.get_weights())