Skip to content

Commit

Permalink
Fixes in DBSCAN algorithm (#2170)
Browse files Browse the repository at this point in the history
  • Loading branch information
KulikovNikita authored and napetrov committed Dec 15, 2022
1 parent 6c4e03a commit f3cbe18
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 78 deletions.
3 changes: 2 additions & 1 deletion cpp/oneapi/dal/algo/dbscan/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ dal_module(
auto = True,
dal_deps = [
"@onedal//cpp/oneapi/dal:core",
"@onedal//cpp/oneapi/dal/backend/primitives",
"@onedal//cpp/oneapi/dal/backend/primitives:common",
"@onedal//cpp/oneapi/dal/backend/primitives:selection",
],
extra_deps = [
"@onedal//cpp/daal/src/algorithms/dbscan:kernel",
Expand Down
64 changes: 31 additions & 33 deletions cpp/oneapi/dal/algo/dbscan/backend/gpu/kernel_fp_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,53 +227,51 @@ std::int32_t kernels_fp<Float>::start_next_cluster(sycl::queue& queue,
const pr::ndview<std::int32_t, 1>& cores,
pr::ndview<std::int32_t, 1>& responses,
const bk::event_vector& deps) {
using oneapi::dal::backend::operator+;
ONEDAL_PROFILER_TASK(start_next_cluster, queue);
ONEDAL_ASSERT(cores.get_dimension(0) > 0);
ONEDAL_ASSERT(cores.get_dimension(0) == responses.get_dimension(0));
std::int64_t block_size = cores.get_dimension(0);

auto [start_index, start_index_event] =
pr::ndarray<std::int32_t, 1>::full(queue, 1, block_size);
start_index_event.wait_and_throw();
pr::ndarray<std::int32_t, 1>::full(queue, { 1 }, block_size, sycl::usm::alloc::device);
auto start_index_ptr = start_index.get_mutable_data();
start_index_ptr[0] = block_size;

const std::int32_t* cores_ptr = cores.get_data();
std::int32_t* responses_ptr = responses.get_mutable_data();
std::int64_t wg_size = get_recommended_sg_size(queue);
queue
.submit([&](sycl::handler& cgh) {
cgh.depends_on(deps);
cgh.parallel_for(
bk::make_multiple_nd_range_2d({ wg_size, 1 }, { wg_size, 1 }),
[=](sycl::nd_item<2> item) {
auto sg = item.get_sub_group();
const std::uint32_t sg_id = sg.get_group_id()[0];
if (sg_id > 0)
return;
const std::uint32_t local_id = sg.get_local_id();
const std::uint32_t local_size = sg.get_local_range()[0];
std::int32_t adjusted_block_size =
local_size * (block_size / local_size + bool(block_size % local_size));
auto full_deps = deps + bk::event_vector{ start_index_event };
auto index_event = queue.submit([&](sycl::handler& cgh) {
cgh.depends_on(full_deps);
cgh.parallel_for(
bk::make_multiple_nd_range_2d({ wg_size, 1 }, { wg_size, 1 }),
[=](sycl::nd_item<2> item) {
auto sg = item.get_sub_group();
const std::uint32_t sg_id = sg.get_group_id()[0];
if (sg_id > 0)
return;
const std::int32_t local_id = sg.get_local_id();
const std::int32_t local_size = sg.get_local_range()[0];
std::int32_t adjusted_block_size =
local_size * (block_size / local_size + bool(block_size % local_size));

for (int32_t i = local_id; i < adjusted_block_size; i += local_size) {
const bool found =
i < block_size ? cores_ptr[i] == 1 && responses_ptr[i] < 0 : false;
const std::int32_t index =
sycl::reduce_over_group(sg,
(std::int32_t)(found ? i : block_size),
sycl::ext::oneapi::minimum<std::int32_t>());
if (index < block_size) {
if (local_id == 0) {
start_index_ptr[0] = index;
}
break;
for (std::int32_t i = local_id; i < adjusted_block_size; i += local_size) {
const bool found =
i < block_size ? cores_ptr[i] == 1 && responses_ptr[i] < 0 : false;
const std::int32_t index =
sycl::reduce_over_group(sg,
(std::int32_t)(found ? i : block_size),
sycl::ext::oneapi::minimum<std::int32_t>());
if (index < block_size) {
if (local_id == 0) {
*start_index_ptr = index;
}
break;
}
});
})
.wait_and_throw();
return *start_index_ptr;
}
});
});
return start_index.to_host(queue, { index_event }).at(0);
}

sycl::event set_queue_ptr(sycl::queue& queue,
Expand Down
114 changes: 70 additions & 44 deletions cpp/oneapi/dal/algo/dbscan/backend/gpu/results.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "oneapi/dal/backend/common.hpp"
#include "oneapi/dal/backend/primitives/ndarray.hpp"
#include "oneapi/dal/backend/primitives/selection.hpp"
#include "oneapi/dal/backend/memory.hpp"
#include "oneapi/dal/algo/dbscan/backend/gpu/kernels_fp.hpp"

Expand All @@ -32,13 +33,55 @@ namespace oneapi::dal::dbscan::backend {
using descriptor_t = detail::descriptor_base<task::clustering>;
using result_t = compute_result<task::clustering>;

template <typename Float>
template <typename Index>
inline auto output_core_indices(sycl::queue& queue,
std::int64_t block_size,
std::int64_t core_count,
const pr::ndview<Index, 1>& cores,
const bk::event_vector& deps = {}) {
using oneapi::dal::backend::operator+;

ONEDAL_ASSERT(block_size > 0);
ONEDAL_ASSERT(core_count > 0);
ONEDAL_ASSERT(cores.has_data());

auto [res, res_event] =
pr::ndarray<Index, 1>::zeros(queue, core_count, sycl::usm::alloc::device);
auto [err, err_event] = pr::ndarray<bool, 1>::full(queue, 1, false, sycl::usm::alloc::device);

auto* const err_ptr = err.get_mutable_data();
auto* const res_ptr = res.get_mutable_data();
const auto* const cores_ptr = cores.get_data();
auto full_deps = deps + bk::event_vector{ err_event, res_event };
auto event = queue.submit([&](sycl::handler& h) {
h.depends_on(deps);
h.single_task([=]() {
std::int64_t pos = 0;
for (std::int64_t i = 0; i < block_size; i++) {
if (*(cores_ptr + i) > 0) {
if (pos < core_count) {
*(res_ptr + pos) = i;
pos++;
}
else {
*err_ptr = true;
break;
}
}
}
});
});

ONEDAL_ASSERT(err.to_host(queue, { event }).at(0));
return std::make_tuple(res, event);
}

template <typename Float, typename Index>
inline auto make_results(sycl::queue& queue,
const descriptor_t& desc,
const pr::ndarray<Float, 2> data,
const pr::ndarray<std::int32_t, 1> responses,
const pr::ndarray<std::int32_t, 1> cores,

const pr::ndarray<Index, 1> responses,
const pr::ndarray<Index, 1> cores,
std::int64_t cluster_count,
std::int64_t core_count = -1) {
const std::int64_t column_count = data.get_dimension(1);
Expand All @@ -64,55 +107,38 @@ inline auto make_results(sycl::queue& queue,
if (core_count == -1) {
core_count = count_cores(queue, cores);
}
ONEDAL_ASSERT(block_size >= core_count);
if (core_count == 0) {
else if (core_count == 0) {
if (return_core_indices) {
results.set_core_observation_indices(dal::homogen_table{});
}
if (return_core_observations) {
results.set_core_observations(dal::homogen_table{});
}
}
if (return_core_indices) {
auto host_indices = array<std::int32_t>::empty(core_count);
auto host_indices_ptr = host_indices.get_mutable_data();
std::int64_t pos = 0;
auto host_cores = cores.to_host(queue);
auto host_cores_ptr = host_cores.get_data();
for (std::int64_t i = 0; i < block_size; i++) {
if (host_cores_ptr[i] > 0) {
ONEDAL_ASSERT(pos < core_count);
host_indices_ptr[pos] = i;
pos++;
}
else {
ONEDAL_ASSERT(core_count > 0);
ONEDAL_ASSERT(block_size >= core_count);

auto [ids_array, ids_event] = output_core_indices(queue, block_size, core_count, cores);

if (return_core_indices) {
results.set_core_observation_indices(
dal::homogen_table::wrap(ids_array.flatten(queue, { ids_event }),
core_count,
1));
}
auto device_indices =
pr::ndarray<std::int32_t, 1>::empty(queue, core_count, sycl::usm::alloc::device);
dal::detail::memcpy_host2usm(queue,
device_indices.get_mutable_data(),
host_indices_ptr,
core_count * sizeof(std::int32_t));
results.set_core_observation_indices(
dal::homogen_table::wrap(device_indices.flatten(queue), core_count, 1));
}
if (return_core_observations) {
auto observations = pr::ndarray<Float, 1>::empty(queue, core_count * column_count);
auto observations_ptr = observations.get_mutable_data();
std::int64_t pos = 0;
auto host_cores = cores.to_host(queue);
auto host_cores_ptr = host_cores.get_data();
for (std::int64_t i = 0; i < block_size; i++) {
if (host_cores_ptr[i] > 0) {
ONEDAL_ASSERT(pos < core_count * column_count);
bk::memcpy(queue,
observations_ptr + pos * column_count,
data.get_data() + i * column_count,
std::size_t(column_count) * sizeof(Float));
pos += column_count;
}
if (return_core_observations) {
auto res = pr::ndarray<Float, 2>::empty(queue,
{ core_count, column_count },
sycl::usm::alloc::device);

auto event = pr::select_indexed_rows(queue, ids_array, data, res, { ids_event });

results.set_core_observations(
dal::homogen_table::wrap(res.flatten(queue, { event }),
core_count,
column_count));
}
results.set_core_observations(
dal::homogen_table::wrap(observations.flatten(queue), core_count, column_count));
}
}
return results;
Expand Down
22 changes: 22 additions & 0 deletions cpp/oneapi/dal/backend/primitives/ndarray.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,28 @@ class ndview : public ndarray_base<axis_count, order> {
}
#endif

template <std::int64_t n = axis_count, typename = std::enable_if_t<n == 1>>
T* begin() {
ONEDAL_ASSERT(data_is_mutable_);
return get_mutable_data();
}

template <std::int64_t n = axis_count, typename = std::enable_if_t<n == 1>>
T* end() {
ONEDAL_ASSERT(data_is_mutable_);
return get_mutable_data() + this->get_count();
}

template <std::int64_t n = axis_count, typename = std::enable_if_t<n == 1>>
const T* cbegin() const {
return get_data();
}

template <std::int64_t n = axis_count, typename = std::enable_if_t<n == 1>>
const T* cend() const {
return get_data() + this->get_count();
}

protected:
explicit ndview(const T* data,
const shape_t& shape,
Expand Down
1 change: 1 addition & 0 deletions cpp/oneapi/dal/backend/primitives/selection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@

#include "oneapi/dal/backend/primitives/selection/kselect_by_rows.hpp"
#include "oneapi/dal/backend/primitives/selection/select_indexed.hpp"
#include "oneapi/dal/backend/primitives/selection/select_indexed_rows.hpp"

0 comments on commit f3cbe18

Please sign in to comment.