In [None]:
from __future__ import print_function

import sys
import numpy as np
from time import time
import matplotlib.pyplot as plt 

sys.path.append('/home/xilinx')
from pynq import Overlay
from pynq import allocate
from pynq import MMIO

ROM_SIZE = 0x2000 #8K

SOC_UP = 0x0000
SOC_LA = 0x1000
PL_AA_MB = 0x2000
PL_AA = 0x2100
SOC_IS = 0x3000
SOC_AS = 0x4000
SOC_CC = 0x5000
PL_AS = 0x6000
PL_IS = 0x7000
PL_DMA = 0x8000

In [None]:
ol = Overlay("/home/xilinx/jupyter_notebooks/PS/caravel_fpga.bit")
#ol.ip_dict

In [None]:
ipOUTPIN = ol.output_pin_0
ipPS = ol.caravel_ps_0
# ipReadROMCODE = ol.read_romcode_0

#Add for SPI
ip_QSPI = ol.axi_quad_spi_0

In [None]:
# ============================================
# AXI QuadSPI Control
# ============================================
XSP_DGIER_OFFSET = 0x1C
XSP_IISR_OFFSET = 0x20
XSP_IIER_OFFSET = 0x28
XSP_SRR_OFFSET = 0x40
XSP_CR_OFFSET = 0x60
XSP_SR_OFFSET = 0x64
XSP_DTR_OFFSET = 0x68
XSP_DRR_OFFSET = 0x6C
XSP_SSR_OFFSET = 0x70
XSP_TFO_OFFSET = 0x74
XSP_RFO_OFFSET = 0x78
XSP_REGISTERS = [0x40, 0x60, 0x64, 0x68, 0x6c, 0x70, 0x74, 0x78, 0x1c, 0x20, 0x28]

XSP_SRR_RESET_MASK = 0x0A
XSP_SR_TX_EMPTY_MASK = 0x04
XSP_SR_TX_FULL_MASK = 0x08
XSP_CR_TRANS_INHIBIT_MASK = 0x100
XSP_CR_LOOPBACK_MASK = 0x01
XSP_CR_ENABLE_MASK = 0x02
XSP_CR_MASTER_MODE_MASK = 0x04
XSP_CR_CLK_POLARITY_MASK = 0x08
XSP_CR_CLK_PHASE_MASK = 0x10
XSP_CR_TXFIFO_RESET_MASK = 0x20
XSP_CR_RXFIFO_RESET_MASK = 0x40
XSP_CR_MANUAL_SS_MASK = 0x80

SLAVE_NO_SELECTION = 0xFFFFFFFF

def cnfg(AxiQspi, clk_phase=0, clk_pol=0):
    print("Configure device")
    # Reset the SPI device
    AxiQspi.write(XSP_SRR_OFFSET, XSP_SRR_RESET_MASK)
    # Enable the transmit empty interrupt, which we use to determine progress on the transmission. 
    AxiQspi.write(XSP_IIER_OFFSET, XSP_SR_TX_EMPTY_MASK)
    # Disable the global IPIF interrupt
    AxiQspi.write(XSP_DGIER_OFFSET, 0)
    # Deselect the slave on the SPI bus
    AxiQspi.write(XSP_SSR_OFFSET, SLAVE_NO_SELECTION)
    # Disable the transmitter, enable Manual Slave Select Assertion, put SPI controller into master mode, and enable it
    ControlReg = AxiQspi.read(XSP_CR_OFFSET)
    ControlReg = ControlReg | XSP_CR_MASTER_MODE_MASK | XSP_CR_MANUAL_SS_MASK | XSP_CR_ENABLE_MASK | XSP_CR_TXFIFO_RESET_MASK | XSP_CR_RXFIFO_RESET_MASK
    AxiQspi.write(XSP_CR_OFFSET, ControlReg)
    ControlReg = AxiQspi.read(XSP_CR_OFFSET)
    ControlReg = ControlReg & ~(XSP_CR_CLK_PHASE_MASK | XSP_CR_CLK_POLARITY_MASK) 
    if clk_phase == 1:
        ControlReg = ControlReg | XSP_CR_CLK_PHASE_MASK
    if clk_pol == 1:
        ControlReg = ControlReg | XSP_CR_CLK_POLARITY_MASK
    AxiQspi.write(XSP_CR_OFFSET, ControlReg)

    return 0

def write_tx_fifo(AxiQspi):
    #print("TransferData")
    ControlReg = AxiQspi.read(XSP_CR_OFFSET)
    ControlReg = ControlReg & ~XSP_CR_TRANS_INHIBIT_MASK
    AxiQspi.write(XSP_CR_OFFSET, ControlReg)

    StatusReg = AxiQspi.read(XSP_SR_OFFSET)
    while (StatusReg & XSP_SR_TX_EMPTY_MASK) == 0:
        StatusReg = AxiQspi.read(XSP_SR_OFFSET)

    #print('XSP_RFO_OFFSET  : 0x{0:08x}'.format(AxiQspi.read(XSP_RFO_OFFSET)))
    ControlReg = AxiQspi.read(XSP_CR_OFFSET)
    ControlReg = ControlReg | XSP_CR_TRANS_INHIBIT_MASK
    AxiQspi.write(XSP_CR_OFFSET, ControlReg)


def read_rx_fifo(bypass_length, AxiQspi):
    #print("ReadResponse")
    resp = list()
    RxFifoStatus = AxiQspi.read(XSP_SR_OFFSET) & 0x01
    
    # By pass the FIFO data during master issue command and address to slave device
    command_addr_length = bypass_length
    counter = 0    
    
    while RxFifoStatus == 0:
        #temp = AxiQspi.read(XSP_RFO_OFFSET)
        #print('XSP_RFO_OFFSET  : 0x{0:08x}'.format(temp))
        temp = AxiQspi.read(XSP_DRR_OFFSET)
        #print('XSP_DRR_OFFSET  : 0x{0:08x}'.format(temp))    

        counter = counter + 1
        if(counter > command_addr_length):
            resp.append(temp)        
        
        RxFifoStatus = AxiQspi.read(XSP_SR_OFFSET) & 0x01

    return resp

