Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Externally-managed memory lifetime #1294

Merged
merged 4 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions dace/codegen/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,6 @@ def generate_code(sdfg, validate=True) -> List[CodeObject]:
shutil.move(f"{tmp_dir}/test2.sdfg", "test2.sdfg")
raise RuntimeError('SDFG serialization failed - files do not match')

# Run with the deserialized version
# NOTE: This means that all subsequent modifications to `sdfg`
# are not reflected outside of this function (e.g., library
# node expansion).
sdfg = sdfg2

# Before generating the code, run type inference on the SDFG connectors
infer_types.infer_connector_types(sdfg)
Expand Down
4 changes: 2 additions & 2 deletions dace/codegen/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def update_persistent_desc(desc: data.Data, sdfg: SDFG):
Replaces the symbols used in a persistent data descriptor according to NestedSDFG's symbol mapping.
The replacement happens recursively up to the top-level SDFG.
"""
if (desc.lifetime == dtypes.AllocationLifetime.Persistent and sdfg.parent
if (desc.lifetime in (dtypes.AllocationLifetime.Persistent, dtypes.AllocationLifetime.External) and sdfg.parent
and any(str(s) in sdfg.parent_nsdfg_node.symbol_mapping for s in desc.free_symbols)):
newdesc = deepcopy(desc)
csdfg = sdfg
Expand Down Expand Up @@ -155,7 +155,7 @@ def get_gpu_runtime() -> gpu_runtime.GPURuntime:
backend = get_gpu_backend()
if backend == 'cuda':
libpath = ctypes.util.find_library('cudart')
if os.name == 'nt' and not libpath: # Windows-based search
if os.name == 'nt' and not libpath: # Windows-based search
for version in (12, 11, 10, 9):
libpath = ctypes.util.find_library(f'cudart64_{version}0')
if libpath:
Expand Down
50 changes: 44 additions & 6 deletions dace/codegen/compiled_sdfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,20 @@ def __exit__(self, *args, **kwargs):
self.unload()


def _array_interface_ptr(array: Any, array_type: dt.Array) -> int:
def _array_interface_ptr(array: Any, storage: dtypes.StorageType) -> int:
"""
If the given array implements ``__array_interface__`` (see
``dtypes.is_array``), returns the base host or device pointer to the
array's allocated memory.

:param array: Array object that implements NumPy's array interface.
:param array_type: Data descriptor of the array (used to get storage
location to determine whether it's a host or GPU device
pointer).
:param array_type: Storage location of the array, used to determine whether
it is a host or device pointer (e.g. GPU).
:return: A pointer to the base location of the allocated buffer.
"""
if hasattr(array, 'data_ptr'):
return array.data_ptr()
if array_type.storage == dtypes.StorageType.GPU_Global:
if storage == dtypes.StorageType.GPU_Global:
return array.__cuda_array_interface__['data'][0]
return array.__array_interface__['data'][0]

Expand Down Expand Up @@ -200,10 +199,13 @@ def __init__(self, sdfg, lib: ReloadableDLL, argnames: List[str] = None):
self.argnames = argnames

self.has_gpu_code = False
self.external_memory_types = set()
for _, _, aval in self._sdfg.arrays_recursive():
if aval.storage in dtypes.GPU_STORAGES:
self.has_gpu_code = True
break
if aval.lifetime == dtypes.AllocationLifetime.External:
self.external_memory_types.add(aval.storage)
if not self.has_gpu_code:
for node, _ in self._sdfg.all_nodes_recursive():
if getattr(node, 'schedule', False) in dtypes.GPU_SCHEDULES:
Expand Down Expand Up @@ -271,6 +273,42 @@ class State(ctypes.Structure):

return State

def get_workspace_sizes(self) -> Dict[dtypes.StorageType, int]:
"""
Returns the total external memory size to be allocated for this SDFG.

:return: A dictionary mapping storage types to the number of bytes necessary
to allocate for the SDFG to work properly.
"""
if not self._initialized:
raise ValueError('Compiled SDFG is uninitialized, please call ``initialize`` prior to '
'querying external memory size.')

result: Dict[dtypes.StorageType, int] = {}
for storage in self.external_memory_types:
func = self._lib.get_symbol(f'__dace_get_external_memory_size_{storage.name}')
result[storage] = func(self._libhandle, *self._lastargs[1])

return result

def set_workspace(self, storage: dtypes.StorageType, workspace: Any):
"""
Sets the workspace for the given storage type to the given buffer.

