Skip to content

Commit

Permalink
PARQUET-1095: [C++] Read and write Arrow decimal values
Browse files Browse the repository at this point in the history
This depends on:
- [x] [ARROW-1607](apache#1128)
- [x] [ARROW-1656](apache#1184)
- [x] [ARROW-1588](apache#1211)
- [x] Add tests for writing different sizes of values

Author: Phillip Cloud <cpcloud@gmail.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes apache#403 from cpcloud/PARQUET-1095 and squashes the following commits:

8c3d222 [Phillip Cloud] Remove loop from BytesToInteger
63018bc [Wes McKinney] Suppress C4996 due to arrow/util/variant.h
e4b02d3 [Phillip Cloud] Refactor types.h
83948ec [Phillip Cloud] Add last_value_ init
51965cd [Phillip Cloud] Min commit that contains the unique kernel in arrow
e25c59b [Phillip Cloud] Fix reader writer test for unique kernel addition
da0a7eb [Phillip Cloud] Update for ARROW-1811
16935de [Phillip Cloud] Reverse operand order and explicit cast
6036ca5 [Phillip Cloud] ARROW-1811
c5c4294 [Phillip Cloud] Fix issues
32a4abe [Phillip Cloud] Cleanup iteration a bit
920832a [Phillip Cloud] Update arrow version
9f97c1d [Phillip Cloud] Update for ARROW-1794: rename DecimalArray to Decimal128Array
b2e0290 [Phillip Cloud] IWYU
64748a8 [Phillip Cloud] Copy from arrow for now
6c9e2a7 [Phillip Cloud] Reduce the number of decimal test cases
7ab2e5c [Phillip Cloud] Parameterize on precision
30655d6 [Phillip Cloud] Use arrow random_decimals
9ff7eb4 [Phillip Cloud] Remove specific template parameters
1eee6a9 [Phillip Cloud] Remove specific randint call
8808e4c [Phillip Cloud] Bump arrow version
659fbc1 [Phillip Cloud] Fix deprecated API call
e162ca1 [Phillip Cloud] Allocate scratch space to hold the byteswapped values
5c9292b [Phillip Cloud] Proper dcheck call
1782da0 [Phillip Cloud] Use arrow
3d243d5 [Phillip Cloud] Checkpoint [ci skip]
028fb03 [Phillip Cloud] Remove garbage values
46dff15 [Phillip Cloud] Clean up uint32 test
613255e [Phillip Cloud] Do not use std::copy when reinterpret_cast will suffice
2917a62 [Phillip Cloud] PARQUET-1095: [C++] Read and write Arrow decimal values

Change-Id: Ibe81cd5a5961bbe86c66db811ec8b770ae48c38b
  • Loading branch information
cpcloud authored and wesm committed Nov 20, 2017
1 parent 3882ee6 commit 238c765
Show file tree
Hide file tree
Showing 11 changed files with 721 additions and 152 deletions.
175 changes: 141 additions & 34 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "gtest/gtest.h"

#include <sstream>
#include <arrow/compute/api.h>

#include "parquet/api/reader.h"
#include "parquet/api/writer.h"
Expand All @@ -37,20 +38,23 @@

#include "arrow/api.h"
#include "arrow/test-util.h"
#include "arrow/util/decimal.h"

using arrow::Array;
using arrow::ArrayVisitor;
using arrow::Buffer;
using arrow::ChunkedArray;
using arrow::Column;
using arrow::EncodeArrayToDictionary;
using arrow::ListArray;
using arrow::PoolBuffer;
using arrow::PrimitiveArray;
using arrow::Status;
using arrow::Table;
using arrow::TimeUnit;
using arrow::default_memory_pool;
using arrow::compute::DictionaryEncode;
using arrow::compute::FunctionContext;
using arrow::compute::Datum;
using arrow::io::BufferReader;

using arrow::test::randint;
Expand All @@ -68,10 +72,10 @@ using ColumnVector = std::vector<std::shared_ptr<arrow::Column>>;
namespace parquet {
namespace arrow {

const int SMALL_SIZE = 100;
const int LARGE_SIZE = 10000;
static constexpr int SMALL_SIZE = 100;
static constexpr int LARGE_SIZE = 10000;

constexpr uint32_t kDefaultSeed = 0;
static constexpr uint32_t kDefaultSeed = 0;

LogicalType::type get_logical_type(const ::arrow::DataType& type) {
switch (type.id()) {
Expand Down Expand Up @@ -118,6 +122,8 @@ LogicalType::type get_logical_type(const ::arrow::DataType& type) {
static_cast<const ::arrow::DictionaryType&>(type);
return get_logical_type(*dict_type.dictionary()->type());
}
case ArrowId::DECIMAL:
return LogicalType::DECIMAL;
default:
break;
}
Expand Down Expand Up @@ -147,6 +153,7 @@ ParquetType::type get_physical_type(const ::arrow::DataType& type) {
case ArrowId::STRING:
return ParquetType::BYTE_ARRAY;
case ArrowId::FIXED_SIZE_BINARY:
case ArrowId::DECIMAL:
return ParquetType::FIXED_LEN_BYTE_ARRAY;
case ArrowId::DATE32:
return ParquetType::INT32;
Expand Down Expand Up @@ -299,6 +306,7 @@ struct test_traits<::arrow::FixedSizeBinaryType> {
const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT
const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03"); // NOLINT
const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed"); // NOLINT

template <typename T>
using ParquetDataType = DataType<test_traits<T>::parquet_enum>;

Expand Down Expand Up @@ -342,36 +350,52 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,

static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::arrow::DataType& type,
Repetition::type repetition) {
int byte_width;
// Decimal is not implemented yet.
int32_t byte_width = -1;
int32_t precision = -1;
int32_t scale = -1;

switch (type.id()) {
case ::arrow::Type::DICTIONARY: {
const ::arrow::DictionaryType& dict_type =
static_cast<const ::arrow::DictionaryType&>(type);
const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type);
const ::arrow::DataType& values_type = *dict_type.dictionary()->type();
if (values_type.id() == ::arrow::Type::FIXED_SIZE_BINARY) {
byte_width =
static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
} else {
byte_width = -1;
switch (values_type.id()) {
case ::arrow::Type::FIXED_SIZE_BINARY:
byte_width =
static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
break;
case ::arrow::Type::DECIMAL: {
const auto& decimal_type =
static_cast<const ::arrow::Decimal128Type&>(values_type);
precision = decimal_type.precision();
scale = decimal_type.scale();
byte_width = DecimalSize(precision);
} break;
default:
break;
}
} break;
case ::arrow::Type::FIXED_SIZE_BINARY:
byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
break;
case ::arrow::Type::DECIMAL: {
const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
precision = decimal_type.precision();
scale = decimal_type.scale();
byte_width = DecimalSize(precision);
} break;
default:
byte_width = -1;
break;
}
auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type),
get_logical_type(type), byte_width);
get_logical_type(type), byte_width, precision, scale);
NodePtr node_ =
GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
return std::static_pointer_cast<GroupNode>(node_);
}

namespace internal {

void AssertArraysEqual(const Array &expected, const Array &actual) {
void AssertArraysEqual(const Array& expected, const Array& actual) {
if (!actual.Equals(expected)) {
std::stringstream pp_result;
std::stringstream pp_expected;
Expand Down Expand Up @@ -526,11 +550,19 @@ class TestParquetIO : public ::testing::Test {
// There we write an UInt32 Array but receive an Int64 Array as result for
// Parquet version 1.0.

typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type,
::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type,
::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType,
::arrow::BinaryType, ::arrow::FixedSizeBinaryType>
typedef ::testing::Types<
::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type,
::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type,
::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType,
::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>,
DecimalWithPrecisionAndScale<3>, DecimalWithPrecisionAndScale<5>,
DecimalWithPrecisionAndScale<7>, DecimalWithPrecisionAndScale<10>,
DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<15>,
DecimalWithPrecisionAndScale<17>, DecimalWithPrecisionAndScale<19>,
DecimalWithPrecisionAndScale<22>, DecimalWithPrecisionAndScale<23>,
DecimalWithPrecisionAndScale<24>, DecimalWithPrecisionAndScale<27>,
DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<32>,
DecimalWithPrecisionAndScale<34>, DecimalWithPrecisionAndScale<38>>
TestTypes;

TYPED_TEST_CASE(TestParquetIO, TestTypes);
Expand Down Expand Up @@ -590,8 +622,10 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) {

ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));

std::shared_ptr<Array> dict_values;
ASSERT_OK(EncodeArrayToDictionary(*values, default_memory_pool(), &dict_values));
Datum out;
FunctionContext ctx(default_memory_pool());
ASSERT_OK(DictionaryEncode(&ctx, Datum(values), &out));
std::shared_ptr<Array> dict_values = MakeArray(out.array());
std::shared_ptr<GroupNode> schema =
MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL);
this->WriteColumn(schema, dict_values);
Expand Down Expand Up @@ -856,25 +890,43 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
ASSERT_OK_NO_THROW(
WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties));

std::shared_ptr<Array> expected_values;
std::shared_ptr<PoolBuffer> int64_data =
std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
{
ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
const uint32_t* uint32_data_ptr =
reinterpret_cast<const uint32_t*>(values->values()->data());
// std::copy might be faster but this is explicit on the casts)
for (int64_t i = 0; i < values->length(); i++) {
int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
}
auto int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
auto uint32_data_ptr = reinterpret_cast<const uint32_t*>(values->values()->data());
const auto cast_uint32_to_int64 = [](uint32_t value) {
return static_cast<int64_t>(value);
};
std::transform(uint32_data_ptr, uint32_data_ptr + values->length(), int64_data_ptr,
cast_uint32_to_int64);
}

std::vector<std::shared_ptr<Buffer>> buffers{values->null_bitmap(), int64_data};
auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(),
buffers, values->null_count());
ASSERT_OK(MakeArray(arr_data, &expected_values));
this->ReadAndCheckSingleColumnTable(expected_values);
std::shared_ptr<Array> expected_values = MakeArray(arr_data);
ASSERT_NE(expected_values, NULLPTR);

const auto& expected = static_cast<const ::arrow::Int64Array&>(*expected_values);
ASSERT_GT(values->length(), 0);
ASSERT_EQ(values->length(), expected.length());

// TODO(phillipc): Is there a better way to compare these two arrays?
// AssertArraysEqual requires the same type, but we only care about values in this case
for (int i = 0; i < expected.length(); ++i) {
const bool value_is_valid = values->IsValid(i);
const bool expected_value_is_valid = expected.IsValid(i);

ASSERT_EQ(expected_value_is_valid, value_is_valid);

if (value_is_valid) {
uint32_t value = values->Value(i);
int64_t expected_value = expected.Value(i);
ASSERT_EQ(expected_value, static_cast<int64_t>(value));
}
}
}

using TestStringParquetIO = TestParquetIO<::arrow::StringType>;
Expand Down Expand Up @@ -1432,7 +1484,7 @@ void MakeListTable(int num_rows, std::shared_ptr<Table>* out) {
offset_values.push_back(total_elements);

std::vector<int8_t> value_draws;
randint<int8_t>(total_elements, 0, 100, &value_draws);
randint(total_elements, 0, 100, &value_draws);

std::vector<bool> is_valid;
random_is_valid(total_elements, 0.1, &is_valid);
Expand Down Expand Up @@ -1889,6 +1941,61 @@ TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
}

class TestArrowReaderAdHocSpark
: public ::testing::TestWithParam<
std::tuple<std::string, std::shared_ptr<::arrow::DataType>>> {};

TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) {
std::string path(std::getenv("PARQUET_TEST_DATA"));

std::string filename;
std::shared_ptr<::arrow::DataType> decimal_type;
std::tie(filename, decimal_type) = GetParam();

path += "/" + filename;
ASSERT_GT(path.size(), 0);

auto pool = ::arrow::default_memory_pool();

std::unique_ptr<FileReader> arrow_reader;
ASSERT_NO_THROW(
arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false))));
std::shared_ptr<::arrow::Table> table;
ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));

