Skip to content

Commit

Permalink
Make snappy decompress check more efficient (#9995)
Browse files Browse the repository at this point in the history
- Add status checking of decompressed snappy parquet pages when using cuDF snappy decompression.
- Improve the performance of checking decompressed snappy parquet pages when using nvCOMP snappy decompression by reducing the number of synchronizes.

Authors:
  - Xavier Simmons (https://github.com/cheinger)
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Devavret Makkar (https://github.com/devavret)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Yunsong Wang (https://github.com/PointKernel)

URL: #9995
  • Loading branch information
cheinger committed Mar 21, 2022
1 parent 037fe87 commit 4ee78fb
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 30 deletions.
78 changes: 59 additions & 19 deletions cpp/src/io/orc/reader_impl.cu
Expand Up @@ -28,6 +28,7 @@
#include <io/utilities/config_utils.hpp>
#include <io/utilities/time_utils.cuh>

#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/table/table.hpp>
#include <cudf/utilities/bit.hpp>
Expand Down Expand Up @@ -259,6 +260,39 @@ auto decimal_column_type(std::vector<std::string> const& float64_columns,

} // namespace

__global__ void decompress_check_kernel(device_span<gpu_inflate_status_s const> stats,
bool* any_block_failure)
{
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < stats.size()) {
if (stats[tid].status != 0) {
*any_block_failure = true; // Doesn't need to be atomic
}
}
}

void decompress_check(device_span<gpu_inflate_status_s> stats,
bool* any_block_failure,
rmm::cuda_stream_view stream)
{
if (stats.empty()) { return; } // early exit for empty stats

dim3 block(128);
dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast<size_t>(block.x)));
decompress_check_kernel<<<grid, block, 0, stream.value()>>>(stats, any_block_failure);
}

__global__ void convert_nvcomp_status(device_span<nvcompStatus_t const> nvcomp_stats,
device_span<size_t const> actual_uncompressed_sizes,
device_span<gpu_inflate_status_s> stats)
{
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < stats.size()) {
stats[tid].status = nvcomp_stats[tid] == nvcompStatus_t::nvcompSuccess ? 0 : 1;
stats[tid].bytes_written = actual_uncompressed_sizes[tid];
}
}

void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_stat,
size_t max_uncomp_page_size,
Expand All @@ -281,6 +315,10 @@ void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
rmm::device_uvector<size_t> actual_uncompressed_data_sizes(num_blocks, stream);
rmm::device_uvector<nvcompStatus_t> statuses(num_blocks, stream);

device_span<size_t const> actual_uncompressed_sizes_span(actual_uncompressed_data_sizes.data(),
actual_uncompressed_data_sizes.size());
device_span<nvcompStatus_t const> statuses_span(statuses.data(), statuses.size());

// Prepare the vectors
auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(),
compressed_data_sizes.begin(),
Expand All @@ -306,19 +344,10 @@ void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
stream.value());
CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, "unable to perform snappy decompression");

CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream),
statuses.begin(),
statuses.end(),
thrust::make_constant_iterator(nvcompStatus_t::nvcompSuccess)),
"Error during snappy decompression");
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
num_blocks,
[=, actual_uncomp_sizes = actual_uncompressed_data_sizes.data()] __device__(auto i) {
comp_stat[i].bytes_written = actual_uncomp_sizes[i];
comp_stat[i].status = 0;
});
dim3 block(128);
dim3 grid(cudf::util::div_rounding_up_safe(num_blocks, static_cast<size_t>(block.x)));
convert_nvcomp_status<<<grid, block, 0, stream.value()>>>(
statuses_span, actual_uncompressed_sizes_span, comp_stat);
}

rmm::device_buffer reader::impl::decompress_stripe_data(
Expand All @@ -332,6 +361,11 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
bool use_base_stride,
rmm::cuda_stream_view stream)
{
// For checking whether we decompress successfully
hostdevice_vector<bool> any_block_failure(1, stream);
any_block_failure[0] = false;
any_block_failure.host_to_device(stream);

// Parse the columns' compressed info
hostdevice_vector<gpu::CompressedStreamInfo> compinfo(0, stream_info.size(), stream);
for (const auto& info : stream_info) {
Expand All @@ -340,6 +374,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
info.length));
}
compinfo.host_to_device(stream);

