Skip to content

Commit

Permalink
Merge pull request #782 from EdwardSro/pr-odp-additions
Browse files Browse the repository at this point in the history
pyverbs: ODP refactoring and extensions
  • Loading branch information
rleon committed Jul 20, 2020
2 parents bed45e6 + ccce439 commit 8a477f6
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 60 deletions.
36 changes: 24 additions & 12 deletions pyverbs/mr.pyx
Expand Up @@ -6,12 +6,13 @@ import logging

from posix.mman cimport mmap, munmap, MAP_PRIVATE, PROT_READ, PROT_WRITE, \
MAP_ANONYMOUS, MAP_HUGETLB
from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError
from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError, \
PyverbsUserError
from libc.stdint cimport uintptr_t, SIZE_MAX
from pyverbs.base import PyverbsRDMAErrno
from posix.stdlib cimport posix_memalign
from libc.string cimport memcpy, memset
cimport pyverbs.libibverbs_enums as e
from libc.stdint cimport uintptr_t
from pyverbs.device cimport DM
from libc.stdlib cimport free
from .pd cimport PD
Expand All @@ -27,16 +28,18 @@ cdef class MR(PyverbsCM):
MR class represents ibv_mr. Buffer allocation in done in the c'tor. Freeing
it is done in close().
"""
def __init__(self, PD pd not None, length=0, access=0, address=None, **kwargs):
def __init__(self, PD pd not None, length=0, access=0, address=None,
implicit=False, **kwargs):
"""
Allocate a user-level buffer of length <length> and register a Memory
Region of the given length and access flags.
:param pd: A PD object
:param length: Length in bytes
:param length: Length (in bytes) of MR's buffer.
:param access: Access flags, see ibv_access_flags enum
:param address: Memory address to register (Optional). If it's not
provided, a memory will be allocated in the class
initialization.
:param implicit: Implicit the MR address.
:param kwargs: Arguments:
* *handle*
A valid kernel handle for a MR object in the given PD.
Expand All @@ -48,10 +51,6 @@ cdef class MR(PyverbsCM):
if self.mr != NULL:
return
self.is_huge = True if access & e.IBV_ACCESS_HUGETLB else False
# We want to enable registering an MR of size 0 but this fails with a
# buffer of size 0, so in this case lets increase the buffer
if length == 0:
length = 10
if address:
self.is_user_addr = True
# uintptr_t is guaranteed to be large enough to hold any pointer.
Expand All @@ -70,7 +69,7 @@ cdef class MR(PyverbsCM):
return

# Allocate a buffer
if not address:
if not address and length > 0:
if self.is_huge:
# Rounding up to multiple of HUGE_PAGE_SIZE
self.mmap_length = length + (HUGE_PAGE_SIZE - length % HUGE_PAGE_SIZE) \
Expand All @@ -86,7 +85,10 @@ cdef class MR(PyverbsCM):
raise PyverbsError('Failed to allocate MR buffer of size {l}'.
format(l=length))
memset(self.buf, 0, length)
self.mr = v.ibv_reg_mr(<v.ibv_pd*>pd.pd, self.buf, length, access)
if implicit:
self.mr = v.ibv_reg_mr(<v.ibv_pd*>pd.pd, NULL, SIZE_MAX, access)
else:
self.mr = v.ibv_reg_mr(<v.ibv_pd*>pd.pd, self.buf, length, access)
if self.mr == NULL:
raise PyverbsRDMAErrno('Failed to register a MR. length: {l}, access flags: {a}'.
format(l=length, a=access))
Expand Down Expand Up @@ -127,18 +129,23 @@ cdef class MR(PyverbsCM):
self.pd = None
self.buf = NULL

def write(self, data, length):
def write(self, data, length, offset=0):
"""
Write user data to the MR's buffer using memcpy
:param data: User data to write
:param length: Length of the data to write
:param offset: Writing offset
:return: None
"""
if not self.buf or length < 0:
raise PyverbsUserError('The MR buffer isn\'t allocated or length'
f' {length} is invalid')
# If data is a string, cast it to bytes as Python3 doesn't
# automatically convert it.
cdef int off = offset
if isinstance(data, str):
data = data.encode()
memcpy(self.buf, <char *>data, length)
memcpy(<char*>(self.buf + off), <char *>data, length)

