In [26]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, MaxPool2D, Input, ReLU, GlobalAveragePooling2D, Flatten, Dense, Activation
from tensorflow.keras import Model

### CMSIS NN quantization
https://community.arm.com/developer/ip-products/processors/b/processors-ip-blog/posts/deploying-convolutional-neural-network-on-cortex-m-with-cmsis-nn

In [17]:
"""
quantize numpy array
return:
q_x: quantized array
fp_x: reverse quantized array to float
"""
def quantize_array(x, bit_depth=8):
    min_x = x.min() 
    max_x = x.max()

    #find number of integer bits to represent this range
    int_bits = int(np.ceil(np.log2(max(abs(min_x),abs(max_x)))))
    
    if int_bits < 0: int_bits = 0

    frac_bits = bit_depth - 1 - int_bits #remaining bits are fractional bits (1-bit for sign)

    #floating point weights are scaled and rounded to [-2^(bit-1),2^(bit-1)-1], which are used in 
    #the fixed-point operations on the actual hardware (i.e., microcontroller)
    q_x = np.round(x*(2**frac_bits))

    #To quantify the impact of quantized weights, scale them back to
    # original range to run inference using quantized weights
    fp_x = q_x/(2**frac_bits)
    
    return q_x, fp_x, int_bits, frac_bits


"""
compute frac_bits based on max and min list
used for activation(feature map) quantization
"""
def min_max_quantize(max_list, min_list, bit_depth=8):
    frac_list = []
    for i in range(len(max_list)):
        int_bits = int(np.ceil(np.log2(max(abs(min_list[i]),abs(max_list[i])))))
        if int_bits < 0: int_bits = 0
        frac_bits = bit_depth - 1 - int_bits #remaining bits are fractional bits (1-bit for sign)
        frac_list.append(frac_bits)
    return frac_list

### Original floating-point model

In [10]:
def Net():
    x_in = Input(shape=(4096, 1, 1))
    
    x = Conv2D(filters=16, kernel_size=(9, 1), strides=1, padding='valid')(x_in)
    x = ReLU()(x)
    x = Conv2D(filters=16, kernel_size=(9, 1), strides=1, padding='valid')(x)
    x = ReLU()(x)
    x = MaxPool2D(pool_size=(16, 1))(x)
    
    x = Conv2D(filters=32, kernel_size=(3, 1), strides=1, padding='valid')(x)
    x = ReLU()(x)
    x = Conv2D(filters=32, kernel_size=(3, 1), strides=1, padding='valid')(x)
    x = ReLU()(x)
    x = MaxPool2D(pool_size=(4, 1))(x)
    
    x = Conv2D(filters=32, kernel_size=(3, 1), strides=1, padding='valid')(x)
    x = ReLU()(x)
    x = Conv2D(filters=32, kernel_size=(3, 1), strides=1, padding='valid')(x)
    x = ReLU()(x)
    x = MaxPool2D(pool_size=(4, 1))(x)
    
    x = Conv2D(filters=64, kernel_size=(3, 1), strides=1, padding='valid')(x)
    x = ReLU()(x)
    x = Conv2D(filters=64, kernel_size=(3, 1), strides=1, padding='valid')(x)
    x = ReLU()(x)
    x = MaxPool2D(pool_size=(4, 1))(x)
    
    x = GlobalAveragePooling2D()(x)
    x = Flatten()(x)
    
    x = Dense(64)(x)
    x = ReLU()(x)
    x = Dense(512)(x)
    x = ReLU()(x)
    x = Dense(1)(x)
    x_out = Activation('sigmoid')(x)
    return Model(x_in, x_out)


model = Net()

# Load the dataset
x = np.load("./data/input.npy")
y = np.load("./data/target.npy")
x = np.expand_dims(np.expand_dims(np.squeeze(x), -1), -1)

print("Input shape: ", x.shape)
print("Output shape: ", y.shape)