In [None]:
# Check MPRJ_IO input/out/en
# 0x10 : Data signal of ps_mprj_in
#        bit 31~0 - ps_mprj_in[31:0] (Read/Write)
# 0x14 : Data signal of ps_mprj_in
#        bit 5~0 - ps_mprj_in[37:32] (Read/Write)
#        others  - reserved
# 0x1c : Data signal of ps_mprj_out
#        bit 31~0 - ps_mprj_out[31:0] (Read)
# 0x20 : Data signal of ps_mprj_out
#        bit 5~0 - ps_mprj_out[37:32] (Read)
#        others  - reserved
# 0x34 : Data signal of ps_mprj_en
#        bit 31~0 - ps_mprj_en[31:0] (Read)
# 0x38 : Data signal of ps_mprj_en
#        bit 5~0 - ps_mprj_en[37:32] (Read)
#        others  - reserved

print ("0x10 = ", hex(ipPS.read(0x10)))
print ("0x14 = ", hex(ipPS.read(0x14)))
print ("0x1c = ", hex(ipPS.read(0x1c)))
print ("0x20 = ", hex(ipPS.read(0x20)))
print ("0x34 = ", hex(ipPS.read(0x34)))
print ("0x38 = ", hex(ipPS.read(0x38)))


In [None]:
# ============================================
# Release Reset First before passthrough mode
# ============================================
# Release Caravel reset
# 0x10 : Data signal of outpin_ctrl
#        bit 0  - outpin_ctrl[0] (Read/Write)
#        others - reserved
print (ipOUTPIN.read(0x10))
ipOUTPIN.write(0x10, 1)
print (ipOUTPIN.read(0x10))

In [None]:
# ============================================
# Load firmware (fsic.hex) to memory npROM
# ============================================

# Create np with 8K/4 (4 bytes per index) size and be initiled to 0
npROM = np.zeros(ROM_SIZE >> 2, dtype=np.uint32)

npROM_index = 0
npROM_offset = 0
fiROM = open("/home/xilinx/jupyter_notebooks/PS/fsic.hex", "r+")

for line in fiROM:
    # offset header
    if line.startswith('@'):
        # Ignore first char @
        npROM_offset = int(line[1:].strip(b'\x00'.decode()), base = 16)
        npROM_offset = npROM_offset >> 2 # 4byte per offset
        #print (npROM_offset)
        npROM_index = 0
        continue
    #print (line)

    # We suppose the data must be 32bit alignment
    buffer = 0
    bytecount = 0
    for line_byte in line.strip(b'\x00'.decode()).split():
        buffer += int(line_byte, base = 16) << (8 * bytecount)
        bytecount += 1
        # Collect 4 bytes, write to npROM
        if(bytecount == 4):
            npROM[npROM_offset + npROM_index] = buffer
            # Clear buffer and bytecount
            buffer = 0
            bytecount = 0
            npROM_index += 1
            #print (npROM_index)
            continue
    # Fill rest data if not alignment 4 bytes
    if (bytecount != 0):
        npROM[npROM_offset + npROM_index] = buffer
        npROM_index += 1
    
fiROM.close()

In [None]:
# ============================================
# Enabling passthrou mode
# ============================================
cnfg(ip_QSPI)
# Passthrou mode - Write command
ip_QSPI.write(XSP_DTR_OFFSET, 0xC4) # Pass-Through (management)
ip_QSPI.write(XSP_DTR_OFFSET, 0x02) # Command: Write data to memory
ip_QSPI.write(XSP_DTR_OFFSET, 0x00) # Address_byte0
ip_QSPI.write(XSP_DTR_OFFSET, 0x00) # Address_byte1
ip_QSPI.write(XSP_DTR_OFFSET, 0x00) # Address_byte2

print('XSP_TFO_OFFSET  : 0x{0:08x}'.format(ip_QSPI.read(XSP_TFO_OFFSET)))

ip_QSPI.write(XSP_SSR_OFFSET, 0xFFFFFFFE)
write_tx_fifo(ip_QSPI)

print('XSP_TFO_OFFSET  : 0x{0:08x}'.format(ip_QSPI.read(XSP_TFO_OFFSET)))

In [None]:
# ============================================
# Writing FW into SPIROM
# ============================================
# Fill up Tx_FIFO (16) for each write_tx_fifo
for index in range (ROM_SIZE >> 2):
     # 4 bytes alignment in npROM
    for byte_shift in range(4):
        tmp = int((npROM[index] >> (byte_shift * 8)) & 0xFF)
        ip_QSPI.write(XSP_DTR_OFFSET, tmp) # Write_data
    # TX_FIFO = 16, 4 * 4 = 16
    if((index % 3) == 3):
        write_tx_fifo(ip_QSPI)
        
# If rest data is not enough 16 bytes. Tx_FIFO is not empty
    StatusReg = ip_QSPI.read(XSP_SR_OFFSET)
    if ((StatusReg & XSP_SR_TX_EMPTY_MASK) == 0):
         write_tx_fifo(ip_QSPI)

In [None]:
# ============================================
# Read SPIROM for testing
# ============================================
cnfg(ip_QSPI)

