Skip to content
Permalink
master
Switch branches/tags
Go to file
 
 
Cannot retrieve contributors at this time
import collections
import ctypes
import functools
import inspect
import os
import sys
import numpy as np
from numba import _dispatcher
from numba.core.typing.templates import AbstractTemplate, ConcreteTemplate
from numba.core import (types, typing, utils, funcdesc, serialize, config,
compiler, sigutils)
from numba.core.typeconv.rules import default_type_manager
from numba.core.compiler import (CompilerBase, DefaultPassBuilder,
compile_result, Flags, Option)
from numba.core.compiler_lock import global_compiler_lock
from numba.core.compiler_machinery import (LoweringPass, AnalysisPass,
PassManager, register_pass)
from numba.core.dispatcher import CompilingCounter, OmittedArg
from numba.core.errors import (NumbaPerformanceWarning,
NumbaInvalidConfigWarning, TypingError)
from numba.core.typed_passes import (IRLegalization, NativeLowering,
AnnotateTypes)
from numba.core.typing.typeof import Purpose, typeof
from warnings import warn
import numba
from .cudadrv.devices import get_context
from .cudadrv.libs import get_cudalib
from .cudadrv import driver
from .errors import missing_launch_config_msg, normalize_kernel_dimensions
from .api import get_current_device
from .args import wrap_arg
from .descriptor import cuda_target
from . import types as cuda_types
def _nvvm_options_type(x):
if x is None:
return None
else:
assert isinstance(x, dict)
return x
class CUDAFlags(Flags):
nvvm_options = Option(
type=_nvvm_options_type,
default=None,
doc="NVVM options",
)
@register_pass(mutates_CFG=True, analysis_only=False)
class CUDABackend(LoweringPass):
_name = "cuda_backend"
def __init__(self):
LoweringPass.__init__(self)
def run_pass(self, state):
"""
Back-end: Packages lowering output in a compile result
"""
lowered = state['cr']
signature = typing.signature(state.return_type, *state.args)
state.cr = compile_result(
typing_context=state.typingctx,
target_context=state.targetctx,
typing_error=state.status.fail_reason,
type_annotation=state.type_annotation,
library=state.library,
call_helper=lowered.call_helper,
signature=signature,
fndesc=lowered.fndesc,
)
return True
@register_pass(mutates_CFG=False, analysis_only=False)
class CreateLibrary(LoweringPass):
"""
Create a CUDACodeLibrary for the NativeLowering pass to populate. The
NativeLowering pass will create a code library if none exists, but we need
to set it up with nvvm_options from the flags if they are present.
"""
_name = "create_library"
def __init__(self):
LoweringPass.__init__(self)
def run_pass(self, state):
codegen = state.targetctx.codegen()
name = state.func_id.func_qualname
nvvm_options = state.flags.nvvm_options
state.library = codegen.create_library(name, nvvm_options=nvvm_options)
# Enable object caching upfront so that the library can be serialized.
state.library.enable_object_caching()
return True
@register_pass(mutates_CFG=False, analysis_only=True)
class CUDALegalization(AnalysisPass):
_name = "cuda_legalization"
def __init__(self):
AnalysisPass.__init__(self)
def run_pass(self, state):
# Early return if NVVM 7
from numba.cuda.cudadrv.nvvm import NVVM
if NVVM().is_nvvm70:
return False
# NVVM < 7, need to check for charseq
typmap = state.typemap
def check_dtype(dtype):
if isinstance(dtype, (types.UnicodeCharSeq, types.CharSeq)):
msg = (f"{k} is a char sequence type. This type is not "
"supported with CUDA toolkit versions < 11.2. To "
"use this type, you need to update your CUDA "
"toolkit - try 'conda install cudatoolkit=11' if "
"you are using conda to manage your environment.")
raise TypingError(msg)
elif isinstance(dtype, types.Record):
for subdtype in dtype.fields.items():
# subdtype is a (name, _RecordField) pair
check_dtype(subdtype[1].type)
for k, v in typmap.items():
if isinstance(v, types.Array):
check_dtype(v.dtype)
return False
class CUDACompiler(CompilerBase):
def define_pipelines(self):
dpb = DefaultPassBuilder
pm = PassManager('cuda')
untyped_passes = dpb.define_untyped_pipeline(self.state)
pm.passes.extend(untyped_passes.passes)
typed_passes = dpb.define_typed_pipeline(self.state)
pm.passes.extend(typed_passes.passes)
pm.add_pass(CUDALegalization, "CUDA legalization")
lowering_passes = self.define_cuda_lowering_pipeline(self.state)
pm.passes.extend(lowering_passes.passes)
pm.finalize()
return [pm]
def define_cuda_lowering_pipeline(self, state):
pm = PassManager('cuda_lowering')
# legalise
pm.add_pass(IRLegalization,
"ensure IR is legal prior to lowering")
pm.add_pass(AnnotateTypes, "annotate types")
# lower
pm.add_pass(CreateLibrary, "create library")
pm.add_pass(NativeLowering, "native lowering")
pm.add_pass(CUDABackend, "cuda backend")
pm.finalize()
return pm
@global_compiler_lock
def compile_cuda(pyfunc, return_type, args, debug=False, lineinfo=False,
inline=False, fastmath=False, nvvm_options=None):
from .descriptor import cuda_target
typingctx = cuda_target.typing_context
targetctx = cuda_target.target_context
flags = CUDAFlags()
# Do not compile (generate native code), just lower (to LLVM)
flags.no_compile = True
flags.no_cpython_wrapper = True
flags.no_cfunc_wrapper = True
if debug or lineinfo:
# Note both debug and lineinfo turn on debug information in the
# compiled code, but we keep them separate arguments in case we
# later want to overload some other behavior on the debug flag.
# In particular, -opt=3 is not supported with -g.
flags.debuginfo = True
if inline:
flags.forceinline = True
if fastmath:
flags.fastmath = True
if nvvm_options:
flags.nvvm_options = nvvm_options
# Run compilation pipeline
cres = compiler.compile_extra(typingctx=typingctx,
targetctx=targetctx,
func=pyfunc,
args=args,
return_type=return_type,
flags=flags,
locals={},
pipeline_class=CUDACompiler)
library = cres.library
library.finalize()
return cres
def compile_kernel(pyfunc, args, link, debug=False, lineinfo=False,
inline=False, fastmath=False, extensions=[],
max_registers=None, opt=True):
return _Kernel(pyfunc, args, link, debug=debug, lineinfo=lineinfo,
inline=inline, fastmath=fastmath, extensions=extensions,
max_registers=max_registers, opt=opt)
@global_compiler_lock
def compile_ptx(pyfunc, args, debug=False, lineinfo=False, device=False,
fastmath=False, cc=None, opt=True):
"""Compile a Python function to PTX for a given set of argument types.
:param pyfunc: The Python function to compile.
:param args: A tuple of argument types to compile for.
:param debug: Whether to include debug info in the generated PTX.
:type debug: bool
:param lineinfo: Whether to include a line mapping from the generated PTX
to the source code. Usually this is used with optimized
code (since debug mode would automatically include this),
so we want debug info in the LLVM but only the line
mapping in the final PTX.
:type lineinfo: bool
:param device: Whether to compile a device function. Defaults to ``False``,
to compile global kernel functions.
:type device: bool
:param fastmath: Whether to enable fast math flags (ftz=1, prec_sqrt=0,
prec_div=, and fma=1)
:type fastmath: bool
:param cc: Compute capability to compile for, as a tuple ``(MAJOR, MINOR)``.
Defaults to ``(5, 2)``.
:type cc: tuple
:param opt: Enable optimizations. Defaults to ``True``.
:type opt: bool
:return: (ptx, resty): The PTX code and inferred return type
:rtype: tuple
"""
if debug and opt:
msg = ("debug=True with opt=True (the default) "
"is not supported by CUDA. This may result in a crash"
" - set debug=False or opt=False.")
warn(NumbaInvalidConfigWarning(msg))
nvvm_options = {
'debug': debug,
'lineinfo': lineinfo,
'fastmath': fastmath,
'opt': 3 if opt else 0
}
cres = compile_cuda(pyfunc, None, args, debug=debug, lineinfo=lineinfo,
nvvm_options=nvvm_options)
resty = cres.signature.return_type
if device:
lib = cres.library
else:
tgt = cres.target_context
filename = cres.type_annotation.filename
linenum = int(cres.type_annotation.linenum)
lib, kernel = tgt.prepare_cuda_kernel(cres.library, cres.fndesc, debug,
nvvm_options, filename, linenum)
cc = cc or config.CUDA_DEFAULT_PTX_CC
ptx = lib.get_asm_str(cc=cc)
return ptx, resty
def compile_ptx_for_current_device(pyfunc, args, debug=False, lineinfo=False,
device=False, fastmath=False, opt=True):
"""Compile a Python function to PTX for a given set of argument types for
the current device's compute capabilility. This calls :func:`compile_ptx`
with an appropriate ``cc`` value for the current device."""
cc = get_current_device().compute_capability
return compile_ptx(pyfunc, args, debug=debug, lineinfo=lineinfo,
device=device, fastmath=fastmath, cc=cc, opt=True)
def declare_device_function(name, restype, argtypes):
from .descriptor import cuda_target
typingctx = cuda_target.typing_context
targetctx = cuda_target.target_context
sig = typing.signature(restype, *argtypes)
extfn = ExternFunction(name, sig)
class device_function_template(ConcreteTemplate):
key = extfn
cases = [sig]
fndesc = funcdesc.ExternalFunctionDescriptor(
name=name, restype=restype, argtypes=argtypes)
typingctx.insert_user_function(extfn, device_function_template)
targetctx.insert_user_function(extfn, fndesc)
return extfn
class ExternFunction(object):
def __init__(self, name, sig):
self.name = name
self.sig = sig
class ForAll(object):
def __init__(self, kernel, ntasks, tpb, stream, sharedmem):
if ntasks < 0:
raise ValueError("Can't create ForAll with negative task count: %s"
% ntasks)
self.kernel = kernel
self.ntasks = ntasks
self.thread_per_block = tpb
self.stream = stream
self.sharedmem = sharedmem
def __call__(self, *args):
if self.ntasks == 0:
return
if self.kernel.specialized:
kernel = self.kernel
else:
kernel = self.kernel.specialize(*args)
blockdim = self._compute_thread_per_block(kernel)
griddim = (self.ntasks + blockdim - 1) // blockdim
return kernel[griddim, blockdim, self.stream, self.sharedmem](*args)
def _compute_thread_per_block(self, kernel):
tpb = self.thread_per_block
# Prefer user-specified config
if tpb != 0:
return tpb
# Else, ask the driver to give a good config
else:
ctx = get_context()
# Kernel is specialized, so there's only one definition - get it so
# we can get the cufunc from the code library
defn = next(iter(kernel.overloads.values()))
kwargs = dict(
func=defn._codelibrary.get_cufunc(),
b2d_func=0, # dynamic-shared memory is constant to blksz
memsize=self.sharedmem,
blocksizelimit=1024,
)
_, tpb = ctx.get_max_potential_block_size(**kwargs)
return tpb
class _Kernel(serialize.ReduceMixin):
'''
CUDA Kernel specialized for a given set of argument types. When called, this
object launches the kernel on the device.
'''
@global_compiler_lock
def __init__(self, py_func, argtypes, link=None, debug=False,
lineinfo=False, inline=False, fastmath=False, extensions=None,
max_registers=None, opt=True, device=False):
if device:
raise RuntimeError('Cannot compile a device function as a kernel')
super().__init__()
self.py_func = py_func
self.argtypes = argtypes
self.debug = debug
self.lineinfo = lineinfo
self.extensions = extensions or []
nvvm_options = {
'debug': self.debug,
'lineinfo': self.lineinfo,
'fastmath': fastmath,
'opt': 3 if opt else 0
}
cres = compile_cuda(self.py_func, types.void, self.argtypes,
debug=self.debug,
lineinfo=self.lineinfo,
inline=inline,
fastmath=fastmath,
nvvm_options=nvvm_options)
tgt_ctx = cres.target_context
code = self.py_func.__code__
filename = code.co_filename
linenum = code.co_firstlineno
lib, kernel = tgt_ctx.prepare_cuda_kernel(cres.library, cres.fndesc,
debug, nvvm_options,
filename, linenum,
max_registers)
if not link:
link = []
# A kernel needs cooperative launch if grid_sync is being used.
self.cooperative = 'cudaCGGetIntrinsicHandle' in lib.get_asm_str()
# We need to link against cudadevrt if grid sync is being used.
if self.cooperative:
link.append(get_cudalib('cudadevrt', static=True))
for filepath in link:
lib.add_linking_file(filepath)
# populate members
self.entry_name = kernel.name
self.signature = cres.signature
self._type_annotation = cres.type_annotation
self._codelibrary = lib
self.call_helper = cres.call_helper
@property
def argument_types(self):
return tuple(self.signature.args)
@classmethod
def _rebuild(cls, cooperative, name, argtypes, codelibrary, link, debug,
lineinfo, call_helper, extensions):
"""
Rebuild an instance.
"""
instance = cls.__new__(cls)
# invoke parent constructor
super(cls, instance).__init__()
# populate members
instance.cooperative = cooperative
instance.entry_name = name
instance.argument_types = tuple(argtypes)
instance._type_annotation = None
instance._codelibrary = codelibrary
instance.debug = debug
instance.lineinfo = lineinfo
instance.call_helper = call_helper
instance.extensions = extensions
return instance
def _reduce_states(self):
"""
Reduce the instance for serialization.
Compiled definitions are serialized in PTX form.
Type annotation are discarded.
Thread, block and shared memory configuration are serialized.
Stream information is discarded.
"""
return dict(cooperative=self.cooperative, name=self.entry_name,
argtypes=self.argtypes, codelibrary=self.codelibrary,
debug=self.debug, lineinfo=self.lineinfo,
call_helper=self.call_helper, extensions=self.extensions)
def bind(self):
"""
Force binding to current CUDA context
"""
self._codelibrary.get_cufunc()
@property
def ptx(self):
'''
PTX code for this kernel.
'''
return self._codelibrary.get_asm_str()
@property
def device(self):
"""
Get current active context
"""
return get_current_device()
@property
def regs_per_thread(self):
'''
The number of registers used by each thread for this kernel.
'''
return self._codelibrary.get_cufunc().attrs.regs
def inspect_llvm(self):
'''
Returns the LLVM IR for this kernel.
'''
return self._codelibrary.get_llvm_str()
def inspect_asm(self, cc):
'''
Returns the PTX code for this kernel.
'''
return self._codelibrary.get_asm_str(cc=cc)
def inspect_sass(self):
'''
Returns the SASS code for this kernel.
Requires nvdisasm to be available on the PATH.
'''
return self._codelibrary.get_sass()
def inspect_types(self, file=None):
'''
Produce a dump of the Python source of this function annotated with the
corresponding Numba IR and type information. The dump is written to
*file*, or *sys.stdout* if *file* is *None*.
'''
if self._type_annotation is None:
raise ValueError("Type annotation is not available")
if file is None:
file = sys.stdout
print("%s %s" % (self.entry_name, self.argument_types), file=file)
print('-' * 80, file=file)
print(self._type_annotation, file=file)
print('=' * 80, file=file)
def max_cooperative_grid_blocks(self, blockdim, dynsmemsize=0):
'''
Calculates the maximum number of blocks that can be launched for this
kernel in a cooperative grid in the current context, for the given block
and dynamic shared memory sizes.
:param blockdim: Block dimensions, either as a scalar for a 1D block, or
a tuple for 2D or 3D blocks.
:param dynsmemsize: Dynamic shared memory size in bytes.
:return: The maximum number of blocks in the grid.
'''
ctx = get_context()
cufunc = self._codelibrary.get_cufunc()
if isinstance(blockdim, tuple):
blockdim = functools.reduce(lambda x, y: x * y, blockdim)
active_per_sm = ctx.get_active_blocks_per_multiprocessor(cufunc,
blockdim,
dynsmemsize)
sm_count = ctx.device.MULTIPROCESSOR_COUNT
return active_per_sm * sm_count
def launch(self, args, griddim, blockdim, stream=0, sharedmem=0):
# Prepare kernel
cufunc = self._codelibrary.get_cufunc()
if self.debug:
excname = cufunc.name + "__errcode__"
excmem, excsz = cufunc.module.get_global_symbol(excname)
assert excsz == ctypes.sizeof(ctypes.c_int)
excval = ctypes.c_int()
excmem.memset(0, stream=stream)
# Prepare arguments
retr = [] # hold functors for writeback
kernelargs = []
for t, v in zip(self.argument_types, args):
self._prepare_args(t, v, stream, retr, kernelargs)
if driver.USE_NV_BINDING:
zero_stream = driver.binding.CUstream(0)
else:
zero_stream = None
stream_handle = stream and stream.handle or zero_stream
# Invoke kernel
driver.launch_kernel(cufunc.handle,
*griddim,
*blockdim,
sharedmem,
stream_handle,
kernelargs,
cooperative=self.cooperative)
if self.debug:
driver.device_to_host(ctypes.addressof(excval), excmem, excsz)
if excval.value != 0:
# An error occurred
def load_symbol(name):
mem, sz = cufunc.module.get_global_symbol("%s__%s__" %
(cufunc.name,
name))
val = ctypes.c_int()
driver.device_to_host(ctypes.addressof(val), mem, sz)
return val.value
tid = [load_symbol("tid" + i) for i in 'zyx']
ctaid = [load_symbol("ctaid" + i) for i in 'zyx']
code = excval.value
exccls, exc_args, loc = self.call_helper.get_exception(code)
# Prefix the exception message with the source location
if loc is None:
locinfo = ''
else:
sym, filepath, lineno = loc
filepath = os.path.abspath(filepath)
locinfo = 'In function %r, file %s, line %s, ' % (sym,
filepath,
lineno,)
# Prefix the exception message with the thread position
prefix = "%stid=%s ctaid=%s" % (locinfo, tid, ctaid)
if exc_args:
exc_args = ("%s: %s" % (prefix, exc_args[0]),) + \
exc_args[1:]
else:
exc_args = prefix,
raise exccls(*exc_args)
# retrieve auto converted arrays
for wb in retr:
wb()
def _prepare_args(self, ty, val, stream, retr, kernelargs):
"""
Convert arguments to ctypes and append to kernelargs
"""
# map the arguments using any extension you've registered
for extension in reversed(self.extensions):
ty, val = extension.prepare_args(
ty,
val,
stream=stream,
retr=retr)
if isinstance(ty, types.Array):
devary = wrap_arg(val).to_device(retr, stream)
c_intp = ctypes.c_ssize_t
meminfo = ctypes.c_void_p(0)
parent = ctypes.c_void_p(0)
nitems = c_intp(devary.size)
itemsize = c_intp(devary.dtype.itemsize)
ptr = driver.device_pointer(devary)
if driver.USE_NV_BINDING:
ptr = int(ptr)
data = ctypes.c_void_p(ptr)
kernelargs.append(meminfo)
kernelargs.append(parent)
kernelargs.append(nitems)
kernelargs.append(itemsize)
kernelargs.append(data)
for ax in range(devary.ndim):
kernelargs.append(c_intp(devary.shape[ax]))
for ax in range(devary.ndim):
kernelargs.append(c_intp(devary.strides[ax]))
elif isinstance(ty, types.Integer):
cval = getattr(ctypes, "c_%s" % ty)(val)
kernelargs.append(cval)
elif ty == types.float16:
cval = ctypes.c_uint16(np.float16(val).view(np.uint16))
kernelargs.append(cval)
elif ty == types.float64:
cval = ctypes.c_double(val)
kernelargs.append(cval)
elif ty == types.float32:
cval = ctypes.c_float(val)
kernelargs.append(cval)
elif ty == types.boolean:
cval = ctypes.c_uint8(int(val))
kernelargs.append(cval)
elif ty == types.complex64:
kernelargs.append(ctypes.c_float(val.real))
kernelargs.append(ctypes.c_float(val.imag))
elif ty == types.complex128:
kernelargs.append(ctypes.c_double(val.real))
kernelargs.append(ctypes.c_double(val.imag))
elif isinstance(ty, (types.NPDatetime, types.NPTimedelta)):
kernelargs.append(ctypes.c_int64(val.view(np.int64)))
elif isinstance(ty, types.Record):
devrec = wrap_arg(val).to_device(retr, stream)
ptr = devrec.device_ctypes_pointer
if driver.USE_NV_BINDING:
ptr = ctypes.c_void_p(int(ptr))
kernelargs.append(ptr)
elif isinstance(ty, types.BaseTuple):
assert len(ty) == len(val)
for t, v in zip(ty, val):
self._prepare_args(t, v, stream, retr, kernelargs)
else:
raise NotImplementedError(ty, val)
class _KernelConfiguration:
def __init__(self, dispatcher, griddim, blockdim, stream, sharedmem):
self.dispatcher = dispatcher
self.griddim = griddim
self.blockdim = blockdim
self.stream = stream
self.sharedmem = sharedmem
if config.CUDA_LOW_OCCUPANCY_WARNINGS:
ctx = get_context()
smcount = ctx.device.MULTIPROCESSOR_COUNT
grid_size = griddim[0] * griddim[1] * griddim[2]
if grid_size < 2 * smcount:
msg = ("Grid size ({grid}) < 2 * SM count ({sm}) "
"will likely result in GPU under utilization due "
"to low occupancy.")
msg = msg.format(grid=grid_size, sm=2 * smcount)
warn(NumbaPerformanceWarning(msg))
def __call__(self, *args):
return self.dispatcher.call(args, self.griddim, self.blockdim,
self.stream, self.sharedmem)
class Dispatcher(_dispatcher.Dispatcher, serialize.ReduceMixin):
'''
CUDA Dispatcher object. When configured and called, the dispatcher will
specialize itself for the given arguments (if no suitable specialized
version already exists) & compute capability, and launch on the device
associated with the current context.
Dispatcher objects are not to be constructed by the user, but instead are
created using the :func:`numba.cuda.jit` decorator.
'''
# Whether to fold named arguments and default values. Default values are
# presently unsupported on CUDA, so we can leave this as False in all
# cases.
_fold_args = False
targetdescr = cuda_target
def __init__(self, py_func, sigs, targetoptions):
self.py_func = py_func
self.sigs = []
self.link = targetoptions.pop('link', (),)
self._can_compile = True
self._type = self._numba_type_
# The compiling counter is only used when compiling device functions as
# it is used to detect recursion - recursion is not possible when
# compiling a kernel.
self._compiling_counter = CompilingCounter()
# Specializations for given sets of argument types
self.specializations = {}
# A mapping of signatures to compile results
self.overloads = collections.OrderedDict()
self.targetoptions = targetoptions
# defensive copy
self.targetoptions['extensions'] = \
list(self.targetoptions.get('extensions', []))
self.typingctx = self.targetdescr.typing_context
self._tm = default_type_manager
pysig = utils.pysignature(py_func)
arg_count = len(pysig.parameters)
argnames = tuple(pysig.parameters)
default_values = self.py_func.__defaults__ or ()
defargs = tuple(OmittedArg(val) for val in default_values)
can_fallback = False # CUDA cannot fallback to object mode
try:
lastarg = list(pysig.parameters.values())[-1]
except IndexError:
has_stararg = False
else:
has_stararg = lastarg.kind == lastarg.VAR_POSITIONAL
exact_match_required = False
_dispatcher.Dispatcher.__init__(self, self._tm.get_pointer(),
arg_count, self._fold_args, argnames,
defargs, can_fallback, has_stararg,
exact_match_required)
if sigs:
if len(sigs) > 1:
raise TypeError("Only one signature supported at present")
if targetoptions.get('device'):
argtypes, restype = sigutils.normalize_signature(sigs[0])
self.compile_device(argtypes)
else:
self.compile(sigs[0])
self._can_compile = False
if targetoptions.get('device'):
self._register_device_function()
def _register_device_function(self):
dispatcher = self
pyfunc = self.py_func
class device_function_template(AbstractTemplate):
key = dispatcher
def generic(self, args, kws):
assert not kws
return dispatcher.compile(args).signature
def get_template_info(cls):
basepath = os.path.dirname(os.path.dirname(numba.__file__))
code, firstlineno = inspect.getsourcelines(pyfunc)
path = inspect.getsourcefile(pyfunc)
sig = str(utils.pysignature(pyfunc))
info = {
'kind': "overload",
'name': getattr(cls.key, '__name__', "unknown"),
'sig': sig,
'filename': utils.safe_relpath(path, start=basepath),
'lines': (firstlineno, firstlineno + len(code) - 1),
'docstring': pyfunc.__doc__
}
return info
from .descriptor import cuda_target
typingctx = cuda_target.typing_context
typingctx.insert_user_function(dispatcher, device_function_template)
@property
def _numba_type_(self):
return cuda_types.CUDADispatcher(self)
@property
def is_compiling(self):
"""
Whether a specialization is currently being compiled.
"""
return self._compiling_counter
def configure(self, griddim, blockdim, stream=0, sharedmem=0):
griddim, blockdim = normalize_kernel_dimensions(griddim, blockdim)
return _KernelConfiguration(self, griddim, blockdim, stream, sharedmem)
def __getitem__(self, args):
if len(args) not in [2, 3, 4]:
raise ValueError('must specify at least the griddim and blockdim')
return self.configure(*args)
def forall(self, ntasks, tpb=0, stream=0, sharedmem=0):
"""Returns a 1D-configured kernel for a given number of tasks.
This assumes that:
- the kernel maps the Global Thread ID ``cuda.grid(1)`` to tasks on a
1-1 basis.
- the kernel checks that the Global Thread ID is upper-bounded by
``ntasks``, and does nothing if it is not.
:param ntasks: The number of tasks.
:param tpb: The size of a block. An appropriate value is chosen if this
parameter is not supplied.
:param stream: The stream on which the configured kernel will be
launched.
:param sharedmem: The number of bytes of dynamic shared memory required
by the kernel.
:return: A configured kernel, ready to launch on a set of arguments."""
return ForAll(self, ntasks, tpb=tpb, stream=stream, sharedmem=sharedmem)
@property
def extensions(self):
'''
A list of objects that must have a `prepare_args` function. When a
specialized kernel is called, each argument will be passed through
to the `prepare_args` (from the last object in this list to the
first). The arguments to `prepare_args` are:
- `ty` the numba type of the argument
- `val` the argument value itself
- `stream` the CUDA stream used for the current call to the kernel
- `retr` a list of zero-arg functions that you may want to append
post-call cleanup work to.
The `prepare_args` function must return a tuple `(ty, val)`, which
will be passed in turn to the next right-most `extension`. After all
the extensions have been called, the resulting `(ty, val)` will be
passed into Numba's default argument marshalling logic.
'''
return self.targetoptions.get('extensions')
def __call__(self, *args, **kwargs):
# An attempt to launch an unconfigured kernel
raise ValueError(missing_launch_config_msg)
def call(self, args, griddim, blockdim, stream, sharedmem):
'''
Compile if necessary and invoke this kernel with *args*.
'''
if self.specialized:
kernel = next(iter(self.overloads.values()))
else:
kernel = _dispatcher.Dispatcher._cuda_call(self, *args)
kernel.launch(args, griddim, blockdim, stream, sharedmem)
def _compile_for_args(self, *args, **kws):
# Based on _DispatcherBase._compile_for_args.
assert not kws
argtypes = [self.typeof_pyval(a) for a in args]
return self.compile(tuple(argtypes))
def _search_new_conversions(self, *args, **kws):
# Based on _DispatcherBase._search_new_conversions
assert not kws
args = [self.typeof_pyval(a) for a in args]
found = False
for sig in self.nopython_signatures:
conv = self.typingctx.install_possible_conversions(args, sig.args)
if conv:
found = True
return found
def typeof_pyval(self, val):
# Based on _DispatcherBase.typeof_pyval, but differs from it to support
# the CUDA Array Interface.
try:
return typeof(val, Purpose.argument)
except ValueError:
if numba.cuda.is_cuda_array(val):
# When typing, we don't need to synchronize on the array's
# stream - this is done when the kernel is launched.
return typeof(numba.cuda.as_cuda_array(val, sync=False),
Purpose.argument)
else:
raise
@property
def nopython_signatures(self):
# Based on _DispatcherBase.nopython_signatures
return [kernel.signature for kernel in self.overloads.values()]
def specialize(self, *args):
'''
Create a new instance of this dispatcher specialized for the given
*args*.
'''
cc = get_current_device().compute_capability
argtypes = tuple(
[self.typingctx.resolve_argument_type(a) for a in args])
if self.specialized:
raise RuntimeError('Dispatcher already specialized')
specialization = self.specializations.get((cc, argtypes))
if specialization:
return specialization
targetoptions = self.targetoptions
targetoptions['link'] = self.link
specialization = Dispatcher(self.py_func, [types.void(*argtypes)],
targetoptions)
self.specializations[cc, argtypes] = specialization
return specialization
def disable_compile(self, val=True):
self._can_compile = not val
@property
def specialized(self):
"""
True if the Dispatcher has been specialized.
"""
return len(self.sigs) == 1 and not self._can_compile
def get_regs_per_thread(self, signature=None):
'''
Returns the number of registers used by each thread in this kernel for
the device in the current context.
:param signature: The signature of the compiled kernel to get register
usage for. This may be omitted for a specialized
kernel.
:return: The number of registers used by the compiled variant of the
kernel for the given signature and current device.
'''
if signature is not None:
return self.overloads[signature.args].regs_per_thread
if self.specialized:
return next(iter(self.overloads.values())).regs_per_thread
else:
return {sig: overload.regs_per_thread
for sig, overload in self.overloads.items()}
def get_call_template(self, args, kws):
# Copied and simplified from _DispatcherBase.get_call_template.
"""
Get a typing.ConcreteTemplate for this dispatcher and the given
*args* and *kws* types. This allows resolution of the return type.
A (template, pysig, args, kws) tuple is returned.
"""
with self._compiling_counter:
# Ensure an exactly-matching overload is available if we can
# compile. We proceed with the typing even if we can't compile
# because we may be able to force a cast on the caller side.
if self._can_compile:
self.compile_device(tuple(args))
# Create function type for typing
func_name = self.py_func.__name__
name = "CallTemplate({0})".format(func_name)
call_template = typing.make_concrete_template(
name, key=func_name, signatures=self.nopython_signatures)
pysig = utils.pysignature(self.py_func)
return call_template, pysig, args, kws
def get_overload(self, sig):
# We give the id of the overload (a CompileResult) because this is used
# as a key into a dict of overloads, and this is the only small and
# unique property of a CompileResult on CUDA (c.f. the CPU target,
# which uses its entry_point, which is a pointer value).
args, return_type = sigutils.normalize_signature(sig)
return id(self.overloads[args])
def compile_device(self, args):
"""Compile the device function for the given argument types.
Each signature is compiled once by caching the compiled function inside
this object.
Returns the `CompileResult`.
"""
if args not in self.overloads:
debug = self.targetoptions.get('debug')
inline = self.targetoptions.get('inline')
nvvm_options = {
'debug': debug,
'opt': 3 if self.targetoptions.get('opt') else 0
}
cres = compile_cuda(self.py_func, None, args, debug=debug,
inline=inline, nvvm_options=nvvm_options)
self.overloads[args] = cres
# The inserted function uses the id of the CompileResult as a key,
# consistent with get_overload() above.
cres.target_context.insert_user_function(id(cres), cres.fndesc,
[cres.library])
else:
cres = self.overloads[args]
return cres
def compile(self, sig):
'''
Compile and bind to the current context a version of this kernel
specialized for the given signature.
'''
argtypes, return_type = sigutils.normalize_signature(sig)
assert return_type is None or return_type == types.none
if self.specialized:
return next(iter(self.overloads.values()))
else:
kernel = self.overloads.get(argtypes)
if kernel is None:
if not self._can_compile:
raise RuntimeError("Compilation disabled")
kernel = _Kernel(self.py_func, argtypes, link=self.link,
**self.targetoptions)
# Inspired by _DispatcherBase.add_overload, but differs slightly
# because we're inserting a _Kernel object instead of a compiled
# function.
c_sig = [a._code for a in argtypes]
self._insert(c_sig, kernel, cuda=True)
self.overloads[argtypes] = kernel
kernel.bind()
self.sigs.append(sig)
return kernel
def inspect_llvm(self, signature=None):
'''
Return the LLVM IR for this kernel.
:param signature: A tuple of argument types.
:return: The LLVM IR for the given signature, or a dict of LLVM IR
for all previously-encountered signatures.
'''
device = self.targetoptions.get('device')
if signature is not None:
if device:
return self.overloads[signature].library.get_llvm_str()
else:
return self.overloads[signature].inspect_llvm()
else:
if device:
return {sig: overload.library.get_llvm_str()
for sig, overload in self.overloads.items()}
else:
return {sig: overload.inspect_llvm()
for sig, overload in self.overloads.items()}
def inspect_asm(self, signature=None):
'''
Return this kernel's PTX assembly code for for the device in the
current context.
:param signature: A tuple of argument types.
:return: The PTX code for the given signature, or a dict of PTX codes
for all previously-encountered signatures.
'''
cc = get_current_device().compute_capability
device = self.targetoptions.get('device')
if signature is not None:
if device:
return self.overloads[signature].library.get_asm_str(cc)
else:
return self.overloads[signature].inspect_asm(cc)
else:
if device:
return {sig: overload.library.get_asm_str(cc)
for sig, overload in self.overloads.items()}
else:
return {sig: overload.inspect_asm(cc)
for sig, overload in self.overloads.items()}
def inspect_sass(self, signature=None):
'''
Return this kernel's SASS assembly code for for the device in the
current context.
:param signature: A tuple of argument types.
:return: The SASS code for the given signature, or a dict of SASS codes
for all previously-encountered signatures.
SASS for the device in the current context is returned.
Requires nvdisasm to be available on the PATH.
'''
if self.targetoptions.get('device'):
raise RuntimeError('Cannot inspect SASS of a device function')
if signature is not None:
return self.overloads[signature].inspect_sass()
else:
return {sig: defn.inspect_sass()
for sig, defn in self.overloads.items()}
def inspect_types(self, file=None):
'''
Produce a dump of the Python source of this function annotated with the
corresponding Numba IR and type information. The dump is written to
*file*, or *sys.stdout* if *file* is *None*.
'''
if file is None:
file = sys.stdout
for _, defn in self.overloads.items():
defn.inspect_types(file=file)
@property
def ptx(self):
return {sig: overload.ptx for sig, overload in self.overloads.items()}
def bind(self):
for defn in self.overloads.values():
defn.bind()
@classmethod
def _rebuild(cls, py_func, sigs, targetoptions):
"""
Rebuild an instance.
"""
instance = cls(py_func, sigs, targetoptions)
return instance
def _reduce_states(self):
"""
Reduce the instance for serialization.
Compiled definitions are discarded.
"""
return dict(py_func=self.py_func, sigs=self.sigs,
targetoptions=self.targetoptions)