diff --git a/pyverbs/mr.pyx b/pyverbs/mr.pyx index da566cbc4..6838e8427 100644 --- a/pyverbs/mr.pyx +++ b/pyverbs/mr.pyx @@ -6,12 +6,13 @@ import logging from posix.mman cimport mmap, munmap, MAP_PRIVATE, PROT_READ, PROT_WRITE, \ MAP_ANONYMOUS, MAP_HUGETLB -from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError +from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError, \ + PyverbsUserError +from libc.stdint cimport uintptr_t, SIZE_MAX from pyverbs.base import PyverbsRDMAErrno from posix.stdlib cimport posix_memalign from libc.string cimport memcpy, memset cimport pyverbs.libibverbs_enums as e -from libc.stdint cimport uintptr_t from pyverbs.device cimport DM from libc.stdlib cimport free from .pd cimport PD @@ -27,16 +28,18 @@ cdef class MR(PyverbsCM): MR class represents ibv_mr. Buffer allocation in done in the c'tor. Freeing it is done in close(). """ - def __init__(self, PD pd not None, length=0, access=0, address=None, **kwargs): + def __init__(self, PD pd not None, length=0, access=0, address=None, + implicit=False, **kwargs): """ Allocate a user-level buffer of length and register a Memory Region of the given length and access flags. :param pd: A PD object - :param length: Length in bytes + :param length: Length (in bytes) of MR's buffer. :param access: Access flags, see ibv_access_flags enum :param address: Memory address to register (Optional). If it's not provided, a memory will be allocated in the class initialization. + :param implicit: Implicit the MR address. :param kwargs: Arguments: * *handle* A valid kernel handle for a MR object in the given PD. @@ -48,10 +51,6 @@ cdef class MR(PyverbsCM): if self.mr != NULL: return self.is_huge = True if access & e.IBV_ACCESS_HUGETLB else False - # We want to enable registering an MR of size 0 but this fails with a - # buffer of size 0, so in this case lets increase the buffer - if length == 0: - length = 10 if address: self.is_user_addr = True # uintptr_t is guaranteed to be large enough to hold any pointer. @@ -70,7 +69,7 @@ cdef class MR(PyverbsCM): return # Allocate a buffer - if not address: + if not address and length > 0: if self.is_huge: # Rounding up to multiple of HUGE_PAGE_SIZE self.mmap_length = length + (HUGE_PAGE_SIZE - length % HUGE_PAGE_SIZE) \ @@ -86,7 +85,10 @@ cdef class MR(PyverbsCM): raise PyverbsError('Failed to allocate MR buffer of size {l}'. format(l=length)) memset(self.buf, 0, length) - self.mr = v.ibv_reg_mr(pd.pd, self.buf, length, access) + if implicit: + self.mr = v.ibv_reg_mr(pd.pd, NULL, SIZE_MAX, access) + else: + self.mr = v.ibv_reg_mr(pd.pd, self.buf, length, access) if self.mr == NULL: raise PyverbsRDMAErrno('Failed to register a MR. length: {l}, access flags: {a}'. format(l=length, a=access)) @@ -127,18 +129,23 @@ cdef class MR(PyverbsCM): self.pd = None self.buf = NULL - def write(self, data, length): + def write(self, data, length, offset=0): """ Write user data to the MR's buffer using memcpy :param data: User data to write :param length: Length of the data to write + :param offset: Writing offset :return: None """ + if not self.buf or length < 0: + raise PyverbsUserError('The MR buffer isn\'t allocated or length' + f' {length} is invalid') # If data is a string, cast it to bytes as Python3 doesn't # automatically convert it. + cdef int off = offset if isinstance(data, str): data = data.encode() - memcpy(self.buf, data, length) + memcpy((self.buf + off), data, length) cpdef read(self, length, offset): """ @@ -150,6 +157,11 @@ cdef class MR(PyverbsCM): cdef char *data cdef int off = offset # we can't use offset in the next line, as it is # a Python object and not C + if offset < 0: + raise PyverbsUserError(f'Invalid offset {offset}') + if not self.buf or length < 0: + raise PyverbsUserError('The MR buffer isn\'t allocated or length' + f' {length} is invalid') data = (self.buf + off) return data[:length] diff --git a/tests/base.py b/tests/base.py index 431136266..d614804c1 100755 --- a/tests/base.py +++ b/tests/base.py @@ -184,6 +184,8 @@ def setUp(self): raise unittest.SkipTest('No port is up, can\'t run traffic') # Choose one combination and use it self._select_config() + self.dev_info = {'dev_name': self.dev_name, 'ib_port': self.ib_port, + 'gid_index': self.gid_index} def _add_gids_per_port(self, ctx, dev, port): # Don't add ports which are not active diff --git a/tests/test_odp.py b/tests/test_odp.py index bd7f338a4..b8ed79bc2 100755 --- a/tests/test_odp.py +++ b/tests/test_odp.py @@ -1,8 +1,9 @@ from pyverbs.mem_alloc import mmap, munmap, MAP_ANONYMOUS_, MAP_PRIVATE_, \ MAP_HUGETLB_ from tests.utils import requires_odp, requires_huge_pages, traffic, \ - xrc_traffic, create_custom_mr + xrc_traffic, create_custom_mr, poll_cq, post_send, GRH_SIZE from tests.base import RCResources, UDResources, XRCResources +from pyverbs.wr import SGE, SendWR, RecvWR from tests.base import RDMATestCase from pyverbs.mr import MR import pyverbs.enums as e @@ -12,15 +13,17 @@ class OdpUD(UDResources): - @requires_odp('ud') + @requires_odp('ud', e.IBV_ODP_SUPPORT_SEND) def create_mr(self): - self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND, - self.msg_size + self.GRH_SIZE) + self.send_mr = MR(self.pd, self.msg_size + self.GRH_SIZE, + e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND) + self.recv_mr = MR(self.pd, self.msg_size + self.GRH_SIZE, + e.IBV_ACCESS_LOCAL_WRITE) class OdpRC(RCResources): def __init__(self, dev_name, ib_port, gid_index, is_huge=False, - user_addr=None, use_mr_prefetch=False): + user_addr=None, use_mr_prefetch=False, is_implicit=False): """ Initialize an OdpRC object. :param dev_name: Device name to be used @@ -30,23 +33,26 @@ def __init__(self, dev_name, ib_port, gid_index, is_huge=False, :param is_huge: If True, use huge pages for MR registration :param user_addr: The MR's buffer address. If None, the buffer will be allocated by pyverbs. + :param is_implicit: If True, register implicit MR. """ self.is_huge = is_huge self.user_addr = user_addr + self.is_implicit = is_implicit super(OdpRC, self).__init__(dev_name=dev_name, ib_port=ib_port, gid_index=gid_index) self.use_mr_prefetch = use_mr_prefetch - @requires_odp('rc') + @requires_odp('rc', e.IBV_ODP_SUPPORT_SEND | e.IBV_ODP_SUPPORT_RECV) def create_mr(self): access = e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND if self.is_huge: access |= e.IBV_ACCESS_HUGETLB - self.mr = MR(self.pd, self.msg_size, access, address=self.user_addr) + self.mr = MR(self.pd, self.msg_size, access, address=self.user_addr, + implicit=self.is_implicit) class OdpXRC(XRCResources): - @requires_odp('xrc') + @requires_odp('xrc', e.IBV_ODP_SUPPORT_SEND | e.IBV_ODP_SUPPORT_SRQ_RECV) def create_mr(self): self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND) @@ -56,29 +62,22 @@ def setUp(self): super(OdpTestCase, self).setUp() self.iters = 100 self.user_addr = None - self.qp_dict = {'rc': OdpRC, 'ud': OdpUD, 'xrc': OdpXRC} - - def create_players(self, qp_type, is_huge=False, use_mr_prefetch=False): - if qp_type == 'rc': - client = self.qp_dict[qp_type](self.dev_name, self.ib_port, - self.gid_index, is_huge=is_huge, - user_addr=self.user_addr, - use_mr_prefetch=use_mr_prefetch) - server = self.qp_dict[qp_type](self.dev_name, self.ib_port, - self.gid_index, is_huge=is_huge, - user_addr=self.user_addr, - use_mr_prefetch=use_mr_prefetch) - else: - client = self.qp_dict[qp_type](self.dev_name, self.ib_port, - self.gid_index) - server = self.qp_dict[qp_type](self.dev_name, self.ib_port, - self.gid_index) - if qp_type == 'xrc': - client.pre_run(server.psns, server.qps_num) - server.pre_run(client.psns, client.qps_num) - else: - client.pre_run(server.psn, server.qpn) - server.pre_run(client.psn, client.qpn) + + def create_players(self, resource, **resource_arg): + """ + Init odp tests resources. + :param resource: The RDMA resources to use. A class of type + BaseResources. + :param resource_arg: Dict of args that specify the resource specific + attributes. + :return: The (client, server) resources. + """ + client = resource(**self.dev_info, **resource_arg) + server = resource(**self.dev_info, **resource_arg) + psn = 'psns' if resource == OdpXRC else 'psn' + qpn = 'qps_num' if resource == OdpXRC else 'qpn' + client.pre_run(getattr(server, psn), getattr(server, qpn)) + server.pre_run(getattr(client, psn), getattr(client, qpn)) return client, server def tearDown(self): @@ -87,29 +86,50 @@ def tearDown(self): super(OdpTestCase, self).tearDown() def test_odp_rc_traffic(self): - client, server = self.create_players('rc') + client, server = self.create_players(OdpRC) traffic(client, server, self.iters, self.gid_index, self.ib_port) - def test_odp_ud_traffic(self): - client, server = self.create_players('ud') + def test_odp_implicit_rc_traffic(self): + client, server = self.create_players(OdpRC, is_implicit=True) traffic(client, server, self.iters, self.gid_index, self.ib_port) + def test_odp_ud_traffic(self): + client, server = self.create_players(OdpUD) + # Implement the traffic here because OdpUD uses two different MRs for + # send and recv. + recv_sge = SGE(server.recv_mr.buf, server.msg_size + GRH_SIZE, + server.recv_mr.lkey) + server_recv_wr = RecvWR(sg=[recv_sge], num_sge=1) + send_sge = SGE(client.send_mr.buf + GRH_SIZE, client.msg_size, + client.send_mr.lkey) + client_send_wr = SendWR(num_sge=1, sg=[send_sge]) + for i in range(self.iters): + server.qp.post_recv(server_recv_wr) + post_send(client, client_send_wr, self.gid_index, self.ib_port) + poll_cq(client.cq) + poll_cq(server.cq) + def test_odp_xrc_traffic(self): - client, server = self.create_players('xrc') + client, server = self.create_players(OdpXRC) xrc_traffic(client, server) @requires_huge_pages() def test_odp_rc_huge_traffic(self): - client, server = self.create_players('rc', is_huge=True) + client, server = self.create_players(OdpRC, is_huge=True) traffic(client, server, self.iters, self.gid_index, self.ib_port) @requires_huge_pages() def test_odp_rc_huge_user_addr_traffic(self): self.user_addr = mmap(length=HUGE_PAGE_SIZE, flags=MAP_ANONYMOUS_| MAP_PRIVATE_| MAP_HUGETLB_) - client, server = self.create_players('rc', is_huge=True) + client, server = self.create_players(OdpRC, is_huge=True, + user_addr=self.user_addr) traffic(client, server, self.iters, self.gid_index, self.ib_port) def test_odp_prefetch_rc_traffic(self): - client, server = self.create_players('rc', use_mr_prefetch=True) + client, server = self.create_players(OdpRC, use_mr_prefetch=True) + traffic(client, server, self.iters, self.gid_index, self.ib_port) + + def test_odp_implicit_prefetch_rc_traffic(self): + client, server = self.create_players(OdpRC, use_mr_prefetch=True, is_implicit=True) traffic(client, server, self.iters, self.gid_index, self.ib_port) diff --git a/tests/utils.py b/tests/utils.py index 6094081c1..36793a367 100755 --- a/tests/utils.py +++ b/tests/utils.py @@ -630,10 +630,12 @@ def xrc_traffic(client, server, is_cq_ex=False, send_op=None): # Decorators -def requires_odp(qp_type): +def requires_odp(qp_type, required_odp_caps): def outer(func): def inner(instance): - odp_supported(instance.ctx, qp_type) + odp_supported(instance.ctx, qp_type, required_odp_caps) + if getattr(instance, 'is_implicit', False): + odp_implicit_supported(instance.ctx) return func(instance) return inner return outer @@ -649,24 +651,32 @@ def inner(instance): return outer -def odp_supported(ctx, qp_type): +def odp_supported(ctx, qp_type, required_odp_caps): """ Check device ODP capabilities, support only send/recv so far. :param ctx: Device Context :param qp_type: QP type ('rc', 'ud' or 'uc') + :param required_odp_caps: ODP Capability mask of specified device :return: None """ odp_caps = ctx.query_device_ex().odp_caps if odp_caps.general_caps == 0: raise unittest.SkipTest('ODP is not supported - No ODP caps') qp_odp_caps = getattr(odp_caps, '{}_odp_caps'.format(qp_type)) - has_odp_send = qp_odp_caps & e.IBV_ODP_SUPPORT_SEND - has_odp_recv = qp_odp_caps & e.IBV_ODP_SUPPORT_SRQ_RECV if qp_type == 'xrc'\ - else qp_odp_caps & e.IBV_ODP_SUPPORT_RECV - if has_odp_send == 0: - raise unittest.SkipTest('ODP is not supported - ODP send not supported') - if has_odp_recv == 0: - raise unittest.SkipTest('ODP is not supported - ODP recv not supported') + if required_odp_caps & qp_odp_caps != required_odp_caps: + raise unittest.SkipTest('ODP is not supported - ODP recv/send is not supported') + + +def odp_implicit_supported(ctx): + """ + Check device ODP implicit capability. + :param ctx: Device Context + :return: None + """ + odp_caps = ctx.query_device_ex().odp_caps + has_odp_implicit = odp_caps.general_caps & e.IBV_ODP_SUPPORT_IMPLICIT + if has_odp_implicit == 0: + raise unittest.SkipTest('ODP implicit is not supported') def requires_huge_pages():