Skip to content

Commit

Permalink
Use size_t to allow large conditional joins (rapidsai#16127)
Browse files Browse the repository at this point in the history
The conditional join kernels were using `cudf::size_type` where `std::size_t` was needed. This PR fixes that bug, which caused `cudaErrorIllegalAddress` as shown in rapidsai#16115. This closes rapidsai#16115.

I did not add tests because we typically do not test very large workloads. However, I committed the test and reverted it in this PR, so there is a record of my validation code.

Authors:
  - Bradley Dice (https://github.com/bdice)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - https://github.com/nvdbaranec
  - Yunsong Wang (https://github.com/PointKernel)

URL: rapidsai#16127
  • Loading branch information
bdice committed Jun 28, 2024
1 parent fb12d98 commit df88cf5
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 107 deletions.
5 changes: 3 additions & 2 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_join_anti_semi(
join_size = size.value(stream);
}

rmm::device_scalar<size_type> write_index(0, stream);
rmm::device_scalar<std::size_t> write_index(0, stream);

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);

Expand Down Expand Up @@ -232,13 +232,14 @@ conditional_join(table_view const& left,
std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr));
}

rmm::device_scalar<size_type> write_index(0, stream);
rmm::device_scalar<std::size_t> write_index(0, stream);

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);
auto right_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);

auto const& join_output_l = left_indices->data();
auto const& join_output_r = right_indices->data();

if (has_nulls) {
conditional_join<DEFAULT_JOIN_BLOCK_SIZE, DEFAULT_JOIN_CACHE_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
Expand Down
124 changes: 114 additions & 10 deletions cpp/src/join/conditional_join_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,110 @@
namespace cudf {
namespace detail {

/**
* @brief Adds a pair of indices to the shared memory cache
*
* @param[in] first The first index in the pair
* @param[in] second The second index in the pair
* @param[in,out] current_idx_shared Pointer to shared index that determines
* where in the shared memory cache the pair will be written
* @param[in] warp_id The ID of the warp of the calling the thread
* @param[out] joined_shared_l Pointer to the shared memory cache for left indices
* @param[out] joined_shared_r Pointer to the shared memory cache for right indices
*/
__inline__ __device__ void add_pair_to_cache(size_type const first,
size_type const second,
std::size_t* current_idx_shared,
int const warp_id,
size_type* joined_shared_l,
size_type* joined_shared_r)
{
cuda::atomic_ref<std::size_t, cuda::thread_scope_block> ref{*(current_idx_shared + warp_id)};
std::size_t my_current_idx = ref.fetch_add(1, cuda::memory_order_relaxed);
// It's guaranteed to fit into the shared cache
joined_shared_l[my_current_idx] = first;
joined_shared_r[my_current_idx] = second;
}

__inline__ __device__ void add_left_to_cache(size_type const first,
std::size_t* current_idx_shared,
int const warp_id,
size_type* joined_shared_l)
{
cuda::atomic_ref<std::size_t, cuda::thread_scope_block> ref{*(current_idx_shared + warp_id)};
std::size_t my_current_idx = ref.fetch_add(1, cuda::memory_order_relaxed);
joined_shared_l[my_current_idx] = first;
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
std::size_t const max_size,
int const warp_id,
int const lane_id,
std::size_t* current_idx,
std::size_t current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type join_shared_r[num_warps][output_cache_size],
size_type* join_output_l,
size_type* join_output_r)
{
// count how many active threads participating here which could be less than warp_size
int const num_threads = __popc(activemask);
std::size_t output_offset = 0;

if (0 == lane_id) {
cuda::atomic_ref<std::size_t, cuda::thread_scope_device> ref{*current_idx};
output_offset = ref.fetch_add(current_idx_shared[warp_id], cuda::memory_order_relaxed);
}

// No warp sync is necessary here because we are assuming that ShuffleIndex
// is internally using post-CUDA 9.0 synchronization-safe primitives
// (__shfl_sync instead of __shfl). __shfl is technically not guaranteed to
// be safe by the compiler because it is not required by the standard to
// converge divergent branches before executing.
output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (std::size_t shared_out_idx = static_cast<std::size_t>(lane_id);
shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
std::size_t thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
join_output_r[thread_offset] = join_shared_r[warp_id][shared_out_idx];
}
}
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
std::size_t const max_size,
int const warp_id,
int const lane_id,
std::size_t* current_idx,
std::size_t current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type* join_output_l)
{
int const num_threads = __popc(activemask);
std::size_t output_offset = 0;

if (0 == lane_id) {
cuda::atomic_ref<std::size_t, cuda::thread_scope_device> ref{*current_idx};
output_offset = ref.fetch_add(current_idx_shared[warp_id], cuda::memory_order_relaxed);
}

output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (std::size_t shared_out_idx = static_cast<std::size_t>(lane_id);
shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
std::size_t thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
}
}
}

/**
* @brief Computes the output size of joining the left table to the right table.
*
Expand Down Expand Up @@ -103,14 +207,14 @@ CUDF_KERNEL void compute_conditional_join_output_size(
}
}

using BlockReduce = cub::BlockReduce<cudf::size_type, block_size>;
using BlockReduce = cub::BlockReduce<std::size_t, block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;
std::size_t block_counter = BlockReduce(temp_storage).Sum(thread_counter);

// Add block counter to global counter
if (threadIdx.x == 0) {
cuda::atomic_ref<std::size_t, cuda::thread_scope_device> ref{*output_size};
ref.fetch_add(block_counter, cuda::std::memory_order_relaxed);
ref.fetch_add(block_counter, cuda::memory_order_relaxed);
}
}

Expand Down Expand Up @@ -143,13 +247,13 @@ CUDF_KERNEL void conditional_join(table_device_view left_table,
join_kind join_type,
cudf::size_type* join_output_l,
cudf::size_type* join_output_r,
cudf::size_type* current_idx,
std::size_t* current_idx,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const max_size,
std::size_t const max_size,
bool const swap_tables)
{
constexpr int num_warps = block_size / detail::warp_size;
__shared__ cudf::size_type current_idx_shared[num_warps];
__shared__ std::size_t current_idx_shared[num_warps];
__shared__ cudf::size_type join_shared_l[num_warps][output_cache_size];
__shared__ cudf::size_type join_shared_r[num_warps][output_cache_size];

Expand Down Expand Up @@ -183,7 +287,7 @@ CUDF_KERNEL void conditional_join(table_device_view left_table,

if (outer_row_index < outer_num_rows) {
bool found_match = false;
for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
for (cudf::thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
++inner_row_index) {
auto output_dest = cudf::ast::detail::value_expression_result<bool, has_nulls>();
auto const left_row_index = swap_tables ? inner_row_index : outer_row_index;
Expand Down Expand Up @@ -277,12 +381,12 @@ CUDF_KERNEL void conditional_join_anti_semi(
table_device_view right_table,
join_kind join_type,
cudf::size_type* join_output_l,
cudf::size_type* current_idx,
std::size_t* current_idx,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const max_size)
std::size_t const max_size)
{
constexpr int num_warps = block_size / detail::warp_size;
__shared__ cudf::size_type current_idx_shared[num_warps];
__shared__ std::size_t current_idx_shared[num_warps];
__shared__ cudf::size_type join_shared_l[num_warps][output_cache_size];

extern __shared__ char raw_intermediate_storage[];
Expand Down Expand Up @@ -310,7 +414,7 @@ CUDF_KERNEL void conditional_join_anti_semi(
for (cudf::thread_index_type outer_row_index = start_idx; outer_row_index < outer_num_rows;
outer_row_index += stride) {
bool found_match = false;
for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
for (cudf::thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
++inner_row_index) {
auto output_dest = cudf::ast::detail::value_expression_result<bool, has_nulls>();

Expand Down
95 changes: 0 additions & 95 deletions cpp/src/join/join_common_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -262,101 +262,6 @@ struct valid_range {
}
};

/**
* @brief Adds a pair of indices to the shared memory cache
*
* @param[in] first The first index in the pair
* @param[in] second The second index in the pair
* @param[in,out] current_idx_shared Pointer to shared index that determines
* where in the shared memory cache the pair will be written
* @param[in] warp_id The ID of the warp of the calling the thread
* @param[out] joined_shared_l Pointer to the shared memory cache for left indices
* @param[out] joined_shared_r Pointer to the shared memory cache for right indices
*/
__inline__ __device__ void add_pair_to_cache(size_type const first,
size_type const second,
size_type* current_idx_shared,
int const warp_id,
size_type* joined_shared_l,
size_type* joined_shared_r)
{
size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))};
// its guaranteed to fit into the shared cache
joined_shared_l[my_current_idx] = first;
joined_shared_r[my_current_idx] = second;
}

