Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix invalid memory access in Parquet reader #14637

Merged
merged 14 commits into from
Dec 19, 2023
Merged
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ __global__ void __launch_bounds__(preprocess_block_size)
{rep_runs}};

// setup page info
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, all_types_filter{}, false)) { return; }
if (!setupLocalPageInfo(
s, pp, chunks, min_row, num_rows, all_types_filter{}, page_processing_stage::PREPROCESS)) {
return;
}

// initialize the stream decoders (requires values computed in setupLocalPageInfo)
// the size of the rolling batch buffer
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ __global__ void __launch_bounds__(decode_block_size)
min_row,
num_rows,
mask_filter{decode_kernel_mask::GENERAL},
true)) {
page_processing_stage::DECODE)) {
return;
}

Expand Down
23 changes: 15 additions & 8 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,12 @@ struct mask_filter {
}
};

enum class page_processing_stage {
PREPROCESS,
STRING_BOUNDS,
DECODE,
};

/**
* @brief Sets up block-local page state information from the global pages.
*
Expand All @@ -1023,7 +1029,7 @@ struct mask_filter {
* @param[in] min_row Crop all rows below min_row
* @param[in] num_rows Maximum number of rows to read
* @param[in] filter Filtering function used to decide which pages to operate on
* @param[in] is_decode_step If we are setting up for the decode step (instead of the preprocess)
* @param[in] stage What stage of the decoding process is this being called from
* @tparam Filter Function that takes a PageInfo reference and returns true if the given page should
* be operated on Currently only used by gpuComputePageSizes step)
* @return True if this page should be processed further
Expand All @@ -1035,7 +1041,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
size_t min_row,
size_t num_rows,
Filter filter,
bool is_decode_step)
page_processing_stage stage)
{
int t = threadIdx.x;

Expand Down Expand Up @@ -1126,7 +1132,8 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
//
// NOTE: this check needs to be done after the null counts have been zeroed out
bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;
if (is_decode_step && s->num_rows == 0 &&
if ((stage == page_processing_stage::STRING_BOUNDS || stage == page_processing_stage::DECODE) &&
s->num_rows == 0 &&
!(has_repetition && (is_bounds_page(s, min_row, num_rows, has_repetition) ||
is_page_contained(s, min_row, num_rows)))) {
return false;
Expand Down Expand Up @@ -1237,7 +1244,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
// NOTE: in a chunked read situation, s->col.column_data_base and s->col.valid_map_base
// will be aliased to memory that has been freed when we get here in the non-decode step, so
// we cannot check against nullptr. we'll just check a flag directly.
if (is_decode_step) {
if (stage == page_processing_stage::DECODE) {
int max_depth = s->col.max_nesting_depth;
for (int idx = 0; idx < max_depth; idx++) {
PageNestingDecodeInfo* nesting_info = &s->nesting_info[idx];
Expand Down Expand Up @@ -1387,13 +1394,13 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,

// if we're in the decoding step, jump directly to the first
// value we care about
Comment on lines 1395 to 1396
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment needs to be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could change this to move the is_bonds_step test inside the else block...I think all that's necessary is to just not zero out those values.

if (is_decode_step) {
if (stage == page_processing_stage::DECODE) {
s->input_value_count = s->page.skipped_values > -1 ? s->page.skipped_values : 0;
} else {
} else if (stage == page_processing_stage::PREPROCESS) {
s->input_value_count = 0;
s->input_leaf_count = 0;
s->page.skipped_values =
-1; // magic number to indicate it hasn't been set for use inside UpdatePageSizes
// magic number to indicate it hasn't been set for use inside UpdatePageSizes
s->page.skipped_values = -1;
s->page.skipped_leaf_values = 0;
}
}
Expand Down
20 changes: 14 additions & 6 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,13 @@ __global__ void __launch_bounds__(96)
auto* const db = &db_state;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::DELTA_BINARY;
if (!setupLocalPageInfo(
s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) {
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::DELTA_BINARY},
page_processing_stage::DECODE)) {
return;
}

Expand Down Expand Up @@ -446,9 +450,13 @@ __global__ void __launch_bounds__(decode_block_size)
auto* const dba = &db_state;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::DELTA_BYTE_ARRAY;
if (!setupLocalPageInfo(
s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) {
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::DELTA_BYTE_ARRAY},
page_processing_stage::DECODE)) {
return;
}

Expand Down
40 changes: 32 additions & 8 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,15 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBou

// setup page info
auto const mask = BitOr(decode_kernel_mask::STRING, decode_kernel_mask::DELTA_BYTE_ARRAY);
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{mask}, true)) { return; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this PR change is_decode_step value in some of these calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string preprocessing was passing that as true, leading the setup call to believe the output buffers were valid and thus accessing invalid memory. With the new flag true and the old flag false, we get the behavior that was originally desired, but can now skip the bad pointer arithmetic.

if (!setupLocalPageInfo(s,
pp,
chunks,
min_row,
num_rows,
mask_filter{mask},
page_processing_stage::STRING_BOUNDS)) {
return;
}

bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

Expand Down Expand Up @@ -659,8 +667,15 @@ __global__ void __launch_bounds__(delta_preproc_block_size) gpuComputeDeltaPageS
bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0;

// setup page info
auto const mask = decode_kernel_mask::DELTA_BYTE_ARRAY;
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{mask}, true)) { return; }
if (!setupLocalPageInfo(s,
pp,
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::DELTA_BYTE_ARRAY},
page_processing_stage::STRING_BOUNDS)) {
return;
}

auto const start_value = pp->start_val;

Expand Down Expand Up @@ -722,8 +737,13 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz
bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0;

// setup page info
if (!setupLocalPageInfo(
s, pp, chunks, min_row, num_rows, mask_filter{decode_kernel_mask::STRING}, true)) {
if (!setupLocalPageInfo(s,
pp,
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::STRING},
page_processing_stage::STRING_BOUNDS)) {
return;
}

Expand Down Expand Up @@ -816,9 +836,13 @@ __global__ void __launch_bounds__(decode_block_size)
int const lane_id = t % warp_size;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::STRING;
if (!setupLocalPageInfo(
s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) {
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::STRING},
page_processing_stage::DECODE)) {
return;
}

Expand Down
Loading