Skip to content

Commit

Permalink
PARQUET-676: Fix incorrect MaxBufferSize for small bit widths
Browse files Browse the repository at this point in the history
This function was implemented incorrectly in the original source code from Impala. The bug never presented itself because Impala allocates a much larger buffer than is required for the data page rather than using the `MaxBufferSize` value.

In a worst case scenario, the RleEncoder can result in a concatenation of short RLE runs of 8 values (the minimum for RLE). For example:

0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 ...

In encoded form, each RLE run occupies 2 bytes (for bit width 1). Thus 1024 values with this structure and bit width 1, at least 256 bytes are required to fully encode. If these were encoded as all literal runs, you can encode up to 504 values in a literal run (plus the indicator byte). Thus, in the same case (bit width 1) only

(504 + 504 + 16)
64 + 64 + 3 = 131

bytes are required. This should also fix PARQUET-698.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes apache#150 from wesm/PARQUET-676 and squashes the following commits:

6ca44a8 [Wes McKinney] clang-format
e48b0bf [Wes McKinney] Fix maximum buffer sizing for hybrid RLE encoding case where short RLE runs use more space than mostly literal runs

Change-Id: Ia333b7b2ecd1d5dbc54842e584aad6afabaddb38
  • Loading branch information
wesm committed Sep 3, 2016
1 parent f4d71b5 commit 08d8520
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 9 deletions.
23 changes: 23 additions & 0 deletions cpp/src/parquet/column/levels-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,27 @@ TEST(TestLevels, TestLevelsDecodeMultipleSetData) {
}
}

TEST(TestLevelEncoder, MinimumBufferSize) {
// PARQUET-676, PARQUET-698
const int kNumToEncode = 1024;

std::vector<int16_t> levels;
for (int i = 0; i < kNumToEncode; ++i) {
if (i % 9 == 0) {
levels.push_back(0);
} else {
levels.push_back(1);
}
}

std::vector<uint8_t> output(
LevelEncoder::MaxBufferSize(Encoding::RLE, 1, kNumToEncode));

LevelEncoder encoder;
encoder.Init(Encoding::RLE, 1, kNumToEncode, output.data(), output.size());
int encode_count = encoder.Encode(kNumToEncode, levels.data());

ASSERT_EQ(kNumToEncode, encode_count);
}

} // namespace parquet
13 changes: 5 additions & 8 deletions cpp/src/parquet/file/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ void ParquetFileReader::DebugPrint(
const ColumnStatistics stats = column_chunk->statistics();

const ColumnDescriptor* descr = file_metadata->schema_descriptor()->Column(i);
stream << "Column " << i << std::endl
<< ", values: " << column_chunk->num_values();
stream << "Column " << i << std::endl << ", values: " << column_chunk->num_values();
if (column_chunk->is_stats_set()) {
stream << ", null values: " << stats.null_count
<< ", distinct values: " << stats.distinct_count << std::endl
Expand All @@ -174,17 +173,15 @@ void ParquetFileReader::DebugPrint(
stream << " Statistics Not Set";
}
stream << std::endl
<< " compression: "
<< compression_to_string(column_chunk->compression())
<< " compression: " << compression_to_string(column_chunk->compression())
<< ", encodings: ";
for (auto encoding : column_chunk->encodings()) {
stream << encoding_to_string(encoding) << " ";
}
stream << std::endl
<< " uncompressed size: "
<< column_chunk->total_uncompressed_size()
<< ", compressed size: "
<< column_chunk->total_compressed_size() << std::endl;
<< " uncompressed size: " << column_chunk->total_uncompressed_size()
<< ", compressed size: " << column_chunk->total_compressed_size()
<< std::endl;
}

if (!print_values) { continue; }
Expand Down
9 changes: 8 additions & 1 deletion cpp/src/parquet/util/rle-encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,14 @@ class RleEncoder {
int bytes_per_run = BitUtil::Ceil(bit_width * MAX_VALUES_PER_LITERAL_RUN, 8.0);
int num_runs = BitUtil::Ceil(num_values, MAX_VALUES_PER_LITERAL_RUN);
int literal_max_size = num_runs + num_runs * bytes_per_run;
return std::max(MinBufferSize(bit_width), literal_max_size);

// In the very worst case scenario, the data is a concatenation of repeated
// runs of 8 values. Repeated run has a 1 byte varint followed by the
// bit-packed repeated value
int min_repeated_run_size = 1 + BitUtil::Ceil(bit_width, 8);
int repeated_max_size = BitUtil::Ceil(num_values, 8) * min_repeated_run_size;

return std::max(literal_max_size, repeated_max_size);
}

/// Encode value. Returns true if the value fits in buffer, false otherwise.
Expand Down

0 comments on commit 08d8520

Please sign in to comment.