Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GPU tests for DLPack support #59

Closed
wants to merge 9 commits into from
4 changes: 4 additions & 0 deletions src/mpi4py/MPI/asbuffer.pxi
Expand Up @@ -130,6 +130,7 @@ cdef int Py27_GetBuffer(object obj, Py_buffer *view, int flags) except -1:

#------------------------------------------------------------------------------

include "asdlpack.pxi"
include "asgpubuf.pxi"

cdef int PyMPI_GetBuffer(object obj, Py_buffer *view, int flags) except -1:
Expand All @@ -138,6 +139,9 @@ cdef int PyMPI_GetBuffer(object obj, Py_buffer *view, int flags) except -1:
if PY2: return Py27_GetBuffer(obj, view, flags)
return PyObject_GetBuffer(obj, view, flags)
except BaseException:
try: return Py_GetDLPackBuffer(obj, view, flags)
except NotImplementedError: pass
except BaseException: raise
try: return Py_GetGPUBuffer(obj, view, flags)
except NotImplementedError: pass
except BaseException: raise
Expand Down
212 changes: 212 additions & 0 deletions src/mpi4py/MPI/asdlpack.pxi
@@ -0,0 +1,212 @@
#------------------------------------------------------------------------------
# Below is dlpack.h (as of v0.6)

cdef extern from * nogil:
ctypedef unsigned char uint8_t
ctypedef unsigned short uint16_t
ctypedef signed long long int64_t
ctypedef unsigned long long uint64_t

ctypedef enum DLDeviceType:
kDLCPU = 1
kDLCUDA = 2
kDLCUDAHost = 3
kDLOpenCL = 4
kDLVulkan = 7
kDLMetal = 8
kDLVPI = 9
kDLROCM = 10
kDLROCMHost = 11
kDLExtDev = 12
kDLCUDAManaged = 13

ctypedef struct DLDevice:
DLDeviceType device_type
int device_id

ctypedef enum DLDataTypeCode:
kDLInt = 0
kDLUInt = 1
kDLFloat = 2
kDLOpaqueHandle = 3
kDLBfloat = 4
kDLComplex = 5

ctypedef struct DLDataType:
uint8_t code
uint8_t bits
uint16_t lanes

ctypedef struct DLTensor:
void *data
DLDevice device
int ndim
DLDataType dtype
int64_t *shape
int64_t *strides
uint64_t byte_offset

ctypedef struct DLManagedTensor:
DLTensor dl_tensor
void *manager_ctx
void (*deleter)(DLManagedTensor *)

#------------------------------------------------------------------------------

cdef extern from "Python.h":
void* PyCapsule_GetPointer(object, const char[]) except? NULL
int PyCapsule_SetName(object, const char[]) except -1
int PyCapsule_IsValid(object, const char[])

#------------------------------------------------------------------------------

cdef inline int dlpack_is_contig(const DLTensor *dltensor, char order) nogil:
cdef int i, ndim = dltensor.ndim
cdef int64_t *shape = dltensor.shape
cdef int64_t *strides = dltensor.strides
cdef int64_t start, step, index, size = 1
if strides == NULL:
if ndim > 1 and order == c'F':
return 0
return 1
if order == c'F':
start = 0
step = 1
else:
start = ndim - 1
step = -1
for i from 0 <= i < ndim:
index = start + step * i
if size != strides[index]:
return 0
size *= shape[index]
return 1

cdef inline int dlpack_check_shape(const DLTensor *dltensor) except -1:
cdef int i, ndim = dltensor.ndim
if ndim < 0:
raise BufferError("dlpack: number of dimensions is negative")
if ndim > 0 and dltensor.shape == NULL:
raise BufferError("dlpack: shape is NULL")
for i from 0 <= i < ndim:
if dltensor.shape[i] < 0:
raise BufferError("dlpack: shape item is negative")
if dltensor.strides != NULL:
for i from 0 <= i < ndim:
if dltensor.strides[i] < 0:
raise BufferError("dlpack: strides item is negative")
return 0

