In [1]:
import time
from pynq import Overlay
import pynq.lib.dma
from pynq import Xlnk
import numpy as np
import random

# Hardware Baseline Implementation of 3 layer SNN on ZYNQ SoC of PYNQ

## Port SNN onto ZYNQ

In [2]:
# Hardware Implementation
ol = Overlay('lif3layer_final.bit') # check your path
ol.download() # it downloads your bit to FPGA
dma = ol.streamLIF.lif_dma # creating a dma instance
xlnk = Xlnk()

## Send the Input Spike Train to ZYNQ via DMA

## Input Image - Digit 1

In [3]:
length = 23520
in_buffer = xlnk.cma_array(shape=(length,), dtype=np.float32) # input buffer
out_buffer = xlnk.cma_array(shape=(10,), dtype=np.float32) # output buffer
with open('st3.txt','r') as fp:
    read_data = fp.read()
read_data = read_data[:-1]
read_data = read_data.replace('\n', ' ')
read_data_float = [float(each) for each in read_data.split(' ')]
read_data_final = read_data_float[:23520]

np.copyto(in_buffer, read_data_final) # copy samples to inout buffer

In [4]:
t_start = time.time()
dma.sendchannel.transfer(in_buffer)
dma.recvchannel.transfer(out_buffer)
dma.sendchannel.wait() # wait for send channel
print('Input Spike Train Sent to Programmable Logic!')
dma.recvchannel.wait() # wait for recv channel
print('Final Spikes Received from Programmable Logic!')
t_stop=time.time()
in_buffer.close()
out_buffer.close()
                         
spike_count_list = out_buffer.tolist()
max_value = max(spike_count_list)
winner = spike_count_list.index(max_value)

Input Spike Train Sent to Programmable Logic!
Final Spikes Received from Programmable Logic!


In [5]:
print('Obtained Spike Counts for Output Layer: '+ f"{out_buffer.tolist()}")
print('Winner is: ' + f"{winner}")
print('Hardware Execution Time on Zynq SoC: ' + f"{t_stop - t_start}" + ' seconds')

Obtained Spike Counts for Output Layer: [0.0, 24.0, 18.0, 13.0, 0.0, 0.0, 9.0, 10.0, 13.0, 5.0]
Winner is: 1
Hardware Execution Time on Zynq SoC: 0.19977831840515137 seconds


# Optimized Implementation of 3 layer SNN on ZYNQ SoC of PYNQ

## Port SNN onto ZYNQ

In [None]:
# Hardware Implementation
ol = Overlay('lif_3layer_optimized.bit') # check your path
ol.download() # it downloads your bit to FPGA
dma = ol.streamLIF.lif_dma # creating a dma instance
xlnk = Xlnk()

## Send the Input Spike Train to ZYNQ via DMA

## Input Image - Digit 1

In [None]:
length = 23520
in_buffer = xlnk.cma_array(shape=(length,), dtype=np.float32) # input buffer
out_buffer = xlnk.cma_array(shape=(10,), dtype=np.float32) # output buffer
with open('st3.txt','r') as fp:
    read_data = fp.read()
read_data = read_data[:-1]
read_data = read_data.replace('\n', ' ')
read_data_float = [float(each) for each in read_data.split(' ')]
read_data_final = read_data_float[:23520]

np.copyto(in_buffer, read_data_final) # copy samples to inout buffer

In [None]:
t_start = time.time()
dma.sendchannel.transfer(in_buffer)
dma.recvchannel.transfer(out_buffer)
dma.sendchannel.wait() # wait for send channel
print('Input Spike Train Sent to Programmable Logic!')
dma.recvchannel.wait() # wait for recv channel
print('Final Spikes Received from Programmable Logic!')
t_stop=time.time()
in_buffer.close()
out_buffer.close()
                         
spike_count_list = out_buffer.tolist()
max_value = max(spike_count_list)
winner = spike_count_list.index(max_value)

In [None]:
print('Obtained Spike Counts for Output Layer: '+ f"{out_buffer.tolist()}")
print('Winner is: ' + f"{winner}")
print('Hardware Execution Time on Zynq SoC: ' + f"{t_stop - t_start}" + ' seconds')

In [None]:
length = 23520
in_buffer = xlnk.cma_array(shape=(length,), dtype=np.float32) # input buffer
out_buffer = xlnk.cma_array(shape=(10,), dtype=np.float32) # output buffer
with open('st2.txt','r') as fp:
    read_data = fp.read()
read_data = read_data[:-1]
read_data = read_data.replace('\n', ' ')
read_data_float = [float(each) for each in read_data.split(' ')]
read_data_final = read_data_float[:23520]

np.copyto(in_buffer, read_data_final) # copy samples to inout buffer

In [None]:
t_start = time.time()
print(t_start)
dma.sendchannel.transfer(in_buffer)
dma.recvchannel.transfer(out_buffer)
dma.sendchannel.wait() # wait for send channel
print('Input Spike Train Sent to Programmable Logic!')
dma.recvchannel.wait() # wait for recv channel
print('Final Spikes Received from Programmable Logic!')
t_stop=time.time()
in_buffer.close()
out_buffer.close()
                         
print('Hardware execution time: ', t_stop-t_start)

In [None]:
print(out_buffer)

In [None]:
length = 23520
in_buffer = xlnk.cma_array(shape=(length,), dtype=np.float32) # input buffer
out_buffer = xlnk.cma_array(shape=(10,), dtype=np.float32) # output buffer
with open('st3.txt','r') as fp:
    read_data = fp.read()
read_data = read_data[:-1]
read_data = read_data.replace('\n', ' ')
read_data_float = [float(each) for each in read_data.split(' ')]
read_data_final = read_data_float[:23520]

np.copyto(in_buffer, read_data_final) # copy samples to inout buffer

In [None]:
t_start = time.time()
print(t_start)
dma.sendchannel.transfer(in_buffer)
dma.recvchannel.transfer(out_buffer)
dma.sendchannel.wait() # wait for send channel
print('Input Spike Train Sent to Programmable Logic!')
dma.recvchannel.wait() # wait for recv channel
print('Final Spikes Received from Programmable Logic!')
t_stop=time.time()
in_buffer.close()
out_buffer.close()
                         
print('Hardware execution time: ', t_stop-t_start)

In [None]:
print(out_buffer)

In [None]:
length = 23520
in_buffer = xlnk.cma_array(shape=(length,), dtype=np.float32) # input buffer
out_buffer = xlnk.cma_array(shape=(10,), dtype=np.float32) # output buffer
with open('st4.txt','r') as fp:
    read_data = fp.read()
read_data = read_data[:-1]
read_data = read_data.replace('\n', ' ')
read_data_float = [float(each) for each in read_data.split(' ')]
read_data_final = read_data_float[:23520]

np.copyto(in_buffer, read_data_final) # copy samples to inout buffer

In [None]:
t_start = time.time()
print(t_start)
dma.sendchannel.transfer(in_buffer)
dma.recvchannel.transfer(out_buffer)
dma.sendchannel.wait() # wait for send channel
print('Input Spike Train Sent to Programmable Logic!')
dma.recvchannel.wait() # wait for recv channel
print('Final Spikes Received from Programmable Logic!')
t_stop=time.time()
in_buffer.close()
out_buffer.close()
                         
print('Hardware execution time: ', t_stop-t_start)

In [None]:
print(out_buffer)