In [1]:
import numpy as np
import random
import time
import pprint

from pynq import PL
from pynq import Overlay
from pynq import Clocks
from pynq import allocate


PL.reset()

ol = Overlay('./design_1_wrapper.bit')

for k in ol.ip_dict:
    print(k)

Clocks.pl_clk0_mhz = 100
print(f"pl_clk0_mhz = {Clocks.pl_clk0_mhz}")

rx_dma
udp_engine_100g_ip
tx_dma
cmac_usplus_0
zynq_ultra_ps_e_0
pl_clk0_mhz = 100


In [2]:
cmac = ol.cmac_usplus_0

# Actually, you don't need to plug in the hardware
# cmac.internal_loopback = 1
# print(cmac.internal_loopback)

cmac.start()
cmac.getStats()

{'tx': {'packets': 0,
  'good_packets': 0,
  'bytes': 0,
  'good_bytes': 0,
  'packets_large': 0,
  'packets_small': 0,
  'bad_fcs': 0,
  'pause': 0,
  'user_pause': 0},
 'rx': {'packets': 0,
  'good_packets': 0,
  'bytes': 0,
  'good_bytes': 0,
  'packets_large': 0,
  'packets_small': 0,
  'packets_undersize': 0,
  'packets_fragmented': 0,
  'packets_oversize': 0,
  'packets_toolong': 0,
  'packets_jabber': 0,
  'bad_fcs': 0,
  'packets_bad_fcs': 0,
  'stomped_fcs': 0,
  'pause': 0,
  'user_pause': 0},
 'cycle_count': 101256848}

In [3]:
from udp_engine_control import *

############################################################################################################
# Configuring FPGA Connection
############################################################################################################
my_connection_manager_sw = connection_manager_sw()
my_udp_engine_controller = udp_engine_controller(ol.udp_engine_100g_ip, my_connection_manager_sw)

MY_DST_CONFIG_MAC   =   random.getrandbits(48)
MY_SRC_CONFIG_MAC   =   random.getrandbits(48)
MY_SRC_CONFIG_IP    =   random.getrandbits(32)
MY_SRC_CONFIG_PORT  =   random.getrandbits(16)


my_udp_engine_controller.configure(MY_SRC_CONFIG_MAC, MY_SRC_CONFIG_IP, MY_SRC_CONFIG_PORT, MY_DST_CONFIG_MAC)
print("INFO:     Configured UDP Engine with:")
print(f"MY_DST_CONFIG_MAC   = {hex(MY_DST_CONFIG_MAC)}")
print(f"MY_SRC_CONFIG_MAC   = {hex(MY_SRC_CONFIG_MAC)}")
print(f"MY_SRC_CONFIG_IP    = {hex(MY_SRC_CONFIG_IP)}")
print(f"MY_SRC_CONFIG_PORT  = {hex(MY_SRC_CONFIG_PORT)}")
my_udp_engine_controller.rx_internal_loopback_enable()

INFO:     Configured UDP Engine with:
MY_DST_CONFIG_MAC   = 0x37b4af1d7f75
MY_SRC_CONFIG_MAC   = 0xafc04a8b3308
MY_SRC_CONFIG_IP    = 0x4b3fa36e
MY_SRC_CONFIG_PORT  = 0x6242


In [4]:
############################################################################################################
# Filling Connection Manager
############################################################################################################
COLLISION_POOL = list(my_connection_manager_sw.generate_collision_entries())
print("SUCCESS:    Generated COLLISION_POOL")

NUM_WRITES     = 10
for _ in range(NUM_WRITES):
    pick = random.choice(COLLISION_POOL)
    bind = (random.random() < 0.9)

    if bind:
        if my_udp_engine_controller.bind_connection(pick['ip'], pick['port']):
            print(f"SUCCESS:  (bind)   ipAddr = {hex(pick['ip'])}, port = {hex(pick['port'])}")
    else:
        if my_udp_engine_controller.unbind_connection(pick['ip'], pick['port']):
            print(f"SUCCESS:  (unbind) ipAddr = {hex(pick['ip'])}, port = {hex(pick['port'])}")



