Skip to content

Commit

Permalink
Merge pull request #1246 from spcl/fix-dynamic-tb
Browse files Browse the repository at this point in the history
Assortment of fixes for dynamic Maps on GPU (dynamic thread blocks)
  • Loading branch information
alexnick83 committed Apr 28, 2023
2 parents 0436f91 + 5a410d5 commit 532bc64
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 5 deletions.
37 changes: 32 additions & 5 deletions dace/codegen/targets/cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,10 @@ def generate_scope(self, sdfg, dfg_scope, state_id, function_stream, callsite_st
(kernel_name, ', '.join(state_param + kernel_args_typed + extra_call_args_typed)), sdfg, state_id,
scope_entry)

# If there are dynamic Map inputs, put the kernel invocation in its own scope to avoid redefinitions.
if dace.sdfg.has_dynamic_map_inputs(state, scope_entry):
callsite_stream.write('{', sdfg, state_id, scope_entry)

# Synchronize all events leading to dynamic map range connectors
for e in dace.sdfg.dynamic_map_inputs(state, scope_entry):
if hasattr(e, '_cuda_event'):
Expand All @@ -1619,6 +1623,10 @@ def generate_scope(self, sdfg, dfg_scope, state_id, function_stream, callsite_st
for aname, arg in kernel_args.items()] + extra_call_args)), sdfg, state_id,
scope_entry)

# If there are dynamic Map inputs, put the kernel invocation in its own scope to avoid redefinitions.
if dace.sdfg.has_dynamic_map_inputs(state, scope_entry):
callsite_stream.write('}', sdfg, state_id, scope_entry)

synchronize_streams(sdfg, state, state_id, scope_entry, scope_exit, callsite_stream, self)

# Instrumentation (post-kernel)
Expand Down Expand Up @@ -1973,8 +1981,6 @@ def generate_devicelevel_scope(self, sdfg, dfg_scope, state_id, function_stream,
raise ValueError('Block size has to be constant for block-wide dynamic map schedule (got %s)' %
str(bdim))
total_block_size *= bdim
if _expr(scope_map.range[0][2]) != 1:
raise NotImplementedError('Skip not implemented for dynamic thread-block map schedule')

##### TODO (later): Generalize
# Find thread-block param map and its name
Expand All @@ -2001,15 +2007,26 @@ def generate_devicelevel_scope(self, sdfg, dfg_scope, state_id, function_stream,
_topy(subsets.Range(outer_scope.map.range[::-1]).max_element()[0] + 1)), sdfg,
state_id, scope_entry)

# NOTE: Dynamic map inputs must be defined both outside and inside the dynamic Map schedule.
# They define inside the schedule the bounds of the any nested Maps.
# They define outside the schedule the bounds of the dynamic Map's for-loop invocation.
# NOTE: The value of the dynamic Map's variable may differ inside and outside the schedule.
for e in dace.sdfg.dynamic_map_inputs(dfg, scope_entry):
callsite_stream.write(
self._cpu_codegen.memlet_definition(sdfg, e.data, False, e.dst_conn,
e.dst.in_connectors[e.dst_conn]), sdfg, state_id, scope_entry)

dynmap_var = scope_map.params[0]
dynmap_begin = scope_map.range[0][0]
dynmap_end = scope_map.range[0][1] + 1
dynmap_step = scope_map.range[0][2]
if dynmap_step != 1:
dynmap_var = f'{dynmap_var}_idx'
dynmap_begin = 0
dynmap_end = f'int_ceil({dynmap_end - dynmap_begin}, {dynmap_step})'
callsite_stream.write(
'__dace_dynmap_begin = {begin};\n'
'__dace_dynmap_end = {end};'.format(begin=scope_map.range[0][0], end=scope_map.range[0][1] + 1), sdfg,
state_id, scope_entry)
'__dace_dynmap_end = {end};'.format(begin=dynmap_begin, end=dynmap_end), sdfg, state_id, scope_entry)

# close if
callsite_stream.write('}', sdfg, state_id, scope_entry)
Expand All @@ -2022,7 +2039,17 @@ def generate_devicelevel_scope(self, sdfg, dfg_scope, state_id, function_stream,
'compiler', 'cuda', 'dynamic_map_fine_grained') else 'false'),
bsize=total_block_size,
kmapIdx=outer_scope.map.params[0],
param=scope_map.params[0]), sdfg, state_id, scope_entry)
param=dynmap_var), sdfg, state_id, scope_entry)

for e in dace.sdfg.dynamic_map_inputs(dfg, scope_entry):
callsite_stream.write(
self._cpu_codegen.memlet_definition(sdfg, e.data, False, e.dst_conn,
e.dst.in_connectors[e.dst_conn]), sdfg, state_id, scope_entry)