# Load pre-trained weight
model.load_weights("./weights/tf.h5")

# Evaluation (float 32)
length = y.shape[0]
positives = 0
for i in range(length):
    pred = model(x[i:i+1])
    pred = (pred > 0.5)
    positives += np.sum(y[i]==pred)

print("Accuracy: ", positives / length)

Input shape:  (14544, 4096, 1, 1)
Output shape:  (14544,)
Accuracy:  0.8620737073707371


### Redefine the model

1. define layer names

2. add bitwise shifting

In [16]:
def QNet(bit=32, shift_list=[]):
    
    shift = False
    if bit != 32:
        shift = True
        
    x_in = Input(shape=(4096, 1, 1), name="input")
    
    x = Conv2D(filters=16, kernel_size=(9, 1), strides=1, padding='valid', name="conv1")(x_in)
    if shift:
        x = x / 2**shift_list[0]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    x = ReLU()(x)
    
    x = Conv2D(filters=16, kernel_size=(9, 1), strides=1, padding='valid', name="conv2")(x)
    if shift:
        x = x / 2**shift_list[1]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    x = ReLU()(x)
    
    x = MaxPool2D(pool_size=(16, 1))(x)
    
    x = Conv2D(filters=32, kernel_size=(3, 1), strides=1, padding='valid', name="conv3")(x)
    if shift:
        x = x / 2**shift_list[2]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    x = ReLU()(x)
    
    x = Conv2D(filters=32, kernel_size=(3, 1), strides=1, padding='valid', name="conv4")(x)
    if shift:
        x = x / 2**shift_list[3]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    x = ReLU()(x)
    
    x = MaxPool2D(pool_size=(4, 1))(x)
    
    x = Conv2D(filters=32, kernel_size=(3, 1), strides=1, padding='valid', name="conv5")(x)
    if shift:
        x = x / 2**shift_list[4]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    x = ReLU()(x)
    
    x = Conv2D(filters=32, kernel_size=(3, 1), strides=1, padding='valid', name="conv6")(x)
    if shift:
        x = x / 2**shift_list[5]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    x = ReLU()(x)
    
    x = MaxPool2D(pool_size=(4, 1))(x)
    
    x = Conv2D(filters=64, kernel_size=(3, 1), strides=1, padding='valid', name="conv7")(x)
    if shift:
        x = x / 2**shift_list[6]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    x = ReLU()(x)
    
    x = Conv2D(filters=64, kernel_size=(3, 1), strides=1, padding='valid', name="conv8")(x)
    if shift:
        x = x / 2**shift_list[7]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    x = ReLU()(x)
    
    x = MaxPool2D(pool_size=(4, 1), name="max_pool")(x)
    
    x = GlobalAveragePooling2D(name="pool")(x)
    if shift:
        x = tf.floor(x)
    
    x = Dense(64, name="fc1")(x)
    if shift:
        x = x / 2**shift_list[8]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    x = ReLU()(x)
    
    x = Dense(512, name="fc2")(x)
    if shift:
        x = x / 2**shift_list[9]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    x = ReLU()(x)
    
    x = Dense(1, name="fc3")(x)
    if shift:
        x = x / 2**shift_list[10]
        x = tf.floor(x)
        x = tf.clip_by_value(x, -2**(bit-1), 2**(bit-1)-1)
    if not shift:
        x_out = Activation('sigmoid')(x)
    else:
        x_out = x
    
    return Model(x_in, x_out)

model = QNet()
model.load_weights("./weights/tf.h5")

# get layer names
layer_names = []
for layer in model.layers:
    if len(layer.get_weights()) != 0:
        layer_names.append(layer.name)
print("Layer names: ", layer_names)
layer_num = len(layer_names)

Layer names:  ['conv1', 'conv2', 'conv3', 'conv4', 'conv5', 'conv6', 'conv7', 'conv8', 'fc1', 'fc2', 'fc3']


