Skip to content

Commit

Permalink
PARQUET-598: Test writing all primitive data types
Browse files Browse the repository at this point in the history
Author: Uwe L. Korn <uwelk@xhochy.com>

Closes apache#113 from xhochy/parquet-598 and squashes the following commits:

a22d7b7 [Uwe L. Korn] Address review comments
a82159b [Uwe L. Korn] Move specialization of InitValues to separate header
e2b48ea [Uwe L. Korn] Move specialization of InitValues to separate header
c3a6790 [Uwe L. Korn] PARQUET-598: Test writing all primitive data types

Change-Id: I694f229427244a6be993587030623133caeb2483
  • Loading branch information
xhochy authored and wesm committed Jun 3, 2016
1 parent 2630d43 commit 9057c9f
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 106 deletions.
223 changes: 151 additions & 72 deletions cpp/src/parquet/column/column-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include <gtest/gtest.h>

#include "parquet/column/test-util.h"
#include "parquet/column/test-specialization.h"

#include "parquet/file/reader-internal.h"
#include "parquet/file/writer-internal.h"
#include "parquet/column/reader.h"
Expand All @@ -32,59 +35,88 @@ using schema::PrimitiveNode;

namespace test {

// The default size used in most tests.
const int SMALL_SIZE = 100;
// Larger size to test some corner cases, only used in some specific cases.
const int LARGE_SIZE = 10000;

template <typename TestType>
class TestPrimitiveWriter : public ::testing::Test {
public:
typedef typename TestType::c_type T;

void SetUpSchemaRequired() {
node_ = PrimitiveNode::Make("int64", Repetition::REQUIRED, Type::INT64);
node_ = PrimitiveNode::Make("column", Repetition::REQUIRED, TestType::type_num,
LogicalType::NONE, FLBA_LENGTH);
schema_ = std::make_shared<ColumnDescriptor>(node_, 0, 0);
}

void SetUpSchemaOptional() {
node_ = PrimitiveNode::Make("int64", Repetition::OPTIONAL, Type::INT64);
node_ = PrimitiveNode::Make("column", Repetition::OPTIONAL, TestType::type_num,
LogicalType::NONE, FLBA_LENGTH);
schema_ = std::make_shared<ColumnDescriptor>(node_, 1, 0);
}

void SetUpSchemaRepeated() {
node_ = PrimitiveNode::Make("int64", Repetition::REPEATED, Type::INT64);
node_ = PrimitiveNode::Make("column", Repetition::REPEATED, TestType::type_num,
LogicalType::NONE, FLBA_LENGTH);
schema_ = std::make_shared<ColumnDescriptor>(node_, 1, 1);
}

void GenerateData(int64_t num_values);

void SetupValuesOut();

void SetUp() {
values_out_.resize(100);
definition_levels_out_.resize(100);
repetition_levels_out_.resize(100);
SetupValuesOut();
definition_levels_out_.resize(SMALL_SIZE);
repetition_levels_out_.resize(SMALL_SIZE);

SetUpSchemaRequired();
}

std::unique_ptr<Int64Reader> BuildReader() {
void BuildReader() {
auto buffer = sink_->GetBuffer();
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
std::unique_ptr<SerializedPageReader> page_reader(
new SerializedPageReader(std::move(source), Compression::UNCOMPRESSED));
return std::unique_ptr<Int64Reader>(
new Int64Reader(schema_.get(), std::move(page_reader)));
reader_.reset(new TypedColumnReader<TestType>(schema_.get(), std::move(page_reader)));
}

std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size = 100) {
std::unique_ptr<TypedColumnWriter<TestType>> BuildWriter(
int64_t output_size = SMALL_SIZE) {
sink_.reset(new InMemoryOutputStream());
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_));
return std::unique_ptr<Int64Writer>(
new Int64Writer(schema_.get(), std::move(pager), output_size));
return std::unique_ptr<TypedColumnWriter<TestType>>(
new TypedColumnWriter<TestType>(schema_.get(), std::move(pager), output_size));
}

void SyncValuesOut();
void ReadColumn() {
auto reader = BuildReader();
reader->ReadBatch(values_out_.size(), definition_levels_out_.data(),
repetition_levels_out_.data(), values_out_.data(), &values_read_);
BuildReader();
reader_->ReadBatch(values_out_.size(), definition_levels_out_.data(),
repetition_levels_out_.data(), values_out_ptr_, &values_read_);
SyncValuesOut();
}

protected:
int64_t values_read_;
// Keep the reader alive as for ByteArray the lifetime of the ByteArray
// content is bound to the reader.
std::unique_ptr<TypedColumnReader<TestType>> reader_;

// Input buffers
std::vector<T> values_;
std::vector<uint8_t> buffer_;
// Pointer to the values, needed as we cannot use vector<bool>::data()
T* values_ptr_;
std::vector<uint8_t> bool_buffer_;

// Output buffers
std::vector<int64_t> values_out_;
std::vector<T> values_out_;
std::vector<uint8_t> bool_buffer_out_;
T* values_out_ptr_;
std::vector<int16_t> definition_levels_out_;
std::vector<int16_t> repetition_levels_out_;

Expand All @@ -95,105 +127,152 @@ class TestPrimitiveWriter : public ::testing::Test {
std::unique_ptr<InMemoryOutputStream> sink_;
};

TEST_F(TestPrimitiveWriter, RequiredNonRepeated) {
std::vector<int64_t> values(100, 128);
template <typename TestType>
void TestPrimitiveWriter<TestType>::SetupValuesOut() {
values_out_.resize(SMALL_SIZE);
values_out_ptr_ = values_out_.data();
}

template <>
void TestPrimitiveWriter<BooleanType>::SetupValuesOut() {
values_out_.resize(SMALL_SIZE);
bool_buffer_out_.resize(SMALL_SIZE);
// Write once to all values so we can copy it without getting Valgrind errors
// about uninitialised values.
std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true);
values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data());
}

