Skip to content

Commit

Permalink
apacheGH-35304: [C++][ORC] Support attributes conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed May 9, 2023
1 parent b73ddc3 commit 5fd327f
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 24 deletions.
6 changes: 3 additions & 3 deletions cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,9 @@ class ORCFileReader::Impl {
std::vector<std::shared_ptr<Field>> fields;
fields.reserve(size);
for (int child = 0; child < size; ++child) {
ARROW_ASSIGN_OR_RAISE(auto elemtype, GetArrowType(type.getSubtype(child)));
std::string name = type.getFieldName(child);
fields.push_back(field(std::move(name), std::move(elemtype)));
const std::string& name = type.getFieldName(child);
ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField(name, type.getSubtype(child)));
fields.push_back(std::move(elem_field));
}
ARROW_ASSIGN_OR_RAISE(auto metadata, ReadMetadata());
return std::make_shared<Schema>(std::move(fields), std::move(metadata));
Expand Down
110 changes: 110 additions & 0 deletions cpp/src/arrow/adapters/orc/adapter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,116 @@ TEST(TestAdapterRead, ReadCharAndVarcharType) {
ASSERT_EQ(nullptr, record_batch);
}

TEST(TestAdapterRead, ReadFieldAttributes) {
const std::string id_key = "iceberg.id";
const std::string required_key = "iceberg.required";

auto set_attributes = [&](liborc::Type* type, const std::string& id,
const std::string& required) {
type->setAttribute(id_key, id);
type->setAttribute(required_key, required);
};

auto check_attributes = [&](const std::shared_ptr<arrow::Field>& field,
const std::string& expect_id,
const std::string& expect_required) {
auto field_metadata = field->metadata();
ASSERT_NE(field_metadata, nullptr);
ASSERT_EQ(expect_id, field_metadata->Get(id_key));
ASSERT_EQ(expect_required, field_metadata->Get(required_key));
};

auto c1_type = liborc::createPrimitiveType(liborc::TypeKind::INT);
set_attributes(c1_type.get(), "1", "true");

auto c2_elem_type = liborc::createPrimitiveType(liborc::TypeKind::INT);
set_attributes(c2_elem_type.get(), "3", "false");
auto c2_type = liborc::createListType(std::move(c2_elem_type));
set_attributes(c2_type.get(), "2", "false");

auto c3_key_type = liborc::createPrimitiveType(liborc::TypeKind::INT);
set_attributes(c3_key_type.get(), "5", "true");
auto c3_value_type = liborc::createPrimitiveType(liborc::TypeKind::INT);
set_attributes(c3_value_type.get(), "6", "false");
auto c3_type = liborc::createMapType(std::move(c3_key_type), std::move(c3_value_type));
set_attributes(c3_type.get(), "4", "false");

auto c4_sub_type = liborc::createPrimitiveType(liborc::TypeKind::INT);
set_attributes(c4_sub_type.get(), "8", "false");
auto c4_type = liborc::createStructType();
c4_type->addStructField("c4_1", std::move(c4_sub_type));
set_attributes(c4_type.get(), "7", "false");

auto orc_type = liborc::createStructType();
orc_type->addStructField("c1", std::move(c1_type));
orc_type->addStructField("c2", std::move(c2_type));
orc_type->addStructField("c3", std::move(c3_type));
orc_type->addStructField("c4", std::move(c4_type));

MemoryOutputStream mem_stream(kDefaultMemStreamSize);
auto writer = CreateWriter(/*stripe_size=*/1024, *orc_type, &mem_stream);
writer->close();

std::shared_ptr<io::RandomAccessFile> in_stream(std::make_shared<io::BufferReader>(
reinterpret_cast<const uint8_t*>(mem_stream.getData()),
static_cast<int64_t>(mem_stream.getLength())));
ASSERT_OK_AND_ASSIGN(
auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
ASSERT_EQ(0, reader->NumberOfRows());

ASSERT_OK_AND_ASSIGN(auto schema, reader->ReadSchema());
ASSERT_EQ(4, schema->num_fields());

// check top level fields
check_attributes(schema->field(0), "1", "true");
check_attributes(schema->field(1), "2", "false");
check_attributes(schema->field(2), "4", "false");
check_attributes(schema->field(3), "7", "false");

// check list element type
auto list_type = checked_pointer_cast<arrow::ListType>(schema->field(1)->type());
check_attributes(list_type->value_field(), "3", "false");

// check map key/value types
auto map_type = checked_pointer_cast<arrow::MapType>(schema->field(2)->type());
check_attributes(map_type->key_field(), "5", "true");
check_attributes(map_type->item_field(), "6", "false");

// check struct sub-field type
auto struct_type = checked_pointer_cast<arrow::StructType>(schema->field(3)->type());
check_attributes(struct_type->field(0), "8", "false");
}

TEST(TestAdapterReadWrite, FieldAttributesRoundTrip) {
EXPECT_OK_AND_ASSIGN(auto buffer_output_stream, io::BufferOutputStream::Create(1024));
auto write_options = adapters::orc::WriteOptions();
write_options.compression = Compression::UNCOMPRESSED;
EXPECT_OK_AND_ASSIGN(auto writer, adapters::orc::ORCFileWriter::Open(
buffer_output_stream.get(), write_options));

auto schema = ::arrow::schema(
{::arrow::field("c0", ::arrow::int64(), true, key_value_metadata({"k0"}, {"v0"})),
::arrow::field("c1", ::arrow::utf8(), true, key_value_metadata({"k1"}, {"v1"})),
::arrow::field(
"c2", ::arrow::list(::arrow::field("item", ::arrow::int64(), true,
key_value_metadata({"k0"}, {"ddv0"}))))});
auto expected_output_table = ::arrow::TableFromJSON(schema, {R"([[1, "a", [1, 2]]])"});
ARROW_EXPECT_OK(writer->Write(*expected_output_table));
ARROW_EXPECT_OK(writer->Close());

EXPECT_OK_AND_ASSIGN(auto buffer, buffer_output_stream->Finish());
std::shared_ptr<io::RandomAccessFile> in_stream(new io::BufferReader(buffer));
EXPECT_OK_AND_ASSIGN(
auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
EXPECT_OK_AND_ASSIGN(auto actual_output_table, reader->Read());
ASSERT_OK(actual_output_table->ValidateFull());
AssertTablesEqual(*expected_output_table, *actual_output_table, false, false);

// Check schema equality with metadata.
EXPECT_OK_AND_ASSIGN(auto read_schema, reader->ReadSchema());
AssertSchemaEqual(schema, read_schema, /*check_metadata=*/true);
}

// Trivial

class TestORCWriterTrivialNoWrite : public ::testing::Test {};
Expand Down
77 changes: 56 additions & 21 deletions cpp/src/arrow/adapters/orc/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,15 @@ Status WriteBatch(const Array& array, int64_t orc_offset,
}
}

void SetAttributes(const std::shared_ptr<arrow::Field>& field, liborc::Type* type) {
if (field->HasMetadata()) {
const auto& metadata = field->metadata();
for (int j = 0; j < metadata->size(); j++) {
type->setAttribute(metadata->key(j), metadata->value(j));
}
}
}

Result<std::unique_ptr<liborc::Type>> GetOrcType(const DataType& type) {
Type::type kind = type.id();
switch (kind) {
Expand Down Expand Up @@ -1000,9 +1009,9 @@ Result<std::unique_ptr<liborc::Type>> GetOrcType(const DataType& type) {
case Type::type::LIST:
case Type::type::FIXED_SIZE_LIST:
case Type::type::LARGE_LIST: {
std::shared_ptr<DataType> arrow_child_type =
checked_cast<const BaseListType&>(type).value_type();
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type));
const auto& value_field = checked_cast<const BaseListType&>(type).value_field();
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*value_field->type()));
SetAttributes(value_field, orc_subtype.get());
return liborc::createListType(std::move(orc_subtype));
}
case Type::type::STRUCT: {
Expand All @@ -1011,19 +1020,19 @@ Result<std::unique_ptr<liborc::Type>> GetOrcType(const DataType& type) {
checked_cast<const StructType&>(type).fields();
for (auto it = arrow_fields.begin(); it != arrow_fields.end(); ++it) {
std::string field_name = (*it)->name();
std::shared_ptr<DataType> arrow_child_type = (*it)->type();
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type));
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*(*it)->type()));
SetAttributes(*it, orc_subtype.get());
out_type->addStructField(field_name, std::move(orc_subtype));
}
return std::move(out_type);
}
case Type::type::MAP: {
std::shared_ptr<DataType> key_arrow_type =
checked_cast<const MapType&>(type).key_type();
std::shared_ptr<DataType> item_arrow_type =
checked_cast<const MapType&>(type).item_type();
ARROW_ASSIGN_OR_RAISE(auto key_orc_type, GetOrcType(*key_arrow_type));
ARROW_ASSIGN_OR_RAISE(auto item_orc_type, GetOrcType(*item_arrow_type));
const auto& key_field = checked_cast<const MapType&>(type).key_field();
const auto& item_field = checked_cast<const MapType&>(type).item_field();
ARROW_ASSIGN_OR_RAISE(auto key_orc_type, GetOrcType(*key_field->type()));
ARROW_ASSIGN_OR_RAISE(auto item_orc_type, GetOrcType(*item_field->type()));
SetAttributes(key_field, key_orc_type.get());
SetAttributes(item_field, item_orc_type.get());
return liborc::createMapType(std::move(key_orc_type), std::move(item_orc_type));
}
case Type::type::DENSE_UNION:
Expand All @@ -1034,6 +1043,7 @@ Result<std::unique_ptr<liborc::Type>> GetOrcType(const DataType& type) {
for (const auto& arrow_field : arrow_fields) {
std::shared_ptr<DataType> arrow_child_type = arrow_field->type();
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_child_type));
SetAttributes(arrow_field, orc_subtype.get());
out_type->addUnionChild(std::move(orc_subtype));
}
return std::move(out_type);
Expand Down Expand Up @@ -1132,23 +1142,26 @@ Result<std::shared_ptr<DataType>> GetArrowType(const liborc::Type* type) {
if (subtype_count != 1) {
return Status::TypeError("Invalid Orc List type");
}
ARROW_ASSIGN_OR_RAISE(auto elemtype, GetArrowType(type->getSubtype(0)));
return list(std::move(elemtype));
ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField("item", type->getSubtype(0)));
return list(std::move(elem_field));
}
case liborc::MAP: {
if (subtype_count != 2) {
return Status::TypeError("Invalid Orc Map type");
}
ARROW_ASSIGN_OR_RAISE(auto key_type, GetArrowType(type->getSubtype(0)));
ARROW_ASSIGN_OR_RAISE(auto item_type, GetArrowType(type->getSubtype(1)));
return map(std::move(key_type), std::move(item_type));
ARROW_ASSIGN_OR_RAISE(
auto key_field, GetArrowField("key", type->getSubtype(0), /*nullable=*/false));
ARROW_ASSIGN_OR_RAISE(auto value_field,
GetArrowField("value", type->getSubtype(1)));
return std::make_shared<MapType>(std::move(key_field), std::move(value_field));
}
case liborc::STRUCT: {
FieldVector fields(subtype_count);
for (int child = 0; child < subtype_count; ++child) {
ARROW_ASSIGN_OR_RAISE(auto elem_type, GetArrowType(type->getSubtype(child)));
std::string name = type->getFieldName(child);
fields[child] = field(std::move(name), std::move(elem_type));
const auto& name = type->getFieldName(child);
ARROW_ASSIGN_OR_RAISE(auto elem_field,
GetArrowField(name, type->getSubtype(child)));
fields[child] = std::move(elem_field);
}
return struct_(std::move(fields));
}
Expand All @@ -1159,8 +1172,9 @@ Result<std::shared_ptr<DataType>> GetArrowType(const liborc::Type* type) {
FieldVector fields(subtype_count);
std::vector<int8_t> type_codes(subtype_count);
for (int child = 0; child < subtype_count; ++child) {
ARROW_ASSIGN_OR_RAISE(auto elem_type, GetArrowType(type->getSubtype(child)));
fields[child] = field("_union_" + ToChars(child), std::move(elem_type));
ARROW_ASSIGN_OR_RAISE(auto elem_field, GetArrowField("_union_" + ToChars(child),
type->getSubtype(child)));
fields[child] = std::move(elem_field);
type_codes[child] = static_cast<int8_t>(child);
}
return sparse_union(std::move(fields), std::move(type_codes));
Expand All @@ -1176,11 +1190,32 @@ Result<std::unique_ptr<liborc::Type>> GetOrcType(const Schema& schema) {
for (int i = 0; i < numFields; i++) {
const auto& field = schema.field(i);
ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*field->type()));
SetAttributes(field, orc_subtype.get());
out_type->addStructField(field->name(), std::move(orc_subtype));
}
return std::move(out_type);
}

