In [1]:
from pynq import Overlay, allocate
import numpy as np
import time

In [2]:
class Matmul_hw_32x8(Overlay):
    def __init__(self, bitfile, **kwargs):
        super().__init__(bitfile, **kwargs)
        
        self.matmul_in1 = self.axi_dma_0
        self.matmul_in2 = self.axi_dma_1
        self.matmul_out = self.axi_dma_2
    
    def matmul(self, in1, in2):
        
        in1_buffer = allocate(shape=(in1.shape[0],in1.shape[1]), dtype="float32")
        in2_buffer = allocate(shape=(in2.shape[0],in2.shape[1]), dtype="int8")
        out_buffer = allocate(shape=(in1.shape[0],in2.shape[1]), dtype="float32")
        
        np.copyto(in1_buffer, in1)
        np.copyto(in2_buffer, in2)
        
        self.matmul_in1.sendchannel.transfer(in1_buffer)
        self.matmul_in2.sendchannel.transfer(in2_buffer)
        self.matmul_out.recvchannel.transfer(out_buffer)
        self.matmul_in1.sendchannel.wait()
        self.matmul_in2.sendchannel.wait()
        self.matmul_out.recvchannel.wait()
        
        return out_buffer

In [None]:
hw_32x8 = Matmul_hw_32x8("matmul_32x8.bit")

In [4]:
mnist = np.load("mnist-original.npy", allow_pickle= True)

x = mnist.item().get("data").T / 255
y = mnist.item().get("label")[0]

Below is weight quantized

In [5]:
print("<weight quantized model, wl=1~8>")
def feed_forward_hw(X0):
    X1 = np.matmul(X0, fc1w.T)
    A1 = np.tanh(X1)
    
    HW_result = hw_32x8.matmul(A1, fc2w.T)
    X2 = HW_result

    A2 = np.tanh(X2)

    X3 = np.matmul(A2, fc3w.T)
    return X3


