Skip to content

Commit

Permalink
Merge pull request #794 from EdwardSro/pr-rdmacm
Browse files Browse the repository at this point in the history
pyverbs: Extend rdmacm support in Pyverbs
  • Loading branch information
rleon committed Aug 3, 2020
2 parents 3b4690c + e62c1b3 commit 2c0a469
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 19 deletions.
2 changes: 2 additions & 0 deletions pyverbs/cmid.pxd
Expand Up @@ -12,6 +12,8 @@ cdef class CMID(PyverbsCM):
cdef object event_channel
cdef object ctx
cdef object pd
cdef object mrs
cdef add_ref(self, obj)
cpdef close(self)


Expand Down
64 changes: 62 additions & 2 deletions pyverbs/cmid.pyx
@@ -1,9 +1,11 @@
from libc.stdint cimport uintptr_t
from libc.string cimport memset
import weakref

from pyverbs.pyverbs_error import PyverbsUserError
from pyverbs.pyverbs_error import PyverbsUserError, PyverbsError
from pyverbs.qp cimport QPInitAttr, QPAttr, ECE
from pyverbs.base import PyverbsRDMAErrno
from pyverbs.base cimport close_weakrefs
cimport pyverbs.libibverbs_enums as e
cimport pyverbs.librdmacm_enums as ce
from pyverbs.addr cimport AH, AHAttr
Expand Down Expand Up @@ -306,6 +308,7 @@ cdef class CMID(PyverbsCM):
self.pd = None
self.ctx = None
self.event_channel = None
self.mrs = weakref.WeakSet()
if creator is None:
return
elif isinstance(creator, AddrInfo):
Expand Down Expand Up @@ -338,6 +341,12 @@ cdef class CMID(PyverbsCM):
raise PyverbsRDMAErrno('Cannot create CM ID from {obj}'
.format(obj=type(creator)))

cdef add_ref(self, obj):
if isinstance(obj, MR):
self.mrs.add(obj)
else:
raise PyverbsError('Unrecognized object type')

@property
def event_channel(self):
return self.event_channel
Expand Down Expand Up @@ -374,6 +383,7 @@ cdef class CMID(PyverbsCM):
(<Context>self.ctx).context = NULL
if self.pd:
(<PD>self.pd).pd = NULL
close_weakrefs([self.mrs])
self.id = NULL

def get_request(self):
Expand Down Expand Up @@ -611,7 +621,25 @@ cdef class CMID(PyverbsCM):
:param size: The total length of the memory to register
:return: registered MR
"""
return MR(self.pd, size, e.IBV_ACCESS_LOCAL_WRITE)
return MR(self, size, e.IBV_ACCESS_LOCAL_WRITE)

def reg_read(self, size=0):
"""
Registers a memory region for sending or receiving messages or for
remote read operations.
:param size: The total length of the memory to register
:return: registered MR
"""
return MR(self, size, e.IBV_ACCESS_REMOTE_READ)

def reg_write(self, size=0):
"""
Registers a memory region for sending or receiving messages or for
remote write operations.
:param size: The total length of the memory to register
:return: registered MR
"""
return MR(self, size, e.IBV_ACCESS_REMOTE_WRITE)

def post_recv(self, MR mr not None, length=None):
"""
Expand Down Expand Up @@ -643,6 +671,38 @@ cdef class CMID(PyverbsCM):
if ret != 0:
raise PyverbsRDMAErrno('Failed to Post Send')

def post_read(self, MR mr not None, length, remote_addr, rkey,
flags=0):
"""
Post read WR using the CMIDs internal QP.
:param mr: A valid MR object.
:param length: length of buffer to send.
:param remote_addr: The remote MR address.
:param rkey: The remote MR rkey.
:param flags: flags for send work request.
:return: None
"""
ret = cm.rdma_post_read(self.id, NULL, mr.buf, length, mr.mr,
flags, remote_addr, rkey)
if ret != 0:
raise PyverbsRDMAErrno('Failed to Post Read')

def post_write(self, MR mr not None, length, remote_addr, rkey,
flags=0):
"""
Post write WR using the CMIDs internal QP.
:param mr: A valid MR object.
:param length: length of buffer to send.
:param remote_addr: The remote MR address.
:param rkey: The remote MR rkey.
:param flags: flags for send work request.
:return: None
"""
ret = cm.rdma_post_write(self.id, NULL, mr.buf, length, mr.mr,
flags, remote_addr, rkey)
if ret != 0:
raise PyverbsRDMAErrno('Failed to Post Write')