Result<std::shared_ptr<const KeyValueMetadata>> GetFieldMetadata(
const liborc::Type* type) {
if (type == nullptr) {
return nullptr;
}
const auto keys = type->getAttributeKeys();
auto metadata = std::make_shared<KeyValueMetadata>();
for (const auto& key : keys) {
metadata->Append(key, type->getAttributeValue(key));
}
return std::const_pointer_cast<const KeyValueMetadata>(metadata);
}

Result<std::shared_ptr<Field>> GetArrowField(const std::string& name,
const liborc::Type* type, bool nullable) {
ARROW_ASSIGN_OR_RAISE(auto arrow_type, GetArrowType(type));
ARROW_ASSIGN_OR_RAISE(auto metadata, GetFieldMetadata(type));
return field(name, std::move(arrow_type), nullable, std::move(metadata));
}

} // namespace orc
} // namespace adapters
} // namespace arrow
8 changes: 8 additions & 0 deletions cpp/src/arrow/adapters/orc/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "arrow/array/builder_base.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/key_value_metadata.h"
#include "orc/OrcFile.hh"

namespace liborc = orc;
Expand All @@ -35,6 +36,13 @@ Result<std::shared_ptr<DataType>> GetArrowType(const liborc::Type* type);

Result<std::unique_ptr<liborc::Type>> GetOrcType(const Schema& schema);

Result<std::shared_ptr<const KeyValueMetadata>> GetFieldMetadata(
const liborc::Type* type);

Result<std::shared_ptr<Field>> GetArrowField(const std::string& name,
const liborc::Type* type,
bool nullable = true);

ARROW_EXPORT Status AppendBatch(const liborc::Type* type,
liborc::ColumnVectorBatch* batch, int64_t offset,
int64_t length, arrow::ArrayBuilder* builder);
Expand Down

0 comments on commit 5fd327f

Please sign in to comment.