In [1]:
from pynq import Overlay, allocate, PL
import struct
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

PL.reset()
overlay = Overlay('cnn_fpga.bit')

In [2]:
print('IP blocks :', list(overlay.ip_dict.keys()))

IP blocks : ['MaxPooling2D_0', 'conv2D_3x3_0', 'ReLU_0', 'Linear_0', 'axi_dma_conv2D', 'axi_dma_relu', 'axi_dma_maxpool2D', 'axi_dma_linear', 'processing_system7_0']


In [3]:
def float_to_hex(f):
    return struct.unpack('<I', struct.pack('<f', f))[0]

def hex_to_float(f):
    return struct.unpack('f', struct.pack('I', f))[0]

In [4]:
class Module():
    def __init__(self, overlay):
        self.overlay = overlay
        self.output_buffer = None
        self.weight_reg = None
        self.layer_ip = None
        self.ip_dict = None
        self.dma_send = None
        self.dma_recv = None
        
    def __call__(self, x):
        return self.forward(x)
    
    def forward(self, x):
        pass
    
    #def get_register_offset(self, ip, parameter):
    #    #print(self.overlay.ip_dict[ip]['registers'])
    #    return self.overlay.ip_dict[ip]['registers'][parameter]['address_offset']
        
    def read_param_float(self, param):
        address = self.ip_dict['registers'][param]['address_offset']
        data = self.layer_ip.read(address)
        return hex_to_float(data)
        
    def read_param_hex(self, param):
        address = self.ip_dict['registers'][param]['address_offset']
        data = self.layer_ip.read(address)
        return data

    def write_param_float(self, param, value):
        address = self.ip_dict['registers'][param]['address_offset']
        self.layer_ip.write(address, float_to_hex(value))
        
    def write_param_hex(self, param, value):
        address = self.ip_dict['registers'][param]['address_offset']
        self.layer_ip.write(address, value)
        
    def process_ip(self, in_buffer, out_buffer):
        self.layer_ip.write(0x0, 0x01)
        self.dma_send.transfer(in_buffer)
        self.dma_recv.transfer(out_buffer)
        print(type(self).__name__, " sending")
        self.dma_send.wait()
        print(type(self).__name__, " recieving")
        self.dma_recv.wait()
    

class Linear(Module):
    def __init__(self, overlay, in_size, out_size):
        Module.__init__(self, overlay)
        
        self.in_size = in_size
        self.out_size = out_size
        self.weights = np.zeros(self.out_size)
        self.output_buffer = []
        self.output_buffer.append(allocate(shape=(out_size,), dtype=np.float32))
    
        self.layer_ip = overlay.Linear_0
        self.ip_dict = overlay.ip_dict['Linear_0']

        self.dma_send = overlay.axi_dma_linear.sendchannel
        self.dma_recv = overlay.axi_dma_linear.recvchannel
            
    def forward(self, x):
        
        assert len(x.shape) == 1 and x.shape[0] == self.in_size
        
        #self.write_param_float('Memory_Kernel', self.weights)
        self.write_param_hex('in_size', self.in_size)
        self.write_param_hex('out_size', self.out_size)
        
        self.process_ip(x[0], self.output_buffer[0])
        
        return self.output_buffer
        

class Conv2d(Module):
    def __init__(self, overlay, in_height, in_width, in_channels, out_channels, kernel_size=3, stride=1):
        Module.__init__(self, overlay)
        
        assert kernel_size == 3
        assert stride == 1
        
        self.layer_ip = overlay.conv2D_3x3_0
        self.ip_dict = overlay.ip_dict['conv2D_3x3_0']
        self.dma_send = self.overlay.axi_dma_conv2D.sendchannel
        self.dma_recv = self.overlay.axi_dma_conv2D.recvchannel
        
        self.in_width = in_width
        self.in_height = in_height
        self.in_channels = in_channels
        self.out_channels = out_channels
        
        self.output_buffer = []
        self.weights = []
        for channel in range(self.out_channels):
            buffer = allocate(shape=(in_height*in_width, ), dtype=np.float32)
            self.output_buffer.append(buffer)
            weight = np.zeros((kernel_size, kernel_size), dtype=np.float32)
            self.weights.append(weight)
        
        self.aux_buffer = allocate(shape=(in_height, in_width), dtype=np.float32)
    
    def forward(self, x):
        
        assert len(x) == self.in_channels and x[0].shape[0] == self.in_width*self.in_height
        
        self.write_param_hex('in_width', self.in_width)
        self.write_param_hex('in_height', self.in_height)
        
        for out_channel in range(self.out_channels):      
            for in_channel in range(self.in_channels):   
                 
                #self.write_param_float('Memory_kernel', self.weights[out_channel])                
                self.process_ip(x[in_channel], self.aux_buffer)
                
                if in_channel == 0:
                    self.output_buffer[out_channel] = self.aux_buffer
                else:
                    self.output_buffer[out_channel] += self.aux_buffer
            
        return self.output_buffer        


