In [1]:
from __future__ import print_function

import sys
import numpy as np
from time import time
import matplotlib.pyplot as plt 

sys.path.append('/home/xilinx')
from pynq import Overlay
from pynq import allocate

from uartlite import *

import multiprocessing

# For sharing string variable
from multiprocessing import Process,Manager,Value
from ctypes import c_char_p

import asyncio

ROM_SIZE = 0x2000 #8K

In [2]:
ol = Overlay("caravel_fpga.bit")
#ol.ip_dict

In [3]:
ipOUTPIN = ol.output_pin_0
ipPS = ol.caravel_ps_0
ipReadROMCODE = ol.read_romcode_0
ipUart = ol.axi_uartlite_0

In [4]:
ol.interrupt_pins

{'axi_intc_0/intr': {'controller': 'axi_intc_0',
  'index': 0,
  'fullpath': 'axi_intc_0/intr'},
 'axi_uartlite_0/interrupt': {'controller': 'axi_intc_0',
  'index': 0,
  'fullpath': 'axi_uartlite_0/interrupt'}}

In [5]:
# See what interrupts are in the system
#ol.interrupt_pins

# Each IP instances has a _interrupts dictionary which lists the names of the interrupts
#ipUart._interrupts

# The interrupts object can then be accessed by its name
# The Interrupt class provides a single function wait 
# which is an asyncio coroutine that returns when the interrupt is signalled.
intUart = ipUart.interrupt

In [6]:
# Create np with 8K/4 (4 bytes per index) size and be initiled to 0
rom_size_final = 0

npROM = np.zeros(ROM_SIZE >> 2, dtype=np.uint32)
npROM_index = 0
npROM_offset = 0
fiROM = open("uart.hex", "r+")
#fiROM = open("counter_wb.hex", "r+")

for line in fiROM:
    # offset header
    if line.startswith('@'):
        # Ignore first char @
        npROM_offset = int(line[1:].strip(b'\x00'.decode()), base = 16)
        npROM_offset = npROM_offset >> 2 # 4byte per offset
        #print (npROM_offset)
        npROM_index = 0
        continue
    #print (line)

    # We suppose the data must be 32bit alignment
    buffer = 0
    bytecount = 0
    for line_byte in line.strip(b'\x00'.decode()).split():
        buffer += int(line_byte, base = 16) << (8 * bytecount)
        bytecount += 1
        # Collect 4 bytes, write to npROM
        if(bytecount == 4):
            npROM[npROM_offset + npROM_index] = buffer
            # Clear buffer and bytecount
            buffer = 0
            bytecount = 0
            npROM_index += 1
            #print (npROM_index)
            continue
    # Fill rest data if not alignment 4 bytes
    if (bytecount != 0):
        npROM[npROM_offset + npROM_index] = buffer
        npROM_index += 1
    
fiROM.close()

rom_size_final = npROM_offset + npROM_index
#print (rom_size_final)

#for data in npROM:
#    print (hex(data))


In [7]:
# Allocate dram buffer will assign physical address to ip ipReadROMCODE

#rom_buffer = allocate(shape=(ROM_SIZE >> 2,), dtype=np.uint32)
rom_buffer = allocate(shape=(rom_size_final,), dtype=np.uint32)

# Initial it by npROM
#for index in range (ROM_SIZE >> 2):
for index in range (rom_size_final):
    rom_buffer[index] = npROM[index]
    
#for index in range (ROM_SIZE >> 2):
#    print ("0x{0:08x}".format(rom_buffer[index]))

# Program physical address for the romcode base address


# 0x00 : Control signals
#        bit 0  - ap_start (Read/Write/COH)
#        bit 1  - ap_done (Read/COR)
#        bit 2  - ap_idle (Read)
#        bit 3  - ap_ready (Read)
#        bit 7  - auto_restart (Read/Write)
#        others - reserved
# 0x10 : Data signal of romcode
#        bit 31~0 - romcode[31:0] (Read/Write)
# 0x14 : Data signal of romcode
#        bit 31~0 - romcode[63:32] (Read/Write)
# 0x1c : Data signal of length_r
#        bit 31~0 - length_r[31:0] (Read/Write)

ipReadROMCODE.write(0x10, rom_buffer.device_address)
ipReadROMCODE.write(0x1C, rom_size_final)

ipReadROMCODE.write(0x14, 0)

# ipReadROMCODE start to move the data from rom_buffer to bram
ipReadROMCODE.write(0x00, 1) # IP Start
while (ipReadROMCODE.read(0x00) & 0x04) == 0x00: # wait for done
    continue
    