gpu::ParseCompressedStripeData(compinfo.device_ptr(),
compinfo.size(),
decompressor->GetBlockSize(),
Expand Down Expand Up @@ -391,6 +426,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(

// Dispatch batches of blocks to decompress
if (num_compressed_blocks > 0) {
device_span<gpu_inflate_status_s> inflate_out_view(inflate_out.data(), num_compressed_blocks);
switch (decompressor->GetKind()) {
case orc::ZLIB:
CUDA_TRY(
Expand All @@ -400,8 +436,6 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
if (nvcomp_integration::is_stable_enabled()) {
device_span<gpu_inflate_input_s> inflate_in_view{inflate_in.data(),
num_compressed_blocks};
device_span<gpu_inflate_status_s> inflate_out_view{inflate_out.data(),
num_compressed_blocks};
snappy_decompress(inflate_in_view, inflate_out_view, max_uncomp_block_size, stream);
} else {
CUDA_TRY(
Expand All @@ -410,22 +444,28 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
break;
default: CUDF_FAIL("Unexpected decompression dispatch"); break;
}
decompress_check(inflate_out_view, any_block_failure.device_ptr(), stream);
}
if (num_uncompressed_blocks > 0) {
CUDA_TRY(gpu_copy_uncompressed_blocks(
inflate_in.data() + num_compressed_blocks, num_uncompressed_blocks, stream));
}
gpu::PostDecompressionReassemble(compinfo.device_ptr(), compinfo.size(), stream);

any_block_failure.device_to_host(stream);

compinfo.device_to_host(stream, true);

// We can check on host after stream synchronize
CUDF_EXPECTS(not any_block_failure[0], "Error during decompression");

const size_t num_columns = chunks.size().second;

// Update the stream information with the updated uncompressed info
// TBD: We could update the value from the information we already
// have in stream_info[], but using the gpu results also updates
// max_uncompressed_size to the actual uncompressed size, or zero if
// decompression failed.
compinfo.device_to_host(stream, true);

const size_t num_columns = chunks.size().second;

for (size_t i = 0; i < num_stripes; ++i) {
for (size_t j = 0; j < num_columns; ++j) {
auto& chunk = chunks[i][j];
Expand Down
55 changes: 44 additions & 11 deletions cpp/src/io/parquet/reader_impl.cu
Expand Up @@ -27,6 +27,7 @@
#include <io/utilities/config_utils.hpp>
#include <io/utilities/time_utils.cuh>

#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/table/table.hpp>
#include <cudf/utilities/error.hpp>
Expand Down Expand Up @@ -1044,6 +1045,37 @@ void reader::impl::decode_page_headers(hostdevice_vector<gpu::ColumnChunkDesc>&
pages.device_to_host(stream, true);
}

__global__ void decompress_check_kernel(device_span<gpu_inflate_status_s const> stats,
bool* any_block_failure)
{
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < stats.size()) {
if (stats[tid].status != 0) {
*any_block_failure = true; // Doesn't need to be atomic
}
}
}

void decompress_check(device_span<gpu_inflate_status_s> stats,
bool* any_block_failure,
rmm::cuda_stream_view stream)
{
if (stats.empty()) { return; } // early exit for empty stats

dim3 block(128);
dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast<size_t>(block.x)));
decompress_check_kernel<<<grid, block, 0, stream.value()>>>(stats, any_block_failure);
}

__global__ void convert_nvcomp_status(device_span<nvcompStatus_t const> nvcomp_stats,
device_span<gpu_inflate_status_s> stats)
{
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < stats.size()) {
stats[tid].status = nvcomp_stats[tid] == nvcompStatus_t::nvcompSuccess ? 0 : 1;
}
}

void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_stat,
size_t max_uncomp_page_size,
Expand Down Expand Up @@ -1072,6 +1104,7 @@ void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
rmm::device_uvector<size_t> actual_uncompressed_data_sizes(num_comp_pages, stream);
// Convertible to comp_stat.status
rmm::device_uvector<nvcompStatus_t> statuses(num_comp_pages, stream);
device_span<nvcompStatus_t const> statuses_span(statuses.data(), statuses.size());

// Prepare the vectors
auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(),
Expand Down Expand Up @@ -1099,16 +1132,9 @@ void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess,
"unable to perform snappy decompression");

CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream),
uncompressed_data_sizes.begin(),
uncompressed_data_sizes.end(),
actual_uncompressed_data_sizes.begin()),
"Mismatch in expected and actual decompressed size during snappy decompression");
CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream),
statuses.begin(),
statuses.end(),
thrust::make_constant_iterator(nvcompStatus_t::nvcompSuccess)),
"Error during snappy decompression");
dim3 block(128);
dim3 grid(cudf::util::div_rounding_up_safe(num_comp_pages, static_cast<size_t>(block.x)));
convert_nvcomp_status<<<grid, block, 0, stream.value()>>>(statuses_span, comp_stat);
}

/**
Expand Down Expand Up @@ -1166,6 +1192,10 @@ rmm::device_buffer reader::impl::decompress_page_data(
hostdevice_vector<gpu_inflate_input_s> inflate_in(0, num_comp_pages, stream);
hostdevice_vector<gpu_inflate_status_s> inflate_out(0, num_comp_pages, stream);

hostdevice_vector<bool> any_block_failure(1, stream);
any_block_failure[0] = false;
any_block_failure.host_to_device(stream);

device_span<gpu_inflate_input_s> inflate_in_view(inflate_in.device_ptr(), inflate_in.size());
device_span<gpu_inflate_status_s> inflate_out_view(inflate_out.device_ptr(), inflate_out.size());

Expand Down Expand Up @@ -1240,7 +1270,10 @@ rmm::device_buffer reader::impl::decompress_page_data(
stream.value()));
}
}
stream.synchronize();

decompress_check(inflate_out_view, any_block_failure.device_ptr(), stream);
any_block_failure.device_to_host(stream, true); // synchronizes stream
CUDF_EXPECTS(not any_block_failure[0], "Error during decompression");

// Update the page information in device memory with the updated value of
// page_data; it now points to the uncompressed data buffer
Expand Down

0 comments on commit 4ee78fb

Please sign in to comment.