In [None]:
# Test Passthrou mode - Read command
ip_QSPI.write(XSP_DTR_OFFSET, 0xC4) # Pass-Through (management)
ip_QSPI.write(XSP_DTR_OFFSET, 0x03) # Command: Read data from memory
ip_QSPI.write(XSP_DTR_OFFSET, 0x00) # Address_byte0
ip_QSPI.write(XSP_DTR_OFFSET, 0x00) # Address_byte1
ip_QSPI.write(XSP_DTR_OFFSET, 0x00) # Address_byte2
# Write dummy data
data_length = 0x8
for index in range(data_length):
    ip_QSPI.write(XSP_DTR_OFFSET, 0x00) # Dummy data

print('XSP_TFO_OFFSET  : 0x{0:08x}'.format(ip_QSPI.read(XSP_TFO_OFFSET)))
ip_QSPI.write(XSP_SSR_OFFSET, 0xFFFFFFFE)

In [None]:
# Issue SPI master cycle
write_tx_fifo(ip_QSPI)

# Read the Rx data
rx_final = read_rx_fifo(5, ip_QSPI)
for data in rx_final:
    print (hex(data))

In [None]:
# Write dummy data
data_length = 0x8
for index in range(data_length):
    ip_QSPI.write(XSP_DTR_OFFSET, 0x00) # Dummy data

print('XSP_TFO_OFFSET  : 0x{0:08x}'.format(ip_QSPI.read(XSP_TFO_OFFSET)))

In [None]:
# Issue SPI master cycle
write_tx_fifo(ip_QSPI)

# Read the Rx data
rx_final = read_rx_fifo(0, ip_QSPI)
for data in rx_final:
    print (hex(data))

In [None]:
# ============================================
# Exit passthrou mode, FW will be fetched
# ============================================
ip_QSPI.write(XSP_SSR_OFFSET, SLAVE_NO_SELECTION)

In [None]:
# Check MPRJ_IO input/out/en
# 0x10 : Data signal of ps_mprj_in
#        bit 31~0 - ps_mprj_in[31:0] (Read/Write)
# 0x14 : Data signal of ps_mprj_in
#        bit 5~0 - ps_mprj_in[37:32] (Read/Write)
#        others  - reserved
# 0x1c : Data signal of ps_mprj_out
#        bit 31~0 - ps_mprj_out[31:0] (Read)
# 0x20 : Data signal of ps_mprj_out
#        bit 5~0 - ps_mprj_out[37:32] (Read)
#        others  - reserved
# 0x34 : Data signal of ps_mprj_en
#        bit 31~0 - ps_mprj_en[31:0] (Read)
# 0x38 : Data signal of ps_mprj_en
#        bit 5~0 - ps_mprj_en[37:32] (Read)
#        others  - reserved

print ("0x10 = ", hex(ipPS.read(0x10)))
print ("0x14 = ", hex(ipPS.read(0x14)))
print ("0x1c = ", hex(ipPS.read(0x1c)))
print ("0x20 = ", hex(ipPS.read(0x20)))
print ("0x34 = ", hex(ipPS.read(0x34)))
print ("0x38 = ", hex(ipPS.read(0x38)))

In [None]:
IP_BASE_ADDRESS = 0x60000000
ADDRESS_RANGE = 0x9000
mmio = MMIO(IP_BASE_ADDRESS, ADDRESS_RANGE)

In [None]:
# ====================================================================================== #
# ====================================================================================== #
# PL_FSIC Side Configuration
# ====================================================================================== #
# ====================================================================================== #

In [None]:
# PL_IS Config
ADDRESS_OFFSET = PL_IS #0x7000
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

In [None]:
mmio.write(ADDRESS_OFFSET, 0x12345671)
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

In [None]:
mmio.write(ADDRESS_OFFSET, 0x12345673)
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

In [None]:
# PL_DMA Config
ADDRESS_OFFSET = PL_DMA # 0x8000
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))
print("mmio.read(ADDRESS_OFFSET+0x10): ", hex(mmio.read(ADDRESS_OFFSET+0x10)))
print("mmio.read(ADDRESS_OFFSET+0x20): ", hex(mmio.read(ADDRESS_OFFSET+0x20)))
print("mmio.read(ADDRESS_OFFSET+0x28): ", hex(mmio.read(ADDRESS_OFFSET+0x28)))
print("mmio.read(ADDRESS_OFFSET+0x30): ", hex(mmio.read(ADDRESS_OFFSET+0x30)))
print("mmio.read(ADDRESS_OFFSET+0x38): ", hex(mmio.read(ADDRESS_OFFSET+0x38)))
print("mmio.read(ADDRESS_OFFSET+0x3C): ", hex(mmio.read(ADDRESS_OFFSET+0x3C)))

In [None]:
# PL_AS Config
ADDRESS_OFFSET = PL_AS # 0x6000
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))
mmio.write(ADDRESS_OFFSET, 0x12345676)
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

In [None]:
# PL_AA Config
ADDRESS_OFFSET = PL_AA # 0x2100
print("mmio.read(ADDRESS_OFFSET+0x00): ", hex(mmio.read(ADDRESS_OFFSET+0x00)))
print("mmio.read(ADDRESS_OFFSET+0x04): ", hex(mmio.read(ADDRESS_OFFSET+0x04))) 

In [None]:
mmio.write(ADDRESS_OFFSET+0x00, 0x11111111)
mmio.write(ADDRESS_OFFSET+0x04, 0x22222222)

In [None]:
# PL_AA_MB Mailbox
ADDRESS_OFFSET = PL_AA_MB # 0x2000
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))
print("mmio.read(ADDRESS_OFFSET+0x04): ", hex(mmio.read(ADDRESS_OFFSET+0x04)))
print("mmio.read(ADDRESS_OFFSET+0x08): ", hex(mmio.read(ADDRESS_OFFSET+0x08)))
print("mmio.read(ADDRESS_OFFSET+0x0C): ", hex(mmio.read(ADDRESS_OFFSET+0x0C)))
print("mmio.read(ADDRESS_OFFSET+0x10): ", hex(mmio.read(ADDRESS_OFFSET+0x10)))
print("mmio.read(ADDRESS_OFFSET+0x14): ", hex(mmio.read(ADDRESS_OFFSET+0x14)))
print("mmio.read(ADDRESS_OFFSET+0x18): ", hex(mmio.read(ADDRESS_OFFSET+0x18)))
print("mmio.read(ADDRESS_OFFSET+0x1C): ", hex(mmio.read(ADDRESS_OFFSET+0x1C)))

