Skip to content

Commit

Permalink
Improve SYCL reduction performance: MDRangePolicy (kokkos#6271)
Browse files Browse the repository at this point in the history
* Improve SYCL MDRangePolicy reduction

* Fix max_size in workgroup_reduction call

* value_count() can't be less than 1

* Use else
  • Loading branch information
masterleinad committed Jul 26, 2023
1 parent ced2451 commit 2f12ebb
Showing 1 changed file with 159 additions and 159 deletions.
318 changes: 159 additions & 159 deletions core/src/SYCL/Kokkos_SYCL_Parallel_Reduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class ParallelReduce<CombinedFunctorReducerType, Kokkos::RangePolicy<Traits...>,

std::size_t size = policy.end() - policy.begin();
const unsigned int value_count =
std::max(m_functor_reducer.get_reducer().value_count(), 1u);
m_functor_reducer.get_reducer().value_count();
sycl::device_ptr<value_type> results_ptr = nullptr;
sycl::global_ptr<value_type> device_accessible_result_ptr =
m_result_ptr_device_accessible ? m_result_ptr : nullptr;
Expand Down Expand Up @@ -271,13 +271,11 @@ class ParallelReduce<CombinedFunctorReducerType, Kokkos::RangePolicy<Traits...>,
std::vector<sycl::event>{parallel_reduce_event});
#endif
last_reduction_event = parallel_reduce_event;
}

// Otherwise, we perform a reduction on the values in all workgroups
// separately, write the workgroup results back to global memory and recurse
// until only one workgroup does the reduction and thus gets the final
// value.
if (size > 1) {
} else {
// Otherwise (when size > 1), we perform a reduction on the values in all
// workgroups separately, write the workgroup results back to global
// memory and recurse until only one workgroup does the reduction and thus
// gets the final value.
auto scratch_flags = static_cast<sycl::device_ptr<unsigned int>>(
instance.scratch_flags(sizeof(unsigned int)));

Expand Down Expand Up @@ -575,65 +573,40 @@ class ParallelReduce<CombinedFunctorReducerType,
m_space.impl_internal_space_instance()->m_mutexScratchSpace) {}

