Skip to content

Commit

Permalink
Merge pull request #1322 from spcl/gpu-persistent-fix
Browse files Browse the repository at this point in the history
Fixes for persistent schedule and GPUPersistentFusion transformation
  • Loading branch information
tbennun committed Jul 18, 2023
2 parents 0d37a94 + 152b69c commit 4991776
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 134 deletions.
27 changes: 24 additions & 3 deletions dace/codegen/targets/cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -1306,10 +1306,31 @@ def generate_devicelevel_state(self, sdfg, state, function_stream, callsite_stre
for c in components:

has_map = any(isinstance(node, dace.nodes.MapEntry) for node in c.nodes())
# If a global is modified, execute once per global state,
# if a shared memory element is modified, execute once per block,
# if a local scalar is modified, execute in every thread.
if not has_map:
callsite_stream.write("if (blockIdx.x == 0 "
"&& threadIdx.x == 0) "
"{ // sub-graph begin", sdfg, state.node_id)
written_nodes = [n for n in c if state.in_degree(n) > 0 and isinstance(n, dace.nodes.AccessNode)]

# The order of the branching below matters - it reduces the scope with every detected write
write_scope = 'thread' # General case acts in every thread
if any(sdfg.arrays[n.data].storage in (dtypes.StorageType.GPU_Global, dtypes.StorageType.CPU_Pinned)
for n in written_nodes):
write_scope = 'grid'
if any(sdfg.arrays[n.data].storage == dtypes.StorageType.GPU_Shared for n in written_nodes):
write_scope = 'block'
if any(sdfg.arrays[n.data].storage == dtypes.StorageType.Register for n in written_nodes):
write_scope = 'thread'

if write_scope == 'grid':
callsite_stream.write("if (blockIdx.x == 0 "
"&& threadIdx.x == 0) "
"{ // sub-graph begin", sdfg, state.node_id)
elif write_scope == 'block':
callsite_stream.write("if (threadIdx.x == 0) "
"{ // sub-graph begin", sdfg, state.node_id)
else:
callsite_stream.write("{ // subgraph begin", sdfg, state.node_id)
else:
callsite_stream.write("{ // subgraph begin", sdfg, state.node_id)

Expand Down
9 changes: 8 additions & 1 deletion dace/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,11 +894,18 @@ def from_json(self, l, sdfg=None):
return set(l)

def __get__(self, obj, objtype=None):
val = super(SetProperty, self).__get__(obj, objtype)
if val is None:
return val

# Copy to avoid changes in the set at callee to be reflected in
# the node directly
return set(super(SetProperty, self).__get__(obj, objtype))
return set(val)

def __set__(self, obj, val):
if val is None:
return super(SetProperty, self).__set__(obj, val)

# Check for uniqueness
if len(val) != len(set(val)):
dups = set([x for x in val if val.count(x) > 1])
Expand Down
13 changes: 10 additions & 3 deletions dace/transformation/passes/prune_symbols.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class RemoveUnusedSymbols(ppl.Pass):
CATEGORY: str = 'Simplification'

recursive = properties.Property(dtype=bool, default=True, desc='Prune nested SDFGs recursively')
symbols = properties.SetProperty(element_type=str, allow_none=True, desc='Limit considered symbols to this set')

def modifies(self) -> ppl.Modifies:
return ppl.Modifies.Symbols
Expand All @@ -43,13 +44,16 @@ def apply_pass(self, sdfg: SDFG, _) -> Optional[Set[Tuple[int, str]]]:
"""
result: Set[str] = set()

symbols_to_consider = self.symbols or set(sdfg.symbols.keys())

# Compute used symbols
used_symbols = self.used_symbols(sdfg)

# Remove unused symbols
for sym in set(sdfg.symbols.keys()) - used_symbols:
sdfg.remove_symbol(sym)
result.add(sym)
for sym in symbols_to_consider - used_symbols:
if sym in sdfg.symbols:
sdfg.remove_symbol(sym)
result.add(sym)

if self.recursive:
# Prune nested SDFGs recursively
Expand All @@ -59,7 +63,10 @@ def apply_pass(self, sdfg: SDFG, _) -> Optional[Set[Tuple[int, str]]]:
for state in sdfg.nodes():
for node in state.nodes():
if isinstance(node, nodes.NestedSDFG):
old_symbols = self.symbols
self.symbols = set()
nres = self.apply_pass(node.sdfg, _)
self.symbols = old_symbols
if nres:
result.update(nres)

Expand Down
59 changes: 46 additions & 13 deletions dace/transformation/subgraph/gpu_persistent_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class GPUPersistentKernel(SubgraphTransformation):
validate = Property(
desc="Validate the sdfg and the nested sdfg",
dtype=bool,
default=True,
default=False,
)

include_in_assignment = Property(
Expand Down Expand Up @@ -172,10 +172,15 @@ def apply(self, sdfg: SDFG):
# create sdfg for kernel and fill it with states and edges from
# ssubgraph dfg will be nested at the end
kernel_sdfg = SDFG('{}kernel'.format(self.kernel_prefix + '_' if self.kernel_prefix != '' else ''))
new_symbols = set()

edges = subgraph.edges()
for edge in edges:
kernel_sdfg.add_edge(edge.src, edge.dst, edge.data)
for k in entry_edge.data.assignments:
new_symbols.add(k)
if k in sdfg.symbols and k not in kernel_sdfg.symbols:
kernel_sdfg.add_symbol(k, sdfg.symbols[k])

# Setting entry node in nested SDFG if no entry guard was created
if entry_guard_state is None:
Expand All @@ -187,6 +192,7 @@ def apply(self, sdfg: SDFG):
# remove the now nested nodes from the outer sdfg and make sure the
# launch state is properly connected to remaining states
sdfg.remove_nodes_from(subgraph.nodes())
other_states = sdfg.nodes()

if entry_state_out is not None \
and len(sdfg.edges_between(entry_state_out, launch_state)) == 0:
Expand All @@ -199,13 +205,16 @@ def apply(self, sdfg: SDFG):
# Handle data for kernel
kernel_data = set(node.data for state in kernel_sdfg for node in state.nodes()
if isinstance(node, nodes.AccessNode))
other_data = set(node.data for state in other_states for node in state.nodes()
if isinstance(node, nodes.AccessNode))

# move Streams and Register data into the nested SDFG
# normal data will be added as kernel argument
kernel_args = []
for data in kernel_data:
if (isinstance(sdfg.arrays[data], dace.data.Stream) or
(isinstance(sdfg.arrays[data], dace.data.Array) and sdfg.arrays[data].storage == StorageType.Register)):
if data not in other_data and (isinstance(sdfg.arrays[data], dace.data.Stream) or
(isinstance(sdfg.arrays[data], dace.data.Array) and sdfg.arrays[data].storage
in (StorageType.Register, StorageType.GPU_Shared))):
kernel_sdfg.add_datadesc(data, sdfg.arrays[data])
del sdfg.arrays[data]
else:
Expand Down Expand Up @@ -248,23 +257,35 @@ def apply(self, sdfg: SDFG):
)
nested_sdfg.schedule = ScheduleType.GPU_Persistent

# If no inputs or outputs were given, connect with an empty memlet
if not kernel_args_read:
launch_state.add_nedge(map_entry, nested_sdfg, dace.Memlet())
if not kernel_args_write:
launch_state.add_nedge(nested_sdfg, map_exit, dace.Memlet())

# Create and connect read only data access nodes
for arg in kernel_args_read:
read_node = launch_state.add_read(arg)
launch_state.add_memlet_path(read_node,
map_entry,
nested_sdfg,
dst_conn=arg,
memlet=Memlet.from_array(arg, sdfg.arrays[arg]))
launch_state.add_edge_pair(map_entry,
nested_sdfg,
read_node,
internal_connector=arg,
internal_memlet=Memlet.from_array(arg, sdfg.arrays[arg]))

# Create and connect writable data access nodes
for arg in kernel_args_write:
write_node = launch_state.add_write(arg)
launch_state.add_memlet_path(nested_sdfg,
map_exit,
write_node,
src_conn=arg,
memlet=Memlet.from_array(arg, sdfg.arrays[arg]))
launch_state.add_edge_pair(map_exit,
nested_sdfg,
write_node,
internal_connector=arg,
internal_memlet=Memlet.from_array(arg, sdfg.arrays[arg]))

# Remove no-longer-used symbols in parent SDFG
from dace.transformation.passes.prune_symbols import RemoveUnusedSymbols
p = RemoveUnusedSymbols()
p.symbols = new_symbols
p.apply_pass(sdfg, {})

# Transformation is done
if self.validate:
Expand Down Expand Up @@ -303,6 +324,12 @@ def is_gpu_state(sdfg: SDFG, state: SDFGState) -> bool:

@staticmethod
def get_entry_states(sdfg: SDFG, subgraph):
"""
Returns a 2-tuple of the (internal, external) states inside and outside of the SDFG,
around which the new nested SDFG will be created. The first element will be a set
of source nodes in the internal SDFG; and the second element will be a set of
predecessor nodes to the nested SDFG.
"""
entry_states_in = set()
entry_states_out = set()

Expand All @@ -318,6 +345,12 @@ def get_entry_states(sdfg: SDFG, subgraph):

@staticmethod
def get_exit_states(sdfg: SDFG, subgraph):
"""
Returns a 2-tuple of the (internal, external) states inside and outside of the SDFG,
around which the new nested SDFG will be created. The first element will be a set
of sink nodes in the internal SDFG; and the second element will be a set of
successor nodes to the nested SDFG.
"""
exit_states_in = set()
exit_states_out = set()

Expand Down
91 changes: 91 additions & 0 deletions tests/codegen/gpu_scalar_execution_context_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2019-2023 ETH Zurich and the DaCe authors. All rights reserved.
"""
Tests how code is generated for free tasklets inside a GPU kernel nested SDFG.
"""

import dace
from dace.sdfg.graph import SubgraphView
from dace.transformation.subgraph import GPUPersistentKernel
import numpy as np
import pytest


def _tester(A: dace.float64[64]):
t = 12.3
for _ in range(5):
A += t
t += 1.01


def _modify_array(sdfg: dace.SDFG, storage: dace.StorageType):
for nsdfg, aname, aval in sdfg.arrays_recursive():
if aname == 't':
if storage == dace.StorageType.GPU_Shared:
aval = dace.data.Array(aval.dtype, [1], transient=aval.transient)
nsdfg.arrays[aname] = aval
aval.storage = storage
break
else:
raise ValueError('Array not found')


def _make_program(storage: dace.StorageType, persistent=False):
sdfg = dace.program(_tester).to_sdfg()
sdfg.apply_gpu_transformations(simplify=False)
_modify_array(sdfg, storage)

if persistent:
content_nodes = set(sdfg.nodes()) - {sdfg.start_state, sdfg.sink_nodes()[0]}
subgraph = SubgraphView(sdfg, content_nodes)
transform = GPUPersistentKernel()
transform.setup_match(subgraph)
transform.apply(sdfg)

return sdfg


@pytest.mark.gpu
def test_global_scalar_update():
sdfg = _make_program(dace.StorageType.GPU_Global, True)
a = np.random.rand(64)
aref = np.copy(a)
_tester(aref)
sdfg(a)
assert np.allclose(a, aref)


@pytest.mark.gpu
def test_shared_scalar_update():
sdfg = _make_program(dace.StorageType.GPU_Shared, persistent=True)

a = np.random.rand(64)
aref = np.copy(a)
_tester(aref)

# Ensure block size will create at least two thread-blocks
with dace.config.set_temporary('compiler', 'cuda', 'persistent_map_SM_fraction', value=0.0001):
with dace.config.set_temporary('compiler', 'cuda', 'persistent_map_occupancy', value=2):
with dace.config.set_temporary('compiler', 'cuda', 'default_block_size', value='32,1,1'):
sdfg(a)

assert np.allclose(a, aref)


@pytest.mark.gpu
@pytest.mark.parametrize('persistent', (False, True))
def test_register_scalar_update(persistent):
sdfg = _make_program(dace.StorageType.Register, persistent)

a = np.random.rand(64)
aref = np.copy(a)
_tester(aref)
sdfg(a)

assert np.allclose(a, aref)


if __name__ == '__main__':
test_global_scalar_update()
test_shared_scalar_update()
test_register_scalar_update(False)
test_register_scalar_update(True)

0 comments on commit 4991776

Please sign in to comment.