cdef inline int dlpack_check_contig(const DLTensor *dltensor) except -1:
if dltensor.strides == NULL: return 0
if dlpack_is_contig(dltensor, c'C'): return 0
if dlpack_is_contig(dltensor, c'F'): return 0
raise BufferError("dlpack: buffer is not contiguous")

cdef inline void *dlpack_get_data(const DLTensor *dltensor) nogil:
return <char*> dltensor.data + dltensor.byte_offset

cdef inline Py_ssize_t dlpack_get_size(const DLTensor *dltensor) nogil:
cdef int i, ndim = dltensor.ndim
cdef int64_t *shape = dltensor.shape
cdef Py_ssize_t bits = dltensor.dtype.bits
cdef Py_ssize_t lanes = dltensor.dtype.lanes
cdef Py_ssize_t size = 1
for i from 0 <= i < ndim:
size *= <Py_ssize_t> shape[i]
size *= (bits * lanes + 7) // 8
return size

cdef inline char *dlpack_get_format(const DLTensor *dltensor) nogil:
cdef unsigned int code = dltensor.dtype.code
cdef unsigned int bits = dltensor.dtype.bits
if dltensor.dtype.lanes != 1: return BYTE_FMT
if code == kDLInt:
if bits == 1*8: return b"i1"
if bits == 2*8: return b"i2"
if bits == 4*8: return b"i4"
if bits == 8*8: return b"i8"
if code == kDLUInt:
if bits == 1*8: return b"u1"
if bits == 2*8: return b"u2"
if bits == 4*8: return b"u4"
if bits == 8*8: return b"u8"
if code == kDLFloat:
if bits == 2*8: return b"f2"
if bits == 4*8: return b"f4"
if bits == 8*8: return b"f8"
if bits == 12*8: return b"f12"
if bits == 16*8: return b"f16"
if code == kDLComplex:
if bits == 4*8: return b"c4"
if bits == 8*8: return b"c8"
if bits == 16*8: return b"c16"
if bits == 24*8: return b"c24"
if bits == 32*8: return b"c32"
return BYTE_FMT

cdef inline Py_ssize_t dlpack_get_itemsize(const DLTensor *dltensor) nogil:
if dltensor.dtype.lanes != 1: return 1
return (dltensor.dtype.bits + 7) // 8

#------------------------------------------------------------------------------

cdef int Py_CheckDLPackBuffer(object obj):
try: return <bint>hasattr(obj, '__dlpack__')
except: return 0

cdef int Py_GetDLPackBuffer(object obj, Py_buffer *view, int flags) except -1:
cdef object dlpack
cdef object dlpack_device
cdef int device_type
cdef int device_id
cdef object capsule
cdef DLManagedTensor *managed
cdef const DLTensor *dltensor
cdef void *buf
cdef Py_ssize_t size
cdef bint readonly
cdef bint fixnull

try:
dlpack = obj.__dlpack__
dlpack_device = obj.__dlpack_device__
except AttributeError:
raise NotImplementedError("dlpack: missing support")

device_type, device_id = dlpack_device()
if device_type == kDLCPU:
capsule = dlpack()
else:
capsule = dlpack(stream=-1)
if not PyCapsule_IsValid(capsule, b"dltensor"):
raise BufferError("dlpack: invalid capsule object")

managed = <DLManagedTensor*> PyCapsule_GetPointer(capsule, b"dltensor")
dltensor = &managed.dl_tensor

try:
dlpack_check_shape(dltensor)
dlpack_check_contig(dltensor)

buf = dlpack_get_data(dltensor)
size = dlpack_get_size(dltensor)
readonly = 0

fixnull = (buf == NULL and size == 0)
if fixnull: buf = &fixnull
PyBuffer_FillInfo(view, obj, buf, size, readonly, flags)
if fixnull: view.buf = NULL

if (flags & PyBUF_FORMAT) == PyBUF_FORMAT:
view.format = dlpack_get_format(dltensor)
if view.format != BYTE_FMT:
view.itemsize = dlpack_get_itemsize(dltensor)
finally:
if managed.deleter != NULL:
managed.deleter(managed)
PyCapsule_SetName(capsule, b"used_dltensor")
del capsule
return 0

