Skip to content

Commit

Permalink
PARQUET-442: Nested schema conversion, Thrift struct decoupling, dump…
Browse files Browse the repository at this point in the history
…-schema utility

Several inter-related things here:

* Added SchemaDescriptor and ColumnDescriptor types to hold computed structure
  information (e.g. max ref/def levels) about the file schema. These are used
  now in the FileReader and ColumnReader
* I also added, very similar to parquet-mr (though leaned down), a logical
  schema node class structure which can be used for both the file reading and
  writing.
* Added FlatSchemaConverter to convert Parquet flat schema metadata into a
  nested logical schema
* Added a SchemaPrinter tool and parquet-dump-schema CLI tool to visit a nested
  schema and print it to the console.
* Another big thing here is that per PARQUET-446 and related work in
  parquet-mr, it's important for both the public API of this project and
  internal development to limit our coupling to the compiled Thrift headers. I
  added `Type`, `Repetition`, and `LogicalType` enums to the `parquet_cpp`
  namespace and inverted the dependency between the column readers, scanners,
  and encoders to use these enums.
* A bunch of unit tests.

Author: Wes McKinney <wes@cloudera.com>

Closes apache#38 from wesm/PARQUET-442 and squashes the following commits:

9ca0219 [Wes McKinney] Add a unit test for SchemaPrinter
fdd37cd [Wes McKinney] Comment re: FLBA node ctor
3a15c0c [Wes McKinney] Add some SchemaDescriptor and ColumnDescriptor tests
27e1805 [Wes McKinney] Don't squash supplied CMAKE_CXX_FLAGS
76dd283 [Wes McKinney] Refactor Make* methods as static member functions
2fae8cd [Wes McKinney] Trim some includes
b2e2661 [Wes McKinney] More doc about the parquet_cpp enums
bd78d7c [Wes McKinney] Move metadata enums to parquet/types.h and add rest of parquet:: enums. Add NONE value to Compression
415305b [Wes McKinney] cpplint
4ac84aa [Wes McKinney] Refactor to make PrimitiveNode and GroupNode ctors private. Add MakePrimitive and MakeGroup factory functions. Move parquet::SchemaElement function into static FromParquet ctors so can set private members
3169b24 [Wes McKinney] NewPrimitive should set num_children = 0 always
954658e [Wes McKinney] Add a comment for TestSchemaConverter.InvalidRoot and uncomment tests for root nodes of other repetition types
55d21b0 [Wes McKinney] Remove schema-builder-test.cc
71c1eab [Wes McKinney] Remove crufty builder.h, will revisit
7ef2dee [Wes McKinney] Fix list encoding comment
8c5af4e [Wes McKinney] Remove old comment, unneeded cast
6b041c5 [Wes McKinney] First draft SchemaDescriptor::Init. Refactor to use ColumnDescriptor. Standardize on parquet_cpp enums instead of Thrift metadata structs. Limit #include from Thrift
841ae7f [Wes McKinney] Don't export SchemaPrinter for now
834389a [Wes McKinney] Add Node::Visotor API and implement a simple schema dump CLI tool
a8bf5c8 [Wes McKinney] Catch and throw exception (instead of core dump) if run out of schema children. Add a Node::Visitor abstract API
bde8b18 [Wes McKinney] Can compare FLBA type metadata in logical schemas
f0df0ba [Wes McKinney] Finish a nested schema conversion test
0af0161 [Wes McKinney] Check that root schema node is repeated
5df00aa [Wes McKinney] Expose GroupConverter API, add test for invalid root
beaa99f [Wes McKinney] Refactor slightly and add an FLBA test
6e248b8 [Wes McKinney] Schema tree conversion first cut, add a couple primitive tests
9685c90 [Wes McKinney] Rename Schema -> RootSchema and add another unit test
f7d0487 [Wes McKinney] Schema types test coverage, move more methods into compilation unit
d746352 [Wes McKinney] Better isolate thrift dependency. Move schema/column descriptor into its own header
a8e5a0a [Wes McKinney] Tweaks
fb9d7ad [Wes McKinney] Draft of flat to nested schema conversion. No tests yet
3015063 [Wes McKinney] More prototyping. Rename Type -> Node. PrimitiveNode factory functions
a8a7a01 [Wes McKinney] Start drafting schema types
  • Loading branch information
