diff --git a/cpp/src/parquet/column/column-reader-test.cc b/cpp/src/parquet/column/column-reader-test.cc index 0abdf79c1b209..c2c0aa3f41750 100644 --- a/cpp/src/parquet/column/column-reader-test.cc +++ b/cpp/src/parquet/column/column-reader-test.cc @@ -181,6 +181,62 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { ASSERT_EQ(0, batch_actual); ASSERT_EQ(0, values_read); } -} // namespace test +TEST_F(TestPrimitiveReader, TestInt32FlatRepeatedMultiplePages) { + vector values[2] = {{1, 2, 3, 4, 5}, + {6, 7, 8, 9, 10}}; + vector def_levels[2] = {{2, 1, 1, 2, 2, 1, 1, 2, 2, 1}, + {2, 2, 1, 2, 1, 1, 2, 1, 2, 1}}; + vector rep_levels[2] = {{0, 1, 1, 0, 0, 1, 1, 0, 0, 1}, + {0, 0, 1, 0, 1, 1, 0, 1, 0, 1}}; + + std::vector buffer[4]; + std::shared_ptr page; + + for (int i = 0; i < 4; i++) { + page = MakeDataPage(values[i % 2], + def_levels[i % 2], 2, rep_levels[i % 2], 1, &buffer[i]); + pages_.push_back(page); + } + + NodePtr type = schema::Int32("a", Repetition::REPEATED); + ColumnDescriptor descr(type, 2, 1); + InitReader(&descr); + + Int32Reader* reader = static_cast(reader_.get()); + + size_t values_read = 0; + size_t batch_actual = 0; + + vector vresult(3, -1); + vector dresult(5, -1); + vector rresult(5, -1); + + for (int i = 0; i < 4; i++) { + batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0], + &vresult[0], &values_read); + ASSERT_EQ(5, batch_actual); + ASSERT_EQ(3, values_read); + + ASSERT_TRUE(vector_equal(vresult, slice(values[i % 2], 0, 3))); + ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 0, 5))); + ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 0, 5))); + + batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0], + &vresult[0], &values_read); + ASSERT_EQ(5, batch_actual); + ASSERT_EQ(2, values_read); + + ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values[i % 2], 3, 5))); + ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 5, 10))); + ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 5, 10))); + } + // EOS, pass all nullptrs to check for improper writes. Do not segfault / + // core dump + batch_actual = reader->ReadBatch(5, nullptr, nullptr, + nullptr, &values_read); + ASSERT_EQ(0, batch_actual); + ASSERT_EQ(0, values_read); +} +} // namespace test } // namespace parquet_cpp diff --git a/cpp/src/parquet/column/levels-test.cc b/cpp/src/parquet/column/levels-test.cc index 6061d235794d3..62188db822bef 100644 --- a/cpp/src/parquet/column/levels-test.cc +++ b/cpp/src/parquet/column/levels-test.cc @@ -16,8 +16,8 @@ // under the License. #include -#include #include +#include #include @@ -28,97 +28,164 @@ using std::string; namespace parquet_cpp { -int GenerateLevels(int min_repeat_factor, int max_repeat_factor, +void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level, std::vector& input_levels) { int total_count = 0; // for each repetition count upto max_repeat_factor for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) { - // repeat count increase by a factor of 2 for every iteration + // repeat count increases by a factor of 2 for every iteration int repeat_count = (1 << repeat); // generate levels for repetition count upto the maximum level int value = 0; int bwidth = 0; while (value <= max_level) { for (int i = 0; i < repeat_count; i++) { - input_levels[total_count++] = value; + input_levels.push_back(value); } value = (2 << bwidth) - 1; bwidth++; } } - return total_count; } -void VerifyLevelsEncoding(Encoding::type encoding, int max_level, - std::vector& input_levels) { +void EncodeLevels(Encoding::type encoding, int max_level, int num_levels, + const int16_t* input_levels, std::vector& bytes) { LevelEncoder encoder; - LevelDecoder decoder; int levels_count = 0; - std::vector output_levels; - std::vector bytes; - int num_levels = input_levels.size(); - output_levels.resize(num_levels); bytes.resize(2 * num_levels); - ASSERT_EQ(num_levels, output_levels.size()); ASSERT_EQ(2 * num_levels, bytes.size()); - // start encoding and decoding + // encode levels if (encoding == Encoding::RLE) { // leave space to write the rle length value encoder.Init(encoding, max_level, num_levels, bytes.data() + sizeof(uint32_t), bytes.size()); - levels_count = encoder.Encode(num_levels, input_levels.data()); + levels_count = encoder.Encode(num_levels, input_levels); (reinterpret_cast(bytes.data()))[0] = encoder.len(); - } else { encoder.Init(encoding, max_level, num_levels, bytes.data(), bytes.size()); - levels_count = encoder.Encode(num_levels, input_levels.data()); + levels_count = encoder.Encode(num_levels, input_levels); } - ASSERT_EQ(num_levels, levels_count); +} - decoder.Init(encoding, max_level, num_levels, bytes.data()); - levels_count = decoder.Decode(num_levels, output_levels.data()); +void VerifyDecodingLevels(Encoding::type encoding, int max_level, + std::vector& input_levels, std::vector& bytes) { + LevelDecoder decoder; + int levels_count = 0; + std::vector output_levels; + int num_levels = input_levels.size(); - ASSERT_EQ(num_levels, levels_count); + output_levels.resize(num_levels); + ASSERT_EQ(num_levels, output_levels.size()); - for (int i = 0; i < num_levels; i++) { - EXPECT_EQ(input_levels[i], output_levels[i]); + // Decode levels and test with multiple decode calls + decoder.SetData(encoding, max_level, num_levels, bytes.data()); + int decode_count = 4; + int num_inner_levels = num_levels / decode_count; + // Try multiple decoding on a single SetData call + for (int ct = 0; ct < decode_count; ct++) { + int offset = ct * num_inner_levels; + levels_count = decoder.Decode(num_inner_levels, output_levels.data()); + ASSERT_EQ(num_inner_levels, levels_count); + for (int i = 0; i < num_inner_levels; i++) { + EXPECT_EQ(input_levels[i + offset], output_levels[i]); + } } + // check the remaining levels + int num_levels_completed = decode_count * (num_levels / decode_count); + int num_remaining_levels = num_levels - num_levels_completed; + if (num_remaining_levels > 0) { + levels_count = decoder.Decode(num_remaining_levels, output_levels.data()); + ASSERT_EQ(num_remaining_levels, levels_count); + for (int i = 0; i < num_remaining_levels; i++) { + EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]); + } + } + //Test zero Decode values + ASSERT_EQ(0, decoder.Decode(1, output_levels.data())); } -TEST(TestLevels, TestEncodeDecodeLevels) { - // test levels with maximum bit-width from 1 to 8 - // increase the repetition count for each iteration by a factor of 2 +void VerifyDecodingMultipleSetData(Encoding::type encoding, int max_level, + std::vector& input_levels, std::vector>& bytes) { + LevelDecoder decoder; + int levels_count = 0; + std::vector output_levels; + // Decode levels and test with multiple SetData calls + int setdata_count = bytes.size(); + int num_levels = input_levels.size() / setdata_count; + output_levels.resize(num_levels); + // Try multiple SetData + for (int ct = 0; ct < setdata_count; ct++) { + int offset = ct * num_levels; + ASSERT_EQ(num_levels, output_levels.size()); + decoder.SetData(encoding, max_level, num_levels, bytes[ct].data()); + levels_count = decoder.Decode(num_levels, output_levels.data()); + ASSERT_EQ(num_levels, levels_count); + for (int i = 0; i < num_levels; i++) { + EXPECT_EQ(input_levels[i + offset], output_levels[i]); + } + } +} + +// Test levels with maximum bit-width from 1 to 8 +// increase the repetition count for each iteration by a factor of 2 +TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) { int min_repeat_factor = 0; int max_repeat_factor = 7; // 128 int max_bit_width = 8; std::vector input_levels; - Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED}; + std::vector bytes; + Encoding::type encodings[2] = {Encoding::RLE, + Encoding::BIT_PACKED}; // for each encoding for (int encode = 0; encode < 2; encode++) { Encoding::type encoding = encodings[encode]; // BIT_PACKED requires a sequence of atleast 8 if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3; - // for each maximum bit-width for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) { - int num_levels_per_width = ((2 << max_repeat_factor) - (1 << min_repeat_factor)); - int num_levels = (bit_width + 1) * num_levels_per_width; - input_levels.resize(num_levels); - ASSERT_EQ(num_levels, input_levels.size()); - // find the maximum level for the current bit_width int max_level = (1 << bit_width) - 1; // Generate levels - int total_count = GenerateLevels(min_repeat_factor, max_repeat_factor, + GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels); - ASSERT_EQ(num_levels, total_count); - VerifyLevelsEncoding(encoding, max_level, input_levels); + EncodeLevels(encoding, max_level, input_levels.size(), input_levels.data(), bytes); + VerifyDecodingLevels(encoding, max_level, input_levels, bytes); + input_levels.clear(); + } + } +} + +// Test multiple decoder SetData calls +TEST(TestLevels, TestLevelsDecodeMultipleSetData) { + int min_repeat_factor = 3; + int max_repeat_factor = 7; // 128 + int bit_width = 8; + int max_level = (1 << bit_width) - 1; + std::vector input_levels; + std::vector> bytes; + Encoding::type encodings[2] = {Encoding::RLE, + Encoding::BIT_PACKED}; + GenerateLevels(min_repeat_factor, max_repeat_factor, + max_level, input_levels); + int num_levels = input_levels.size(); + int setdata_factor = 8; + int split_level_size = num_levels / setdata_factor; + bytes.resize(setdata_factor); + + // for each encoding + for (int encode = 0; encode < 2; encode++) { + Encoding::type encoding = encodings[encode]; + for (int rf = 0; rf < setdata_factor; rf++) { + int offset = rf * split_level_size; + EncodeLevels(encoding, max_level, split_level_size, + reinterpret_cast(input_levels.data()) + offset, bytes[rf]); } + VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes); } } diff --git a/cpp/src/parquet/column/levels.h b/cpp/src/parquet/column/levels.h index 18fd0bb5a6109..a0266040f6a6a 100644 --- a/cpp/src/parquet/column/levels.h +++ b/cpp/src/parquet/column/levels.h @@ -19,6 +19,7 @@ #define PARQUET_COLUMN_LEVELS_H #include +#include #include "parquet/exception.h" #include "parquet/types.h" @@ -96,25 +97,35 @@ class LevelEncoder { class LevelDecoder { public: - LevelDecoder() {} + LevelDecoder() : num_values_remaining_(0) {} - // Initialize the LevelDecoder and return the number of bytes consumed - size_t Init(Encoding::type encoding, int16_t max_level, + // Initialize the LevelDecoder state with new data + // and return the number of bytes consumed + size_t SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values, const uint8_t* data) { uint32_t num_bytes = 0; uint32_t total_bytes = 0; - bit_width_ = BitUtil::Log2(max_level + 1); encoding_ = encoding; + num_values_remaining_ = num_buffered_values; + bit_width_ = BitUtil::Log2(max_level + 1); switch (encoding) { case Encoding::RLE: { num_bytes = *reinterpret_cast(data); const uint8_t* decoder_data = data + sizeof(uint32_t); - rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_)); + if (!rle_decoder_) { + rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_)); + } else { + rle_decoder_->Reset(decoder_data, num_bytes, bit_width_); + } return sizeof(uint32_t) + num_bytes; } case Encoding::BIT_PACKED: { num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8); - bit_packed_decoder_.reset(new BitReader(data, num_bytes)); + if (!bit_packed_decoder_) { + bit_packed_decoder_.reset(new BitReader(data, num_bytes)); + } else { + bit_packed_decoder_->Reset(data, num_bytes); + } return num_bytes; } default: @@ -126,30 +137,30 @@ class LevelDecoder { // Decodes a batch of levels into an array and returns the number of levels decoded size_t Decode(size_t batch_size, int16_t* levels) { size_t num_decoded = 0; - if (!rle_decoder_ && !bit_packed_decoder_) { - throw ParquetException("Level decoders are not initialized."); - } + size_t num_values = std::min(num_values_remaining_, batch_size); if (encoding_ == Encoding::RLE) { - for (size_t i = 0; i < batch_size; ++i) { + for (size_t i = 0; i < num_values; ++i) { if (!rle_decoder_->Get(levels + i)) { break; } ++num_decoded; } } else { - for (size_t i = 0; i < batch_size; ++i) { + for (size_t i = 0; i < num_values; ++i) { if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) { break; } ++num_decoded; } } + num_values_remaining_ -= num_decoded; return num_decoded; } private: int bit_width_; + size_t num_values_remaining_; Encoding::type encoding_; std::unique_ptr rle_decoder_; std::unique_ptr bit_packed_decoder_; diff --git a/cpp/src/parquet/column/reader.cc b/cpp/src/parquet/column/reader.cc index 4ba06163cd381..4011347c24161 100644 --- a/cpp/src/parquet/column/reader.cc +++ b/cpp/src/parquet/column/reader.cc @@ -97,15 +97,13 @@ bool TypedColumnReader::ReadNewPage() { // the page size to determine the number of bytes in the encoded data. size_t data_size = page->size(); - int16_t max_definition_level = descr_->max_definition_level(); - int16_t max_repetition_level = descr_->max_repetition_level(); //Data page Layout: Repetition Levels - Definition Levels - encoded values. //Levels are encoded as rle or bit-packed. //Init repetition levels - if (max_repetition_level > 0) { - size_t rep_levels_bytes = repetition_level_decoder_.Init( - page->repetition_level_encoding(), - max_repetition_level, num_buffered_values_, buffer); + if (descr_->max_repetition_level() > 0) { + size_t rep_levels_bytes = repetition_level_decoder_.SetData( + page->repetition_level_encoding(), descr_->max_repetition_level(), + num_buffered_values_, buffer); buffer += rep_levels_bytes; data_size -= rep_levels_bytes; } @@ -113,10 +111,10 @@ bool TypedColumnReader::ReadNewPage() { //if the initial value is invalid //Init definition levels - if (max_definition_level > 0) { - size_t def_levels_bytes = definition_level_decoder_.Init( - page->definition_level_encoding(), - max_definition_level, num_buffered_values_, buffer); + if (descr_->max_definition_level() > 0) { + size_t def_levels_bytes = definition_level_decoder_.SetData( + page->definition_level_encoding(), descr_->max_definition_level(), + num_buffered_values_, buffer); buffer += def_levels_bytes; data_size -= def_levels_bytes; } diff --git a/cpp/src/parquet/util/bit-stream-utils.h b/cpp/src/parquet/util/bit-stream-utils.h index 3e8f95c65dd6f..b93b90ed4f101 100644 --- a/cpp/src/parquet/util/bit-stream-utils.h +++ b/cpp/src/parquet/util/bit-stream-utils.h @@ -118,6 +118,8 @@ class BitReader { max_bytes_ = buffer_len; byte_offset_ = 0; bit_offset_ = 0; + int num_bytes = std::min(8, max_bytes_ - byte_offset_); + memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes); } /// Gets the next value from the buffer. Returns true if 'v' could be read or false if