In [1]:
from __future__ import print_function

import sys
import numpy as np
from time import time
import matplotlib.pyplot as plt 

sys.path.append('/home/xilinx')
from pynq import Overlay
from pynq import allocate

from uartlite import *

import multiprocessing

# For sharing string variable
from multiprocessing import Process,Manager,Value
from ctypes import c_char_p

import asyncio

ROM_SIZE = 0x2000 #8K

In [2]:
ol = Overlay("caravel_fpga.bit")
#ol.ip_dict

In [3]:
ipOUTPIN = ol.output_pin_0          # OutputPIN (For CPU reset)
ipPS = ol.caravel_ps_0              # Provide AXI-Lite for caravel
ipReadROMCODE = ol.read_romcode_0   # Read ROM code
ipUart = ol.axi_uartlite_0          # AXI-Lite interface for UART

In [4]:
ol.interrupt_pins

{'axi_uartlite_0/interrupt': {'controller': 'axi_intc_0',
  'index': 0,
  'fullpath': 'axi_uartlite_0/interrupt'},
 'axi_intc_0/intr': {'controller': 'axi_intc_0',
  'index': 0,
  'fullpath': 'axi_intc_0/intr'}}

In [5]:
# See what interrupts are in the system
#ol.interrupt_pins

# Each IP instances has a _interrupts dictionary which lists the names of the interrupts
#ipUart._interrupts

# The interrupts object can then be accessed by its name
# The Interrupt class provides a single function wait 
# which is an asyncio coroutine that returns when the interrupt is signalled.
intUart = ipUart.interrupt

In [6]:
# Create np with 8K/4 (4 bytes per index) size and be initiled to 0
rom_size_final = 0

npROM = np.zeros(ROM_SIZE >> 2, dtype=np.uint32)
npROM_index = 0
npROM_offset = 0
# Open Firmware of the design
fiROM = open("combined.hex", "r+")
#fiROM = open("counter_wb.hex", "r+")

for line in fiROM:
    # offset header
    if line.startswith('@'):
        # Ignore first char @
        npROM_offset = int(line[1:].strip(b'\x00'.decode()), base = 16)
        npROM_offset = npROM_offset >> 2 # 4byte per offset
        #print (npROM_offset)
        npROM_index = 0
        continue
    #print (line)

    # We suppose the data must be 32bit alignment
    buffer = 0
    bytecount = 0
    for line_byte in line.strip(b'\x00'.decode()).split():
        buffer += int(line_byte, base = 16) << (8 * bytecount)
        bytecount += 1
        # Collect 4 bytes, write to npROM
        if(bytecount == 4):
            npROM[npROM_offset + npROM_index] = buffer
            # Clear buffer and bytecount
            buffer = 0
            bytecount = 0
            npROM_index += 1
            #print (npROM_index)
            continue
    # Fill rest data if not alignment 4 bytes
    if (bytecount != 0):
        npROM[npROM_offset + npROM_index] = buffer
        npROM_index += 1
    
fiROM.close()

rom_size_final = npROM_offset + npROM_index
#print (rom_size_final)

#for data in npROM:
#    print (hex(data))


In [7]:
# Allocate dram buffer will assign physical address to ip ipReadROMCODE

#rom_buffer = allocate(shape=(ROM_SIZE >> 2,), dtype=np.uint32)
rom_buffer = allocate(shape=(rom_size_final,), dtype=np.uint32)

# Initial it by npROM
#for index in range (ROM_SIZE >> 2):
for index in range (rom_size_final):
    rom_buffer[index] = npROM[index]
    
#for index in range (ROM_SIZE >> 2):
#    print ("0x{0:08x}".format(rom_buffer[index]))

# Program physical address for the romcode base address


# 0x00 : Control signals
#        bit 0  - ap_start (Read/Write/COH)
#        bit 1  - ap_done (Read/COR)
#        bit 2  - ap_idle (Read)
#        bit 3  - ap_ready (Read)
#        bit 7  - auto_restart (Read/Write)
#        others - reserved
# 0x10 : Data signal of romcode
#        bit 31~0 - romcode[31:0] (Read/Write)
# 0x14 : Data signal of romcode
#        bit 31~0 - romcode[63:32] (Read/Write)
# 0x1c : Data signal of length_r
#        bit 31~0 - length_r[31:0] (Read/Write)

ipReadROMCODE.write(0x10, rom_buffer.device_address)
ipReadROMCODE.write(0x1C, rom_size_final)

ipReadROMCODE.write(0x14, 0)

# ipReadROMCODE start to move the data from rom_buffer to bram
ipReadROMCODE.write(0x00, 1) # IP Start
while (ipReadROMCODE.read(0x00) & 0x04) == 0x00: # wait for done
    continue
    
print("Write to bram done")


Write to bram done


In [8]:
# Initialize AXI UART
uart = UartAXI(ipUart.mmio.base_addr)

# Setup AXI UART register
uart.setupCtrlReg()

# Get current UART status
uart.currentStatus()

{'RX_VALID': 0,
 'RX_FULL': 0,
 'TX_EMPTY': 1,
 'TX_FULL': 0,
 'IS_INTR': 0,
 'OVERRUN_ERR': 0,
 'FRAME_ERR': 0,
 'PARITY_ERR': 0}