In [None]:
mmio.write(ADDRESS_OFFSET, 0x11111112)
mmio.write(ADDRESS_OFFSET+0x04, 0x22222223)
mmio.write(ADDRESS_OFFSET+0x08, 0x33333334)
mmio.write(ADDRESS_OFFSET+0x0C, 0x44444445)
mmio.write(ADDRESS_OFFSET+0x10, 0x55555556)
mmio.write(ADDRESS_OFFSET+0x14, 0x66666667)
mmio.write(ADDRESS_OFFSET+0x18, 0x77777778)
mmio.write(ADDRESS_OFFSET+0x1C, 0x88888889)

In [None]:
# ====================================================================================== #
# ====================================================================================== #
# PL_Caravel Side Configuration
# ====================================================================================== #
# ====================================================================================== #

In [None]:
# Caravel-IS Config
ADDRESS_OFFSET = SOC_IS # 0x3000
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

In [None]:
# Caravel-IS Config
ADDRESS_OFFSET = 0x3000
mmio.write(ADDRESS_OFFSET, 0x00000003)

In [None]:
# Caravel-AS Config
ADDRESS_OFFSET = SOC_AS # 0x4000
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

In [None]:
# Caravel-AS Config
ADDRESS_OFFSET = SOC_AS # 0x4000
mmio.write(ADDRESS_OFFSET, 0x00000006)

In [None]:
# Caravel-CC Config
ADDRESS_OFFSET = SOC_CC # 0x5000
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

In [None]:
# Caravel-CC Config
ADDRESS_OFFSET = SOC_CC # 0x5000
mmio.write(ADDRESS_OFFSET, 0x00000000)

In [None]:
# Caravel-LA Config
ADDRESS_OFFSET = SOC_LA # 0x1000
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

In [None]:
# Caravel-UP Config
ADDRESS_OFFSET = SOC_UP # 0x0000
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET+0x04)))

In [None]:
# ====================================================================================== #
# ====================================================================================== #
# LADMA Verification
# ====================================================================================== #
# ====================================================================================== #

In [None]:
# Allocation memory
ladma_buf = allocate(shape=(1024,), dtype=np.uint32)
print("ladma_buf.device_address: ", hex(ladma_buf.device_address))

IP_BASE_ADDRESS = ladma_buf.device_address
ADDRESS_RANGE = 0x1000
buf_mmio = MMIO(IP_BASE_ADDRESS, ADDRESS_RANGE)
# 0x00 : Control signals
#        bit 0 - ap_start (R/W/COH)
#        bit 1 - ap_done (R/COR)
#        bit 2 - ap_idle (R)
#        bit 3 - ap_ready (R/COR)
# 0x10 : Buffer transfer done status register
#        bit 0 - buffer transfer done status (R)
# 0x20 : Buffer transfer done status clear register
#        bit 0 - clear buffer transfer done status (R/W)
# 0x28 : Buffer Length
#        bit 31~0 - set buffer length (must 1024)
# 0x30 : Triggered condition 
#        bit 23~0 - set triggered condidtion (R/W)
#        others  - reserved
# 0x34 : Buffer Lower base address 
#        bit 31~0 - (R/W)
# 0x38 : Buffer High base address
#        bit 31~0 - (R/W)
# ladma Configuration
ADDRESS_OFFSET = PL_DMA # 0x8000
# exit clear operation
mmio.write(ADDRESS_OFFSET + 0x20, 0x00000000)
# set buffer length
mmio.write(ADDRESS_OFFSET + 0x28, 0x00000400)
# set trigger condition
mmio.write(ADDRESS_OFFSET + 0x30, 0x00000000)
# set buffer low
mmio.write(ADDRESS_OFFSET + 0x38, ladma_buf.device_address)
# set buffer high
mmio.write(ADDRESS_OFFSET + 0x3C, 0x00000000)

In [None]:
ADDRESS_OFFSET = PL_DMA # 0x8000
print("mmio.read(ADDRESS_OFFSET+0x00): ", hex(mmio.read(ADDRESS_OFFSET+0x00)))
print("mmio.read(ADDRESS_OFFSET+0x10): ", hex(mmio.read(ADDRESS_OFFSET+0x10)))
print("mmio.read(ADDRESS_OFFSET+0x20): ", hex(mmio.read(ADDRESS_OFFSET+0x20)))
print("mmio.read(ADDRESS_OFFSET+0x28): ", hex(mmio.read(ADDRESS_OFFSET+0x28)))
print("mmio.read(ADDRESS_OFFSET+0x30): ", hex(mmio.read(ADDRESS_OFFSET+0x30)))
print("mmio.read(ADDRESS_OFFSET+0x38): ", hex(mmio.read(ADDRESS_OFFSET+0x38)))
print("mmio.read(ADDRESS_OFFSET+0x3C): ", hex(mmio.read(ADDRESS_OFFSET+0x3C)))

In [None]:
# ladma Configuration
ADDRESS_OFFSET = PL_DMA # 0x8000
# set ap_start
mmio.write(ADDRESS_OFFSET + 0x00, 0x00000001)

In [None]:
# enable la 0xFFFFFF
ADDRESS_OFFSET = SOC_LA # 0x1000
mmio.write(ADDRESS_OFFSET, 0x00FFFFFF)
#print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))
# select target UP
ADDRESS_OFFSET = SOC_CC # 0x5000
mmio.write(ADDRESS_OFFSET, 0x00000003)
#print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

