Skip to content

Commit

Permalink
PARQUET-555: Dictionary page metadata handling inconsistencies
Browse files Browse the repository at this point in the history
Includes tests

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes apache#73 from majetideepak/PARQUET-555 and squashes the following commits:

2c53f53 [Deepak Majeti] minor fixes
c7f1b24 [Deepak Majeti] addressed comment and added more tests
5b66a1f [Deepak Majeti] PARQUET-555: Dictionary page metadata handling inconsistencies

Change-Id: I69248b5b3ca4aab0ee6bd0cb8511f84166b76638
  • Loading branch information
Deepak Majeti authored and wesm committed Mar 4, 2016
1 parent eaba5d4 commit 4fe21e4
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 14 deletions.
65 changes: 65 additions & 0 deletions cpp/src/parquet/column/column-reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,70 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
ExecuteDict(num_pages, levels_per_page, &descr);
}

TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
max_def_level_ = 0;
max_rep_level_ = 0;
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
shared_ptr<OwnedMutableBuffer> dummy = std::make_shared<OwnedMutableBuffer>();

shared_ptr<DictionaryPage> dict_page = std::make_shared<DictionaryPage>(dummy,
0, Encoding::PLAIN);
shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(&descr, {}, 0,
Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
InitReader(&descr);
// Tests Dict : PLAIN, Data : RLE_DICTIONARY
ASSERT_NO_THROW(reader_->HasNext());
pages_.clear();

dict_page = std::make_shared<DictionaryPage>(dummy,
0, Encoding::PLAIN_DICTIONARY);
data_page = MakeDataPage<Int32Type>(&descr, {}, 0,
Encoding::PLAIN_DICTIONARY, {}, 0, {}, 0, {}, 0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
InitReader(&descr);
// Tests Dict : PLAIN_DICTIONARY, Data : PLAIN_DICTIONARY
ASSERT_NO_THROW(reader_->HasNext());
pages_.clear();

data_page = MakeDataPage<Int32Type>(&descr, {}, 0,
Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
pages_.push_back(data_page);
InitReader(&descr);
// Tests dictionary page must occur before data page
ASSERT_THROW(reader_->HasNext(), ParquetException);
pages_.clear();

dict_page = std::make_shared<DictionaryPage>(dummy,
0, Encoding::DELTA_BYTE_ARRAY);
pages_.push_back(dict_page);
InitReader(&descr);
// Tests only RLE_DICTIONARY is supported
ASSERT_THROW(reader_->HasNext(), ParquetException);
pages_.clear();

shared_ptr<DictionaryPage> dict_page1 = std::make_shared<DictionaryPage>(dummy,
0, Encoding::PLAIN_DICTIONARY);
shared_ptr<DictionaryPage> dict_page2 = std::make_shared<DictionaryPage>(dummy,
0, Encoding::PLAIN);
pages_.push_back(dict_page1);
pages_.push_back(dict_page2);
InitReader(&descr);
// Column cannot have more than one dictionary
ASSERT_THROW(reader_->HasNext(), ParquetException);
pages_.clear();

data_page = MakeDataPage<Int32Type>(&descr, {}, 0,
Encoding::DELTA_BYTE_ARRAY, {}, 0, {}, 0, {}, 0);
pages_.push_back(data_page);
InitReader(&descr);
// unsupported encoding
ASSERT_THROW(reader_->HasNext(), ParquetException);
pages_.clear();
}

} // namespace test
} // namespace parquet_cpp
38 changes: 25 additions & 13 deletions cpp/src/parquet/column/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,35 @@ ColumnReader::ColumnReader(const ColumnDescriptor* descr,

template <int TYPE>
void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
int encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
int encoding = static_cast<int>(page->encoding());
if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
page->encoding() == Encoding::PLAIN) {
encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
}

auto it = decoders_.find(encoding);
if (it != decoders_.end()) {
throw ParquetException("Column cannot have more than one dictionary.");
}

PlainDecoder<TYPE> dictionary(descr_);
dictionary.SetData(page->num_values(), page->data(), page->size());

// The dictionary is fully decoded during DictionaryDecoder::Init, so the
// DictionaryPage buffer is no longer required after this step
//
// TODO(wesm): investigate whether this all-or-nothing decoding of the
// dictionary makes sense and whether performance can be improved

auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_);
decoder->SetDict(&dictionary);
if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
page->encoding() == Encoding::PLAIN) {
PlainDecoder<TYPE> dictionary(descr_);
dictionary.SetData(page->num_values(), page->data(), page->size());

// The dictionary is fully decoded during DictionaryDecoder::Init, so the
// DictionaryPage buffer is no longer required after this step
//
// TODO(wesm): investigate whether this all-or-nothing decoding of the
// dictionary makes sense and whether performance can be improved

auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_);
decoder->SetDict(&dictionary);
decoders_[encoding] = decoder;
} else {
ParquetException::NYI("only plain dictionary encoding has been implemented");
}

decoders_[encoding] = decoder;
current_decoder_ = decoders_[encoding].get();
}

Expand Down Expand Up @@ -130,6 +139,9 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {

auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/parquet/column/scanner-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ TEST_F(TestFLBAFlatScanner, TestDictScanner) {
Encoding::RLE_DICTIONARY);
}

TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) {
this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH,
Encoding::PLAIN_DICTIONARY);
}


//PARQUET 502
TEST_F(TestFlatFLBAScanner, TestSmallBatch) {
NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/parquet/column/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ static int MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_pa
InitValues<typename Type::c_type>(num_values, values, buffer);
PaginatePlain<Type>(d, values, def_levels, max_def_level,
rep_levels, max_rep_level, levels_per_page, values_per_page, pages);
} else if (encoding == Encoding::RLE_DICTIONARY) {
} else if (encoding == Encoding::RLE_DICTIONARY
|| encoding == Encoding::PLAIN_DICTIONARY) {
// Calls InitValues and repeats the data
InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer);
PaginateDict<Type>(d, values, def_levels, max_def_level,
Expand Down

0 comments on commit 4fe21e4

Please sign in to comment.