if dynmap_step != 1:
callsite_stream.write(
f'auto {scope_map.params[0]} = {scope_map.range[0][0]} + {dynmap_step} * {dynmap_var};', sdfg,
state_id, scope_entry)

elif scope_map.schedule == dtypes.ScheduleType.GPU_Device:
dfg_kernel = self._kernel_state.scope_subgraph(self._kernel_map)
Expand Down
231 changes: 231 additions & 0 deletions tests/dynamic_tb_map_cudatest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

@dace.program(dace.uint32[H + 1], dace.uint32[nnz], dace.float32[nnz], dace.float32[W], dace.float32[H])
def spmv(A_row, A_col, A_val, x, b):

@dace.mapscope(_[0:H])
def compute_row(i):

@dace.map(_[A_row[i]:A_row[i + 1]])
def compute(j):
a << A_val[j]
Expand Down Expand Up @@ -64,5 +66,234 @@ def test_dynamic_map():
assert diff <= 1e-5


@pytest.mark.gpu
def test_dynamic_maps():
""" Tests the case of multiple dynamic maps in a row that share dynamic inputs."""

W = dace.symbol('W')
H = dace.symbol('H')
nnz = dace.symbol('nnz')

@dace.program(dace.uint32[H + 1], dace.uint32[nnz], dace.float32[nnz], dace.float32[W], dace.float32[H],
dace.float32[H])
def spmv_2x(A_row, A_col, A_val, x, b, c):

for i in range(H):
row_start = A_row[i]
row_end = A_row[i + 1]
for j in dace.map[row_start:row_end]:
b[i] += A_val[j] * x[A_col[j]]
for j in dace.map[row_start:row_end]:
c[i] += A_val[j] * x[A_col[j]]

height = 1024
width = 1024

# Prepare spmv SDFG for GPU
sdfg = spmv_2x.to_sdfg()
# Rename dynamic inputs to cause name clashes
main_entry = None
main_dict = {}
for node, state in sdfg.all_nodes_recursive():
if isinstance(node, dace.sdfg.nodes.MapEntry):
if main_entry is None:
main_entry = node
for e in dace.sdfg.dynamic_map_inputs(state, node):
main_dict[e.data.data] = e.dst_conn
else:
repl_dict = {}
for e in dace.sdfg.dynamic_map_inputs(state, node):
node.remove_in_connector(e.dst_conn)
node.add_in_connector(main_dict[e.data.data])
repl_dict[e.dst_conn] = main_dict[e.data.data]
e._dst_conn = main_dict[e.data.data]
node.map.range.replace(repl_dict)

sdfg.apply_gpu_transformations()

for node in sdfg.all_nodes_recursive():
if isinstance(node[0], dace.sdfg.nodes.MapEntry) \
and node[0].schedule == dace.dtypes.ScheduleType.Sequential:
node[0].schedule = dace.dtypes.ScheduleType.GPU_ThreadBlock_Dynamic

# Fill input data
# each row has up (including) 256 elements
A_row = np.random.randint(257, size=height + 1, dtype=dace.uint32.type)
A_row[0] = 0
A_row = np.cumsum(A_row, dtype=dace.uint32.type)

# Column data
A_col = dace.ndarray([A_row[height]], dtype=dace.uint32)
for i in range(height):
A_col[A_row[i]:A_row[i + 1]] = np.sort(np.random.choice(width, A_row[i + 1] - A_row[i], replace=False))

# values
A_val = np.random.rand(A_row[height]).astype(dace.float32.type)

A_sparse = scipy.sparse.csr_matrix((A_val, A_col, A_row), dtype=dace.float32.type, shape=(1024, 1024))

x = np.random.rand(width).astype(dace.float32.type)
b = np.zeros(height, dtype=dace.float32.type)
c = np.zeros(height, dtype=dace.float32.type)

sdfg(A_row=A_row,
A_col=A_col,
A_val=A_val,
x=x,
b=b,
c=c,
H=A_sparse.shape[0],
W=A_sparse.shape[1],
nnz=A_sparse.nnz)

diff0 = np.linalg.norm(A_sparse.dot(x) - b) / float(height)
diff1 = np.linalg.norm(A_sparse.dot(x) - c) / float(height)
assert diff0 <= 1e-5
assert diff1 <= 1e-5


@pytest.mark.gpu
def test_nested_dynamic_map():
""" Tests the case where the dynamic map inputs are defined in an outer scope. """

M = dace.symbol('M')
N = dace.symbol('N')
K = dace.symbol('K')
nnz_A = dace.symbol('nnz_A')
nnz_D = dace.symbol('nnz_D')

@dace.program
def sddmm(D_vals: dace.float32[nnz_D], A2_crd: dace.int32[nnz_A], A2_pos: dace.int32[M + 1],
A_vals: dace.float32[nnz_A], B: dace.float32[M, K], C: dace.float32[K, N]):
for i in dace.map[0:M]:
for j in dace.map[A2_pos[i]:A2_pos[i + 1]]:
for k in dace.map[0:K]:
D_vals[j] += A_vals[j] * B[i, k] * C[k, A2_crd[j]]

