In [1]:
from pynq import Overlay
from pynq import allocate
import numpy as np
import asyncio
import time

Load overlay via XSA file.

In [2]:
ol = Overlay("./dma_timer_regfile.xsa")
print(ol.is_loaded())
print(list(ol.ip_dict.keys()))

True
['axi_gpio_0', 'led_blinker_axion_reg_0', 'axi4lite_slave_0', 'axi_intc_0', 'axi_dma_0', 'axi_timer_0', 'zynq_ultra_ps_e_0']


In [3]:
print("All interrupts:")  
for pin_name, pin_info in ol.interrupt_pins.items():  
    if 'dma' in pin_name.lower():  
        print(f"  {pin_name}: {pin_info}")

All interrupts:
  axi_dma_0/mm2s_introut: {'controller': 'axi_intc_0', 'index': 0, 'fullpath': 'axi_dma_0/mm2s_introut'}
  axi_dma_0/s2mm_introut: {'controller': 'axi_intc_0', 'index': 1, 'fullpath': 'axi_dma_0/s2mm_introut'}


In [4]:
dma = ol.axi_dma_0
dma_send = ol.axi_dma_0.sendchannel
dma_recv = ol.axi_dma_0.recvchannel

Create data buffer.

In [5]:
data_size = 1000
input_buffer = allocate(shape=(data_size,), dtype=np.uint32)
output_buffer = allocate(shape=(data_size,), dtype=np.uint32)

In [6]:
for i in range(data_size):
    input_buffer[i] = i + 0xcafe0000

In [7]:
print("output_buffer:")
for i in range(10):
    print('  0x' + format(output_buffer[i], '02x'))

output_buffer:
  0x00
  0x00
  0x00
  0x00
  0x00
  0x00
  0x00
  0x00
  0x00
  0x00


In [8]:
print("input_buffer:")
for i in range(10):
    print(f'  {hex(input_buffer[i])}')

input_buffer:
  0xcafe0000
  0xcafe0001
  0xcafe0002
  0xcafe0003
  0xcafe0004
  0xcafe0005
  0xcafe0006
  0xcafe0007
  0xcafe0008
  0xcafe0009


Do a MM2S transfer.

In [9]:
dma_send.transfer(input_buffer)

Do a S2MM transfer.

In [10]:
dma_recv.transfer(output_buffer)

In [11]:
print("output_buffer:")
for i in range(10):
    print('  0x' + format(output_buffer[i], '02x'))

output_buffer:
  0xcafe0000
  0xcafe0001
  0xcafe0002
  0xcafe0003
  0xcafe0004
  0xcafe0005
  0xcafe0006
  0xcafe0007
  0xcafe0008
  0xcafe0009


Verify data content.

In [12]:
print("Arrays are equal: {}".format(np.array_equal(input_buffer, output_buffer)))

Arrays are equal: True


Interrupts

In [13]:
# Check system interrupt count before
!cat /proc/interrupts | grep -E "(fabric|dma)"  
  
# Run DMA transfer  
dma.sendchannel.transfer(input_buffer)  
dma.recvchannel.transfer(output_buffer)  
dma.sendchannel.wait()  
dma.recvchannel.wait()  
  
# Check interrupt count after
print()
!cat /proc/interrupts | grep -E "(fabric|dma)"

 25:          0          0          0          0     GICv2 156 Level     zynqmp-dma
 26:          0          0          0          0     GICv2 157 Level     zynqmp-dma
 27:          0          0          0          0     GICv2 158 Level     zynqmp-dma
 28:          0          0          0          0     GICv2 159 Level     zynqmp-dma
 29:          0          0          0          0     GICv2 160 Level     zynqmp-dma
 30:          0          0          0          0     GICv2 161 Level     zynqmp-dma
 31:          0          0          0          0     GICv2 162 Level     zynqmp-dma
 32:          0          0          0          0     GICv2 163 Level     zynqmp-dma
 33:          0          0          0          0     GICv2 109 Level     zynqmp-dma
 34:          0          0          0          0     GICv2 110 Level     zynqmp-dma
 35:          0          0          0          0     GICv2 111 Level     zynqmp-dma
 36:          0          0          0          0     GICv2 112 Level     zyn

Check DMA Status

In [14]:
dma_recv.error

False

In [15]:
dma_recv.idle

True

In [16]:
dma_recv.running

True

Check DMA Reg Map

In [17]:
dma.register_map

RegisterMap {
  MM2S_DMACR = Register(RS=1, Reset=0, Keyhole=0, Cyclic_BD_Enable=0, IOC_IrqEn=1, Dly_IrqEn=0, Err_IrqEn=0, IRQThreshold=1, IRQDelay=0),
  MM2S_DMASR = Register(Halted=0, Idle=1, SGIncld=0, DMAIntErr=0, DMASlvErr=0, DMADecErr=0, SGIntErr=0, SGSlvErr=0, SGDecErr=0, IOC_Irq=1, Dly_Irq=0, Err_Irq=0, IRQThresholdSts=0, IRQDelaySts=0),
  MM2S_CURDESC = Register(Current_Descriptor_Pointer=0),
  MM2S_CURDESC_MSB = Register(Current_Descriptor_Pointer=0),
  MM2S_TAILDESC = Register(Tail_Descriptor_Pointer=0),
  MM2S_TAILDESC_MSB = Register(Tail_Descriptor_Pointer=0),
  MM2S_SA = Register(Source_Address=2015428608),
  MM2S_SA_MSB = Register(Source_Address=0),
  MM2S_LENGTH = Register(Length=4000),
  SG_CTL = Register(SG_CACHE=0, SG_USER=0),
  S2MM_DMACR = Register(RS=1, Reset=0, Keyhole=0, Cyclic_BD_Enable=0, IOC_IrqEn=1, Dly_IrqEn=0, Err_IrqEn=0, IRQThreshold=1, IRQDelay=0),
  S2MM_DMASR = Register(Halted=0, Idle=1, SGIncld=0, DMAIntErr=0, DMASlvErr=0, DMADecErr=0, SGIntErr=0, SG

In [18]:
print("Input buffer address   :", hex(input_buffer.physical_address))
print("Output buffer address  :", hex(output_buffer.physical_address))
print("---")
print("DMA Source address     :", hex(dma.register_map.MM2S_SA.Source_Address))
print("DMA Destination address:", hex(dma.register_map.S2MM_DA.Destination_Address))

Input buffer address   : 0x78210000
Output buffer address  : 0x78212000
---
DMA Source address     : 0x78210000
DMA Destination address: 0x78212000


Async DMA Test

In [19]:
async def test_dma_interrupts():  
    # Allocate buffers  
    input_buffer = allocate(shape=(1024,), dtype=np.uint32)  
    output_buffer = allocate(shape=(1024,), dtype=np.uint32)  
      
    # Fill input  
    input_buffer[:] = range(1024)  
      
    # Start transfers  
    dma.sendchannel.transfer(input_buffer)  
    dma.recvchannel.transfer(output_buffer)  
      
    # Wait asynchronously  
    await dma.sendchannel.wait_async()  
    await dma.recvchannel.wait_async()  
      
    print("Async DMA transfer completed successfully!")  

asyncio.run(test_dma_interrupts())

Async DMA transfer completed successfully!


Clean up

In [20]:
del input_buffer, output_buffer