print("Write to bram done")


Write to bram done


In [8]:
# Initialize AXI UART
uart = UartAXI(ipUart.mmio.base_addr)

# Setup AXI UART register
uart.setupCtrlReg()

# Get current UART status
uart.currentStatus()

{'RX_VALID': 0,
 'RX_FULL': 0,
 'TX_EMPTY': 1,
 'TX_FULL': 0,
 'IS_INTR': 0,
 'OVERRUN_ERR': 0,
 'FRAME_ERR': 0,
 'PARITY_ERR': 0}

In [9]:
async def uart_rxtx():
    # Reset FIFOs, enable interrupts
    ipUart.write(CTRL_REG, 1<<RST_TX | 1<<RST_RX | 1<<INTR_EN)
    print("Waitting for interrupt")
    tx_str = "hello\n"
    ipUart.write(TX_FIFO, ord(tx_str[0]))
    i = 1
    while(True):
        await intUart.wait()
        buf = ""
        # Read FIFO until valid bit is clear
        while ((ipUart.read(STAT_REG) & (1<<RX_VALID))):
            buf += chr(ipUart.read(RX_FIFO))
            if i<len(tx_str):
                ipUart.write(TX_FIFO, ord(tx_str[i]))
                i=i+1
        print(buf, end='')
        
############################## Added by us ##############################
async def uart_rxtx_for_only_one_char():
    # Reset FIFOs, enable interrupts
    ipUart.write(CTRL_REG, 1<<RST_TX | 1<<RST_RX | 1<<INTR_EN)
    print("Waitting for interrupt")
    tx_str = "s"
    buf = ""
    
    latency_timer_start=time()
    
    ipUart.write(TX_FIFO, ord(tx_str[0]))
    await intUart.wait()
    buf = chr(ipUart.read(RX_FIFO))
    
    latency_timer_end=time()
    
    
    print("Notebook received"+buf+", and the latency for one character loop-back using UART = ", latency_timer_end-latency_timer_start)
        
#########################################################################
        
async def caravel_start():
    ipOUTPIN.write(0x10, 0)
    print("Start Caravel Soc")
    ipOUTPIN.write(0x10, 1)
    ############################## Added by us ##############################
    #check_number=0
    ###for n in range(1, 100):
    ###    print ("0x1c = ", hex(ipPS.read(0x1c)))
    
    while ((ipPS.read(0x1c)>>16) != 0xab40):
        continue
    # Because print() function takes a lot of time to execute (resulting in missing mprj_io code), we cannot print the information here, and we will print all the information together at last.
    ###print("Info: Start matrix multiplication test...") 
    
#    while ((ipPS.read(0x1c)>>16) != 0x003e):
#        continue
    ###print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x003e, which is 62 in decimal")
    #check_number=1
    
    while ((ipPS.read(0x1c)>>16) != 0x0044):
        continue
    ###print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x0044, which is 68 in decimal")
    #check_number=2
    
#    while ((ipPS.read(0x1c)>>16) != 0x004a):
#        continue
    ###print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x004a, which is 74 in decimal")
    #check_number=3
    
    while ((ipPS.read(0x1c)>>16) != 0x0050):
        continue
    ###print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x0050, which is 80 in decimal")
    #check_number=4
    
    while ((ipPS.read(0x1c)>>16) != 0xab51):
        continue
    ###print("Success: Matrix multiplication test passed ♪")
    
    ###print("Info: Start Q sort test...")
    #print ("0x1c = ", (ipPS.read(0x1c)>>16) == 43857)
    # Using decimal to do value comparison is too slow (?)
#    while ((ipPS.read(0x1c)>>16) != 40):
#    while ((ipPS.read(0x1c)>>16) != 0x0028):
#        continue
    ###print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x0028, which is 40 in decimal")
    #check_number=5
    
###    while ((ipPS.read(0x1c)>>16) != 893):
#    while ((ipPS.read(0x1c)>>16) != 0x037d):
###        continue
    ###print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x037d, which is 893 in decimal")
    #check_number=6
    
    while ((ipPS.read(0x1c)>>16) != 2541):
#    while ((ipPS.read(0x1c)>>16) != 0x09ed):
        continue
    ###print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x09ed, which is 2541 in decimal")
    #check_number=7
    
