Skip to content

Commit

Permalink
tests: Test RSS traffic
Browse files Browse the repository at this point in the history
Add a test that sends raw traffic to an RSS QP.

Signed-off-by: Shachar Kagan <skagan@nvidia.com>
Signed-off-by: Edward Srouji <edwards@nvidia.com>
  • Loading branch information
ShacharKagan authored and EdwardSro committed Feb 23, 2022
1 parent 64a8525 commit 9459b91
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 4 deletions.
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ rdma_python_test(tests
test_qpex.py
test_rdmacm.py
test_relaxed_ordering.py
test_rss_traffic.py
test_shared_pd.py
utils.py
)
Expand Down
5 changes: 4 additions & 1 deletion tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@ def create_flow(self, specs=[]):
for spec in specs:
flow_attr.specs.append(spec)
try:
flow = Flow(self.qp, flow_attr)
flow = self._create_flow(flow_attr)
except PyverbsRDMAError as ex:
if ex.error_code == errno.EOPNOTSUPP:
raise unittest.SkipTest('Flow creation is not supported')
raise ex
return flow

def _create_flow(self, flow_attr):
return Flow(self.qp, flow_attr)


class FlowTest(RDMATestCase):
"""
Expand Down
148 changes: 148 additions & 0 deletions tests/test_rss_traffic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import unittest
import random
import errno

from pyverbs.wq import WQInitAttr, WQAttr, WQ, RwqIndTableInitAttr, RwqIndTable, RxHashConf
from tests.utils import requires_root_on_eth, PacketConsts
from tests.base import RDMATestCase, PyverbsRDMAError, MLNX_VENDOR_ID, \
CX3_MLNX_PART_ID, CX3Pro_MLNX_PART_ID
from pyverbs.qp import QPInitAttrEx, QPEx
from tests.test_flow import FlowRes
from pyverbs.flow import Flow
from pyverbs.cq import CQ
import pyverbs.enums as e
import tests.utils as u


WRS_PER_ROUND = 512
CQS_NUM = 2
TOEPLITZ_KEY_LEN = 40
HASH_KEY = [0x2c, 0xc6, 0x81, 0xd1, 0x5b, 0xdb, 0xf4, 0xf7,
0xfc, 0xa2, 0x83, 0x19, 0xdb, 0x1a, 0x3e, 0x94,
0x6b, 0x9e, 0x38, 0xd9, 0x2c, 0x9c, 0x03, 0xd1,
0xad, 0x99, 0x44, 0xa7, 0xd9, 0x56, 0x3d, 0x59,
0x06, 0x3c, 0x25, 0xf3, 0xfc, 0x1f, 0xdc, 0x2a]


def requires_indirection_table_support(func):
def wrapper(instance):
dev_attrs = instance.ctx.query_device()
vendor_id = dev_attrs.vendor_id
vendor_pid = dev_attrs.vendor_part_id
if vendor_id == MLNX_VENDOR_ID and vendor_pid in [CX3_MLNX_PART_ID,
CX3Pro_MLNX_PART_ID]:
raise unittest.SkipTest('WQN must be aligned with the Indirection Table size in CX3')
return func(instance)
return wrapper


class RssRes(FlowRes):
def __init__(self, dev_name, ib_port, gid_index, log_ind_tbl_size=3):
"""
Initialize rss resources based on Flow resources that include RSS Raw QP.
:param dev_name: Device name to be used
:param ib_port: IB port of the device to use
:param gid_index: Which GID index to use
"""
self.log_ind_tbl_size = log_ind_tbl_size
self.wqs = []
self.cqs = []
self.ind_table = None
super().__init__(dev_name=dev_name, ib_port=ib_port,
gid_index=gid_index)

def create_cq(self):
self.cqs = [CQ(self.ctx, WRS_PER_ROUND) for _ in range(CQS_NUM)]

@requires_root_on_eth()
def create_qps(self):
"""
Initializes self.qps with RSS QPs.
:return: None
"""
qp_init_attr = self.create_qp_init_attr()
for _ in range(self.qp_count):
try:
qp = QPEx(self.ctx, qp_init_attr)
self.qps.append(qp)
self.qps_num.append(qp.qp_num)
self.psns.append(random.getrandbits(24))
except PyverbsRDMAError as ex:
if ex.error_code == errno.EOPNOTSUPP:
raise unittest.SkipTest(f'Create QPEx type {qp_init_attr.qp_type} is not '
'supported')
raise ex

def create_qp_init_attr(self):
self.create_ind_table()
mask = e.IBV_QP_INIT_ATTR_CREATE_FLAGS | e.IBV_QP_INIT_ATTR_PD | \
e.IBV_QP_INIT_ATTR_RX_HASH | e.IBV_QP_INIT_ATTR_IND_TABLE
return QPInitAttrEx(qp_type=e.IBV_QPT_RAW_PACKET, comp_mask=mask, pd=self.pd,
hash_conf=self.hash_conf, ind_table=self.ind_tbl)

@requires_indirection_table_support
def create_ind_table(self):
self.ind_tbl = RwqIndTable(self.ctx, self.initiate_table_attr())
self.hash_conf = self.init_rx_hash_config()

def initiate_table_attr(self):
self.create_wqs()
return RwqIndTableInitAttr(self.log_ind_tbl_size, self.wqs)

def create_wqs(self):
wqias = [self.initiate_wq_attr(cq) for cq in self.cqs]
for i in range(1 << self.log_ind_tbl_size):
wq = WQ(self.ctx, wqias[i % CQS_NUM])
wq.modify(WQAttr(attr_mask=e.IBV_WQ_ATTR_STATE, wq_state=e.IBV_WQS_RDY))
self.wqs.append(wq)
return self.wqs

def initiate_wq_attr(self, cq):
return WQInitAttr(wq_context=None, wq_pd=self.pd, wq_cq=cq, wq_type=e.IBV_WQT_RQ,
max_wr=WRS_PER_ROUND, max_sge=self.ctx.query_device().max_sge,
comp_mask=0, create_flags=0)

def init_rx_hash_config(self):
return RxHashConf(rx_hash_function=e.IBV_RX_HASH_FUNC_TOEPLITZ,
rx_hash_key_len=len(HASH_KEY),
rx_hash_key=HASH_KEY,
rx_hash_fields_mask=e.IBV_RX_HASH_DST_IPV4 | e.IBV_RX_HASH_SRC_IPV4)

def _create_flow(self, flow_attr):
return [Flow(qp, flow_attr) for qp in self.qps]


class RSSTrafficTest(RDMATestCase):
"""
Test various functionalities of the RSS QPs.
"""
def setUp(self):
super().setUp()
self.iters = 1
self.server = None
self.client = None

def create_players(self):
"""
Init RSS tests resources.
RSS-QP can recive traffic only, so client will be based on Flow tests resources.
"""
self.client = FlowRes(**self.dev_info)
self.server = RssRes(**self.dev_info)

def flow_traffic(self, specs, l3=PacketConsts.IP_V4,
l4=PacketConsts.UDP_PROTO):
"""
Execute raw ethernet traffic with given specs flow.
:param specs: List of flow specs to match on the QP
:param l3: Packet layer 3 type: 4 for IPv4 or 6 for IPv6
:param l4: Packet layer 4 type: 'tcp' or 'udp'
:return: None
"""
self.flows = self.server.create_flow(specs)
u.raw_rss_traffic(self.client, self.server, self.iters, l3, l4,
num_packets=32)

def test_rss_traffic(self):
self.create_players()
self.flow_traffic([self.server.create_eth_spec()])
58 changes: 55 additions & 3 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from tests.base import XRCResources, DCT_KEY
from tests.efa_base import SRDResources
from pyverbs.wr import SGE, SendWR, RecvWR
from pyverbs.qp import QPCap, QPInitAttr, QPInitAttrEx, QPAttr
from pyverbs.qp import QPCap, QPInitAttr, QPInitAttrEx, QPAttr, QPEx
from tests.mlx5_base import Mlx5DcResources, Mlx5DcStreamsRes
from pyverbs.base import PyverbsRDMAErrno
from pyverbs.cq import PollCqAttr, CQEX
Expand Down Expand Up @@ -526,7 +526,11 @@ def post_recv(agr_obj, recv_wr, qp_idx=0 ,num_wqes=1):
"""
receive_queue = agr_obj.srq if agr_obj.srq else agr_obj.qps[qp_idx]
for _ in range(num_wqes):
receive_queue.post_recv(recv_wr, None)
if isinstance(receive_queue, QPEx) and receive_queue.ind_table:
for wq in receive_queue.ind_table.wqs:
wq.post_recv(recv_wr, None)
else:
receive_queue.post_recv(recv_wr, None)