def post_ud_send(self, MR mr not None, AH ah not None, rqpn=0,
flags=v.IBV_SEND_SIGNALED, length=None):
"""
Expand Down
38 changes: 34 additions & 4 deletions pyverbs/device.pyx
Expand Up @@ -15,6 +15,7 @@ from pyverbs.base import PyverbsRDMAErrno
from pyverbs.base cimport close_weakrefs
cimport pyverbs.libibverbs_enums as e
cimport pyverbs.libibverbs as v
cimport pyverbs.librdmacm as cm
from pyverbs.cmid cimport CMID
from pyverbs.xrcd cimport XRCD
from pyverbs.addr cimport GID
Expand All @@ -35,11 +36,12 @@ class Device(PyverbsObject):
It is not a part of objects creation order - there's no need for the user
to create it for such purposes.
"""
def __init__(self, name, guid, node_type, transport_type):
def __init__(self, name, guid, node_type, transport_type, index):
self._node_type = node_type
self._transport_type = transport_type
self._name = name
self._guid = guid
self._index = index

@property
def name(self):
Expand All @@ -57,12 +59,16 @@ class Device(PyverbsObject):
def guid(self):
return self._guid

@property
def index(self):
return self._index

def __str__(self):
return 'Device {dev}, node type {ntype}, transport type {ttype},' \
' guid {guid}'.format(dev=self.name.decode(),
' guid {guid}, index {index}'.format(dev=self.name.decode(),
ntype=translate_node_type(self.node_type),
ttype=translate_transport_type(self.transport_type),
guid=guid_to_hex(self.guid))
guid=guid_to_hex(self.guid), index=self._index)