class ReLU(Module):
    def __init__(self, overlay, in_width, in_height, in_channels):
        Module.__init__(self, overlay)
    
        #fpga specific
        self.layer_ip = overlay.ReLU_0
        self.ip_dict = overlay.ip_dict['ReLU_0']
        self.dma_send = overlay.axi_dma_relu.sendchannel
        self.dma_recv = overlay.axi_dma_relu.recvchannel
        
        self.in_width = in_width
        self.in_height = in_height
        self.in_channels = in_channels
                
        self.output_buffer = []
        for channel in range(in_channels):
            buffer = allocate(shape=(in_width*in_height,), dtype=np.float32)
            self.output_buffer.append(buffer)
        
    def forward(self, x):
        
        assert len(x) == self.in_channels and x[0].shape[0] == self.in_width*self.in_height
        
        self.write_param_hex('data_size', self.in_width*self.in_height)
        for channel in range(self.in_channels):
            self.process_ip(x[channel], self.output_buffer[channel])
            
        return self.output_buffer


class MaxPooling2D(Module):
    def __init__(self, overlay, in_height, in_width, in_channels):
        Module.__init__(self, overlay)
    
        self.layer_ip = overlay.MaxPooling2D_0
        self.ip_dict = overlay.ip_dict['MaxPooling2D_0']
        self.dma_send = overlay.axi_dma_maxpool2D.sendchannel
        self.dma_recv = overlay.axi_dma_maxpool2D.recvchannel
        
        self.in_height = in_height
        self.in_width = in_width
        self.in_channels = in_channels
                
        self.output_buffer = []
        for channel in range(in_channels):
            self.output_buffer.append(allocate(shape=(in_height*in_width,), dtype=np.float32))
        
    def forward(self, x):
        
        assert len(x) == self.in_channels and x[0].shape[0] == self.in_height*self.in_width
        
        self.write_param_hex('in_width', self.in_width)
        self.write_param_hex('in_height', self.in_height)

        for channel in range(self.in_channels):
            self.process_ip(x[channel], self.output_buffer[channel])
        
        return self.output_buffer

    
class Net(Module):
    def __init__(self, overlay):
        #super(Net, self).__init__()
        Module.__init__(self, overlay)
        self.conv1 = Conv2d(overlay, 64, 64, 1, 32, 3, 1)
        #self.relu1 = ReLU(overlay, 64, 64, 1)
        #self.conv2 = Conv2d(overlay, 64, 64, 32, 64, 3, 1)
        #self.relu2 = ReLU(overlay, 64, 64, 64)
        #self.max_pool2d = MaxPooling2D(overlay, 64, 64, 1)
                
        #self.dropout1 = Dropout(0.25)
        #self.dropout2 = Dropout(0.5)
        #self.fc1 = Linear(overlay, 65536, 128)
        #self.relu3 = ReLU(overlay, 128, 1, 1)
        #self.fc2 = Linear(overlay, 128, 10)
        
    def forward(self, x):
        x = self.conv1(x)
        #x = self.relu1(x)
        #x = self.conv2(x)
        #x = self.relu2(x)
        #x = self.max_pool2d(x)
        #x = self.dropout1(x)
        #x = torch.flatten(x, 1)
        #x = self.fc1(x)
        #x = self.relu3(x)
        #x = self.dropout2(x)
        #x = self.fc2(x)
        output = x #self.log_softmax(x, dim=1)
        return output

In [5]:
model = Net(overlay)

input_buffer = [allocate(shape=(64*64,), dtype=np.float32)]
image = model(input_buffer)

KeyboardInterrupt: 

In [None]:
%matplotlib inline


#image = 
#imgplot = plt.imshow(image)
#plt.show()