def poll_cq(cq, count=1, data=None):
Expand Down Expand Up @@ -760,6 +764,8 @@ def gen_packet(msg_size, l3=PacketConsts.IP_V4, l4=PacketConsts.UDP_PROTO, with_
:param kwargs: Arguments:
* *src_mac*
Source MAC address to use in the packet.
* *src_ipv4*
Source IPv4 address to use in the packet.
:return: packet
"""
l3_header_size = getattr(PacketConsts, f'IPV{str(l3)}_HEADER_SIZE')
Expand All @@ -786,11 +792,12 @@ def gen_packet(msg_size, l3=PacketConsts.IP_V4, l4=PacketConsts.UDP_PROTO, with_

if l3 == PacketConsts.IP_V4:
# IPv4 header
src_ipv4 = kwargs.get('src_ipv4', PacketConsts.SRC_IP)
packet += struct.pack('!2B3H2BH4s4s', (PacketConsts.IP_V4 << 4) +
PacketConsts.IHL, 0, ip_total_len, 0,
PacketConsts.IP_V4_FLAGS << 13,
PacketConsts.TTL_HOP_LIMIT, next_hdr, 0,
socket.inet_aton(PacketConsts.SRC_IP),
socket.inet_aton(src_ipv4),
socket.inet_aton(PacketConsts.DST_IP))
else:
# IPv6 header
Expand Down Expand Up @@ -879,6 +886,51 @@ def raw_traffic(client, server, iters, l3=PacketConsts.IP_V4,
expected_packet if expected_packet else msg, skip_idxs)


def raw_rss_traffic(client, server, iters, l3=PacketConsts.IP_V4,
l4=PacketConsts.UDP_PROTO, with_vlan=False, num_packets=1):
"""
Runs raw ethernet rss traffic between two sides.
:param client: client side, clients base class is BaseTraffic
:param server: server side, servers base class is BaseTraffic
:param iters: number of traffic iterations
:param l3: Packet layer 3 type: 4 for IPv4 or 6 for IPv6
:param l4: Packet layer 4 type: 'tcp' or 'udp'
:param with_vlan: if True add VLAN header to the packet
:param num_packets: Number of packets to send with different ipv4 src
address in each iteration.
:return: None
"""
s_recv_wr = get_recv_wr(server)
for qp_idx in range(server.qp_count):
# prepare the receive queue with RecvWR
post_recv(server, s_recv_wr, qp_idx=qp_idx, num_wqes=num_packets)
for _ in range(iters):
for qp_idx in range(server.qp_count):
for i in range(num_packets):
c_send_wr, c_sg, msg = get_send_elements_raw_qp(
client, l3, l4, with_vlan,
src_ipv4='.'.join([str(num) for num in range(i, i + 4)]))
send(client, c_send_wr, e.IBV_WR_SEND, False, qp_idx)
poll_cq(client.cq)
completions = 0
start_poll_t = time.perf_counter()
while completions < num_packets and \
(time.perf_counter() - start_poll_t < POLL_CQ_TIMEOUT):
for cq in server.cqs:
n, wcs = cq.poll()
if n > 0:
if wcs[0].status != e.IBV_WC_SUCCESS:
raise PyverbsRDMAError(
f'Completion status is {wc_status_to_str(wcs[0].status)}',
wcs[0].status)
completions += 1
if completions >= num_packets:
break
if completions < num_packets:
raise PyverbsError(f'Expected {num_packets} completions - got {completions}')
post_recv(server, s_recv_wr, qp_idx=qp_idx, num_wqes=num_packets)


def rdma_traffic(client, server, iters, gid_idx, port, new_send=False,
send_op=None):
"""
Expand Down

0 comments on commit 9459b91

Please sign in to comment.