:param storage: The storage type to fill.
:param workspace: An array-convertible object (through ``__[cuda_]array_interface__``,
see ``_array_interface_ptr``) to use for the workspace.
"""
if not self._initialized:
raise ValueError('Compiled SDFG is uninitialized, please call ``initialize`` prior to '
'setting external memory.')
if storage not in self.external_memory_types:
raise ValueError(f'Compiled SDFG does not specify external memory of {storage}')

func = self._lib.get_symbol(f'__dace_set_external_memory_{storage.name}', None)
ptr = _array_interface_ptr(workspace, storage)
func(self._libhandle, ctypes.c_void_p(ptr), *self._lastargs[1])

@property
def filename(self):
return self._lib._library_filename
Expand Down Expand Up @@ -487,7 +525,7 @@ def _construct_args(self, kwargs) -> Tuple[Tuple[Any], Tuple[Any]]:
for arg, actype, atype, aname in callparams if aname in symbols)

# Replace arrays with their base host/device pointers
newargs = tuple((ctypes.c_void_p(_array_interface_ptr(arg, atype)), actype,
newargs = tuple((ctypes.c_void_p(_array_interface_ptr(arg, atype.storage)), actype,
atype) if dtypes.is_array(arg) else (arg, actype, atype)
for arg, actype, atype, _ in callparams)

Expand Down
17 changes: 13 additions & 4 deletions dace/codegen/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class DefinedMemlets:
referenced correctly in nested scopes and SDFGs.
The ones defined in the first (top) scope, refer to global variables.
"""

def __init__(self):
self._scopes = [(None, {}, True), (None, {}, True)]

Expand Down Expand Up @@ -142,6 +143,7 @@ def remove(self, name: str, ancestor: int = 0, is_global: bool = False) -> Tuple
class TargetDispatcher(object):
""" Dispatches sub-SDFG generation (according to scope),
storage<->storage copies, and storage<->tasklet copies to targets. """