In [None]:
# ladma Configuration
ADDRESS_OFFSET = PL_DMA # 0x8000
while True:
    if mmio.read(ADDRESS_OFFSET+0x10) == 0x01:
        break
print("mmio.read(ADDRESS_OFFSET+0x10): ", hex(mmio.read(ADDRESS_OFFSET+0x10)))

# disable la 0x000000
ADDRESS_OFFSET = SOC_LA # 0x1000
mmio.write(ADDRESS_OFFSET, 0x00000000)
# select fake UP
ADDRESS_OFFSET = SOC_CC # 0x5000
mmio.write(ADDRESS_OFFSET, 0x00000000) 

ADDRESS_OFFSET = PL_DMA # 0x8000
# clear buffer transfer done operation
mmio.write(ADDRESS_OFFSET + 0x20, 0x00000001)
# set ap_start
mmio.write(ADDRESS_OFFSET + 0x00, 0x00000001)
while True:
    if mmio.read(ADDRESS_OFFSET+0x10) != 0x01:
        break
print("mmio.read(ADDRESS_OFFSET+0x10): ", hex(mmio.read(ADDRESS_OFFSET+0x10)))      

In [None]:
ADDRESS_OFFSET = PL_DMA # 0x8000
print("mmio.read(ADDRESS_OFFSET+0x00): ", hex(mmio.read(ADDRESS_OFFSET+0x00)))
print("mmio.read(ADDRESS_OFFSET+0x10): ", hex(mmio.read(ADDRESS_OFFSET+0x10)))
print("mmio.read(ADDRESS_OFFSET+0x20): ", hex(mmio.read(ADDRESS_OFFSET+0x20)))
print("mmio.read(ADDRESS_OFFSET+0x28): ", hex(mmio.read(ADDRESS_OFFSET+0x28)))
print("mmio.read(ADDRESS_OFFSET+0x30): ", hex(mmio.read(ADDRESS_OFFSET+0x30)))
print("mmio.read(ADDRESS_OFFSET+0x38): ", hex(mmio.read(ADDRESS_OFFSET+0x38)))
print("mmio.read(ADDRESS_OFFSET+0x3C): ", hex(mmio.read(ADDRESS_OFFSET+0x3C)))

In [None]:
#dump la log to file
file = open("simulate.log", "w")
for i in range(0,0xFFF,4):
    file.write('{:08x}'.format(buf_mmio.read(i))+"\n")
file.close()

In [None]:
# ladma Configuration
ADDRESS_OFFSET = PL_DMA # 0x8000
# exit clear operation
mmio.write(ADDRESS_OFFSET + 0x20, 0x00000000)
# set ap_start again
mmio.write(ADDRESS_OFFSET + 0x00, 0x00000001)

# Confirming SOC_CC & SOC_LA Reading is still work
ADDRESS_OFFSET = SOC_CC # 0x5000
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))
print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

In [None]:
# Translate simulate.log to output_new.vcd, which can be open by gtakwave
%run read_la_data.py

## TPU application

In [None]:
# select target UP
ADDRESS_OFFSET = SOC_CC # 0x5000
mmio.write(ADDRESS_OFFSET, 0x00000001)
#print("mmio.read(ADDRESS_OFFSET): ", hex(mmio.read(ADDRESS_OFFSET)))

### Address def

In [None]:
TPU_CTRL_OFFSET = 0x00

TPU_M_OFFSET = 0x10
TPU_K_OFFSET = 0x14
TPU_N_OFFSET = 0x18

TPU_BUFF_A_ADDR_OFFSET = 0x20
TPU_BUFF_A_DIN_OFFSET = 0x24
TPU_BUFF_B_ADDR_OFFSET = 0x30
TPU_BUFF_B_DIN_OFFSET = 0x34
TPU_BUFF_C_ADDR_OFFSET = 0x40
TPU_BUFF_C_DOUT_0_OFFSET = 0x44
TPU_BUFF_C_DOUT_1_OFFSET = 0x48
TPU_BUFF_C_DOUT_2_OFFSET = 0x4c
TPU_BUFF_C_DOUT_3_OFFSET = 0x50

### Function def

In [None]:
def matrix_mul_sw(A, B):
    """
    Multiplies two matrices A and B.
    
    Parameters:
    A (numpy.ndarray): First matrix of size (m, k).
    B (numpy.ndarray): Second matrix of size (k, n).
    
    Returns:
    numpy.ndarray: Resultant matrix of size (m, n) after multiplication.
    """
    # Ensure the matrices can be multiplied
    if A.shape[1] != B.shape[0]:
        raise ValueError("The number of columns in A must be equal to the number of rows in B")
    
    # Perform matrix multiplication
    result = np.dot(A, B)
    
    return result

def matrix_mul_with_tiling_sw(A, B, t):
    """
    Multiplies two matrices A and B using tiling.
    
    Parameters:
    A (numpy.ndarray): First matrix of size (m, k).
    B (numpy.ndarray): Second matrix of size (k, n).
    t (int): Tile size.
    
    Returns:
    numpy.ndarray: Resultant matrix of size (m, n) after multiplication.
    """
    m, k1 = A.shape
    k2, n = B.shape
    
    # Ensure the matrices can be multiplied
    if k1 != k2:
        raise ValueError("The number of columns in A must be equal to the number of rows in B")
    
    # Initialize the result matrix with zeros
    C = np.zeros((m, n))
    
    # Perform tiled matrix multiplication
    for i in range(0, m, t):
        for j in range(0, n, t):
            for k in range(0, k1, t):
                # Define the end points for the current tile
                i_end = min(i + t, m)
                j_end = min(j + t, n)
                k_end = min(k + t, k1)
                
                # Perform the multiplication for the current tile
                C[i:i_end, j:j_end] += np.dot(A[i:i_end, k:k_end], B[k:k_end, j:j_end])
    
    return C