cdef class Context(PyverbsCM):
Expand Down Expand Up @@ -970,6 +976,7 @@ def get_device_list():
device node type
device transport type
device guid
device index
"""
cdef int count = 0;
cdef v.ibv_device **dev_list;
Expand All @@ -983,7 +990,30 @@ def get_device_list():
node = dev_list[i].node_type
transport = dev_list[i].transport_type
guid = be64toh(v.ibv_get_device_guid(dev_list[i]))
devices.append(Device(name, guid, node, transport))
index = v.ibv_get_device_index(dev_list[i])
devices.append(Device(name, guid, node, transport, index))
finally:
v.ibv_free_device_list(dev_list)
return devices


def rdma_get_devices():
"""
Get the RDMA devices.
:return: list of Device objects.
"""
cdef int count
cdef v.ibv_context **ctx_list
ctx_list = cm.rdma_get_devices(&count)
if ctx_list == NULL:
raise PyverbsRDMAErrno('Failed to get device list')
devices = []
for i in range(count):
name = ctx_list[i].device.name
node = ctx_list[i].device.node_type
transport = ctx_list[i].device.transport_type
guid = be64toh(v.ibv_get_device_guid(ctx_list[i].device))
index = v.ibv_get_device_index(ctx_list[i].device)
devices.append(Device(name, guid, node, transport, index))
cm.rdma_free_devices(ctx_list)
return devices
1 change: 1 addition & 0 deletions pyverbs/libibverbs.pxd
Expand Up @@ -481,6 +481,7 @@ cdef extern from 'infiniband/verbs.h':
uint32_t comp_mask

ibv_device **ibv_get_device_list(int *n)
int ibv_get_device_index(ibv_device *device);
void ibv_free_device_list(ibv_device **list)
ibv_context *ibv_open_device(ibv_device *device)
int ibv_close_device(ibv_context *context)
Expand Down
10 changes: 10 additions & 0 deletions pyverbs/librdmacm.pxd
Expand Up @@ -93,6 +93,8 @@ cdef extern from '<rdma/rdma_cma.h>':

rdma_event_channel *rdma_create_event_channel()
void rdma_destroy_event_channel(rdma_event_channel *channel)
ibv_context **rdma_get_devices(int *num_devices)
void rdma_free_devices (ibv_context **list);
int rdma_get_cm_event(rdma_event_channel *channel, rdma_cm_event **event)
int rdma_ack_cm_event(rdma_cm_event *event)
char *rdma_event_str(rdma_cm_event_type event)
Expand Down Expand Up @@ -135,7 +137,15 @@ cdef extern from '<rdma/rdma_verbs.h>':
int rdma_post_ud_send(rdma_cm_id *id, void *context, void *addr,
size_t length, ibv_mr *mr, int flags, ibv_ah *ah,
uint32_t remote_qpn)
int rdma_post_read(rdma_cm_id *id, void *context, void *addr,
size_t length, ibv_mr *mr, int flags,
uint64_t remote_addr, uint32_t rkey)
int rdma_post_write(rdma_cm_id *id, void *context, void *addr,
size_t length, ibv_mr *mr, int flags,
uint64_t remote_addr, uint32_t rkey)
int rdma_get_send_comp(rdma_cm_id *id, ibv_wc *wc)
int rdma_get_recv_comp(rdma_cm_id *id, ibv_wc *wc)
ibv_mr *rdma_reg_msgs(rdma_cm_id *id, void *addr, size_t length)
ibv_mr *rdma_reg_read(rdma_cm_id *id, void *addr, size_t length)
ibv_mr *rdma_reg_write(rdma_cm_id *id, void *addr, size_t length)
int rdma_dereg_mr(ibv_mr *mr)
2 changes: 2 additions & 0 deletions pyverbs/mr.pxd
Expand Up @@ -4,11 +4,13 @@
#cython: language_level=3

from pyverbs.base cimport PyverbsCM
cimport pyverbs.librdmacm as cm
from . cimport libibverbs as v


cdef class MR(PyverbsCM):
cdef object pd
cdef object cmid
cdef v.ibv_mr *mr
cdef int mmap_length
cdef object is_huge
Expand Down
36 changes: 27 additions & 9 deletions pyverbs/mr.pyx
Expand Up @@ -15,6 +15,7 @@ from libc.string cimport memcpy, memset
cimport pyverbs.libibverbs_enums as e
from pyverbs.device cimport DM
from libc.stdlib cimport free
from .cmid cimport CMID
from .pd cimport PD

cdef extern from 'sys/mman.h':
Expand All @@ -28,12 +29,15 @@ cdef class MR(PyverbsCM):
MR class represents ibv_mr. Buffer allocation in done in the c'tor. Freeing
it is done in close().
"""
def __init__(self, PD pd not None, length=0, access=0, address=None,
def __init__(self, creator not None, length=0, access=0, address=None,
implicit=False, **kwargs):
"""
Allocate a user-level buffer of length <length> and register a Memory
Region of the given length and access flags.
:param pd: A PD object
:param creator: A PD/CMID object. In case of CMID is passed the MR will
be registered using rdma_reg_msgs/write/read according
to the passed access flag of local_write/remote_write or
remote_read respectively.
:param length: Length (in bytes) of MR's buffer.
:param access: Access flags, see ibv_access_flags enum
:param address: Memory address to register (Optional). If it's not
Expand All @@ -42,7 +46,7 @@ cdef class MR(PyverbsCM):
:param implicit: Implicit the MR address.
:param kwargs: Arguments:
* *handle*
A valid kernel handle for a MR object in the given PD.
A valid kernel handle for a MR object in the given PD (creator).
If passed, the MR will be imported and associated with the
context that is associated with the given PD using ibv_import_mr.
:return: The newly created MR on success
Expand All @@ -60,6 +64,7 @@ cdef class MR(PyverbsCM):
mr_handle = kwargs.get('handle')
# If a MR handle is passed import MR and finish
if mr_handle is not None:
pd = <PD>creator
self.mr = v.ibv_import_mr(pd.pd, mr_handle)
if self.mr == NULL:
raise PyverbsRDMAErrno('Failed to import MR')
Expand All @@ -85,15 +90,27 @@ cdef class MR(PyverbsCM):
raise PyverbsError('Failed to allocate MR buffer of size {l}'.
format(l=length))
memset(self.buf, 0, length)
if implicit:
self.mr = v.ibv_reg_mr(<v.ibv_pd*>pd.pd, NULL, SIZE_MAX, access)
else:
self.mr = v.ibv_reg_mr(<v.ibv_pd*>pd.pd, self.buf, length, access)
if isinstance(creator, PD):
pd = <PD>creator
if implicit:
self.mr = v.ibv_reg_mr(pd.pd, NULL, SIZE_MAX, access)
else:
self.mr = v.ibv_reg_mr(pd.pd, self.buf, length, access)
self.pd = pd
pd.add_ref(self)
elif isinstance(creator, CMID):
cmid = <CMID>creator
if access == e.IBV_ACCESS_LOCAL_WRITE:
self.mr = cm.rdma_reg_msgs(cmid.id, self.buf, length)
elif access == e.IBV_ACCESS_REMOTE_WRITE:
self.mr = cm.rdma_reg_write(cmid.id, self.buf, length)
elif access == e.IBV_ACCESS_REMOTE_READ:
self.mr = cm.rdma_reg_read(cmid.id, self.buf, length)
self.cmid = cmid
cmid.add_ref(self)
if self.mr == NULL:
raise PyverbsRDMAErrno('Failed to register a MR. length: {l}, access flags: {a}'.
format(l=length, a=access))
self.pd = pd
pd.add_ref(self)
self.logger.debug('Registered ibv_mr. Length: {l}, access flags {a}'.
format(l=length, a=access))

Expand Down Expand Up @@ -128,6 +145,7 @@ cdef class MR(PyverbsCM):
self.mr = NULL
self.pd = None
self.buf = NULL
self.cmid = None

def write(self, data, length, offset=0):
"""
Expand Down
9 changes: 5 additions & 4 deletions tests/base_rdmacm.py
Expand Up @@ -37,6 +37,7 @@ def __init__(self, addr=None, passive=None, **kwargs):
self.with_ext_qp = kwargs.get('with_ext_qp', False)
self.port = kwargs.get('port') if kwargs.get('port') else '7471'
self.port_space = kwargs.get('port_space', ce.RDMA_PS_TCP)
self.remote_operation = kwargs.get('remote_op')
self.qp_type = qp_type_per_ps[self.port_space]
self.qp_init_attr = QPInitAttr(qp_type=self.qp_type, cap=QPCap())
self.connected = False
Expand All @@ -60,10 +61,10 @@ def __init__(self, addr=None, passive=None, **kwargs):
port_space=self.port_space)