ASSERT_EQ(1, table->num_columns());

constexpr int32_t expected_length = 24;

auto value_column = table->column(0);
ASSERT_EQ(expected_length, value_column->length());

auto raw_array = value_column->data();
ASSERT_EQ(1, raw_array->num_chunks());

auto chunk = raw_array->chunk(0);

std::shared_ptr<Array> expected_array;

::arrow::Decimal128Builder builder(decimal_type, pool);

for (int32_t i = 0; i < expected_length; ++i) {
::arrow::Decimal128 value((i + 1) * 100);
ASSERT_OK(builder.Append(value));
}
ASSERT_OK(builder.Finish(&expected_array));

internal::AssertArraysEqual(*expected_array, *chunk);
}

INSTANTIATE_TEST_CASE_P(
ReadDecimals, TestArrowReaderAdHocSpark,
::testing::Values(
std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)),
std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)),
std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 2)),
std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2))));

} // namespace arrow

} // namespace parquet
6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(TimeUnit::MILLI);
const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO);
const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO);
const auto BINARY = ::arrow::binary();
const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal128Type>(8, 4);

class TestConvertParquetSchema : public ::testing::Test {
public:
Expand All @@ -62,8 +62,8 @@ class TestConvertParquetSchema : public ::testing::Test {
for (int i = 0; i < expected_schema->num_fields(); ++i) {
auto lhs = result_schema_->field(i);
auto rhs = expected_schema->field(i);
EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
<< " != " << rhs->ToString();
EXPECT_TRUE(lhs->Equals(rhs))
<< i << " " << lhs->ToString() << " != " << rhs->ToString();
}
}

Expand Down
Loading

0 comments on commit 238c765

Please sign in to comment.