In [None]:
DEBUG = False

import struct

def mat_preproc(matrix):
    matrix = matrix.astype(np.int8)
    m, n = matrix.shape
    if n % 4 != 0:
        matrix = np.concatenate((matrix, np.zeros((m, 4 - (n % 4)), dtype=np.int8)), axis=1)
        n = matrix.shape[1]

    matrix_proc = np.lib.stride_tricks.as_strided(
        matrix,
        shape=(n//4, m, 4),
        strides=(4, 8, 1)
    ).reshape(-1, 4)
    # print(matrix_proc)

    # Convert the int8 matrix to uint8 to handle bytes correctly
    uint8_matrix = np.flip(matrix_proc, axis=1).astype(np.uint8)
        
    # View the reshaped matrix as int32
    # And convert the int32 matrix to a native Python int array
    return uint8_matrix.view(np.int32).ravel().tolist()

def matrix_mul(matA, matB, k, m, n):
    # wait ap_idle
    while (mmio.read(SOC_UP + TPU_CTRL_OFFSET) & 0x04) == 0:
        continue
    # write config
    mmio.write(SOC_UP + TPU_M_OFFSET, m)
    mmio.write(SOC_UP + TPU_K_OFFSET, k)
    mmio.write(SOC_UP + TPU_N_OFFSET, n)
    # write matA
    nrow = int(k * ((m >> 2) + 1)) if m % 4 != 0 else int(k * (m >> 2))
    #print(f'nrow A:{nrow}')
    for i in range(nrow):
        mmio.write(SOC_UP + TPU_BUFF_A_ADDR_OFFSET, i)
        mmio.write(SOC_UP + TPU_BUFF_A_DIN_OFFSET, matA[i]) # TODO
    # write matB
    nrow = int(k * ((n >> 2) + 1)) if n % 4 != 0 else int(k * (n >> 2))
    #print(f'nrow B:{nrow}')
    for i in range(nrow):
        mmio.write(SOC_UP + TPU_BUFF_B_ADDR_OFFSET, i)
        mmio.write(SOC_UP + TPU_BUFF_B_DIN_OFFSET, matB[i]) # TODO
    # ap_start
    mmio.write(SOC_UP + TPU_CTRL_OFFSET, 0x01)
    # wait ap_done
    
    while (mmio.read(SOC_UP + TPU_CTRL_OFFSET) & 0x02) == 0:
        continue
    calign = int((n+3)/4)*4
    matC_temp = np.zeros((m, calign), dtype=np.int32)

    nrow = int(m * ((n >> 2) + 1)) if n % 4 != 0 else int(m * (n >> 2))
    for i in range(nrow):
        mmio.write(SOC_UP + TPU_BUFF_C_ADDR_OFFSET, i)
        c_temp0 = mmio.read(SOC_UP + TPU_BUFF_C_DOUT_0_OFFSET) # TODO
        c_temp1 = mmio.read(SOC_UP + TPU_BUFF_C_DOUT_1_OFFSET) # TODO
        c_temp2 = mmio.read(SOC_UP + TPU_BUFF_C_DOUT_2_OFFSET) # TODO
        c_temp3 = mmio.read(SOC_UP + TPU_BUFF_C_DOUT_3_OFFSET) # TODO
        if DEBUG:
            print(f'{hex(c_temp3)},{hex(c_temp2)},{hex(c_temp1)},{hex(c_temp0)}')
            
        m_index = int(i % m)
        n_index = int(i / m)
        
        matC_temp[m_index][n_index*4 + 0] = int(c_temp3)
        matC_temp[m_index][n_index*4 + 1] = int(c_temp2)
        matC_temp[m_index][n_index*4 + 2] = int(c_temp1)
        matC_temp[m_index][n_index*4 + 3] = int(c_temp0)
        
    matC = matC_temp[:,0:n]
    
    return matC

def matrix_mul_with_preproc(matA, matB):
    m, k = matA.shape
    k, n = matB.shape
    matA = mat_preproc(matA)
    matB = mat_preproc(matB)
    
    return matrix_mul(matA, matB, k, m, n)

def matrix_mul_with_tiling(A, B, t):
    """
    Multiplies two matrices A and B using tiling.
    
    Parameters:
    A (numpy.ndarray): First matrix of size (m, k).
    B (numpy.ndarray): Second matrix of size (k, n).
    t (int): Tile size.
    
    Returns:
    numpy.ndarray: Resultant matrix of size (m, n) after multiplication.
    """
    m, k1 = A.shape
    k2, n = B.shape
    
    # Ensure the matrices can be multiplied
    if k1 != k2:
        raise ValueError("The number of columns in A must be equal to the number of rows in B")
    
    # Initialize the result matrix with zeros
    C = np.zeros((m, n))
    
    # Perform tiled matrix multiplication
    for i in range(0, m, t):
        for j in range(0, n, t):
            for k in range(0, k1, t):
                # Define the end points for the current tile
                i_end = min(i + t, m)
                j_end = min(j + t, n)
                k_end = min(k + t, k1)
                
                # Perform the multiplication for the current tile
                # C[i:i_end, j:j_end] += np.dot(A[i:i_end, k:k_end], B[k:k_end, j:j_end])
                C[i:i_end, j:j_end] += matrix_mul_with_preproc(A[i:i_end, k:k_end], B[k:k_end, j:j_end])
    
    return C

### CNN model

#### Layer

In [None]:
class _Layer(object):
    def __init__(self):
        pass
    def forward(self, *input):
        raise NotImplementedError
    def backward(self, *output_grad):
        raise NotImplementedError
        
class Convolution(_Layer):
    def __init__(self, kernal_size, stride, in_size):
        self.weight = np.random.randn(kernal_size, kernal_size) # * 0.01
        self.kernal_size = kernal_size
        self.stride = stride
        self.in_size = in_size
        self.out_size = (in_size - kernal_size + 1) // stride
        self.bias = np.zeros([self.out_size, self.out_size])
        '''For RMSProp'''
        self.si_weight = np.zeros_like(self.weight)
        self.si_bias = np.zeros_like(self.bias)

    def forward(self, input):
        self.input = input
        image_num = input.shape[0]
        output = np.empty([image_num, self.out_size, self.out_size])
        data_stride = input.strides[2]
        for index in range(image_num):
            cmatrix = np.lib.stride_tricks.as_strided(
                input[index],
                shape=(self.out_size, self.out_size, self.kernal_size, self.kernal_size),
                strides=(data_stride*self.stride*self.in_size, data_stride*self.stride, data_stride*self.in_size, data_stride)
            )
            cmatrix = cmatrix.reshape(-1, self.kernal_size*self.kernal_size)
            output[index] = self.weight.astype(np.int8).ravel().dot(cmatrix.astype(np.int8).T).reshape(self.out_size, self.out_size) + self.bias
            # print(cmatrix.shape)
            # print(self.weight.reshape((-1, 1)).shape)
            # output[index] = matrix_mul(cmatrix, self.weight.reshape((-1, 1)), self.kernal_size * self.kernal_size, cmatrix.shape[0], 1).reshape(self.out_size, self.out_size) + self.bias
            # output[index] = matrix_mul_sw(cmatrix.astype(np.int8), self.weight.astype(np.int8).reshape((-1, 1))).reshape(self.out_size, self.out_size) + self.bias
            # output[index] = matrix_mul_with_tiling_sw(cmatrix.astype(np.int8), self.weight.astype(np.int8).reshape((-1, 1)), 8).reshape(self.out_size, self.out_size) + self.bias
            output[index] = matrix_mul_with_tiling(cmatrix.astype(np.int8), self.weight.astype(np.int8).reshape((-1, 1)), 8).reshape(self.out_size, self.out_size) + self.bias
        return output
    
    def backward(self, output_grad):
        image_num = output_grad.shape[0]
        output_grad = output_grad.reshape([image_num, self.out_size, self.out_size])
        self.weight_grad = np.empty([image_num, self.kernal_size, self.kernal_size])
        self.bias_grad = np.empty([image_num, self.out_size, self.out_size])

        data_stride = self.input.strides[2]
        for batch_idx in range(image_num):
            cmatrix = np.lib.stride_tricks.as_strided(
                self.input[batch_idx],
                shape=(self.out_size, self.out_size, self.kernal_size, self.kernal_size),
                strides=(data_stride*self.stride*self.in_size, data_stride*self.stride, data_stride*self.in_size, data_stride)
            )
            cmatrix = cmatrix.reshape(-1, self.kernal_size*self.kernal_size)
            tmp = cmatrix * output_grad[batch_idx].reshape(-1, 1)
            self.weight_grad[batch_idx] = tmp.sum(axis = 0).reshape([self.kernal_size, self.kernal_size])
            self.bias_grad[batch_idx] = output_grad[batch_idx]

        self.si_weight = np.sqrt(0.9 * np.square(self.si_weight) + 0.1 * np.square(self.weight_grad) + 1e-8)
        self.si_bias = np.sqrt(0.9 * np.square(self.si_bias) + 0.1 * np.square(self.bias_grad))

class FullyConnected(_Layer):
    def __init__(self, in_features, out_features):
        self.weight = np.random.randn(in_features, out_features) * 0.01
        self.bias = np.zeros([1, out_features])
        '''For RMSProp'''
        self.si_weight = np.zeros_like(self.weight)
        self.si_bias = np.zeros_like(self.bias)

    def forward(self, input):
        self.forward_pass = input
        output = input.reshape(input.shape[0], -1).dot(self.weight) + self.bias
        return output

    def backward(self, output_grad):
        image_num = output_grad.shape[0]
        input_grad = np.empty([image_num, self.weight.shape[0]])
        self.weight_grad = np.empty([image_num, self.weight.shape[0], self.weight.shape[1]])
        self.bias_grad = np.empty([image_num, self.bias.shape[0], self.bias.shape[1]])
        for batch_idx in range(image_num):
            input_grad[batch_idx] = output_grad[batch_idx].dot(self.weight.T)
            self.weight_grad[batch_idx] = np.outer(self.forward_pass[batch_idx].ravel(), output_grad[batch_idx])
            self.bias_grad[batch_idx] = output_grad[batch_idx]

        self.si_weight = np.sqrt(0.9 * np.square(self.si_weight) + 0.1 * np.square(self.weight_grad) + 1e-8)
        self.si_bias = np.sqrt(0.9 * np.square(self.si_bias) + 0.1 * np.square(self.bias_grad))
        return input_grad

class Relu(_Layer):
    def __init__(self):
        pass
    def forward(self, input):
        output = np.where(input<0, 0.01*input, input)
        return output
    def backward(self, output_grad):
        input_grad = np.where(output_grad < 0, 0.01, 1)
        return input_grad

class Sigmoid(_Layer):
    def __init__(self):
        pass
    
    def forward(self, input):
        output = np.where(input < 0, np.exp(input)/(1 + np.exp(input)), 1/(1 + np.exp(-input)))
        return output

    def backward(self, output_grad):
        sig = np.where(output_grad < 0, np.exp(output_grad)/(1 + np.exp(output_grad)), 1/(1 + np.exp(-output_grad)))
        input_grad = sig * (1 - sig)
        return input_grad

class SoftmaxWithloss(_Layer):
    def __init__(self):
        pass

    def forward(self, input, target):
        self.target = target
        '''Softmax'''
        input -= input.max(axis = 1).reshape(-1, 1)
        predict = np.exp(input) / np.exp(input).sum(axis = 1).reshape(-1, 1)
        '''Cross entropy'''
        your_loss = -(target * np.log(predict + 1e-15)).sum(axis = 1)
        self.predict = predict
        return predict, your_loss
    
    def backward(self):
        input_grad = self.predict - self.target
        return input_grad

#### Network

In [None]:
class Network(object):
    def __init__(self):
        self.cnn1 = Convolution(3, 1, 28)
        self.act1 = Relu()
        self.fc1 = FullyConnected(26*26, 10)
        self.loss = SoftmaxWithloss()

    def forward(self, input, target):
        h1 = self.cnn1.forward(input.reshape(input.shape[0], 28, 28))
        n1 = self.act1.forward(h1)
        h2 = self.fc1.forward(n1)
        pred, loss = self.loss.forward(h2, target)
        loss_total = loss.mean()
        return pred, loss_total

    def backward(self):
        loss_grad = self.loss.backward()
        h2_grad = self.fc1.backward(loss_grad)
        n1_grad = self.act1.backward(h2_grad)
        self.cnn1.backward(n1_grad)

    def update(self, lr):
        ### RMSProp
        self.fc1.weight -= lr * np.sum(self.fc1.weight_grad, axis = 0) / np.sum(self.fc1.si_weight, axis = 0)
        self.fc1.bias -= lr * np.sum(self.fc1.bias_grad, axis = 0) / np.sum(self.fc1.si_bias, axis = 0)
        self.cnn1.weight -= lr * np.sum(self.cnn1.weight_grad, axis = 0) / np.sum(self.cnn1.si_weight, axis = 0)
        self.cnn1.bias -= lr * np.sum(self.cnn1.bias_grad, axis = 0) / np.sum(self.cnn1.si_bias, axis = 0)

#### Train

In [None]:
# Fix the random seed
np.random.seed(1)

# Load data
train_load = np.loadtxt('./train.csv',delimiter=',',dtype="int")
train_data=train_load[:,1:]
train_label=train_load[:,0]
print("shape of train_data: {}".format(train_data.shape))
print("shape of train_label: {}".format(train_label.shape))

train_image_num = train_data.shape[0]
print("shape of train_data: {}".format(train_data.shape))
print("shape of train_label: {}".format(train_label.shape))
print("train_image_num  is : {}".format(train_image_num))

# Show data
plt.figure(figsize=(20, 20))
for index in range(100):
    image = train_data[index].reshape(28,28)
    plt.subplot(10, 10, index+1)
    plt.imshow(image)
plt.show()

# Convert the training labels to one hot vector
label_temp = np.zeros((train_image_num, 10), dtype = np.float32)
for i in range(train_image_num):
    label_temp[i][train_label[i]] = 1
train_label_onehot = np.copy(label_temp)
print("One-hot training labels shape:",train_label_onehot.shape)

In [None]:
# Hyperparameters
EPOCH = 10
val_image_num = 4800
Batch_size = 8
Learning_rate = 1e-5

d_model = 1024
warmup_steps = 40000

In [None]:
# Training
net = Network()

train_batch_num = (train_image_num  -  val_image_num  )//Batch_size
val_batch_num = (val_image_num)//Batch_size
# test_batch_num = test_image_num//Batch_size

print("[   0.000] start training...")
start_time = time()
for epoch in range(1, EPOCH+1):
    train_hit = 0
    val_hit = 0
    total_train_loss = 0
    total_val_loss = 0
    for it in range(train_batch_num):
        pred, train_loss = net.forward(train_data[it*Batch_size:(it+1)*Batch_size], train_label_onehot[it*Batch_size:(it+1)*Batch_size])
        pred_index = np.argmax(pred, axis=1)
        train_hit += (pred_index==train_label[it*Batch_size:(it+1)*Batch_size]).sum()
        total_train_loss += train_loss

        step = epoch * train_batch_num + it
        Learning_rate = d_model**(-0.5) * min(step**(-0.5), step*(warmup_steps**(1.5)))
        net.backward()
        net.update(0.1*Learning_rate)
        print('[%8.3f]'%(time()-start_time), 'train batch:', '%5d'%it, '/', '%5d'%train_batch_num)
        
    for titt in range(val_batch_num):
        tit=train_batch_num+titt
        pred, val_loss = net.forward(train_data[tit*Batch_size:(tit+1)*Batch_size], train_label_onehot[tit*Batch_size:(tit+1)*Batch_size])
        pred_index = np.argmax(pred, axis=1)
        val_hit += (pred_index==train_label[tit*Batch_size:(tit+1)*Batch_size]).sum()
        total_val_loss += val_loss
        print('[%8.3f]'%(time()-start_time), 'val batch:', '%5d'%it, '/', '%5d'%train_batch_num)
    
    run_time = time() - start_time
    print('[%8.3f]'%run_time,'Epoch:%3d'%epoch, '|Train Loss:%8.4f'%(total_train_loss/train_batch_num), '|Train Acc:%3.4f'%(train_hit/(train_image_num-val_image_num)*100.0)
          , '|Val Loss:%8.4f'%(total_val_loss/val_batch_num), '|Val Acc:%3.4f'%(val_hit/val_image_num*100.0))

### Unit Test

#### Matrix Multiplication

In [None]:
mmtestA = np.random(8,8)
mmtestB = np.random(8,8)
print(time())
mmtestC = matrix_mul_sw(mmtestA, mmtestB)
print(time())
mmtestC = matrix_mul_with_preproc(mmtestA, mmtestB)
print(time())