def create_mr(self):
if self.passive:
self.mr = self.child_id.reg_msgs(self.msg_size + GRH_SIZE)
else:
self.mr = self.cmid.reg_msgs(self.msg_size + GRH_SIZE)
cmid = self.child_id if self.passive else self.cmid
mr_remote_function = {None: cmid.reg_msgs, 'read': cmid.reg_read,
'write': cmid.reg_write}
self.mr = mr_remote_function[self.remote_operation](self.msg_size + GRH_SIZE)

def create_event_channel(self):
self.channel = CMEventChannel()
Expand Down
33 changes: 33 additions & 0 deletions tests/rdmacm_utils.py
Expand Up @@ -55,6 +55,39 @@ def rdmacm_traffic(self, server=None, multicast=False):
else:
self._cmid_client_traffic(multicast)

def remote_traffic(self, passive, remote_op='write'):
"""
Run rdmacm remote traffic. This method runs RDMA remote traffic from
the active to the passive.
:param passive: If True, run as server.
:param remote_op: 'write'/'read', The type of the RDMA remote operation.
"""
msg_size = self.cm_res.msg_size
if passive:
self.cm_res.mr.write((msg_size) * 's', msg_size)
mr_details = (self.cm_res.mr.rkey, self.cm_res.mr.buf)
self.notifier.put(mr_details)
self.syncer.wait()
self.syncer.wait()
if remote_op == 'write':
msg_received = self.cm_res.mr.read(msg_size, 0)
validate(msg_received, True, msg_size)
else:
self.cm_res.mr.write((msg_size) * 'c', msg_size)
self.syncer.wait()
rkey, remote_addr = self.notifier.get()
cmid = self.cm_res.cmid
post_func = cmid.post_write if remote_op == 'write' else \
cmid.post_read
for _ in range(self.cm_res.num_msgs):
post_func(self.cm_res.mr, msg_size, remote_addr, rkey,
flags=e.IBV_SEND_SIGNALED)
cmid.get_send_comp()
self.syncer.wait()
if remote_op == 'read':
msg_received = self.cm_res.mr.read(msg_size, 0)
validate(msg_received, False, msg_size)

def _ext_qp_server_traffic(self):
"""
RDMACM server side traffic function which sends and receives a message,
Expand Down

0 comments on commit 2c0a469

Please sign in to comment.