SUCCESS:    Generated COLLISION_POOL
SUCCESS:  (bind)   ipAddr = 0x7e566d06, port = 0x1664
SUCCESS:  (bind)   ipAddr = 0xf2e3c170, port = 0x2b5
SUCCESS:  (bind)   ipAddr = 0x5c726638, port = 0xc044
SUCCESS:  (unbind) ipAddr = 0x6e3f1a07, port = 0xd7c0
SUCCESS:  (bind)   ipAddr = 0x10f4d730, port = 0x7100
SUCCESS:  (bind)   ipAddr = 0xdcef46a3, port = 0xe05f
SUCCESS:  (bind)   ipAddr = 0x9a4d14a3, port = 0x2433
SUCCESS:  (bind)   ipAddr = 0x7e566d06, port = 0x1664
SUCCESS:  (bind)   ipAddr = 0x89d32d57, port = 0xe077
SUCCESS:  (bind)   ipAddr = 0x69ca57a5, port = 0x9a65


In [7]:
############################################################################################################
# TX-RX Test
############################################################################################################

my_udp_model = udp_model(my_connection_manager_sw,
                            MY_DST_CONFIG_MAC,
                            MY_SRC_CONFIG_MAC,
                            MY_SRC_CONFIG_IP,
                            MY_SRC_CONFIG_PORT)

tx_dma = ol.tx_dma
rx_dma = ol.rx_dma

NUM_TESTS    = 100
for i in range(NUM_TESTS):
    print("---------------------------------------------------------------")
    print(f"INFO:     Test Idx = {i}:\n")
    print("---------------------------------------------------------------")

    # generate test
    test_unit = my_udp_model.generate_end_to_end_test()
    print(f"INFO:     Generated test unit with:")
    print(f"            connection_id = {test_unit['connection_id']}")
    print(f"            input_payload_bytes_length = {test_unit['input_payload_bytes_length']}")
    print(f"            output_packet_bytes_length = {test_unit['rx_output_packet_bytes_length']}")
    print(f"            dropped = {test_unit['dropped']}")
    
    #send through dma
    dma_buf_in = allocate(test_unit['input_payload_bytes_length'], dtype=np.uint8)
    dma_buf_in[:] = np.array(test_unit['input_payload_bytes'], dtype=np.uint8)
    tx_dma.sendchannel.transfer(dma_buf_in)
    tx_dma.sendchannel.wait()
    print("INFO:     packet sent through tx_dma")

    #receive response if expecting any
    if test_unit['dropped']==0:
        dma_buf_out = allocate(test_unit['rx_output_packet_bytes_length'], dtype=np.uint8)
        rx_dma.recvchannel.transfer(dma_buf_out)
        rx_dma.recvchannel.wait()
        print("INFO:     packet received through rx_dma")

        # validate
        if np.array_equal(dma_buf_out, test_unit['rx_output_packet_bytes']):
            print(f"SUCCESS: test {i}/{NUM_TESTS} passed")
        else:
            print(f"FAIL:    test {i}/{NUM_TESTS} failed")
            print(f"expected = {test_unit['rx_output_packet_bytes']}")
            print(f"actual = {dma_buf_out}")


---------------------------------------------------------------
INFO:     Test Idx = 0:

---------------------------------------------------------------
INFO:     Generated test unit with:
            connection_id = 42528
            input_payload_bytes_length = 90
            output_packet_bytes_length = 98
            dropped = 0
INFO:     packet sent through tx_dma
INFO:     packet received through rx_dma
SUCCESS: test 0/100 passed
---------------------------------------------------------------
INFO:     Test Idx = 1:

---------------------------------------------------------------
INFO:     Generated test unit with:
            connection_id = 173600
            input_payload_bytes_length = 154
            output_packet_bytes_length = 162
            dropped = 0
INFO:     packet sent through tx_dma
INFO:     packet received through rx_dma
SUCCESS: test 1/100 passed
---------------------------------------------------------------
INFO:     Test Idx = 2:

----------------------------

In [8]:
cmac.getStats()

{'tx': {'packets': 89,
  'good_packets': 0,
  'bytes': 15449,
  'good_bytes': 0,
  'packets_large': 0,
  'packets_small': 0,
  'bad_fcs': 89,
  'pause': 0,
  'user_pause': 0},
 'rx': {'packets': 89,
  'good_packets': 0,
  'bytes': 15449,
  'good_bytes': 0,
  'packets_large': 0,
  'packets_small': 0,
  'packets_undersize': 0,
  'packets_fragmented': 0,
  'packets_oversize': 0,
  'packets_toolong': 0,
  'packets_jabber': 0,
  'bad_fcs': 89,
  'packets_bad_fcs': 89,
  'stomped_fcs': 0,
  'pause': 0,
  'user_pause': 0},
 'cycle_count': 10674099493}