In [1]:
from pynq import Overlay
import pynq.lib.dma

OL = Overlay("./design_1.bit")
print(OL.ip_dict.keys())
dma = OL.axi_dma_0

dict_keys(['axi_dma_0', 'processing_system7_0'])


In [2]:
import numpy as np
from pynq import allocate
data_src = allocate((107,), dtype=np.int16)
for i in range(107):
    data_src[i] = i - 53

data_dst = allocate((307,), dtype=np.int16)

print("size of data_src :", data_src.nbytes, "Byte")
print("size of data_dst :", data_dst.nbytes, "Byte")

size of data_src : 214 Byte
size of data_dst : 614 Byte


In [3]:
"""Register Map"""

MM2S_DMACR = 0x00 # MM2S DMA Ctrl
MM2S_DMASR = 0x04 # MM2S DMA Status
MM2S_SA = 0x18 # MM2S Source ADDR
MM2S_LENGTH = 0x28 # MM2S Data Length

S2MM_DMACR = 0x30 # S2MM DMA Ctrl
S2MM_DMASR = 0x34 # S2MM DMA Status
S2MM_DA = 0x48 # S2MM Destination ADDR
S2MM_LENGTH = 0x58 # S2MM Data Length

In [4]:
def stream_status(ip, reg):
    if reg == MM2S_DMASR:
        print("MM2S status : ", end='')
    elif reg == S2MM_DMASR:
        print("S2MM status : ", end='')
    else:
        print("Invalid reg")
        return 0
    
    status = ip.read(reg)
    if (status & 0x01) == 0x01:
        print("halted, ", end='')
    else:
        print("running, ", end='')
    
    if (status & 0x02) == 0x02:
        print("idle, ", end='')

    if (status & 0x08) == 0x08:
        print("SGIncld, ", end='')
        
    if (status & 0x10) == 0x10:
        print("DMAIntErr, ", end='')
    
    if (status & 0x20) == 0x20:
        print("DMASlvErr, ", end='')
        
    if (status & 0x40) == 0x40:
        print("DMADecErr, ", end='')
        
    if (status & 0x100) == 0x100:
        print("SGIntErr, ", end='')
        
    if (status & 0x200) == 0x200:
        print("SGSlvErr, ", end='')
        
    if (status & 0x400) == 0x400:
        print("SGDecErr, ", end='')

    if (status & 0x1000) == 0x1000:
        print("IOC_Irq, ", end='')
        
    if (status & 0x2000) == 0x2000:
        print("Dly_Irq, ", end='')
        
    if (status & 0x4000) == 0x4000:
        print("Err_Irq, ", end='')
    
    print("")

In [5]:
try:
    print(data_dst)
    print(data_src)
    print("\n" + "=" * 30 + "\n")

    stream_status(dma, MM2S_DMASR)
    stream_status(dma, S2MM_DMASR)
    print(dma.read(MM2S_DMACR))
    print(dma.read(S2MM_DMACR))
    stream_status(dma, MM2S_DMASR)
    stream_status(dma, S2MM_DMASR)

    # Reset DMA IPs
    print("\nReset DMA IPs")
    dma.write(S2MM_DMACR, 0x04)
    dma.write(MM2S_DMACR, 0x04)
    print(dma.read(MM2S_DMACR))
    print(dma.read(S2MM_DMACR))

    stream_status(dma, MM2S_DMASR)
    stream_status(dma, S2MM_DMASR)

    # Standby DMA IPs
    print("\nStandby DMA IPs")
    dma.write(S2MM_DMACR, 0x00)
    dma.write(MM2S_DMACR, 0x00)
    print(dma.read(MM2S_DMACR))
    print(dma.read(S2MM_DMACR))

    stream_status(dma, MM2S_DMASR)
    stream_status(dma, S2MM_DMASR)

    # Send Memory Address to DMA IPs
    print("\nSend Memory Address to DMA IPs")
    dma.write(S2MM_DA, data_dst.physical_address)
    dma.write(MM2S_SA, data_src.physical_address)
    print(dma.read(MM2S_DMACR))
    print(dma.read(S2MM_DMACR))

    stream_status(dma, MM2S_DMASR)
    stream_status(dma, S2MM_DMASR)

    # Run DMA IP
    print("\nStart DMA IPs")
    dma.write(S2MM_DMACR, 0xF001)
    dma.write(MM2S_DMACR, 0xF001)
    print(dma.read(MM2S_DMACR))
    print(dma.read(S2MM_DMACR))

    stream_status(dma, MM2S_DMASR)
    stream_status(dma, S2MM_DMASR)

    # Send Data Length to DMA IP
    print("\nSend Data Length to DMA IP")
    dma.write(S2MM_LENGTH, data_dst.nbytes)
    dma.write(MM2S_LENGTH, data_src.nbytes)
    print(dma.read(MM2S_DMACR))
    print(dma.read(S2MM_DMACR))
    
    stream_status(dma, MM2S_DMASR)
    stream_status(dma, S2MM_DMASR)

    # Wait MM2S Done
    print("\nWait MM2S...")
    mm2s_status = dma.read(MM2S_DMASR)
    while not(mm2s_status & (1 << 12) == (1 << 12)) or not(mm2s_status & (1 << 1) == (1 << 1)):
        mm2s_status = dma.read(MM2S_DMASR)
    stream_status(dma, MM2S_DMASR)
    stream_status(dma, S2MM_DMASR)

    # Wait S2MM Done
    print("\nWait S2MM...")
    s2mm_status = dma.read(S2MM_DMASR)
    while not(s2mm_status & (1 << 12) == (1 << 12)) or not(s2mm_status & (1 << 1) == (1 << 1)):
        s2mm_status = dma.read(S2MM_DMASR)
    stream_status(dma, MM2S_DMASR)
    stream_status(dma, S2MM_DMASR)

    print("\n" + "=" * 30 + "\n")
    print(data_dst)

except:
#     block.write(0x00, 0x100) # auto-restart
    import traceback
    traceback.print_exc()
    
    # Reset DMA IPs
    print("\n\n" + "=" * 30)
    print("Reset DMA IPs")
    
    dma.write(S2MM_DMASR, 0xF000)
    dma.write(MM2S_DMASR, 0xF000)
    
    # Reset DMA IPs
    dma.write(S2MM_DMACR, 0x04)
    dma.write(MM2S_DMACR, 0x04)

    dma.write(S2MM_DMACR, 0x00)
    dma.write(MM2S_DMACR, 0x00)
    print("Reset DMA Done")
    print("=" * 30 + "\n\n")
    
else:
    print("\nDone")

[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0]
[-53 -52 -51 -50 -49 -48 -47 -46 -45 -44 -43 -42 -41 -40 -39 -38 -37 -36
 -35 -34 -33 -32 -31 -30 -29 -28 -27 -26 -25 -24 -23 -22 -21 -20 -19 -18
 -17 -16 -15 -14 -13 -12 -11 -10  -9  -8  -7  -6  -5  -4  -3  -2  -1   0
   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15  16  17  18
  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35  36
  37  38  3