### Get intermediate feature maps to compute:

1. the max and min value of each layer's output
2. the quantization format

In [19]:
intermediate_output = [model.get_layer("input").output]
for i in range(layer_num):
    intermediate_output.append(model.get_layer(layer_names[i]).output)
intermediate_layer_model = Model(inputs = model.input, outputs = intermediate_output)


x = np.load("./data/input.npy")
x = np.expand_dims(np.expand_dims(np.squeeze(x), -1), -1)
length = x.shape[0]

frac_bits_output_list = []
max_list = []
min_list = []
for i in range(length):
    pred = intermediate_layer_model(x[i: i+1])
    for i in range(len(pred)):
        pred_np = pred[i].numpy()
        max_np = pred_np.max()
        min_np = pred_np.min()
        if len(max_list) != len(pred):
            max_list.append(max_np)
            min_list.append(min_np)
        else:
            if max_np > max_list[i]:
                max_list[i] = max_np
            if min_np < min_list[i]:
                min_list[i] = min_np
                

bit_depth = 16
frac_bits_activation_list = min_max_quantize(max_list, min_list, bit_depth=bit_depth)

print("input: ", frac_bits_activation_list[0])
for i in range(layer_num):
    print(i+1, "  ", layer_names[i], "  ", frac_bits_activation_list[i+1])

input:  15
1    conv1    14
2    conv2    12
3    conv3    11
4    conv4    11
5    conv5    10
6    conv6    10
7    conv7    9
8    conv8    7
9    fc1    7
10    fc2    8
11    fc3    9


### Quantize weight and bias

In [20]:
bit_depth = 16
frac_bits_weight_list = []
frac_bits_bias_list = []
q_weight_list = []
q_bias_list = []

for i in range(layer_num):
    weight, bias = model.get_layer(layer_names[i]).get_weights()
    q_weight, f_weight, int_bits_weight, frac_bits_weight = quantize_array(weight, bit_depth=bit_depth)
    q_bias, f_bias, int_bits_bias, frac_bits_bias = quantize_array(bias, bit_depth=bit_depth)
    
    q_weight_list.append(q_weight)
    q_bias_list.append(q_bias)
    frac_bits_weight_list.append(frac_bits_weight)
    frac_bits_bias_list.append(frac_bits_bias)
    print("**********")
    print(layer_names[i] + " - - weight - - Q"+str(int_bits_weight)+"."+str(frac_bits_weight))
    print(layer_names[i] + " - - bias - - Q"+str(int_bits_bias)+"."+str(frac_bits_bias))

**********
conv1 - - weight - - Q0.15
conv1 - - bias - - Q0.15
**********
conv2 - - weight - - Q0.15
conv2 - - bias - - Q0.15
**********
conv3 - - weight - - Q1.14
conv3 - - bias - - Q0.15
**********
conv4 - - weight - - Q0.15
conv4 - - bias - - Q0.15
**********
conv5 - - weight - - Q0.15
conv5 - - bias - - Q0.15
**********
conv6 - - weight - - Q0.15
conv6 - - bias - - Q0.15
**********
conv7 - - weight - - Q0.15
conv7 - - bias - - Q0.15
**********
conv8 - - weight - - Q0.15
conv8 - - bias - - Q0.15
**********
fc1 - - weight - - Q0.15
fc1 - - bias - - Q0.15
**********
fc2 - - weight - - Q0.15
fc2 - - bias - - Q0.15
**********
fc3 - - weight - - Q0.15
fc3 - - bias - - Q0.15


### Calculate bias shifting and output shifting

In [21]:
bias_shift_list = np.array(frac_bits_weight_list) + np.array(frac_bits_activation_list[0:-1]) - np.array(frac_bits_bias_list)
output_shift_list = np.array(frac_bits_weight_list) + np.array(frac_bits_activation_list[0:-1]) - np.array(frac_bits_activation_list[1:])
print(bias_shift_list)
print(output_shift_list)

