diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 89488d75735..081ae69e48f 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -259,6 +260,39 @@ auto decimal_column_type(std::vector const& float64_columns, } // namespace +__global__ void decompress_check_kernel(device_span stats, + bool* any_block_failure) +{ + auto tid = blockIdx.x * blockDim.x + threadIdx.x; + if (tid < stats.size()) { + if (stats[tid].status != 0) { + *any_block_failure = true; // Doesn't need to be atomic + } + } +} + +void decompress_check(device_span stats, + bool* any_block_failure, + rmm::cuda_stream_view stream) +{ + if (stats.empty()) { return; } // early exit for empty stats + + dim3 block(128); + dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast(block.x))); + decompress_check_kernel<<>>(stats, any_block_failure); +} + +__global__ void convert_nvcomp_status(device_span nvcomp_stats, + device_span actual_uncompressed_sizes, + device_span stats) +{ + auto tid = blockIdx.x * blockDim.x + threadIdx.x; + if (tid < stats.size()) { + stats[tid].status = nvcomp_stats[tid] == nvcompStatus_t::nvcompSuccess ? 0 : 1; + stats[tid].bytes_written = actual_uncompressed_sizes[tid]; + } +} + void snappy_decompress(device_span comp_in, device_span comp_stat, size_t max_uncomp_page_size, @@ -281,6 +315,10 @@ void snappy_decompress(device_span comp_in, rmm::device_uvector actual_uncompressed_data_sizes(num_blocks, stream); rmm::device_uvector statuses(num_blocks, stream); + device_span actual_uncompressed_sizes_span(actual_uncompressed_data_sizes.data(), + actual_uncompressed_data_sizes.size()); + device_span statuses_span(statuses.data(), statuses.size()); + // Prepare the vectors auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(), compressed_data_sizes.begin(), @@ -306,19 +344,10 @@ void snappy_decompress(device_span comp_in, stream.value()); CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, "unable to perform snappy decompression"); - CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream), - statuses.begin(), - statuses.end(), - thrust::make_constant_iterator(nvcompStatus_t::nvcompSuccess)), - "Error during snappy decompression"); - thrust::for_each_n( - rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - num_blocks, - [=, actual_uncomp_sizes = actual_uncompressed_data_sizes.data()] __device__(auto i) { - comp_stat[i].bytes_written = actual_uncomp_sizes[i]; - comp_stat[i].status = 0; - }); + dim3 block(128); + dim3 grid(cudf::util::div_rounding_up_safe(num_blocks, static_cast(block.x))); + convert_nvcomp_status<<>>( + statuses_span, actual_uncompressed_sizes_span, comp_stat); } rmm::device_buffer reader::impl::decompress_stripe_data( @@ -332,6 +361,11 @@ rmm::device_buffer reader::impl::decompress_stripe_data( bool use_base_stride, rmm::cuda_stream_view stream) { + // For checking whether we decompress successfully + hostdevice_vector any_block_failure(1, stream); + any_block_failure[0] = false; + any_block_failure.host_to_device(stream); + // Parse the columns' compressed info hostdevice_vector compinfo(0, stream_info.size(), stream); for (const auto& info : stream_info) { @@ -340,6 +374,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data( info.length)); } compinfo.host_to_device(stream); + gpu::ParseCompressedStripeData(compinfo.device_ptr(), compinfo.size(), decompressor->GetBlockSize(), @@ -391,6 +426,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data( // Dispatch batches of blocks to decompress if (num_compressed_blocks > 0) { + device_span inflate_out_view(inflate_out.data(), num_compressed_blocks); switch (decompressor->GetKind()) { case orc::ZLIB: CUDA_TRY( @@ -400,8 +436,6 @@ rmm::device_buffer reader::impl::decompress_stripe_data( if (nvcomp_integration::is_stable_enabled()) { device_span inflate_in_view{inflate_in.data(), num_compressed_blocks}; - device_span inflate_out_view{inflate_out.data(), - num_compressed_blocks}; snappy_decompress(inflate_in_view, inflate_out_view, max_uncomp_block_size, stream); } else { CUDA_TRY( @@ -410,6 +444,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data( break; default: CUDF_FAIL("Unexpected decompression dispatch"); break; } + decompress_check(inflate_out_view, any_block_failure.device_ptr(), stream); } if (num_uncompressed_blocks > 0) { CUDA_TRY(gpu_copy_uncompressed_blocks( @@ -417,15 +452,20 @@ rmm::device_buffer reader::impl::decompress_stripe_data( } gpu::PostDecompressionReassemble(compinfo.device_ptr(), compinfo.size(), stream); + any_block_failure.device_to_host(stream); + + compinfo.device_to_host(stream, true); + + // We can check on host after stream synchronize + CUDF_EXPECTS(not any_block_failure[0], "Error during decompression"); + + const size_t num_columns = chunks.size().second; + // Update the stream information with the updated uncompressed info // TBD: We could update the value from the information we already // have in stream_info[], but using the gpu results also updates // max_uncompressed_size to the actual uncompressed size, or zero if // decompression failed. - compinfo.device_to_host(stream, true); - - const size_t num_columns = chunks.size().second; - for (size_t i = 0; i < num_stripes; ++i) { for (size_t j = 0; j < num_columns; ++j) { auto& chunk = chunks[i][j]; diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index d7a99f01106..9e7a48b7a69 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -1044,6 +1045,37 @@ void reader::impl::decode_page_headers(hostdevice_vector& pages.device_to_host(stream, true); } +__global__ void decompress_check_kernel(device_span stats, + bool* any_block_failure) +{ + auto tid = blockIdx.x * blockDim.x + threadIdx.x; + if (tid < stats.size()) { + if (stats[tid].status != 0) { + *any_block_failure = true; // Doesn't need to be atomic + } + } +} + +void decompress_check(device_span stats, + bool* any_block_failure, + rmm::cuda_stream_view stream) +{ + if (stats.empty()) { return; } // early exit for empty stats + + dim3 block(128); + dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast(block.x))); + decompress_check_kernel<<>>(stats, any_block_failure); +} + +__global__ void convert_nvcomp_status(device_span nvcomp_stats, + device_span stats) +{ + auto tid = blockIdx.x * blockDim.x + threadIdx.x; + if (tid < stats.size()) { + stats[tid].status = nvcomp_stats[tid] == nvcompStatus_t::nvcompSuccess ? 0 : 1; + } +} + void snappy_decompress(device_span comp_in, device_span comp_stat, size_t max_uncomp_page_size, @@ -1072,6 +1104,7 @@ void snappy_decompress(device_span comp_in, rmm::device_uvector actual_uncompressed_data_sizes(num_comp_pages, stream); // Convertible to comp_stat.status rmm::device_uvector statuses(num_comp_pages, stream); + device_span statuses_span(statuses.data(), statuses.size()); // Prepare the vectors auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(), @@ -1099,16 +1132,9 @@ void snappy_decompress(device_span comp_in, CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "unable to perform snappy decompression"); - CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream), - uncompressed_data_sizes.begin(), - uncompressed_data_sizes.end(), - actual_uncompressed_data_sizes.begin()), - "Mismatch in expected and actual decompressed size during snappy decompression"); - CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream), - statuses.begin(), - statuses.end(), - thrust::make_constant_iterator(nvcompStatus_t::nvcompSuccess)), - "Error during snappy decompression"); + dim3 block(128); + dim3 grid(cudf::util::div_rounding_up_safe(num_comp_pages, static_cast(block.x))); + convert_nvcomp_status<<>>(statuses_span, comp_stat); } /** @@ -1166,6 +1192,10 @@ rmm::device_buffer reader::impl::decompress_page_data( hostdevice_vector inflate_in(0, num_comp_pages, stream); hostdevice_vector inflate_out(0, num_comp_pages, stream); + hostdevice_vector any_block_failure(1, stream); + any_block_failure[0] = false; + any_block_failure.host_to_device(stream); + device_span inflate_in_view(inflate_in.device_ptr(), inflate_in.size()); device_span inflate_out_view(inflate_out.device_ptr(), inflate_out.size()); @@ -1240,7 +1270,10 @@ rmm::device_buffer reader::impl::decompress_page_data( stream.value())); } } - stream.synchronize(); + + decompress_check(inflate_out_view, any_block_failure.device_ptr(), stream); + any_block_failure.device_to_host(stream, true); // synchronizes stream + CUDF_EXPECTS(not any_block_failure[0], "Error during decompression"); // Update the page information in device memory with the updated value of // page_data; it now points to the uncompressed data buffer