Skip to content

Commit

Permalink
Add CPU_Persistent map schedule (#1330)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbennun authored and alexnick83 committed Aug 2, 2023
1 parent 69cff31 commit 30af8da
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 71 deletions.
3 changes: 2 additions & 1 deletion dace/cli/daceprof.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ def make_sequential(sdfg: dace.SDFG):
for n, _ in sdfg.all_nodes_recursive():
if isinstance(n, dace.nodes.EntryNode):
sched = getattr(n, 'schedule', False)
if sched == dace.ScheduleType.CPU_Multicore or sched == dace.ScheduleType.Default:
if sched in (dace.ScheduleType.CPU_Multicore, dace.ScheduleType.CPU_Persistent,
dace.ScheduleType.Default):
n.schedule = dace.ScheduleType.Sequential

registered.append(dace.hooks.register_sdfg_call_hook(before_hook=make_sequential))
Expand Down
2 changes: 1 addition & 1 deletion dace/codegen/instrumentation/likwid.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class LIKWIDInstrumentationCPU(InstrumentationProvider):
the Likwid tool.
"""

perf_whitelist_schedules = [dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.Sequential]
perf_whitelist_schedules = [dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent, dtypes.ScheduleType.Sequential]

def __init__(self):
self._likwid_used = False
Expand Down
6 changes: 3 additions & 3 deletions dace/codegen/instrumentation/papi.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PAPIInstrumentation(InstrumentationProvider):

_counters: Optional[Set[str]] = None

perf_whitelist_schedules = [dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.Sequential]
perf_whitelist_schedules = [dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent, dtypes.ScheduleType.Sequential]

def __init__(self):
self._papi_used = False
Expand Down Expand Up @@ -350,7 +350,7 @@ def on_consume_entry(self, sdfg, state, node, outer_stream, inner_stream):

@staticmethod
def perf_get_supersection_start_string(node, dfg, unified_id):
if node.map.schedule == dtypes.ScheduleType.CPU_Multicore:
if node.map.schedule in (dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent):
# Nested SuperSections are not supported. Therefore, we mark the
# outermost section and disallow internal scopes from creating it.
if not hasattr(node.map, '_can_be_supersection_start'):
Expand All @@ -360,7 +360,7 @@ def perf_get_supersection_start_string(node, dfg, unified_id):
for x in children:
if not hasattr(x.map, '_can_be_supersection_start'):
x.map._can_be_supersection_start = True
if x.map.schedule == dtypes.ScheduleType.CPU_Multicore:
if x.map.schedule in (dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent):

x.map._can_be_supersection_start = False
elif x.map.schedule == dtypes.ScheduleType.Sequential:
Expand Down
140 changes: 83 additions & 57 deletions dace/codegen/targets/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from dace.sdfg import nodes, utils as sdutils
from dace.sdfg import (ScopeSubgraphView, SDFG, scope_contains_scope, is_array_stream_view, NodeNotExpandedError,
dynamic_map_inputs, local_transients)
from dace.sdfg.scope import is_devicelevel_gpu, is_devicelevel_fpga
from dace.sdfg.scope import is_devicelevel_gpu, is_devicelevel_fpga, is_in_scope
from typing import Union
from dace.codegen.targets import fpga

Expand Down Expand Up @@ -79,7 +79,9 @@ def __init__(self, frame_codegen, sdfg):

# Register dispatchers
dispatcher.register_node_dispatcher(self)
dispatcher.register_map_dispatcher([dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.Sequential], self)
dispatcher.register_map_dispatcher(
[dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent, dtypes.ScheduleType.Sequential],
self)

cpu_storage = [dtypes.StorageType.CPU_Heap, dtypes.StorageType.CPU_ThreadLocal, dtypes.StorageType.Register]
dispatcher.register_array_dispatcher(cpu_storage, self)
Expand Down Expand Up @@ -222,7 +224,7 @@ def declare_array(self, sdfg, dfg, state_id, node, nodedesc, function_stream, de
# We add the `dfg is not None` check because the `sdutils.is_nonfree_sym_dependent` check will fail if
# `nodedesc` is a View and `dfg` is None.
if dfg and not sdutils.is_nonfree_sym_dependent(node, nodedesc, dfg, fsymbols):
raise NotImplementedError("The declare_array method should only be used for variables "
raise NotImplementedError("The declare_array method should only be used for variables "
"that must have their declaration and allocation separate.")

name = node.data
Expand Down Expand Up @@ -1714,66 +1716,87 @@ def _generate_MapEntry(

# TODO: Refactor to generate_scope_preamble once a general code
# generator (that CPU inherits from) is implemented
if node.map.schedule == dtypes.ScheduleType.CPU_Multicore:
map_header += "#pragma omp parallel for"
if node.map.omp_schedule != dtypes.OMPScheduleType.Default:
schedule = " schedule("
if node.map.omp_schedule == dtypes.OMPScheduleType.Static:
schedule += "static"
elif node.map.omp_schedule == dtypes.OMPScheduleType.Dynamic:
schedule += "dynamic"
elif node.map.omp_schedule == dtypes.OMPScheduleType.Guided:
schedule += "guided"
if node.map.schedule in (dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent):
# OpenMP header
in_persistent = False
if node.map.schedule == dtypes.ScheduleType.CPU_Multicore:
in_persistent = is_in_scope(sdfg, state_dfg, node, [dtypes.ScheduleType.CPU_Persistent])
if in_persistent:
# If already in a #pragma omp parallel, no need to use it twice
map_header += "#pragma omp for"
# TODO(later): barriers and map_header += " nowait"
else:
raise ValueError("Unknown OpenMP schedule type")
if node.map.omp_chunk_size > 0:
schedule += f", {node.map.omp_chunk_size}"
schedule += ")"
map_header += schedule
if node.map.omp_num_threads > 0:
map_header += f" num_threads({node.map.omp_num_threads})"
if node.map.collapse > 1:
map_header += "#pragma omp parallel for"

elif node.map.schedule == dtypes.ScheduleType.CPU_Persistent:
map_header += "#pragma omp parallel"

# OpenMP schedule properties
if not in_persistent:
if node.map.omp_schedule != dtypes.OMPScheduleType.Default:
schedule = " schedule("
if node.map.omp_schedule == dtypes.OMPScheduleType.Static:
schedule += "static"
elif node.map.omp_schedule == dtypes.OMPScheduleType.Dynamic:
schedule += "dynamic"
elif node.map.omp_schedule == dtypes.OMPScheduleType.Guided:
schedule += "guided"
else:
raise ValueError("Unknown OpenMP schedule type")
if node.map.omp_chunk_size > 0:
schedule += f", {node.map.omp_chunk_size}"
schedule += ")"
map_header += schedule

if node.map.omp_num_threads > 0:
map_header += f" num_threads({node.map.omp_num_threads})"

# OpenMP nested loop properties
if node.map.schedule == dtypes.ScheduleType.CPU_Multicore and node.map.collapse > 1:
map_header += ' collapse(%d)' % node.map.collapse
# Loop over outputs, add OpenMP reduction clauses to detected cases
# TODO: set up register outside loop
# exit_node = dfg.exit_node(node)
reduction_stmts = []
# for outedge in dfg.in_edges(exit_node):
# if (isinstance(outedge.src, nodes.CodeNode)
# and outedge.data.wcr is not None):
# redt = operations.detect_reduction_type(outedge.data.wcr)
# if redt != dtypes.ReductionType.Custom:
# reduction_stmts.append('reduction({typ}:{var})'.format(
# typ=_REDUCTION_TYPE_TO_OPENMP[redt],
# var=outedge.src_conn))
# reduced_variables.append(outedge)

map_header += " %s\n" % ", ".join(reduction_stmts)

# TODO: Explicit map unroller
if node.map.unroll:
if node.map.schedule == dtypes.ScheduleType.CPU_Multicore:
raise ValueError("A Multicore CPU map cannot be unrolled (" + node.map.label + ")")

constsize = all([not symbolic.issymbolic(v, sdfg.constants) for r in node.map.range for v in r])
if node.map.unroll:
if node.map.schedule in (dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent):
raise ValueError("An OpenMP map cannot be unrolled (" + node.map.label + ")")

# Nested loops
result.write(map_header, sdfg, state_id, node)
for i, r in enumerate(node.map.range):
# var = '__DACEMAP_%s_%d' % (node.map.label, i)
var = map_params[i]
begin, end, skip = r

if node.map.unroll:
result.write("#pragma unroll", sdfg, state_id, node)
if node.map.schedule == dtypes.ScheduleType.CPU_Persistent:
result.write('{\n', sdfg, state_id, node)

# Find if bounds are used within the scope
scope = state_dfg.scope_subgraph(node, False, False)
fsyms = scope.free_symbols
# Include external edges
for n in scope.nodes():
for e in state_dfg.all_edges(n):
fsyms |= e.data.free_symbols
fsyms = set(map(str, fsyms))

ntid_is_used = '__omp_num_threads' in fsyms
tid_is_used = node.map.params[0] in fsyms
if tid_is_used or ntid_is_used:
function_stream.write('#include <omp.h>', sdfg, state_id, node)
if tid_is_used:
result.write(f'auto {node.map.params[0]} = omp_get_thread_num();', sdfg, state_id, node)
if ntid_is_used:
result.write(f'auto __omp_num_threads = omp_get_num_threads();', sdfg, state_id, node)
else:
# Emit nested loops
for i, r in enumerate(node.map.range):
var = map_params[i]
begin, end, skip = r

result.write(
"for (auto %s = %s; %s < %s; %s += %s) {\n" %
(var, cpp.sym2cpp(begin), var, cpp.sym2cpp(end + 1), var, cpp.sym2cpp(skip)),
sdfg,
state_id,
node,
)
if node.map.unroll:
result.write("#pragma unroll", sdfg, state_id, node)

result.write(
"for (auto %s = %s; %s < %s; %s += %s) {\n" %
(var, cpp.sym2cpp(begin), var, cpp.sym2cpp(end + 1), var, cpp.sym2cpp(skip)),
sdfg,
state_id,
node,
)

callsite_stream.write(inner_stream.getvalue())

Expand Down Expand Up @@ -1803,8 +1826,11 @@ def _generate_MapExit(self, sdfg, dfg, state_id, node, function_stream, callsite

self.generate_scope_postamble(sdfg, dfg, state_id, function_stream, outer_stream, callsite_stream)

for _ in map_node.map.range:
if map_node.map.schedule == dtypes.ScheduleType.CPU_Persistent:
result.write("}", sdfg, state_id, node)
else:
for _ in map_node.map.range:
result.write("}", sdfg, state_id, node)

result.write(outer_stream.getvalue())

Expand Down
13 changes: 8 additions & 5 deletions dace/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class ScheduleType(aenum.AutoNumberEnum):
Default = () #: Scope-default parallel schedule
Sequential = () #: Sequential code (single-thread)
MPI = () #: MPI processes
CPU_Multicore = () #: OpenMP
CPU_Multicore = () #: OpenMP parallel for loop
CPU_Persistent = () #: OpenMP parallel region
Unrolled = () #: Unrolled code
SVE_Map = () #: Arm SVE

Expand Down Expand Up @@ -188,6 +189,7 @@ class TilingType(aenum.AutoNumberEnum):
ScheduleType.Sequential: StorageType.Register,
ScheduleType.MPI: StorageType.CPU_Heap,
ScheduleType.CPU_Multicore: StorageType.Register,
ScheduleType.CPU_Persistent: StorageType.CPU_Heap,
ScheduleType.GPU_Default: StorageType.GPU_Global,
ScheduleType.GPU_Persistent: StorageType.GPU_Global,
ScheduleType.GPU_Device: StorageType.GPU_Shared,
Expand All @@ -205,6 +207,7 @@ class TilingType(aenum.AutoNumberEnum):
ScheduleType.Sequential: ScheduleType.Sequential,
ScheduleType.MPI: ScheduleType.CPU_Multicore,
ScheduleType.CPU_Multicore: ScheduleType.Sequential,
ScheduleType.CPU_Persistent: ScheduleType.CPU_Multicore,
ScheduleType.Unrolled: ScheduleType.CPU_Multicore,
ScheduleType.GPU_Default: ScheduleType.GPU_Device,
ScheduleType.GPU_Persistent: ScheduleType.GPU_Device,
Expand Down Expand Up @@ -1432,7 +1435,7 @@ def can_access(schedule: ScheduleType, storage: StorageType):
ScheduleType.GPU_Default,
]:
return storage in [StorageType.GPU_Global, StorageType.GPU_Shared, StorageType.CPU_Pinned]
elif schedule in [ScheduleType.Default, ScheduleType.CPU_Multicore]:
elif schedule in [ScheduleType.Default, ScheduleType.CPU_Multicore, ScheduleType.CPU_Persistent]:
return storage in [
StorageType.Default, StorageType.CPU_Heap, StorageType.CPU_Pinned, StorageType.CPU_ThreadLocal
]
Expand Down Expand Up @@ -1460,19 +1463,19 @@ def can_allocate(storage: StorageType, schedule: ScheduleType):
# Host-only allocation
if storage in [StorageType.CPU_Heap, StorageType.CPU_Pinned, StorageType.CPU_ThreadLocal]:
return schedule in [
ScheduleType.CPU_Multicore, ScheduleType.Sequential, ScheduleType.MPI, ScheduleType.GPU_Default
ScheduleType.CPU_Multicore, ScheduleType.CPU_Persistent, ScheduleType.Sequential, ScheduleType.MPI, ScheduleType.GPU_Default
]

# GPU-global memory
if storage is StorageType.GPU_Global:
return schedule in [
ScheduleType.CPU_Multicore, ScheduleType.Sequential, ScheduleType.MPI, ScheduleType.GPU_Default
ScheduleType.CPU_Multicore, ScheduleType.CPU_Persistent, ScheduleType.Sequential, ScheduleType.MPI, ScheduleType.GPU_Default
]

# FPGA-global memory
if storage is StorageType.FPGA_Global:
return schedule in [
ScheduleType.CPU_Multicore, ScheduleType.Sequential, ScheduleType.MPI, ScheduleType.FPGA_Device,
ScheduleType.CPU_Multicore, ScheduleType.CPU_Persistent, ScheduleType.Sequential, ScheduleType.MPI, ScheduleType.FPGA_Device,
ScheduleType.GPU_Default
]

Expand Down
6 changes: 3 additions & 3 deletions dace/sdfg/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,17 +851,17 @@ class Map(object):
default=0,
desc="Number of OpenMP threads executing the Map",
optional=True,
optional_condition=lambda m: m.schedule == dtypes.ScheduleType.CPU_Multicore)
optional_condition=lambda m: m.schedule in (dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent))
omp_schedule = EnumProperty(dtype=dtypes.OMPScheduleType,
default=dtypes.OMPScheduleType.Default,
desc="OpenMP schedule {static, dynamic, guided}",
optional=True,
optional_condition=lambda m: m.schedule == dtypes.ScheduleType.CPU_Multicore)
optional_condition=lambda m: m.schedule in (dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent))
omp_chunk_size = Property(dtype=int,
default=0,
desc="OpenMP schedule chunk size",
optional=True,
optional_condition=lambda m: m.schedule == dtypes.ScheduleType.CPU_Multicore)
optional_condition=lambda m: m.schedule in (dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent))

gpu_block_size = ListProperty(element_type=int,
default=None,
Expand Down
3 changes: 2 additions & 1 deletion dace/transformation/interstate/sdfg_nesting.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,8 @@ def can_be_applied(self, graph: SDFGState, expr_index: int, sdfg: SDFG, permissi
# Not every schedule is supported
if not permissive:
if nsdfg.schedule not in (None, dtypes.ScheduleType.Default, dtypes.ScheduleType.Sequential,
dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.GPU_Device):
dtypes.ScheduleType.CPU_Multicore, dtypes.ScheduleType.CPU_Persistent,
dtypes.ScheduleType.GPU_Device):
return False

candidates = InlineTransients._candidates(sdfg, graph, nsdfg)
Expand Down

0 comments on commit 30af8da

Please sign in to comment.