def __init__(self, framecode):
# Avoid import loop
from dace.codegen.targets import framecode as fc
Expand Down Expand Up @@ -215,7 +217,8 @@ def register_state_dispatcher(self, dispatcher, predicate=None):
"""

if not hasattr(dispatcher, "generate_state"):
raise TypeError("State dispatcher \"{}\" does not " "implement \"generate_state\"".format(dispatcher))
raise TypeError("State dispatcher \"{}\" does not "
"implement \"generate_state\"".format(dispatcher))
if predicate is None:
self._generic_state_dispatcher = dispatcher
else:
Expand All @@ -241,7 +244,8 @@ def register_node_dispatcher(self, dispatcher, predicate=None):
:see: TargetCodeGenerator
"""
if not hasattr(dispatcher, "generate_node"):
raise TypeError("Node dispatcher must " "implement \"generate_node\"")
raise TypeError("Node dispatcher must "
"implement \"generate_node\"")
if predicate is None:
self._generic_node_dispatcher = dispatcher
else:
Expand Down Expand Up @@ -448,9 +452,12 @@ def dispatch_allocate(self,
""" Dispatches a code generator for data allocation. """
self._used_targets.add(self._array_dispatchers[datadesc.storage])

if datadesc.lifetime is dtypes.AllocationLifetime.Persistent:
if datadesc.lifetime == dtypes.AllocationLifetime.Persistent:
declaration_stream = CodeIOStream()
callsite_stream = self.frame._initcode
elif datadesc.lifetime == dtypes.AllocationLifetime.External:
declaration_stream = CodeIOStream()
callsite_stream = CodeIOStream()
else:
declaration_stream = callsite_stream

Expand All @@ -468,8 +475,10 @@ def dispatch_deallocate(self, sdfg: SDFG, dfg: ScopeSubgraphView, state_id: int,
""" Dispatches a code generator for a data deallocation. """
self._used_targets.add(self._array_dispatchers[datadesc.storage])

if datadesc.lifetime is dtypes.AllocationLifetime.Persistent:
if datadesc.lifetime == dtypes.AllocationLifetime.Persistent:
callsite_stream = self.frame._exitcode
elif datadesc.lifetime == dtypes.AllocationLifetime.External:
return

self._array_dispatchers[datadesc.storage].deallocate_array(sdfg, dfg, state_id, node, datadesc, function_stream,
callsite_stream)
Expand Down
14 changes: 8 additions & 6 deletions dace/codegen/targets/cpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def copy_expr(
offset_cppstr = "0"
dt = ""

is_global = data_desc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent)
is_global = data_desc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent,
dtypes.AllocationLifetime.External)
defined_types = None
# Non-free symbol dependent Arrays due to their shape
dependent_shape = (isinstance(data_desc, data.Array) and not isinstance(data_desc, data.View) and any(
Expand Down Expand Up @@ -219,7 +220,7 @@ def ptr(name: str, desc: data.Data, sdfg: SDFG = None, framecode=None) -> str:

# Special case: If memory is persistent and defined in this SDFG, add state
# struct to name
if (desc.transient and desc.lifetime is dtypes.AllocationLifetime.Persistent):
if (desc.transient and desc.lifetime in (dtypes.AllocationLifetime.Persistent, dtypes.AllocationLifetime.External)):
from dace.codegen.targets.cuda import CUDACodeGen # Avoid import loop

if desc.storage == dtypes.StorageType.CPU_ThreadLocal: # Use unambiguous name for thread-local arrays
Expand Down Expand Up @@ -1252,7 +1253,7 @@ def visit_BinOp(self, node: ast.BinOp):
if isinstance(node.op, ast.Pow):
from dace.frontend.python import astutils
try:
evaluated_node = astutils.evalnode(node.right, {**self.constants, 'dace': dace,'math': math})
evaluated_node = astutils.evalnode(node.right, {**self.constants, 'dace': dace, 'math': math})
unparsed = symbolic.pystr_to_symbolic(evaluated_node)
evaluated_constant = symbolic.evaluate(unparsed, self.constants)
evaluated = symbolic.symstr(evaluated_constant, cpp_mode=True)
Expand Down Expand Up @@ -1356,8 +1357,8 @@ def synchronize_streams(sdfg, dfg, state_id, node, scope_exit, callsite_stream,
if isinstance(desc, data.Array) and desc.start_offset != 0:
ptrname = f'({ptrname} - {sym2cpp(desc.start_offset)})'
if Config.get_bool('compiler', 'cuda', 'syncdebug'):
callsite_stream.write(f'DACE_GPU_CHECK({backend}FreeAsync({ptrname}, {cudastream}));\n', sdfg,
state_id, scope_exit)
callsite_stream.write(f'DACE_GPU_CHECK({backend}FreeAsync({ptrname}, {cudastream}));\n', sdfg, state_id,
scope_exit)
callsite_stream.write(f'DACE_GPU_CHECK({backend}DeviceSynchronize());')
else:
callsite_stream.write(f'{backend}FreeAsync({ptrname}, {cudastream});\n', sdfg, state_id, scope_exit)
Expand All @@ -1381,7 +1382,8 @@ def synchronize_streams(sdfg, dfg, state_id, node, scope_exit, callsite_stream,
and edge.dst._cuda_stream != node._cuda_stream):
callsite_stream.write(
"""DACE_GPU_CHECK({backend}EventRecord(__state->gpu_context->events[{ev}], {src_stream}));
DACE_GPU_CHECK({backend}StreamWaitEvent(__state->gpu_context->streams[{dst_stream}], __state->gpu_context->events[{ev}], 0));""".format(
DACE_GPU_CHECK({backend}StreamWaitEvent(__state->gpu_context->streams[{dst_stream}], __state->gpu_context->events[{ev}], 0));"""
.format(
ev=edge._cuda_event if hasattr(edge, "_cuda_event") else 0,
src_stream=cudastream,
dst_stream=edge.dst._cuda_stream,
Expand Down
19 changes: 13 additions & 6 deletions dace/codegen/targets/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def declare_array(self, sdfg, dfg, state_id, node, nodedesc, function_stream, de
# We add the `dfg is not None` check because the `sdutils.is_nonfree_sym_dependent` check will fail if
# `nodedesc` is a View and `dfg` is None.
if dfg and not sdutils.is_nonfree_sym_dependent(node, nodedesc, dfg, fsymbols):
raise NotImplementedError("The declare_array method should only be used for variables "
raise NotImplementedError("The declare_array method should only be used for variables "
"that must have their declaration and allocation separate.")

name = node.data
Expand Down Expand Up @@ -278,7 +278,7 @@ def allocate_array(self, sdfg, dfg, state_id, node, nodedesc, function_stream, d
declared = self._dispatcher.declared_arrays.has(alloc_name)

define_var = self._dispatcher.defined_vars.add
if nodedesc.lifetime == dtypes.AllocationLifetime.Persistent:
if nodedesc.lifetime in (dtypes.AllocationLifetime.Persistent, dtypes.AllocationLifetime.External):
define_var = self._dispatcher.defined_vars.add_global
nodedesc = update_persistent_desc(nodedesc, sdfg)

Expand Down Expand Up @@ -449,7 +449,8 @@ def deallocate_array(self, sdfg, dfg, state_id, node, nodedesc, function_stream,
alloc_name = f'({alloc_name} - {cpp.sym2cpp(nodedesc.start_offset)})'

if self._dispatcher.declared_arrays.has(alloc_name):
is_global = nodedesc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent)
is_global = nodedesc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent,
dtypes.AllocationLifetime.External)
self._dispatcher.declared_arrays.remove(alloc_name, is_global=is_global)

if isinstance(nodedesc, (data.Scalar, data.View, data.Stream, data.Reference)):
Expand Down Expand Up @@ -932,7 +933,8 @@ def process_out_memlets(self,
desc = sdfg.arrays[memlet.data]
ptrname = cpp.ptr(memlet.data, desc, sdfg, self._frame)
is_global = desc.lifetime in (dtypes.AllocationLifetime.Global,
dtypes.AllocationLifetime.Persistent)
dtypes.AllocationLifetime.Persistent,
dtypes.AllocationLifetime.External)
try:
defined_type, _ = self._dispatcher.declared_arrays.get(ptrname, is_global=is_global)
except KeyError:
Expand Down Expand Up @@ -1430,7 +1432,8 @@ def define_out_memlet(self, sdfg, state_dfg, state_id, src_node, dst_node, edge,
# If pointer, also point to output
desc = sdfg.arrays[edge.data.data]
ptrname = cpp.ptr(edge.data.data, desc, sdfg, self._frame)
is_global = desc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent)
is_global = desc.lifetime in (dtypes.AllocationLifetime.Global, dtypes.AllocationLifetime.Persistent,
dtypes.AllocationLifetime.External)
defined_type, _ = self._dispatcher.defined_vars.get(ptrname, is_global=is_global)
base_ptr = cpp.cpp_ptr_expr(sdfg, edge.data, defined_type, codegen=self._frame)
callsite_stream.write(f'{cdtype.ctype} {edge.src_conn} = {base_ptr};', sdfg, state_id, src_node)
Expand All @@ -1448,18 +1451,22 @@ def generate_nsdfg_header(self, sdfg, state, state_id, node, memlet_references,
# Add "__restrict__" keywords to arguments that do not alias with others in the context of this SDFG
restrict_args = []
for atype, aname, _ in memlet_references:

def make_restrict(expr: str) -> str:
# Check whether "restrict" has already been added before and can be added
if expr.strip().endswith('*'):
return '__restrict__'
else:
return ''

if aname in node.sdfg.arrays and not node.sdfg.arrays[aname].may_alias:
restrict_args.append(make_restrict(atype))
else:
restrict_args.append('')

arguments += [f'{atype} {restrict} {aname}' for (atype, aname, _), restrict in zip(memlet_references, restrict_args)]
arguments += [
f'{atype} {restrict} {aname}' for (atype, aname, _), restrict in zip(memlet_references, restrict_args)
]
arguments += [
f'{node.sdfg.symbols[aname].as_arg(aname)}' for aname in sorted(node.symbol_mapping.keys())
if aname not in sdfg.constants
Expand Down