template <typename TestType>
void TestPrimitiveWriter<TestType>::SyncValuesOut() {}

template <>
void TestPrimitiveWriter<BooleanType>::SyncValuesOut() {
std::copy(bool_buffer_out_.begin(), bool_buffer_out_.end(), values_out_.begin());
}

template <typename TestType>
void TestPrimitiveWriter<TestType>::GenerateData(int64_t num_values) {
values_.resize(num_values);
InitValues<T>(num_values, values_, buffer_);
values_ptr_ = values_.data();
}

template <>
void TestPrimitiveWriter<BooleanType>::GenerateData(int64_t num_values) {
values_.resize(num_values);
InitValues<T>(num_values, values_, buffer_);
bool_buffer_.resize(num_values);
std::copy(values_.begin(), values_.end(), bool_buffer_.begin());
values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data());
}

typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
BooleanType, ByteArrayType, FLBAType> TestTypes;

TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);

TYPED_TEST(TestPrimitiveWriter, Required) {
this->GenerateData(SMALL_SIZE);

// Test case 1: required and non-repeated, so no definition or repetition levels
std::unique_ptr<Int64Writer> writer = BuildWriter();
writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
std::unique_ptr<TypedColumnWriter<TypeParam>> writer = this->BuildWriter();
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
writer->Close();

ReadColumn();
ASSERT_EQ(100, values_read_);
ASSERT_EQ(values, values_out_);
this->ReadColumn();
ASSERT_EQ(SMALL_SIZE, this->values_read_);
ASSERT_EQ(this->values_, this->values_out_);
}