[15 14 11 11 11 10 10  9  7  7  8]
[16 17 15 15 16 15 16 17 15 14 14]


### save the quantized weight and bias in 'weight.h'

In [22]:
# reorder the weight tensor
reordered_weight_list = []
for i in range(len(q_weight_list)):
    if len(q_weight_list[i].shape) == 4:  # CONV layer
        reordered_weight_list.append(np.moveaxis(q_weight_list[i], 2, 0))
    else:  # FC layer
        reordered_weight_list.append(np.moveaxis(q_weight_list[i], 1, 0))
        
# flatten the weight tensor
reordered_weight_flatten_list = []
for i in range(len(q_weight_list)):
    if len(reordered_weight_list[i].shape) == 4: # CONV layer
        reordered_weight_flatten_list.append(reordered_weight_list[i].flatten('F'))
    else: # FC layer
        reordered_weight_flatten_list.append(reordered_weight_list[i].flatten())

# save
f = open("./weight.h", "w")
for i in range(len(q_weight_list)):
    print("********" + layer_names[i] + "********")
    print("Bias Shift: ", bias_shift_list[i])
    print("Output Shift:  ", output_shift_list[i])
    print("Bias:  ", q_bias_list[i].astype(np.int16))
    print("Weight:  ", reordered_weight_flatten_list[i].astype(np.int16))
    
    f.write("#define " + layer_names[i].upper()+ "_WT {" + str(reordered_weight_flatten_list[i].astype(np.int16).tolist())[1:-1] + "}\n")
    f.write("#define " + layer_names[i].upper() + "_BIAS {" + str(q_bias_list[i].astype(np.int16).tolist())[1:-1] + "}\n")
    f.write("#define " + layer_names[i].upper() + "_BIAS_LSHIFT " + str(bias_shift_list[i]) + "\n")
    f.write("#define " + layer_names[i].upper() + "_OUT_RSHIFT " + str(output_shift_list[i]) + "\n\n")
    
f.close()

### test the quantized model in python

In [27]:
# define the model with output shifting
q_model = QNet(bit=16, shift_list=output_shift_list)

# set quantized weight and bias. (bias is shfited)
for i in range(layer_num):
    q_model.get_layer(layer_names[i]).set_weights([q_weight_list[i], q_bias_list[i]*(2**bias_shift_list[i])])

# evaluation
length = y.shape[0]
positives = 0
for i in range(length):
    pred = q_model(np.round(x[i: i+1]*(2**frac_bits_activation_list[0])))  # quantize the input
    pred = (pred > 0)
    positives += np.sum(y[i]==pred)

print("Accuracy after quantization: ", positives / length)

### generate sample input and output, so we can verify the results in C++

In [29]:
intermediate_output = []
intermediate_output.append(q_model.get_layer('input').output)

for i in range(layer_num):
    intermediate_output.append(q_model.get_layer(layer_names[i]).output)

q_intermediate_layer_model = Model(inputs = q_model.input, outputs = intermediate_output)

sample_output = q_intermediate_layer_model.predict(np.round(x[0: 1]*(2**frac_bits_activation_list[0])))

# save
f = open("./sample_input_output.h", "w")
temp = np.squeeze(sample_output[0], axis=0)
temp = temp.transpose(2, 0, 1)
temp = temp.flatten("F")
f.write("#define " + "INPUT_DATA {" + str(temp.astype(np.int16).tolist())[1:-1] + "}\n")

for i in range(layer_num):
    temp = np.squeeze(sample_output[i+1], axis=0)
    print(temp.shape)
    if len(temp.shape) == 3:
        temp = temp.transpose(2, 0, 1)
        temp = temp.flatten("F")
    
    temp = temp.astype(np.int32) >> output_shift_list[i]
    f.write("#define " + layer_names[i].upper()+ " {" + str(temp.tolist())[1:-1] + "}\n") 

f.close()