cpdef read(self, length, offset):
"""
Expand All @@ -150,6 +157,11 @@ cdef class MR(PyverbsCM):
cdef char *data
cdef int off = offset # we can't use offset in the next line, as it is
# a Python object and not C
if offset < 0:
raise PyverbsUserError(f'Invalid offset {offset}')
if not self.buf or length < 0:
raise PyverbsUserError('The MR buffer isn\'t allocated or length'
f' {length} is invalid')
data = <char*>(self.buf + off)
return data[:length]

Expand Down
2 changes: 2 additions & 0 deletions tests/base.py
Expand Up @@ -184,6 +184,8 @@ def setUp(self):
raise unittest.SkipTest('No port is up, can\'t run traffic')
# Choose one combination and use it
self._select_config()
self.dev_info = {'dev_name': self.dev_name, 'ib_port': self.ib_port,
'gid_index': self.gid_index}

def _add_gids_per_port(self, ctx, dev, port):
# Don't add ports which are not active
Expand Down
96 changes: 58 additions & 38 deletions tests/test_odp.py
@@ -1,8 +1,9 @@
from pyverbs.mem_alloc import mmap, munmap, MAP_ANONYMOUS_, MAP_PRIVATE_, \
MAP_HUGETLB_
from tests.utils import requires_odp, requires_huge_pages, traffic, \
xrc_traffic, create_custom_mr
xrc_traffic, create_custom_mr, poll_cq, post_send, GRH_SIZE
from tests.base import RCResources, UDResources, XRCResources
from pyverbs.wr import SGE, SendWR, RecvWR
from tests.base import RDMATestCase
from pyverbs.mr import MR
import pyverbs.enums as e
Expand All @@ -12,15 +13,17 @@


class OdpUD(UDResources):
@requires_odp('ud')
@requires_odp('ud', e.IBV_ODP_SUPPORT_SEND)
def create_mr(self):
self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND,
self.msg_size + self.GRH_SIZE)
self.send_mr = MR(self.pd, self.msg_size + self.GRH_SIZE,
e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND)
self.recv_mr = MR(self.pd, self.msg_size + self.GRH_SIZE,
e.IBV_ACCESS_LOCAL_WRITE)


class OdpRC(RCResources):
def __init__(self, dev_name, ib_port, gid_index, is_huge=False,
user_addr=None, use_mr_prefetch=False):
user_addr=None, use_mr_prefetch=False, is_implicit=False):
"""
Initialize an OdpRC object.
:param dev_name: Device name to be used
Expand All @@ -30,23 +33,26 @@ def __init__(self, dev_name, ib_port, gid_index, is_huge=False,
:param is_huge: If True, use huge pages for MR registration
:param user_addr: The MR's buffer address. If None, the buffer will be
allocated by pyverbs.
:param is_implicit: If True, register implicit MR.
"""
self.is_huge = is_huge
self.user_addr = user_addr
self.is_implicit = is_implicit
super(OdpRC, self).__init__(dev_name=dev_name, ib_port=ib_port,
gid_index=gid_index)
self.use_mr_prefetch = use_mr_prefetch

@requires_odp('rc')
@requires_odp('rc', e.IBV_ODP_SUPPORT_SEND | e.IBV_ODP_SUPPORT_RECV)
def create_mr(self):
access = e.IBV_ACCESS_LOCAL_WRITE | e.IBV_ACCESS_ON_DEMAND
if self.is_huge:
access |= e.IBV_ACCESS_HUGETLB
self.mr = MR(self.pd, self.msg_size, access, address=self.user_addr)
self.mr = MR(self.pd, self.msg_size, access, address=self.user_addr,
implicit=self.is_implicit)


class OdpXRC(XRCResources):
@requires_odp('xrc')
@requires_odp('xrc', e.IBV_ODP_SUPPORT_SEND | e.IBV_ODP_SUPPORT_SRQ_RECV)
def create_mr(self):
self.mr = create_custom_mr(self, e.IBV_ACCESS_ON_DEMAND)

Expand All @@ -56,29 +62,22 @@ def setUp(self):
super(OdpTestCase, self).setUp()
self.iters = 100
self.user_addr = None
self.qp_dict = {'rc': OdpRC, 'ud': OdpUD, 'xrc': OdpXRC}

def create_players(self, qp_type, is_huge=False, use_mr_prefetch=False):
if qp_type == 'rc':
client = self.qp_dict[qp_type](self.dev_name, self.ib_port,
self.gid_index, is_huge=is_huge,
user_addr=self.user_addr,
use_mr_prefetch=use_mr_prefetch)
server = self.qp_dict[qp_type](self.dev_name, self.ib_port,
self.gid_index, is_huge=is_huge,
user_addr=self.user_addr,
use_mr_prefetch=use_mr_prefetch)
else:
client = self.qp_dict[qp_type](self.dev_name, self.ib_port,
self.gid_index)
server = self.qp_dict[qp_type](self.dev_name, self.ib_port,
self.gid_index)
if qp_type == 'xrc':
client.pre_run(server.psns, server.qps_num)
server.pre_run(client.psns, client.qps_num)
else:
client.pre_run(server.psn, server.qpn)
server.pre_run(client.psn, client.qpn)

def create_players(self, resource, **resource_arg):
"""
Init odp tests resources.
:param resource: The RDMA resources to use. A class of type
BaseResources.
:param resource_arg: Dict of args that specify the resource specific
attributes.
:return: The (client, server) resources.
"""
client = resource(**self.dev_info, **resource_arg)
server = resource(**self.dev_info, **resource_arg)
psn = 'psns' if resource == OdpXRC else 'psn'
qpn = 'qps_num' if resource == OdpXRC else 'qpn'
client.pre_run(getattr(server, psn), getattr(server, qpn))
server.pre_run(getattr(client, psn), getattr(client, qpn))
return client, server

def tearDown(self):
Expand All @@ -87,29 +86,50 @@ def tearDown(self):
super(OdpTestCase, self).tearDown()

def test_odp_rc_traffic(self):
client, server = self.create_players('rc')
client, server = self.create_players(OdpRC)
traffic(client, server, self.iters, self.gid_index, self.ib_port)

def test_odp_ud_traffic(self):
client, server = self.create_players('ud')
def test_odp_implicit_rc_traffic(self):
client, server = self.create_players(OdpRC, is_implicit=True)
traffic(client, server, self.iters, self.gid_index, self.ib_port)

def test_odp_ud_traffic(self):
client, server = self.create_players(OdpUD)
# Implement the traffic here because OdpUD uses two different MRs for
# send and recv.
recv_sge = SGE(server.recv_mr.buf, server.msg_size + GRH_SIZE,
server.recv_mr.lkey)
server_recv_wr = RecvWR(sg=[recv_sge], num_sge=1)
send_sge = SGE(client.send_mr.buf + GRH_SIZE, client.msg_size,
client.send_mr.lkey)
client_send_wr = SendWR(num_sge=1, sg=[send_sge])
for i in range(self.iters):
server.qp.post_recv(server_recv_wr)
post_send(client, client_send_wr, self.gid_index, self.ib_port)
poll_cq(client.cq)
poll_cq(server.cq)

def test_odp_xrc_traffic(self):
client, server = self.create_players('xrc')
client, server = self.create_players(OdpXRC)
xrc_traffic(client, server)

@requires_huge_pages()
def test_odp_rc_huge_traffic(self):
client, server = self.create_players('rc', is_huge=True)
client, server = self.create_players(OdpRC, is_huge=True)
traffic(client, server, self.iters, self.gid_index, self.ib_port)

@requires_huge_pages()
def test_odp_rc_huge_user_addr_traffic(self):
self.user_addr = mmap(length=HUGE_PAGE_SIZE,
flags=MAP_ANONYMOUS_| MAP_PRIVATE_| MAP_HUGETLB_)
client, server = self.create_players('rc', is_huge=True)
client, server = self.create_players(OdpRC, is_huge=True,
user_addr=self.user_addr)
traffic(client, server, self.iters, self.gid_index, self.ib_port)

def test_odp_prefetch_rc_traffic(self):
client, server = self.create_players('rc', use_mr_prefetch=True)
client, server = self.create_players(OdpRC, use_mr_prefetch=True)
traffic(client, server, self.iters, self.gid_index, self.ib_port)

def test_odp_implicit_prefetch_rc_traffic(self):
client, server = self.create_players(OdpRC, use_mr_prefetch=True, is_implicit=True)
traffic(client, server, self.iters, self.gid_index, self.ib_port)
30 changes: 20 additions & 10 deletions tests/utils.py
Expand Up @@ -630,10 +630,12 @@ def xrc_traffic(client, server, is_cq_ex=False, send_op=None):


# Decorators
def requires_odp(qp_type):
def requires_odp(qp_type, required_odp_caps):
def outer(func):
def inner(instance):
odp_supported(instance.ctx, qp_type)
odp_supported(instance.ctx, qp_type, required_odp_caps)
if getattr(instance, 'is_implicit', False):
odp_implicit_supported(instance.ctx)
return func(instance)
return inner
return outer
Expand All @@ -649,24 +651,32 @@ def inner(instance):
return outer


def odp_supported(ctx, qp_type):
def odp_supported(ctx, qp_type, required_odp_caps):
"""
Check device ODP capabilities, support only send/recv so far.
:param ctx: Device Context
:param qp_type: QP type ('rc', 'ud' or 'uc')
:param required_odp_caps: ODP Capability mask of specified device
:return: None
"""
odp_caps = ctx.query_device_ex().odp_caps
if odp_caps.general_caps == 0:
raise unittest.SkipTest('ODP is not supported - No ODP caps')
qp_odp_caps = getattr(odp_caps, '{}_odp_caps'.format(qp_type))
has_odp_send = qp_odp_caps & e.IBV_ODP_SUPPORT_SEND
has_odp_recv = qp_odp_caps & e.IBV_ODP_SUPPORT_SRQ_RECV if qp_type == 'xrc'\
else qp_odp_caps & e.IBV_ODP_SUPPORT_RECV
if has_odp_send == 0:
raise unittest.SkipTest('ODP is not supported - ODP send not supported')
if has_odp_recv == 0:
raise unittest.SkipTest('ODP is not supported - ODP recv not supported')
if required_odp_caps & qp_odp_caps != required_odp_caps:
raise unittest.SkipTest('ODP is not supported - ODP recv/send is not supported')


def odp_implicit_supported(ctx):
"""
Check device ODP implicit capability.
:param ctx: Device Context
:return: None
"""
odp_caps = ctx.query_device_ex().odp_caps
has_odp_implicit = odp_caps.general_caps & e.IBV_ODP_SUPPORT_IMPLICIT
if has_odp_implicit == 0:
raise unittest.SkipTest('ODP implicit is not supported')


def requires_huge_pages():
Expand Down

0 comments on commit 8a477f6

Please sign in to comment.