#------------------------------------------------------------------------------
7 changes: 7 additions & 0 deletions src/mpi4py/MPI/msgbuffer.pxi
Expand Up @@ -26,6 +26,9 @@ cdef inline int is_buffer(object ob):
else:
return PyObject_CheckBuffer(ob) or _Py2_IsBuffer(ob)

cdef inline int is_dlpack_buffer(object ob):
return Py_CheckDLPackBuffer(ob)

cdef inline int is_gpu_buffer(object ob):
return Py_CheckGPUBuffer(ob)

Expand Down Expand Up @@ -185,6 +188,8 @@ cdef _p_message message_simple(object msg,
(o_buf, o_count, o_displ, o_type) = msg
else:
raise ValueError("message: expecting 2 to 4 items")
elif is_dlpack_buffer(msg):
o_buf = msg
elif is_gpu_buffer(msg):
o_buf = msg
elif PYPY:
Expand Down Expand Up @@ -299,6 +304,8 @@ cdef _p_message message_vector(object msg,
(o_buf, o_counts, o_displs, o_type) = msg
else:
raise ValueError("message: expecting 2 to 4 items")
elif is_dlpack_buffer(msg):
o_buf = msg
elif is_gpu_buffer(msg):
o_buf = msg
elif PYPY:
Expand Down
80 changes: 80 additions & 0 deletions test/arrayimpl.py
Expand Up @@ -240,6 +240,47 @@ def size(self):
return self.array.size


try:
import dlpackimpl as dlpack
except ImportError:
dlpack = None

class BaseDLPackCPU(object):

def __dlpack_device__(self):
return (dlpack.DLDeviceType.kDLCPU, 0)

def __dlpack__(self, stream=None):
assert stream is None
capsule = dlpack.make_py_capsule(self.array)
return capsule

def as_raw(self):
return self


if dlpack is not None and array is not None:

@add_backend
class DLPackArray(BaseDLPackCPU, ArrayArray):

backend = 'dlpack-array'

def __init__(self, arg, typecode, shape=None):
super(DLPackArray, self).__init__(arg, typecode, shape)


if dlpack is not None and numpy is not None:

@add_backend
class DLPackNumPy(BaseDLPackCPU, ArrayNumPy):

backend = 'dlpack-numpy'

def __init__(self, arg, typecode, shape=None):
super(DLPackNumPy, self).__init__(arg, typecode, shape)


def typestr(typecode, itemsize):
typestr = ''
if sys.byteorder == 'little':
Expand Down Expand Up @@ -348,6 +389,45 @@ def size(self):
return self.array.size


if dlpack is not None and cupy is not None:

# Note: we do not create a BaseDLPackGPU class because each GPU library
# has its own way to get device ID etc, so we have to reimplement the
# DLPack support anyway

@add_backend
class DLPackCuPy(GPUArrayCuPy):

backend = 'dlpack-cupy'
has_dlpack = None
dev_type = None

def __init__(self, arg, typecode, shape=None):
super().__init__(arg, typecode, shape)
self.has_dlpack = hasattr(self.array, '__dlpack_device__')
# TODO(leofang): test CUDA managed memory?
if cupy.cuda.runtime.is_hip:
self.dev_type = dlpack.DLDeviceType.kDLROCM
else:
self.dev_type = dlpack.DLDeviceType.kDLCUDA

def __dlpack_device__(self):
if self.has_dlpack:
return self.array.__dlpack_device__()
else:
return (self.dev_type, self.array.device.id)

def __dlpack__(self, stream=None):
cupy.cuda.get_current_stream().synchronize()
if self.has_dlpack:
return self.array.__dlpack__(stream=-1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why stream=-1 and not stream=stream ? Is it because of the previous synchronize() call?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 comes from the Consumer, which is mpi4py. The previous synchronize() is just mimicking users doing the right job (recall the change we did with as_raw() in #60).

else:
return self.array.toDlpack()

def as_raw(self):
return self


if numba is not None:

@add_backend
Expand Down