Skip to content

Commit

Permalink
Merge pull request #1294 from spcl/external-mem
Browse files Browse the repository at this point in the history
Externally-managed memory lifetime
  • Loading branch information
tbennun committed Jul 4, 2023
2 parents 484630a + 211582e commit 36cb24e
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 52 deletions.
5 changes: 0 additions & 5 deletions dace/codegen/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,6 @@ def generate_code(sdfg, validate=True) -> List[CodeObject]:
shutil.move(f"{tmp_dir}/test2.sdfg", "test2.sdfg")
raise RuntimeError('SDFG serialization failed - files do not match')

# Run with the deserialized version
# NOTE: This means that all subsequent modifications to `sdfg`
# are not reflected outside of this function (e.g., library
# node expansion).
sdfg = sdfg2

# Before generating the code, run type inference on the SDFG connectors
infer_types.infer_connector_types(sdfg)
Expand Down
4 changes: 2 additions & 2 deletions dace/codegen/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def update_persistent_desc(desc: data.Data, sdfg: SDFG):
Replaces the symbols used in a persistent data descriptor according to NestedSDFG's symbol mapping.
The replacement happens recursively up to the top-level SDFG.
"""
if (desc.lifetime == dtypes.AllocationLifetime.Persistent and sdfg.parent
if (desc.lifetime in (dtypes.AllocationLifetime.Persistent, dtypes.AllocationLifetime.External) and sdfg.parent
and any(str(s) in sdfg.parent_nsdfg_node.symbol_mapping for s in desc.free_symbols)):
newdesc = deepcopy(desc)
csdfg = sdfg
Expand Down Expand Up @@ -155,7 +155,7 @@ def get_gpu_runtime() -> gpu_runtime.GPURuntime:
backend = get_gpu_backend()
if backend == 'cuda':
libpath = ctypes.util.find_library('cudart')
if os.name == 'nt' and not libpath: # Windows-based search
if os.name == 'nt' and not libpath: # Windows-based search
for version in (12, 11, 10, 9):
libpath = ctypes.util.find_library(f'cudart64_{version}0')
if libpath:
Expand Down
50 changes: 44 additions & 6 deletions dace/codegen/compiled_sdfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,20 @@ def __exit__(self, *args, **kwargs):
self.unload()


def _array_interface_ptr(array: Any, array_type: dt.Array) -> int:
def _array_interface_ptr(array: Any, storage: dtypes.StorageType) -> int:
"""
If the given array implements ``__array_interface__`` (see
``dtypes.is_array``), returns the base host or device pointer to the
array's allocated memory.
:param array: Array object that implements NumPy's array interface.
:param array_type: Data descriptor of the array (used to get storage
location to determine whether it's a host or GPU device
pointer).
:param array_type: Storage location of the array, used to determine whether
it is a host or device pointer (e.g. GPU).
:return: A pointer to the base location of the allocated buffer.
"""
if hasattr(array, 'data_ptr'):
return array.data_ptr()
if array_type.storage == dtypes.StorageType.GPU_Global:
if storage == dtypes.StorageType.GPU_Global:
return array.__cuda_array_interface__['data'][0]
return array.__array_interface__['data'][0]

Expand Down Expand Up @@ -200,10 +199,13 @@ def __init__(self, sdfg, lib: ReloadableDLL, argnames: List[str] = None):
self.argnames = argnames

self.has_gpu_code = False
self.external_memory_types = set()
for _, _, aval in self._sdfg.arrays_recursive():
if aval.storage in dtypes.GPU_STORAGES:
self.has_gpu_code = True
break
if aval.lifetime == dtypes.AllocationLifetime.External:
self.external_memory_types.add(aval.storage)
if not self.has_gpu_code:
for node, _ in self._sdfg.all_nodes_recursive():
if getattr(node, 'schedule', False) in dtypes.GPU_SCHEDULES:
Expand Down Expand Up @@ -271,6 +273,42 @@ class State(ctypes.Structure):

return State

def get_workspace_sizes(self) -> Dict[dtypes.StorageType, int]:
"""
Returns the total external memory size to be allocated for this SDFG.
:return: A dictionary mapping storage types to the number of bytes necessary
to allocate for the SDFG to work properly.
"""
if not self._initialized:
raise ValueError('Compiled SDFG is uninitialized, please call ``initialize`` prior to '
'querying external memory size.')

result: Dict[dtypes.StorageType, int] = {}
for storage in self.external_memory_types:
func = self._lib.get_symbol(f'__dace_get_external_memory_size_{storage.name}')
result[storage] = func(self._libhandle, *self._lastargs[1])

return result

def set_workspace(self, storage: dtypes.StorageType, workspace: Any):
"""
Sets the workspace for the given storage type to the given buffer.
:param storage: The storage type to fill.
:param workspace: An array-convertible object (through ``__[cuda_]array_interface__``,
see ``_array_interface_ptr``) to use for the workspace.
"""
if not self._initialized:
raise ValueError('Compiled SDFG is uninitialized, please call ``initialize`` prior to '
'setting external memory.')
if storage not in self.external_memory_types:
raise ValueError(f'Compiled SDFG does not specify external memory of {storage}')

func = self._lib.get_symbol(f'__dace_set_external_memory_{storage.name}', None)
ptr = _array_interface_ptr(workspace, storage)
func(self._libhandle, ctypes.c_void_p(ptr), *self._lastargs[1])

@property
def filename(self):
return self._lib._library_filename
Expand Down Expand Up @@ -487,7 +525,7 @@ def _construct_args(self, kwargs) -> Tuple[Tuple[Any], Tuple[Any]]:
for arg, actype, atype, aname in callparams if aname in symbols)

# Replace arrays with their base host/device pointers
newargs = tuple((ctypes.c_void_p(_array_interface_ptr(arg, atype)), actype,
newargs = tuple((ctypes.c_void_p(_array_interface_ptr(arg, atype.storage)), actype,
atype) if dtypes.is_array(arg) else (arg, actype, atype)
for arg, actype, atype, _ in callparams)

Expand Down
17 changes: 13 additions & 4 deletions dace/codegen/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class DefinedMemlets:
referenced correctly in nested scopes and SDFGs.
The ones defined in the first (top) scope, refer to global variables.
"""