#    while ((ipPS.read(0x1c)>>16) != 2669):
#    while ((ipPS.read(0x1c)>>16) != 0x0a6d):
#        continue
    ###print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x0a6d, which is 2669 in decimal")
    #check_number=8
    
    while ((ipPS.read(0x1c)>>16) != 6023):
        continue
    ###print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x1787, which is 6023 in decimal")
    
    while ((ipPS.read(0x1c)>>16) != 0xab52):
        continue
    ###print("Success: Q sort test passed ♪")
    
    ###print("Info: Start FIR test...")
#    while ((ipPS.read(0x1c)>>16) != 1098):
    #while ((ipPS.read(0x1c) & 0xffff0000) != 0x044a0000):
#        continue
    ###print("Call function fir() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x044a, which is 1098 in decimal")
    #check_number=9
    
    while ((ipPS.read(0x1c) & 0xffff0000) != 0xffe70000):
        continue
    ###print("Call function fir() in User Project BRAM (mprjram, 0x38000000) return value passed, 0xffe7, which is -25 in decimal")
    
    while ((ipPS.read(0x1c)>>16) != 732):
        continue
    ###print("Call function fir() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x02dc, which is 732 in decimal")
    
    while ((ipPS.read(0x1c)>>16) != 0xab60):
        continue
    ###print("Success: FIR test passed ♪")
    
    print("Successfully start matrix multiplication test (checkbits = 0xAB40)")
    #####print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x003e, which is 62 in decimal")
    print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x0044, which is 68 in decimal")
    #####print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x004a, which is 74 in decimal")
    print("Call function matmul() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x0050, which is 80 in decimal")
    print("Success: Matrix multiplication test passed ♪ (checkbits = 0xAB51)") 
    print("Successfully start Q sort test (checkbits = 0xAB51)")
    #####print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x0028, which is 40 in decimal")
    #####print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x037d, which is 893 in decimal")
    print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x09ed, which is 2541 in decimal")
    #####print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x0a6d, which is 2669 in decimal")
    print("Call function qsort() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x1787, which is 6023 in decimal")
    print("Success: Q sort test passed ♪ (checkbits = 0xAB52)")
    print("Successfully start FIR test (checkbits = 0xAB52)")
    #####print("Call function fir() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x044a, which is 1098 in decimal")
    print("Call function fir() in User Project BRAM (mprjram, 0x38000000) return value passed, 0xffe7, which is -25 in decimal")
    print("Call function fir() in User Project BRAM (mprjram, 0x38000000) return value passed, 0x02dc, which is 732 in decimal")
    print("Success: FIR test passed ♪ (checkbits = 0xAB60)")
    #########################################################################
    
############################## Added by us ##############################
###async def check_matrix_multiplication():
###    #print("Start check_matrix_multiplication()")
###    for n in range(1, 100):
###        print ("0x1c = ", hex(ipPS.read(0x1c)))
###    while ((ipPS.read(0x1c)>>16) != 0xab51):
###        continue
###    print("Success: Matrix multiplication test passed ♪")
#########################################################################

# Python 3.5+
#tasks = [ # Create a task list
#    asyncio.ensure_future(example1()),
#    asyncio.ensure_future(example2()),
#]
# To test this we need to use the asyncio library to schedule our new coroutine. 
# asyncio uses event loops to execute coroutines. 
# When python starts it will create a default event loop 
# which is what the PYNQ interrupt subsystem uses to handle interrupts

#loop = asyncio.get_event_loop()
#loop.run_until_complete(asyncio.wait(tasks))

# Python 3.7+
async def async_main(): 
    task2 = asyncio.create_task(caravel_start()) 
    ############################## Added by us ##############################
    #await asyncio.sleep(1)
    ###task3 = asyncio.create_task(check_matrix_multiplication())
    task1 = asyncio.create_task(uart_rxtx_for_only_one_char())
    #########################################################################
    #####task1 = asyncio.create_task(uart_rxtx()) 
    # Wait for 5 second
    await asyncio.sleep(10)
    task1.cancel()
    try:
        await task1
    except asyncio.CancelledError:
        print('main(): uart_rx is cancelled now')

In [10]:
asyncio.run(async_main()) 

Start Caravel Soc


KeyboardInterrupt: 

In [None]:
print ("0x10 = ", hex(ipPS.read(0x10)))
print ("0x14 = ", hex(ipPS.read(0x14)))
print ("0x1c = ", hex(ipPS.read(0x1c)))
print ("0x20 = ", hex(ipPS.read(0x20)))
print ("0x34 = ", hex(ipPS.read(0x34)))
print ("0x38 = ", hex(ipPS.read(0x38)))