private:
template <typename PolicyType, typename CombinedFunctorReducerWrapper>
template <typename CombinedFunctorReducerWrapper>
sycl::event sycl_direct_launch(
const PolicyType& policy,
const CombinedFunctorReducerWrapper& functor_reducer_wrapper,
const sycl::event& memcpy_event) const {
// Convenience references
Kokkos::Experimental::Impl::SYCLInternal& instance =
*m_space.impl_internal_space_instance();
sycl::queue& q = m_space.sycl_queue();

const typename Policy::index_type nwork = m_policy.m_num_tiles;
const typename Policy::index_type block_size =
std::pow(2, std::ceil(std::log2(m_policy.m_prod_tile_dims)));

const sycl::range<1> local_range(block_size);
// REMEMBER swap local x<->y to be conforming with Cuda/HIP implementation
const sycl::range<1> global_range(nwork * block_size);
const sycl::nd_range<1> range{global_range, local_range};

const size_t wgroup_size = range.get_local_range().size();
size_t size = range.get_global_range().size();
const auto init_size =
std::max<std::size_t>((size + wgroup_size - 1) / wgroup_size, 1);
const typename Policy::index_type n_tiles = m_policy.m_num_tiles;
const unsigned int value_count =
m_functor_reducer.get_reducer().value_count();
const auto results_ptr =
static_cast<sycl::device_ptr<value_type>>(instance.scratch_space(
sizeof(value_type) * std::max(value_count, 1u) * init_size));
sycl::global_ptr<value_type> device_accessible_result_ptr =
m_result_ptr_device_accessible ? m_result_ptr : nullptr;
auto scratch_flags = static_cast<sycl::device_ptr<unsigned int>>(
instance.scratch_flags(sizeof(unsigned int)));
sycl::device_ptr<value_type> results_ptr;

sycl::event last_reduction_event;

// If size<=1 we only call init(), the functor and possibly final once
// working with the global scratch memory but don't copy back to
// m_result_ptr yet.
if (size <= 1) {
// If n_tiles==0 we only call init() and final() working with the global
// scratch memory but don't copy back to m_result_ptr yet.
if (n_tiles == 0) {
auto parallel_reduce_event = q.submit([&](sycl::handler& cgh) {
#ifndef KOKKOS_IMPL_SYCL_USE_IN_ORDER_QUEUES
cgh.depends_on(memcpy_event);
#else
(void)memcpy_event;
#endif
results_ptr = static_cast<sycl::device_ptr<value_type>>(
instance.scratch_space(sizeof(value_type) * value_count));
sycl::global_ptr<value_type> device_accessible_result_ptr =
m_result_ptr_device_accessible ? m_result_ptr : nullptr;
cgh.single_task([=]() {
const CombinedFunctorReducerType& functor_reducer =
functor_reducer_wrapper.get_functor();
const FunctorType& functor = functor_reducer.get_functor();
const ReducerType& reducer = functor_reducer.get_reducer();

reference_type update = reducer.init(results_ptr);
if (size == 1) {
Kokkos::Impl::Reduce::DeviceIterateTile<
Policy::rank, BarePolicy, FunctorType,
typename Policy::work_tag, reference_type>(
policy, functor, update, {1, 1, 1}, {0, 0, 0}, {0, 0, 0})
.exec_range();
}
reducer.init(results_ptr);
reducer.final(results_ptr);
if (device_accessible_result_ptr)
reducer.copy(device_accessible_result_ptr.get(), results_ptr.get());
Expand All @@ -644,17 +617,36 @@ class ParallelReduce<CombinedFunctorReducerType,
std::vector<sycl::event>{parallel_reduce_event});
#endif
last_reduction_event = parallel_reduce_event;
}
} else {
// Otherwise (when n_tiles is not zero), we perform a reduction on the
// values in all workgroups separately, write the workgroup results back
// to global memory and recurse until only one workgroup does the
// reduction and thus gets the final value.
const int wgroup_size = Kokkos::bit_ceil(
static_cast<unsigned int>(m_policy.m_prod_tile_dims));

// FIXME_SYCL Find a better way to determine a good limit for the
// maximum number of work groups, also see
// https://github.com/intel/llvm/blob/756ba2616111235bba073e481b7f1c8004b34ee6/sycl/source/detail/reduction.cpp#L51-L62
size_t max_work_groups =
2 * q.get_device().get_info<sycl::info::device::max_compute_units>();
int values_per_thread = 1;
size_t n_wgroups = n_tiles;
while (n_wgroups > max_work_groups) {
values_per_thread *= 2;
n_wgroups = (n_tiles + values_per_thread - 1) / values_per_thread;
}

results_ptr = static_cast<sycl::device_ptr<value_type>>(
instance.scratch_space(sizeof(value_type) * value_count * n_wgroups));
sycl::global_ptr<value_type> device_accessible_result_ptr =
m_result_ptr_device_accessible ? m_result_ptr : nullptr;
auto scratch_flags = static_cast<sycl::device_ptr<unsigned int>>(
instance.scratch_flags(sizeof(unsigned int)));

// Otherwise, we perform a reduction on the values in all workgroups
// separately, write the workgroup results back to global memory and recurse
// until only one workgroup does the reduction and thus gets the final
// value.
if (size > 1) {
auto n_wgroups = (size + wgroup_size - 1) / wgroup_size;
auto parallel_reduce_event = q.submit([&](sycl::handler& cgh) {
sycl::local_accessor<value_type> local_mem(
sycl::range<1>(wgroup_size) * std::max(value_count, 1u), cgh);
sycl::range<1>(wgroup_size) * value_count, cgh);
sycl::local_accessor<unsigned int> num_teams_done(1, cgh);

const BarePolicy bare_policy = m_policy;
Expand All @@ -665,115 +657,124 @@ class ParallelReduce<CombinedFunctorReducerType,
(void)memcpy_event;
#endif

cgh.parallel_for(range, [=](sycl::nd_item<1> item) {
const auto local_id = item.get_local_linear_id();
const CombinedFunctorReducerType& functor_reducer =
functor_reducer_wrapper.get_functor();
const FunctorType& functor = functor_reducer.get_functor();
const ReducerType& reducer = functor_reducer.get_reducer();
// REMEMBER swap local x<->y to be conforming with Cuda/HIP
// implementation
cgh.parallel_for(
sycl::nd_range<1>{n_wgroups * wgroup_size, wgroup_size},
[=](sycl::nd_item<1> item) {
const int local_id = item.get_local_linear_id();
const CombinedFunctorReducerType& functor_reducer =
functor_reducer_wrapper.get_functor();
const FunctorType& functor = functor_reducer.get_functor();
const ReducerType& reducer = functor_reducer.get_reducer();

// In the first iteration, we call functor to initialize the local
// memory. Otherwise, the local memory is initialized with the
// results from the previous iteration that are stored in global
// memory.
using index_type = typename Policy::index_type;

// SWAPPED here to be conforming with CUDA implementation
const index_type local_x = 0;
const index_type local_y = item.get_local_id(0);
const index_type local_z = 0;
const index_type global_x = item.get_group(0);
const index_type global_y = 0;
const index_type global_z = 0;
const index_type n_global_x = item.get_group_range(0);
const index_type n_global_y = 1;
const index_type n_global_z = 1;

if constexpr (!use_shuffle_based_algorithm<ReducerType>) {
reference_type update =
reducer.init(&local_mem[local_id * value_count]);

Kokkos::Impl::Reduce::DeviceIterateTile<
Policy::rank, BarePolicy, FunctorType,
typename Policy::work_tag, reference_type>(
bare_policy, functor, update,
{n_global_x, n_global_y, n_global_z},
{global_x, global_y, global_z}, {local_x, local_y, local_z})
.exec_range();
item.barrier(sycl::access::fence_space::local_space);

SYCLReduction::workgroup_reduction<>(
item, local_mem, results_ptr, device_accessible_result_ptr,
value_count, reducer, false, std::min(size, wgroup_size));

if (local_id == 0) {
sycl::atomic_ref<unsigned, sycl::memory_order::relaxed,
sycl::memory_scope::device,
sycl::access::address_space::global_space>
scratch_flags_ref(*scratch_flags);
num_teams_done[0] = ++scratch_flags_ref;
}
item.barrier(sycl::access::fence_space::local_space);
if (num_teams_done[0] == n_wgroups) {
if (local_id >= n_wgroups)
reducer.init(&local_mem[local_id * value_count]);
else {
reducer.copy(&local_mem[local_id * value_count],
&results_ptr[local_id * value_count]);
for (unsigned int id = local_id + wgroup_size; id < n_wgroups;
id += wgroup_size) {
reducer.join(&local_mem[local_id * value_count],
&results_ptr[id * value_count]);
// In the first iteration, we call functor to initialize the local
// memory. Otherwise, the local memory is initialized with the
// results from the previous iteration that are stored in global
// memory.
using index_type = typename Policy::index_type;

// SWAPPED here to be conforming with CUDA implementation
const index_type local_x = 0;
const index_type local_y = item.get_local_id(0);
const index_type local_z = 0;
const index_type global_y = 0;
const index_type global_z = 0;
const index_type n_global_x = n_tiles;
const index_type n_global_y = 1;
const index_type n_global_z = 1;

if constexpr (!use_shuffle_based_algorithm<ReducerType>) {
reference_type update =
reducer.init(&local_mem[local_id * value_count]);

for (index_type global_x = item.get_group(0);
global_x < n_tiles; global_x += item.get_group_range(0))
Kokkos::Impl::Reduce::DeviceIterateTile<
Policy::rank, BarePolicy, FunctorType,
typename Policy::work_tag, reference_type>(
bare_policy, functor, update,
{n_global_x, n_global_y, n_global_z},
{global_x, global_y, global_z},
{local_x, local_y, local_z})
.exec_range();
item.barrier(sycl::access::fence_space::local_space);

SYCLReduction::workgroup_reduction<>(
item, local_mem, results_ptr, device_accessible_result_ptr,
value_count, reducer, false, wgroup_size);

if (local_id == 0) {
sycl::atomic_ref<unsigned, sycl::memory_order::relaxed,
sycl::memory_scope::device,
sycl::access::address_space::global_space>
scratch_flags_ref(*scratch_flags);
num_teams_done[0] = ++scratch_flags_ref;
}
}
item.barrier(sycl::access::fence_space::local_space);
if (num_teams_done[0] == n_wgroups) {
if (local_id >= static_cast<int>(n_wgroups))
reducer.init(&local_mem[local_id * value_count]);
else {
reducer.copy(&local_mem[local_id * value_count],
&results_ptr[local_id * value_count]);
for (unsigned int id = local_id + wgroup_size;
id < n_wgroups; id += wgroup_size) {
reducer.join(&local_mem[local_id * value_count],
&results_ptr[id * value_count]);
}
}

SYCLReduction::workgroup_reduction<>(
item, local_mem, results_ptr, device_accessible_result_ptr,
value_count, reducer, true, std::min(n_wgroups, wgroup_size));
}
} else {
value_type local_value;
reference_type update = reducer.init(&local_value);

Kokkos::Impl::Reduce::DeviceIterateTile<
Policy::rank, BarePolicy, FunctorType,
typename Policy::work_tag, reference_type>(
bare_policy, functor, update,
{n_global_x, n_global_y, n_global_z},
{global_x, global_y, global_z}, {local_x, local_y, local_z})
.exec_range();

SYCLReduction::workgroup_reduction<>(
item, local_mem, local_value, results_ptr,
device_accessible_result_ptr, reducer, false,
std::min(size, wgroup_size));

if (local_id == 0) {
sycl::atomic_ref<unsigned, sycl::memory_order::relaxed,
sycl::memory_scope::device,
sycl::access::address_space::global_space>
scratch_flags_ref(*scratch_flags);
num_teams_done[0] = ++scratch_flags_ref;
}
item.barrier(sycl::access::fence_space::local_space);
if (num_teams_done[0] == n_wgroups) {
if (local_id >= n_wgroups)
reducer.init(&local_value);
else {
local_value = results_ptr[local_id];
for (unsigned int id = local_id + wgroup_size; id < n_wgroups;
id += wgroup_size) {
reducer.join(&local_value, &results_ptr[id]);
SYCLReduction::workgroup_reduction<>(
item, local_mem, results_ptr,
device_accessible_result_ptr, value_count, reducer, true,
std::min<int>(n_wgroups, wgroup_size));
}
}
} else {
value_type local_value;
reference_type update = reducer.init(&local_value);

SYCLReduction::workgroup_reduction<>(
item, local_mem, local_value, results_ptr,
device_accessible_result_ptr, reducer, true,
std::min(n_wgroups, wgroup_size));
}
}
});
for (index_type global_x = item.get_group(0);
global_x < n_tiles; global_x += item.get_group_range(0))
Kokkos::Impl::Reduce::DeviceIterateTile<
Policy::rank, BarePolicy, FunctorType,
typename Policy::work_tag, reference_type>(
bare_policy, functor, update,
{n_global_x, n_global_y, n_global_z},
{global_x, global_y, global_z},
{local_x, local_y, local_z})
.exec_range();

SYCLReduction::workgroup_reduction<>(
item, local_mem, local_value, results_ptr,
device_accessible_result_ptr, reducer, false, wgroup_size);

if (local_id == 0) {
sycl::atomic_ref<unsigned, sycl::memory_order::relaxed,
sycl::memory_scope::device,
sycl::access::address_space::global_space>
scratch_flags_ref(*scratch_flags);
num_teams_done[0] = ++scratch_flags_ref;
}
item.barrier(sycl::access::fence_space::local_space);
if (num_teams_done[0] == n_wgroups) {
if (local_id >= static_cast<int>(n_wgroups))
reducer.init(&local_value);
else {
local_value = results_ptr[local_id];
for (unsigned int id = local_id + wgroup_size;
id < n_wgroups; id += wgroup_size) {
reducer.join(&local_value, &results_ptr[id]);
}
}

SYCLReduction::workgroup_reduction<>(
item, local_mem, local_value, results_ptr,
device_accessible_result_ptr, reducer, true,
std::min<int>(n_wgroups, wgroup_size));
}
}
});
});
#ifndef KOKKOS_IMPL_SYCL_USE_IN_ORDER_QUEUES
q.ext_oneapi_submit_barrier(
Expand Down Expand Up @@ -812,9 +813,8 @@ class ParallelReduce<CombinedFunctorReducerType,
Experimental::Impl::make_sycl_function_wrapper(m_functor_reducer,
indirectKernelMem);

sycl::event event =
sycl_direct_launch(m_policy, functor_reducer_wrapper,
functor_reducer_wrapper.get_copy_event());
sycl::event event = sycl_direct_launch(
functor_reducer_wrapper, functor_reducer_wrapper.get_copy_event());
functor_reducer_wrapper.register_event(event);
}

Expand Down

0 comments on commit 2f12ebb

Please sign in to comment.