for wl in range(1, 9):
    file_name = f'quantized_model_wl={wl}.npy'
    weights = np.load(file_name, allow_pickle=True)

    fc1w = weights.item().get('fc1w')
    # fc1b = weights.item().get('fc1b')

    fc2w = weights.item().get('fc2w')
    # fc2b = weights.item().get('fc2b')

    fc3w = weights.item().get('fc3w')
    # fc3b = weights.item().get('fc3b')

    batch_size = 64

    fc2w = fc2w.astype("int8")
    #print(f"Data type: {fc2w.dtype}")
    
    prediction = []
    
    start_time = time.time()
    for idx in range(len(x)//batch_size):
        xs = x[batch_size * idx:batch_size * idx + batch_size]
        # batch_size, 784
        ys = y[batch_size * idx:batch_size*idx + batch_size]
        # 1 dimension list : (batch_size,)
        outputs = feed_forward_hw(xs) # (batch_size, 10)
        for output, yk in zip(outputs, ys):
            # zip() iterates over the outputs and ys lists in parallel.
            prediction.append(np.argmax(output) == yk)
            # argmax dim = 0 : coloumn direction (garo)
        #print(prediction[idx])
        
    end_time = time.time()
    
    score = np.mean(prediction) * 100
    
    print("wl={}".format(wl))
    print(f"Execution time: {end_time - start_time:.2f} seconds")
    print("prediction accuracy is {:.2f}".format(score))
    print(' ')

<weight quantized model, wl=1~8>
wl=1
Execution time: 122.88 seconds
prediction accuracy is 13.14
 
wl=2
Execution time: 124.55 seconds
prediction accuracy is 85.75
 
wl=3
Execution time: 122.17 seconds
prediction accuracy is 96.61
 
wl=4
Execution time: 122.34 seconds
prediction accuracy is 97.11
 
wl=5
Execution time: 120.73 seconds
prediction accuracy is 97.11
 
wl=6
Execution time: 122.41 seconds
prediction accuracy is 97.11
 
wl=7
Execution time: 120.79 seconds
prediction accuracy is 97.11
 
wl=8
Execution time: 122.32 seconds
prediction accuracy is 97.11
 


Below is not quantized any parameters

In [6]:

class Matmul_hw_32x32(Overlay):
    def __init__(self, bitfile, **kwargs):
        super().__init__(bitfile, **kwargs)
        
        self.matmul_in1 = self.axi_dma_0
        self.matmul_in2 = self.axi_dma_1
        self.matmul_out = self.axi_dma_2
    
    def matmul(self, in1, in2):
        
        in1_buffer = allocate(shape=(in1.shape[0],in1.shape[1]), dtype="float32")
        in2_buffer = allocate(shape=(in2.shape[0],in2.shape[1]), dtype="float32")
        out_buffer = allocate(shape=(in1.shape[0],in2.shape[1]), dtype="float32")
        
        np.copyto(in1_buffer, in1)
        np.copyto(in2_buffer, in2)
        
        self.matmul_in1.sendchannel.transfer(in1_buffer)
        self.matmul_in2.sendchannel.transfer(in2_buffer)
        self.matmul_out.recvchannel.transfer(out_buffer)
        self.matmul_in1.sendchannel.wait()
        self.matmul_in2.sendchannel.wait()
        self.matmul_out.recvchannel.wait()
        
        return out_buffer

In [None]:
hw_32x32 = Matmul_hw_32x32("matmul_32x32.bit")

In [8]:
def feed_forward_hw_32x32(X0):
    X1 = np.matmul(X0, fc1w.T)
    A1 = np.tanh(X1)
    
    HW_result = hw_32x32.matmul(A1, fc2w.T)
    X2 = HW_result

    A2 = np.tanh(X2)

    X3 = np.matmul(A2, fc3w.T)
    return X3

weights = np.load('float32_model.npy', allow_pickle=True)

fc1w = weights.item().get('fc1w')
fc2w = weights.item().get('fc2w')
fc3w = weights.item().get('fc3w')

batch_size = 64
    
prediction = []

start_time = time.time()
for idx in range(len(x)//batch_size):
    xs = x[batch_size * idx:batch_size * idx + batch_size]
    # batch_size, 784
    ys = y[batch_size * idx:batch_size*idx + batch_size]
    # 1 dimension list : (batch_size,)
    outputs = feed_forward_hw_32x32(xs) # (batch_size, 10)
    for output, yk in zip(outputs, ys):
        # zip() iterates over the outputs and ys lists in parallel.
        prediction.append(np.argmax(output) == yk)
        # argmax dim = 0 : coloumn direction (garo)
    #print(prediction[idx])
    
end_time = time.time()
print("<no_qunatized_model>")
print(f"Execution time: {end_time - start_time:.2f} seconds")

score = np.mean(prediction) * 100

print("Prediction accuracy is {}".format(score))
print(' ')

<no_qunatized_model>
Execution time: 142.97 seconds
Prediction accuracy is 97.58691674290942
 


below is active and weight both quantized to int8.

In [9]:

class Matmul_hw_8x8(Overlay):
    def __init__(self, bitfile, **kwargs):
        super().__init__(bitfile, **kwargs)
        
        self.matmul_in1 = self.axi_dma_0
        self.matmul_in2 = self.axi_dma_1
        self.matmul_out = self.axi_dma_2
    
    def matmul(self, in1, in2):
        
        in1_buffer = allocate(shape=(in1.shape[0],in1.shape[1]), dtype="int8")
        in2_buffer = allocate(shape=(in2.shape[0],in2.shape[1]), dtype="int8")
        out_buffer = allocate(shape=(in1.shape[0],in2.shape[1]), dtype="int8")
        
        np.copyto(in1_buffer, in1)
        np.copyto(in2_buffer, in2)
        
        self.matmul_in1.sendchannel.transfer(in1_buffer)
        self.matmul_in2.sendchannel.transfer(in2_buffer)
        self.matmul_out.recvchannel.transfer(out_buffer)
        self.matmul_in1.sendchannel.wait()
        self.matmul_in2.sendchannel.wait()
        self.matmul_out.recvchannel.wait()
        
        return out_buffer

In [None]:
hw_8x8 = Matmul_hw_8x8("matmul_8x8.bit")

In [None]:
print("<both quantized model : 8bits active & weigth>")

weights = np.load('quantized_model_wl=8.npy',allow_pickle=True)

fc1w = weights.item().get('fc1w')
fc2w = weights.item().get('fc2w')
fc3w = weights.item().get('fc3w')

batch_size = 64

fc2w = fc2w.astype("int8")

def fixed_point_quantize(x, wl, fl, clamp=True, symmetric=True):
    scale = 2**(-fl)
    if symmetric:
        min_val = -2**(wl - fl - 1)
        max_val = 2**(wl - fl - 1) - scale
    else:
        min_val = -2**(wl - fl - 1) + scale
        max_val = 2**(wl - fl - 1) - scale
    
    if clamp:
        x = np.clip(x, min_val, max_val)
    
    x_scaled = x / scale
    x_rounded = np.round(x_scaled).astype("int8")
    return x_rounded

def feed_forward_hw_8x8(X0):
    X1 = np.matmul(X0, fc1w.T) 
    A1 = np.tanh(X1) 

    A1 = fixed_point_quantize(A1, wl=8, fl=4) # arbitary fl
    #print(A1)
    HW_result = hw_8x8.matmul(A1, fc2w.T) # int8 * int8
    X2 = HW_result

    A2 = np.tanh(X2)

    X3 = np.matmul(A2, fc3w.T)
    return X3

prediction = []

start_time = time.time()

for idx in range(len(x)//batch_size):
    xs = x[batch_size * idx:batch_size * idx + batch_size]
    # batch_size, 784
    ys = y[batch_size * idx:batch_size*idx + batch_size]
    # 1 dimension list : (batch_size,)
    outputs = feed_forward_hw_8x8(xs) # (batch_size, 10)
    for output, yk in zip(outputs, ys):
        # zip() iterates over the outputs and ys lists in parallel.
        prediction.append(np.argmax(output) == yk)
        # argmax dim = 0 : coloumn direction (garo)
    #print(prediction[idx])
    
end_time = time.time()

score = np.mean(prediction) * 100
print(f"Execution time: {end_time - start_time:.2f} seconds")

print("activation quantized prediction accuracy is {:.2f}".format(score))

<both quantized model : 8bits active & weigth>
Execution time: 109.99 seconds
activation quantized prediction accuracy is 97.02