In [9]:
async def uart_rxtx():
    # Reset FIFOs, enable interrupts
    ipUart.write(CTRL_REG, 1<<RST_TX | 1<<RST_RX | 1<<INTR_EN)
    print("Waitting for interrupt")
    tx_str = "hello\n"
    ipUart.write(TX_FIFO, ord(tx_str[0]))
    i = 1
    while(True):
        await intUart.wait()
        buf = ""
        # Read FIFO until valid bit is clear
        while ((ipUart.read(STAT_REG) & (1<<RX_VALID))):
            buf += chr(ipUart.read(RX_FIFO))
            if i<len(tx_str):
                ipUart.write(TX_FIFO, ord(tx_str[i]))
                i=i+1
        print(buf, end='')
        
async def caravel_start():
    ipOUTPIN.write(0x10, 0)
    print("Start Caravel Soc")
    ipOUTPIN.write(0x10, 1)

import time    
async def show():
    start = time.time()
    
    while (True):
        reg_mprj_datal = hex(ipPS.read(0x1c))
        print(f"mprj: {reg_mprj_datal}, time: {(time.time() - start) * 10e6}us")
        prev = time.time()
    
async def test():
    #============== FIR ===============#
    while (ipPS.read(0x1c) & 0xffff0000 != 0xab510000)
    print("--------- FIR test started ---------")
    while (ipPS.read(0x1c) & 0xffff0000 != 0xab610000):
        continue
    print("-------- FIR test finished ---------")
    print("------------------------------------")

    #============== MM ================#
    print("--------- MM Test started ----------") 
    #while((ipPS.read(0x1c) & 0xffff0000) != 0x003E0000):
    #    continue
    #print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x%x", ipPS.read(0x1c))
    #while((ipPS.read(0x1c) & 0xffff0000) != 0x00440000):
    #    continue
    #print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x%x", ipPS.read(0x1c))
    #while((ipPS.read(0x1c) & 0xffff0000) != 0x004A0000):
    #    continue
    #print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x%x", ipPS.read(0x1c))
    #while((ipPS.read(0x1c) & 0xffff0000) != 0x00500000):
    #    continue
    #print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x%x", ipPS.read(0x1c))
    while((ipPS.read(0x1c) & 0xffff0000) != 0xab710000):
        continue
    print("MM Test finished")
    print("------------------------------------")

    #=============== QS ================#
    print("-------- QS Test started -----------")
    #while((ipPS.read(0x1c) & 0xffff0000) != 40):
    #    continue
    #print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x%x", ipPS.read(0x1c))
    #while((ipPS.read(0x1c) & 0xffff0000) != 893):
    #    continue
    #print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x%x", ipPS.read(0x1c))
    #while((ipPS.read(0x1c) & 0xffff0000) != 2541):
    #    continue
    #print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x%x", ipPS.read(0x1c))
    #while((ipPS.read(0x1c) & 0xffff0000) != 2669):
    #    continue
    #print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x%x", ipPS.read(0x1c))
    while((ipPS.read(0x1c) & 0xffff0000) != 0xab810000):
        continue
    print("QS Test finished")
    
    print("=====================================")
    print("All FIR/MM/QS and UART are finished!!")
    print("=====================================")
    
# Python 3.5+
#tasks = [ # Create a task list
#    asyncio.ensure_future(example1()),
#    asyncio.ensure_future(example2()),
#]
# To test this we need to use the asyncio library to schedule our new coroutine. 
# asyncio uses event loops to execute coroutines. 
# When python starts it will create a default event loop 
# which is what the PYNQ interrupt subsystem uses to handle interrupts

#loop = asyncio.get_event_loop()
#loop.run_until_complete(asyncio.wait(tasks))

# Python 3.7+
async def async_main(): 
    task2 = asyncio.create_task(caravel_start())
    task1 = asyncio.create_task(uart_rxtx()) 
    task3 = asyncio.create_task(test())
    #task0 = asyncio.create_task(show())
    # Wait for 5 second
    await asyncio.sleep(10)
    task1.cancel()
    #task0.cancel()
    try:
        await task1
    except asyncio.CancelledError:
        print('main(): uart_rx is cancelled now')
    

In [None]:
asyncio.run(async_main()) 

Start Caravel Soc
Waitting for interrupt
mprj: 0xab400040, time: 722.4082946777344us
mprj: 0xab400040, time: 21808.147430419922us
mprj: 0xab400040, time: 41952.13317871094us
mprj: 0xab400040, time: 65231.3232421875us
mprj: 0xab400040, time: 69279.67071533203us
mprj: 0xab400040, time: 72281.3606262207us
mprj: 0xab400040, time: 103063.58337402344us
mprj: 0xab400040, time: 120706.55822753906us
mprj: 0xab400040, time: 124409.19876098633us
mprj: 0xab400040, time: 127282.14263916016us
mprj: 0xab400040, time: 130050.1823425293us
mprj: 0xab400040, time: 164904.59442138672us
mprj: 0xab400040, time: 167946.81549072266us
mprj: 0xab400040, time: 170745.849609375us
mprj: 0xab400040, time: 173480.51071166992us
mprj: 0xab400040, time: 176222.3243713379us
mprj: 0xab400040, time: 178954.6012878418us
mprj: 0xab400040, time: 181682.10983276367us
mprj: 0xab400040, time: 184397.69744873047us
mprj: 0xab400040, time: 187144.27947998047us
mprj: 0xab400040, time: 189960.00289916992us
mprj: 0xab400040, time: 19

In [None]:
print ("0x10 = ", hex(ipPS.read(0x10)))
print ("0x14 = ", hex(ipPS.read(0x14)))
print ("0x1c = ", hex(ipPS.read(0x1c)))
print ("0x20 = ", hex(ipPS.read(0x20)))
print ("0x34 = ", hex(ipPS.read(0x34)))
print ("0x38 = ", hex(ipPS.read(0x38)))