wesm committed Sep 2, 2018
1 parent f6b1509 commit c05d509
Show file tree
Hide file tree
Showing 31 changed files with 2,093 additions and 236 deletions.
33 changes: 15 additions & 18 deletions cpp/src/parquet/column/column-reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ using std::vector;
using std::shared_ptr;
using parquet::FieldRepetitionType;
using parquet::SchemaElement;
using parquet::Encoding;
using parquet::Type;

namespace parquet_cpp {

using schema::NodePtr;

namespace test {

class TestPrimitiveReader : public ::testing::Test {
Expand All @@ -49,9 +49,9 @@ class TestPrimitiveReader : public ::testing::Test {

void TearDown() {}

void InitReader(const SchemaElement* element) {
void InitReader(const ColumnDescriptor* descr) {
pager_.reset(new test::MockPageReader(pages_));
reader_ = ColumnReader::Make(element, std::move(pager_));
reader_ = ColumnReader::Make(descr, std::move(pager_));
}

protected:
Expand All @@ -77,18 +77,17 @@ static vector<T> slice(const vector<T>& values, size_t start, size_t end) {
TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
size_t num_values = values.size();
Encoding::type value_encoding = Encoding::PLAIN;
parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);
page_builder.AppendValues(values, Encoding::PLAIN);
page_builder.AppendValues(values, parquet::Encoding::PLAIN);
pages_.push_back(page_builder.Finish());

// TODO: simplify this
SchemaElement element;
element.__set_type(Type::INT32);
element.__set_repetition_type(FieldRepetitionType::REQUIRED);
InitReader(&element);
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
ColumnDescriptor descr(type, 0, 0);
InitReader(&descr);

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

Expand All @@ -108,22 +107,20 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};

size_t num_values = values.size();
Encoding::type value_encoding = Encoding::PLAIN;
parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;

vector<uint8_t> page1;
test::DataPageBuilder<Type::INT32> page_builder(&page1);

// Definition levels precede the values
page_builder.AppendDefLevels(def_levels, 1, Encoding::RLE);
page_builder.AppendValues(values, Encoding::PLAIN);
page_builder.AppendDefLevels(def_levels, 1, parquet::Encoding::RLE);
page_builder.AppendValues(values, parquet::Encoding::PLAIN);

pages_.push_back(page_builder.Finish());

// TODO: simplify this
SchemaElement element;
element.__set_type(Type::INT32);
element.__set_repetition_type(FieldRepetitionType::OPTIONAL);
InitReader(&element);
NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
ColumnDescriptor descr(type, 1, 0);
InitReader(&descr);

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

Expand Down
77 changes: 39 additions & 38 deletions cpp/src/parquet/column/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,23 @@

namespace parquet_cpp {

using parquet::Encoding;
using parquet::FieldRepetitionType;
using parquet::PageType;
using parquet::Type;

ColumnReader::ColumnReader(const parquet::SchemaElement* schema,
ColumnReader::ColumnReader(const ColumnDescriptor* descr,
std::unique_ptr<PageReader> pager)
: schema_(schema),
: descr_(descr),
pager_(std::move(pager)),
num_buffered_values_(0),
num_decoded_values_(0) {}

template <int TYPE>
void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
auto it = decoders_.find(Encoding::RLE_DICTIONARY);
int encoding = static_cast<int>(parquet::Encoding::RLE_DICTIONARY);

auto it = decoders_.find(encoding);
if (it != decoders_.end()) {
throw ParquetException("Column cannot have more than one dictionary.");
}

PlainDecoder<TYPE> dictionary(schema_);
PlainDecoder<TYPE> dictionary(descr_);
dictionary.SetData(page->num_values(), page->data(), page->size());

// The dictionary is fully decoded during DictionaryDecoder::Init, so the
Expand All @@ -56,10 +53,10 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
// TODO(wesm): investigate whether this all-or-nothing decoding of the
// dictionary makes sense and whether performance can be improved
std::shared_ptr<DecoderType> decoder(
new DictionaryDecoder<TYPE>(schema_, &dictionary));
new DictionaryDecoder<TYPE>(descr_, &dictionary));

decoders_[Encoding::RLE_DICTIONARY] = decoder;
current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
decoders_[encoding] = decoder;
current_decoder_ = decoders_[encoding].get();
}


Expand All @@ -76,8 +73,9 @@ static size_t InitializeLevelDecoder(const uint8_t* buffer,

// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
static bool IsDictionaryIndexEncoding(const parquet::Encoding::type& e) {
return e == parquet::Encoding::RLE_DICTIONARY ||
e == parquet::Encoding::PLAIN_DICTIONARY;
}

template <int TYPE>
Expand All @@ -92,10 +90,10 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
return false;
}

if (current_page_->type() == PageType::DICTIONARY_PAGE) {
if (current_page_->type() == parquet::PageType::DICTIONARY_PAGE) {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
} else if (current_page_->type() == parquet::PageType::DATA_PAGE) {
const DataPage* page = static_cast<const DataPage*>(current_page_.get());

// Read a data page.
Expand All @@ -111,10 +109,11 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
// the page size to determine the number of bytes in the encoded data.
size_t data_size = page->size();

max_definition_level_ = descr_->max_definition_level();

// Read definition levels.
if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
if (max_definition_level_ > 0) {
// Temporary hack until schema resolution implemented
max_definition_level_ = 1;

size_t def_levels_bytes = InitializeLevelDecoder(buffer,
max_definition_level_, definition_level_decoder_);
Expand All @@ -130,27 +129,29 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
Encoding::type encoding = page->encoding();
parquet::Encoding::type encoding = page->encoding();

if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
if (IsDictionaryIndexEncoding(encoding)) {
encoding = parquet::Encoding::RLE_DICTIONARY;
}

auto it = decoders_.find(encoding);
auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
std::shared_ptr<DecoderType> decoder(new PlainDecoder<TYPE>(schema_));
decoders_[encoding] = decoder;
case parquet::Encoding::PLAIN: {
std::shared_ptr<DecoderType> decoder(new PlainDecoder<TYPE>(descr_));
decoders_[static_cast<int>(encoding)] = decoder;
current_decoder_ = decoder.get();
break;
}
case Encoding::RLE_DICTIONARY:
case parquet::Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
case parquet::Encoding::DELTA_BINARY_PACKED:
case parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY:
case parquet::Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
Expand Down Expand Up @@ -202,25 +203,25 @@ size_t ColumnReader::ReadRepetitionLevels(size_t batch_size, int16_t* levels) {
// Dynamic column reader constructor

std::shared_ptr<ColumnReader> ColumnReader::Make(
const parquet::SchemaElement* element,
const ColumnDescriptor* descr,
std::unique_ptr<PageReader> pager) {
switch (element->type) {
switch (descr->physical_type()) {
case Type::BOOLEAN:
return std::make_shared<BoolReader>(element, std::move(pager));
return std::make_shared<BoolReader>(descr, std::move(pager));
case Type::INT32:
return std::make_shared<Int32Reader>(element, std::move(pager));
return std::make_shared<Int32Reader>(descr, std::move(pager));
case Type::INT64:
return std::make_shared<Int64Reader>(element, std::move(pager));
return std::make_shared<Int64Reader>(descr, std::move(pager));
case Type::INT96:
return std::make_shared<Int96Reader>(element, std::move(pager));
return std::make_shared<Int96Reader>(descr, std::move(pager));
case Type::FLOAT:
return std::make_shared<FloatReader>(element, std::move(pager));
return std::make_shared<FloatReader>(descr, std::move(pager));
case Type::DOUBLE:
return std::make_shared<DoubleReader>(element, std::move(pager));
return std::make_shared<DoubleReader>(descr, std::move(pager));
case Type::BYTE_ARRAY:
return std::make_shared<ByteArrayReader>(element, std::move(pager));
return std::make_shared<ByteArrayReader>(descr, std::move(pager));
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<FixedLenByteArrayReader>(element, std::move(pager));
return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager));
default:
ParquetException::NYI("type reader not implemented");
}
Expand Down
49 changes: 18 additions & 31 deletions cpp/src/parquet/column/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,20 @@
#include "parquet/types.h"

#include "parquet/column/page.h"

#include "parquet/thrift/parquet_constants.h"
#include "parquet/thrift/parquet_types.h"
#include "parquet/encodings/encodings.h"
#include "parquet/schema/descriptor.h"
#include "parquet/util/rle-encoding.h"

namespace std {

template <>
struct hash<parquet::Encoding::type> {
std::size_t operator()(const parquet::Encoding::type& k) const {
return hash<int>()(static_cast<int>(k));
}
};

} // namespace std

namespace parquet_cpp {

class Codec;
class Scanner;

class ColumnReader {
public:
ColumnReader(const parquet::SchemaElement*, std::unique_ptr<PageReader>);
ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>);

static std::shared_ptr<ColumnReader> Make(const parquet::SchemaElement*,
static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor*,
std::unique_ptr<PageReader>);

// Returns true if there are still values in this column.
Expand All @@ -71,12 +58,12 @@ class ColumnReader {
return true;
}

parquet::Type::type type() const {
return schema_->type;
Type::type type() const {
return descr_->physical_type();
}

const parquet::SchemaElement* schema() const {
return schema_;
const ColumnDescriptor* descr() const {
return descr_;
}

protected:
Expand All @@ -92,7 +79,7 @@ class ColumnReader {
// Returns the number of decoded repetition levels
size_t ReadRepetitionLevels(size_t batch_size, int16_t* levels);

const parquet::SchemaElement* schema_;
const ColumnDescriptor* descr_;

std::unique_ptr<PageReader> pager_;
std::shared_ptr<Page> current_page_;
Expand Down Expand Up @@ -125,7 +112,7 @@ class TypedColumnReader : public ColumnReader {
public:
typedef typename type_traits<TYPE>::value_type T;

TypedColumnReader(const parquet::SchemaElement* schema,
TypedColumnReader(const ColumnDescriptor* schema,
std::unique_ptr<PageReader> pager) :
ColumnReader(schema, std::move(pager)),
current_decoder_(NULL) {
Expand Down Expand Up @@ -162,7 +149,7 @@ class TypedColumnReader : public ColumnReader {
// Map of encoding type to the respective decoder object. For example, a
// column chunk's data pages may include both dictionary-encoded and
// plain-encoded data.
std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_;
std::unordered_map<int, std::shared_ptr<DecoderType> > decoders_;

void ConfigureDictionary(const DictionaryPage* page);

Expand Down Expand Up @@ -227,14 +214,14 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_le
}


typedef TypedColumnReader<parquet::Type::BOOLEAN> BoolReader;
typedef TypedColumnReader<parquet::Type::INT32> Int32Reader;
typedef TypedColumnReader<parquet::Type::INT64> Int64Reader;
typedef TypedColumnReader<parquet::Type::INT96> Int96Reader;
typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader;
typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader;
typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader;
typedef TypedColumnReader<parquet::Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayReader;
typedef TypedColumnReader<Type::BOOLEAN> BoolReader;
typedef TypedColumnReader<Type::INT32> Int32Reader;
typedef TypedColumnReader<Type::INT64> Int64Reader;
typedef TypedColumnReader<Type::INT96> Int96Reader;
typedef TypedColumnReader<Type::FLOAT> FloatReader;
typedef TypedColumnReader<Type::DOUBLE> DoubleReader;
typedef TypedColumnReader<Type::BYTE_ARRAY> ByteArrayReader;
typedef TypedColumnReader<Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayReader;

} // namespace parquet_cpp

Expand Down
4 changes: 0 additions & 4 deletions cpp/src/parquet/column/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
#include <memory>

#include "parquet/column/reader.h"
#include "parquet/thrift/parquet_types.h"
#include "parquet/thrift/util.h"

using parquet::Type;

namespace parquet_cpp {

Expand Down
Loading

0 comments on commit c05d509

Please sign in to comment.