def __init__(self):
self._scopes = [(None, {}, True), (None, {}, True)]

Expand Down Expand Up @@ -142,6 +143,7 @@ def remove(self, name: str, ancestor: int = 0, is_global: bool = False) -> Tuple
class TargetDispatcher(object):
""" Dispatches sub-SDFG generation (according to scope),
storage<->storage copies, and storage<->tasklet copies to targets. """

def __init__(self, framecode):
# Avoid import loop
from dace.codegen.targets import framecode as fc
Expand Down Expand Up @@ -215,7 +217,8 @@ def register_state_dispatcher(self, dispatcher, predicate=None):
"""

if not hasattr(dispatcher, "generate_state"):
raise TypeError("State dispatcher \"{}\" does not " "implement \"generate_state\"".format(dispatcher))
raise TypeError("State dispatcher \"{}\" does not "
"implement \"generate_state\"".format(dispatcher))
if predicate is None:
self._generic_state_dispatcher = dispatcher
else:
Expand All @@ -241,7 +244,8 @@ def register_node_dispatcher(self, dispatcher, predicate=None):
:see: TargetCodeGenerator
"""
if not hasattr(dispatcher, "generate_node"):
raise TypeError("Node dispatcher must " "implement \"generate_node\"")
raise TypeError("Node dispatcher must "
"implement \"generate_node\"")
if predicate is None:
self._generic_node_dispatcher = dispatcher
else:
Expand Down Expand Up @@ -448,9 +452,12 @@ def dispatch_allocate(self,
""" Dispatches a code generator for data allocation. """
self._used_targets.add(self._array_dispatchers[datadesc.storage])

if datadesc.lifetime is dtypes.AllocationLifetime.Persistent:
if datadesc.lifetime == dtypes.AllocationLifetime.Persistent:
declaration_stream = CodeIOStream()
callsite_stream = self.frame._initcode
elif datadesc.lifetime == dtypes.AllocationLifetime.External:
declaration_stream = CodeIOStream()
callsite_stream = CodeIOStream()
else:
declaration_stream = callsite_stream

Expand All @@ -468,8 +475,10 @@ def dispatch_deallocate(self, sdfg: SDFG, dfg: ScopeSubgraphView, state_id: int,
""" Dispatches a code generator for a data deallocation. """
self._used_targets.add(self._array_dispatchers[datadesc.storage])

if datadesc.lifetime is dtypes.AllocationLifetime.Persistent:
if datadesc.lifetime == dtypes.AllocationLifetime.Persistent:
callsite_stream = self.frame._exitcode
elif datadesc.lifetime == dtypes.AllocationLifetime.External:
return

self._array_dispatchers[datadesc.storage].deallocate_array(sdfg, dfg, state_id, node, datadesc, function_stream,
callsite_stream)
Expand Down
14 changes: 8 additions & 6 deletions dace/codegen/targets/cpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def copy_expr(
offset_cppstr = "0"
dt = ""

is_global = data_desc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent)
is_global = data_desc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent,
dtypes.AllocationLifetime.External)
defined_types = None
# Non-free symbol dependent Arrays due to their shape
dependent_shape = (isinstance(data_desc, data.Array) and not isinstance(data_desc, data.View) and any(
Expand Down Expand Up @@ -219,7 +220,7 @@ def ptr(name: str, desc: data.Data, sdfg: SDFG = None, framecode=None) -> str:

# Special case: If memory is persistent and defined in this SDFG, add state
# struct to name
if (desc.transient and desc.lifetime is dtypes.AllocationLifetime.Persistent):
if (desc.transient and desc.lifetime in (dtypes.AllocationLifetime.Persistent, dtypes.AllocationLifetime.External)):
from dace.codegen.targets.cuda import CUDACodeGen # Avoid import loop

if desc.storage == dtypes.StorageType.CPU_ThreadLocal: # Use unambiguous name for thread-local arrays
Expand Down Expand Up @@ -1252,7 +1253,7 @@ def visit_BinOp(self, node: ast.BinOp):
if isinstance(node.op, ast.Pow):
from dace.frontend.python import astutils
try:
evaluated_node = astutils.evalnode(node.right, {**self.constants, 'dace': dace,'math': math})
evaluated_node = astutils.evalnode(node.right, {**self.constants, 'dace': dace, 'math': math})
unparsed = symbolic.pystr_to_symbolic(evaluated_node)
evaluated_constant = symbolic.evaluate(unparsed, self.constants)
evaluated = symbolic.symstr(evaluated_constant, cpp_mode=True)
Expand Down Expand Up @@ -1356,8 +1357,8 @@ def synchronize_streams(sdfg, dfg, state_id, node, scope_exit, callsite_stream,
if isinstance(desc, data.Array) and desc.start_offset != 0:
ptrname = f'({ptrname} - {sym2cpp(desc.start_offset)})'
if Config.get_bool('compiler', 'cuda', 'syncdebug'):
callsite_stream.write(f'DACE_GPU_CHECK({backend}FreeAsync({ptrname}, {cudastream}));\n', sdfg,
state_id, scope_exit)
callsite_stream.write(f'DACE_GPU_CHECK({backend}FreeAsync({ptrname}, {cudastream}));\n', sdfg, state_id,
scope_exit)
callsite_stream.write(f'DACE_GPU_CHECK({backend}DeviceSynchronize());')
else:
callsite_stream.write(f'{backend}FreeAsync({ptrname}, {cudastream});\n', sdfg, state_id, scope_exit)
Expand All @@ -1381,7 +1382,8 @@ def synchronize_streams(sdfg, dfg, state_id, node, scope_exit, callsite_stream,
and edge.dst._cuda_stream != node._cuda_stream):
callsite_stream.write(
"""DACE_GPU_CHECK({backend}EventRecord(__state->gpu_context->events[{ev}], {src_stream}));
DACE_GPU_CHECK({backend}StreamWaitEvent(__state->gpu_context->streams[{dst_stream}], __state->gpu_context->events[{ev}], 0));""".format(
DACE_GPU_CHECK({backend}StreamWaitEvent(__state->gpu_context->streams[{dst_stream}], __state->gpu_context->events[{ev}], 0));"""
.format(
ev=edge._cuda_event if hasattr(edge, "_cuda_event") else 0,
src_stream=cudastream,
dst_stream=edge.dst._cuda_stream,
Expand Down
19 changes: 13 additions & 6 deletions dace/codegen/targets/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def declare_array(self, sdfg, dfg, state_id, node, nodedesc, function_stream, de
# We add the `dfg is not None` check because the `sdutils.is_nonfree_sym_dependent` check will fail if
# `nodedesc` is a View and `dfg` is None.
if dfg and not sdutils.is_nonfree_sym_dependent(node, nodedesc, dfg, fsymbols):
raise NotImplementedError("The declare_array method should only be used for variables "
raise NotImplementedError("The declare_array method should only be used for variables "
"that must have their declaration and allocation separate.")

name = node.data
Expand Down Expand Up @@ -278,7 +278,7 @@ def allocate_array(self, sdfg, dfg, state_id, node, nodedesc, function_stream, d
declared = self._dispatcher.declared_arrays.has(alloc_name)

define_var = self._dispatcher.defined_vars.add
if nodedesc.lifetime == dtypes.AllocationLifetime.Persistent:
if nodedesc.lifetime in (dtypes.AllocationLifetime.Persistent, dtypes.AllocationLifetime.External):
define_var = self._dispatcher.defined_vars.add_global
nodedesc = update_persistent_desc(nodedesc, sdfg)

Expand Down Expand Up @@ -449,7 +449,8 @@ def deallocate_array(self, sdfg, dfg, state_id, node, nodedesc, function_stream,
alloc_name = f'({alloc_name} - {cpp.sym2cpp(nodedesc.start_offset)})'

if self._dispatcher.declared_arrays.has(alloc_name):
is_global = nodedesc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent)
is_global = nodedesc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent,
dtypes.AllocationLifetime.External)
self._dispatcher.declared_arrays.remove(alloc_name, is_global=is_global)

if isinstance(nodedesc, (data.Scalar, data.View, data.Stream, data.Reference)):
Expand Down Expand Up @@ -932,7 +933,8 @@ def process_out_memlets(self,
desc = sdfg.arrays[memlet.data]
ptrname = cpp.ptr(memlet.data, desc, sdfg, self._frame)
is_global = desc.lifetime in (dtypes.AllocationLifetime.Global,
dtypes.AllocationLifetime.Persistent)
dtypes.AllocationLifetime.Persistent,
dtypes.AllocationLifetime.External)
try:
defined_type, _ = self._dispatcher.declared_arrays.get(ptrname, is_global=is_global)
except KeyError:
Expand Down Expand Up @@ -1430,7 +1432,8 @@ def define_out_memlet(self, sdfg, state_dfg, state_id, src_node, dst_node, edge,
# If pointer, also point to output
desc = sdfg.arrays[edge.data.data]
ptrname = cpp.ptr(edge.data.data, desc, sdfg, self._frame)
is_global = desc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent)
is_global = desc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent,
dtypes.AllocationLifetime.External)
defined_type, _ = self._dispatcher.defined_vars.get(ptrname, is_global=is_global)
base_ptr = cpp.cpp_ptr_expr(sdfg, edge.data, defined_type, codegen=self._frame)
callsite_stream.write(f'{cdtype.ctype} {edge.src_conn} = {base_ptr};', sdfg, state_id, src_node)
Expand All @@ -1448,18 +1451,22 @@ def generate_nsdfg_header(self, sdfg, state, state_id, node, memlet_references,
# Add "__restrict__" keywords to arguments that do not alias with others in the context of this SDFG
restrict_args = []
for atype, aname, _ in memlet_references:

def make_restrict(expr: str) -> str:
# Check whether "restrict" has already been added before and can be added
if expr.strip().endswith('*'):
return '__restrict__'
else:
return ''

if aname in node.sdfg.arrays and not node.sdfg.arrays[aname].may_alias:
restrict_args.append(make_restrict(atype))
else:
restrict_args.append('')

arguments += [f'{atype} {restrict} {aname}' for (atype, aname, _), restrict in zip(memlet_references, restrict_args)]
arguments += [
f'{atype} {restrict} {aname}' for (atype, aname, _), restrict in zip(memlet_references, restrict_args)
]
arguments += [
f'{node.sdfg.symbols[aname].as_arg(aname)}' for aname in sorted(node.symbol_mapping.keys())
if aname not in sdfg.constants
Expand Down

0 comments on commit 36cb24e

Please sign in to comment.