TEST_F(TestPrimitiveWriter, OptionalNonRepeated) {
TYPED_TEST(TestPrimitiveWriter, Optional) {
// Optional and non-repeated, with definition levels
// but no repetition levels
SetUpSchemaOptional();
this->SetUpSchemaOptional();

std::vector<int64_t> values(100, 128);
std::vector<int16_t> definition_levels(100, 1);
this->GenerateData(SMALL_SIZE);
std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
definition_levels[1] = 0;

auto writer = BuildWriter();
writer->WriteBatch(values.size(), definition_levels.data(), nullptr, values.data());
auto writer = this->BuildWriter();
writer->WriteBatch(
this->values_.size(), definition_levels.data(), nullptr, this->values_ptr_);
writer->Close();

ReadColumn();
ASSERT_EQ(99, values_read_);
values_out_.resize(99);
values.resize(99);
ASSERT_EQ(values, values_out_);
this->ReadColumn();
ASSERT_EQ(99, this->values_read_);
this->values_out_.resize(99);
this->values_.resize(99);
ASSERT_EQ(this->values_, this->values_out_);
}

TEST_F(TestPrimitiveWriter, OptionalRepeated) {
TYPED_TEST(TestPrimitiveWriter, Repeated) {
// Optional and repeated, so definition and repetition levels
SetUpSchemaRepeated();
this->SetUpSchemaRepeated();

std::vector<int64_t> values(100, 128);
std::vector<int16_t> definition_levels(100, 1);
this->GenerateData(SMALL_SIZE);
std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
definition_levels[1] = 0;
std::vector<int16_t> repetition_levels(100, 0);
std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);

auto writer = BuildWriter();
writer->WriteBatch(
values.size(), definition_levels.data(), repetition_levels.data(), values.data());
auto writer = this->BuildWriter();
writer->WriteBatch(this->values_.size(), definition_levels.data(),
repetition_levels.data(), this->values_ptr_);
writer->Close();

ReadColumn();
ASSERT_EQ(99, values_read_);
values_out_.resize(99);
values.resize(99);
ASSERT_EQ(values, values_out_);
this->ReadColumn();
ASSERT_EQ(SMALL_SIZE - 1, this->values_read_);
this->values_out_.resize(SMALL_SIZE - 1);
this->values_.resize(SMALL_SIZE - 1);
ASSERT_EQ(this->values_, this->values_out_);
}

TEST_F(TestPrimitiveWriter, RequiredTooFewRows) {
std::vector<int64_t> values(99, 128);
TYPED_TEST(TestPrimitiveWriter, RequiredTooFewRows) {
this->GenerateData(SMALL_SIZE - 1);

auto writer = BuildWriter();
writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
auto writer = this->BuildWriter();
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
ASSERT_THROW(writer->Close(), ParquetException);
}

TEST_F(TestPrimitiveWriter, RequiredTooMany) {
std::vector<int64_t> values(200, 128);
TYPED_TEST(TestPrimitiveWriter, RequiredTooMany) {
this->GenerateData(2 * SMALL_SIZE);

auto writer = BuildWriter();
ASSERT_THROW(writer->WriteBatch(values.size(), nullptr, nullptr, values.data()),
auto writer = this->BuildWriter();
ASSERT_THROW(
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_),
ParquetException);
}

TEST_F(TestPrimitiveWriter, OptionalRepeatedTooFewRows) {
TYPED_TEST(TestPrimitiveWriter, RepeatedTooFewRows) {
// Optional and repeated, so definition and repetition levels
SetUpSchemaRepeated();
this->SetUpSchemaRepeated();

std::vector<int64_t> values(100, 128);
std::vector<int16_t> definition_levels(100, 1);
this->GenerateData(SMALL_SIZE);
std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
definition_levels[1] = 0;
std::vector<int16_t> repetition_levels(100, 0);
std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
repetition_levels[3] = 1;

auto writer = BuildWriter();
writer->WriteBatch(
values.size(), definition_levels.data(), repetition_levels.data(), values.data());
auto writer = this->BuildWriter();
writer->WriteBatch(this->values_.size(), definition_levels.data(),
repetition_levels.data(), this->values_ptr_);
ASSERT_THROW(writer->Close(), ParquetException);
}

TEST_F(TestPrimitiveWriter, RequiredNonRepeatedLargeChunk) {
std::vector<int64_t> values(10000, 128);
TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
this->GenerateData(LARGE_SIZE);

// Test case 1: required and non-repeated, so no definition or repetition levels
std::unique_ptr<Int64Writer> writer = BuildWriter(10000);
writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
auto writer = this->BuildWriter(LARGE_SIZE);
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
writer->Close();

// Just read the first 100 to ensure we could read it back in
ReadColumn();
ASSERT_EQ(100, values_read_);
values.resize(100);
ASSERT_EQ(values, values_out_);
// Just read the first SMALL_SIZE rows to ensure we could read it back in
this->ReadColumn();
ASSERT_EQ(SMALL_SIZE, this->values_read_);
this->values_.resize(SMALL_SIZE);
ASSERT_EQ(this->values_, this->values_out_);
}

} // namespace test
Expand Down
35 changes: 1 addition & 34 deletions cpp/src/parquet/column/scanner-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "parquet/schema/descriptor.h"
#include "parquet/schema/types.h"
#include "parquet/util/test-common.h"
#include "parquet/column/test-specialization.h"

using std::string;
using std::vector;
Expand All @@ -40,42 +41,8 @@ namespace parquet {

using schema::NodePtr;

static int FLBA_LENGTH = 12;

bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
}

namespace test {

template <>
void InitValues<bool>(int num_values, vector<bool>& values, vector<uint8_t>& buffer) {
values = flip_coins(num_values, 0);
}

template <>
void InitValues<Int96>(int num_values, vector<Int96>& values, vector<uint8_t>& buffer) {
random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(),
std::numeric_limits<int32_t>::max(), values.data());
}

template <>
void InitValues<ByteArray>(
int num_values, vector<ByteArray>& values, vector<uint8_t>& buffer) {
int max_byte_array_len = 12;
int num_bytes = max_byte_array_len + sizeof(uint32_t);
size_t nbytes = num_values * num_bytes;
buffer.resize(nbytes);
random_byte_array(num_values, 0, buffer.data(), values.data(), max_byte_array_len);
}

template <>
void InitValues<FLBA>(int num_values, vector<FLBA>& values, vector<uint8_t>& buffer) {
size_t nbytes = num_values * FLBA_LENGTH;
buffer.resize(nbytes);
random_fixed_byte_array(num_values, 0, buffer.data(), FLBA_LENGTH, values.data());
}

template <>
void InitDictValues<bool>(
int num_values, int dict_per_page, vector<bool>& values, vector<uint8_t>& buffer) {
Expand Down
Loading

0 comments on commit 9057c9f

Please sign in to comment.