Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Zstandard decompression in Parquet reader #10847

Merged
merged 8 commits into from May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions cpp/src/io/comp/nvcomp_adapter.cpp
Expand Up @@ -20,6 +20,14 @@

#include <nvcomp/snappy.h>

#define NVCOMP_ZSTD_HEADER <nvcomp/zstd.h>
#if __has_include(NVCOMP_ZSTD_HEADER)
#include NVCOMP_ZSTD_HEADER
#define NVCOMP_HAS_ZSTD 1
#else
#define NVCOMP_HAS_ZSTD 0
#endif

namespace cudf::io::nvcomp {

template <typename... Args>
Expand All @@ -28,6 +36,10 @@ auto batched_decompress_get_temp_size(compression_type type, Args&&... args)
switch (type) {
case compression_type::SNAPPY:
return nvcompBatchedSnappyDecompressGetTempSize(std::forward<Args>(args)...);
#if NVCOMP_HAS_ZSTD
case compression_type::ZSTD:
return nvcompBatchedZstdDecompressGetTempSize(std::forward<Args>(args)...);
#endif
default: CUDF_FAIL("Unsupported compression type");
}
};
Expand All @@ -38,6 +50,10 @@ auto batched_decompress_async(compression_type type, Args&&... args)
switch (type) {
case compression_type::SNAPPY:
return nvcompBatchedSnappyDecompressAsync(std::forward<Args>(args)...);
#if NVCOMP_HAS_ZSTD
case compression_type::ZSTD:
return nvcompBatchedZstdDecompressAsync(std::forward<Args>(args)...);
#endif
default: CUDF_FAIL("Unsupported compression type");
}
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/comp/nvcomp_adapter.hpp
Expand Up @@ -24,7 +24,7 @@

namespace cudf::io::nvcomp {

enum class compression_type { SNAPPY };
enum class compression_type { SNAPPY, ZSTD };

/**
* @brief Device batch decompression of given type.
Expand Down
11 changes: 10 additions & 1 deletion cpp/src/io/parquet/reader_impl.cu
Expand Up @@ -1108,7 +1108,8 @@ rmm::device_buffer reader::impl::decompress_page_data(

std::array codecs{codec_stats{parquet::GZIP, 0, 0},
codec_stats{parquet::SNAPPY, 0, 0},
codec_stats{parquet::BROTLI, 0, 0}};
codec_stats{parquet::BROTLI, 0, 0},
codec_stats{parquet::ZSTD, 0, 0}};

auto is_codec_supported = [&codecs](int8_t codec) {
if (codec == parquet::UNCOMPRESSED) return true;
Expand Down Expand Up @@ -1191,6 +1192,14 @@ rmm::device_buffer reader::impl::decompress_page_data(
gpu_unsnap(d_comp_in, d_comp_out, d_comp_stats_view, stream);
}
break;
case parquet::ZSTD:
nvcomp::batched_decompress(nvcomp::compression_type::ZSTD,
d_comp_in,
d_comp_out,
d_comp_stats_view,
codec.max_decompressed_size,
stream);
break;
case parquet::BROTLI:
gpu_debrotli(d_comp_in,
d_comp_out,
Expand Down
14 changes: 10 additions & 4 deletions python/cudf/cudf/tests/test_parquet.py
Expand Up @@ -2509,8 +2509,14 @@ def test_parquet_reader_decimal_columns():
assert_eq(actual, expected)


def test_parquet_reader_unsupported_compression(datadir):
def test_parquet_reader_zstd_compression(datadir):
fname = datadir / "spark_zstd.parquet"

with pytest.raises(RuntimeError):
cudf.read_parquet(fname)
try:
df = cudf.read_parquet(fname)
pdf = pd.read_parquet(fname)
assert_eq(df, pdf)
except RuntimeError as e:
if "Unsupported compression type" in str(e):
pytest.mark.xfail(reason="nvcomp build doesn't have zstd")
else:
raise e