__inline__ __device__ void add_left_to_cache(size_type const first,
size_type* current_idx_shared,
int const warp_id,
size_type* joined_shared_l)
{
size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))};

joined_shared_l[my_current_idx] = first;
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
cudf::size_type const max_size,
int const warp_id,
int const lane_id,
cudf::size_type* current_idx,
cudf::size_type current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type join_shared_r[num_warps][output_cache_size],
size_type* join_output_l,
size_type* join_output_r)
{
// count how many active threads participating here which could be less than warp_size
int const num_threads = __popc(activemask);
cudf::size_type output_offset = 0;

if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); }

// No warp sync is necessary here because we are assuming that ShuffleIndex
// is internally using post-CUDA 9.0 synchronization-safe primitives
// (__shfl_sync instead of __shfl). __shfl is technically not guaranteed to
// be safe by the compiler because it is not required by the standard to
// converge divergent branches before executing.
output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
cudf::size_type thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
join_output_r[thread_offset] = join_shared_r[warp_id][shared_out_idx];
}
}
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
cudf::size_type const max_size,
int const warp_id,
int const lane_id,
cudf::size_type* current_idx,
cudf::size_type current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type* join_output_l)
{
int const num_threads = __popc(activemask);
cudf::size_type output_offset = 0;

if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); }

output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
cudf::size_type thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
}
}
}

} // namespace detail

} // namespace cudf

0 comments on commit df88cf5

Please sign in to comment.