sdfg = sddmm.to_sdfg(simplify=True)

ime, jme, kme = None, None, None
for state in sdfg.states():
for node in state.nodes():
if isinstance(node, dace.sdfg.nodes.MapEntry):
if node.map.params[0] == 'i':
ime = node
elif node.map.params[0] == 'j':
jme = node
elif node.map.params[0] == 'k':
kme = node
assert ime is not None and jme is not None and kme is not None

from dace.transformation.dataflow import MapInterchange, TrivialTaskletElimination
MapInterchange.apply_to(sdfg, outer_map_entry=jme, inner_map_entry=kme)
sdfg.apply_transformations_repeated(TrivialTaskletElimination)

sdfg.apply_gpu_transformations()
ime.map.schedule = dace.ScheduleType.GPU_Device
kme.map.schedule = dace.ScheduleType.GPU_ThreadBlock_Dynamic

dtype = np.float32
rng = np.random.default_rng(42)
problem_size = 1024
density = 0.01
B = rng.random((problem_size, problem_size), dtype=dtype)
C = rng.random((problem_size, problem_size), dtype=dtype)
A = scipy.sparse.random(problem_size, problem_size, density=density, format='csr', dtype=dtype, random_state=rng)
val = np.zeros_like(A.data)
ref = np.empty_like(A.data)

sdfg(D_vals=val,
A2_crd=A.indices.copy(),
A2_pos=A.indptr.copy(),
A_vals=A.data.copy(),
B=B,
C=C,
M=problem_size,
N=problem_size,
K=problem_size,
nnz_A=A.nnz,
nnz_D=A.nnz)
tmp = B @ C
for row in range(problem_size):
for j in range(A.indptr[row], A.indptr[row + 1]):
col = A.indices[j]
ref[j] = A.data[j] * tmp[row, col]
assert np.allclose(val, ref.data)


@pytest.mark.gpu
def test_dynamic_map_with_step():

M = dace.symbol('M')
N = dace.symbol('N')
nnz_A = dace.symbol('nnz_A')
nnz_D = dace.symbol('nnz_D')

@dace.program
def sddvm(D_vals: dace.float32[nnz_D], A2_crd: dace.int32[nnz_A], A2_pos: dace.int32[M + 1],
A_vals: dace.float32[nnz_A], B: dace.float32[M], C: dace.float32[N]):
for i in dace.map[0:M]:
for j in dace.map[A2_pos[i]:A2_pos[i + 1]]:
D_vals[j] += A_vals[j] * B[i] * C[A2_crd[j]]

sdfg = sddvm.to_sdfg(simplify=True)

ime, jme = None, None
for state in sdfg.states():
for node in state.nodes():
if isinstance(node, dace.sdfg.nodes.MapEntry):
if node.map.params[0] == 'i':
ime = node
elif node.map.params[0] == 'j':
jme = node
assert ime is not None and jme is not None

from dace.transformation.dataflow import StripMining, TrivialTaskletElimination
sdfg.apply_transformations_repeated(TrivialTaskletElimination)
StripMining.apply_to(sdfg, map_entry=jme)

tile_jme = None, None
for state in sdfg.states():
for node in state.nodes():
if isinstance(node, dace.sdfg.nodes.MapEntry):
if node.map.params[0] == 'tile_j':
tile_jme = node
assert tile_jme is not None

sdfg.apply_gpu_transformations()
ime.map.schedule = dace.ScheduleType.GPU_Device
tile_jme.map.schedule = dace.ScheduleType.GPU_ThreadBlock_Dynamic

dtype = np.float32
rng = np.random.default_rng(42)
problem_size = 1024
density = 0.01
B = rng.random((problem_size, ), dtype=dtype)
C = rng.random((problem_size, ), dtype=dtype)
A = scipy.sparse.random(problem_size, problem_size, density=density, format='csr', dtype=dtype, random_state=rng)
val = np.zeros_like(A.data)
ref = np.empty_like(A.data)

sdfg(D_vals=val,
A2_crd=A.indices.copy(),
A2_pos=A.indptr.copy(),
A_vals=A.data.copy(),
B=B,
C=C,
M=problem_size,
N=problem_size,
nnz_A=A.nnz,
nnz_D=A.nnz)
tmp = np.outer(B, C)
for row in range(problem_size):
for j in range(A.indptr[row], A.indptr[row + 1]):
col = A.indices[j]
ref[j] = A.data[j] * tmp[row, col]
assert np.allclose(val, ref.data)


if __name__ == '__main__':
test_dynamic_map()
test_dynamic_maps()
test_nested_dynamic_map()
test_dynamic_map_with_step()

0 comments on commit 532bc64

Please sign in to comment.