Skip to content

Commit

Permalink
Start working on properly preserving and deserializing field_id in C+…
Browse files Browse the repository at this point in the history
…+. Some

field_id round trips working

Schema tests passing, some refactoring to simplify schema serialization code

Add KeyValueMetadata::Merge method, Arrow schema access for ParquetFile

Add Field::WithMergedMetadata

Add Parquet field_id metadata to Arrow schemas. Better diffing in AssertTablesEqual

Ignore unequal child field metadata in ChunkedArray::Equals, fix Python Parquet unit tests

Add Python unit test to verify that field_id's are correctly filtered through to the Arrow schema in Python

decruft
  • Loading branch information
wesm committed Feb 17, 2020
1 parent 4ffa286 commit 08890cb
Show file tree
Hide file tree
Showing 24 changed files with 686 additions and 433 deletions.
85 changes: 50 additions & 35 deletions cpp/src/arrow/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,45 +73,22 @@ bool ChunkedArray::Equals(const ChunkedArray& other) const {
return false;
}
if (length_ == 0) {
return type_->Equals(other.type_);
// We cannot toggle check_metadata here yet, so we don't check it
return type_->Equals(*other.type_, /*check_metadata=*/false);
}

// Check contents of the underlying arrays. This checks for equality of
// the underlying data independently of the chunk size.
int this_chunk_idx = 0;
int64_t this_start_idx = 0;
int other_chunk_idx = 0;
int64_t other_start_idx = 0;

int64_t elements_compared = 0;
while (elements_compared < length_) {
const std::shared_ptr<Array> this_array = chunks_[this_chunk_idx];
const std::shared_ptr<Array> other_array = other.chunk(other_chunk_idx);
int64_t common_length = std::min(this_array->length() - this_start_idx,
other_array->length() - other_start_idx);
if (!this_array->RangeEquals(this_start_idx, this_start_idx + common_length,
other_start_idx, other_array)) {
return false;
}

elements_compared += common_length;

// If we have exhausted the current chunk, proceed to the next one individually.
if (this_start_idx + common_length == this_array->length()) {
this_chunk_idx++;
this_start_idx = 0;
} else {
this_start_idx += common_length;
}

if (other_start_idx + common_length == other_array->length()) {
other_chunk_idx++;
other_start_idx = 0;
} else {
other_start_idx += common_length;
}
}
return true;
return internal::ApplyToChunkOverlaps(
*this, other,
[](const Array& left_piece, const Array& right_piece,
int64_t ARROW_ARG_UNUSED(position)) {
if (!left_piece.Equals(right_piece)) {
return Status::Invalid("Unequal piece");
}
return Status::OK();
})
.ok();
}

bool ChunkedArray::Equals(const std::shared_ptr<ChunkedArray>& other) const {
Expand Down Expand Up @@ -222,6 +199,44 @@ Status ChunkedArray::ValidateFull() const {
return Status::OK();
}

namespace internal {

bool MultipleChunkIterator::Next(std::shared_ptr<Array>* next_left,
std::shared_ptr<Array>* next_right) {
if (pos_ == length_) return false;

// Find non-empty chunk
std::shared_ptr<Array> chunk_left, chunk_right;
while (true) {
chunk_left = left_.chunk(chunk_idx_left_);
chunk_right = right_.chunk(chunk_idx_right_);
if (chunk_pos_left_ == chunk_left->length()) {
chunk_pos_left_ = 0;
++chunk_idx_left_;
continue;
}
if (chunk_pos_right_ == chunk_right->length()) {
chunk_pos_right_ = 0;
++chunk_idx_right_;
continue;
}
break;
}
// Determine how big of a section to return
int64_t iteration_size = std::min(chunk_left->length() - chunk_pos_left_,
chunk_right->length() - chunk_pos_right_);

*next_left = chunk_left->Slice(chunk_pos_left_, iteration_size);
*next_right = chunk_right->Slice(chunk_pos_right_, iteration_size);

pos_ += iteration_size;
chunk_pos_left_ += iteration_size;
chunk_pos_right_ += iteration_size;
return true;
}

} // namespace internal

// ----------------------------------------------------------------------
// Table methods

Expand Down
57 changes: 57 additions & 0 deletions cpp/src/arrow/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,63 @@ class ARROW_EXPORT ChunkedArray {
ARROW_DISALLOW_COPY_AND_ASSIGN(ChunkedArray);
};

namespace internal {

/// \brief EXPERIMENTAL: Utility for incremental iteration over contiguous
/// pieces of potentially differently-chunked ChunkedArray objects
class MultipleChunkIterator {
public:
MultipleChunkIterator(const ChunkedArray& left, const ChunkedArray& right)
: left_(left),
right_(right),
pos_(0),
length_(left.length()),
chunk_idx_left_(0),
chunk_idx_right_(0),
chunk_pos_left_(0),
chunk_pos_right_(0) {}

bool Next(std::shared_ptr<Array>* next_left, std::shared_ptr<Array>* next_right);

int64_t position() const { return pos_; }

private:
const ChunkedArray& left_;
const ChunkedArray& right_;

// The amount of the entire ChunkedArray consumed
int64_t pos_;

// Length of the chunked array(s)
int64_t length_;

// Current left chunk
int chunk_idx_left_;

// Current right chunk
int chunk_idx_right_;

// Offset into the current left chunk
int64_t chunk_pos_left_;

// Offset into the current right chunk
int64_t chunk_pos_right_;
};

// Execute the passed function
template <typename Action>
Status ApplyToChunkOverlaps(const ChunkedArray& left, const ChunkedArray& right,
Action&& action) {
MultipleChunkIterator iterator(left, right);
std::shared_ptr<Array> left_piece, right_piece;
while (iterator.Next(&left_piece, &right_piece)) {
ARROW_RETURN_NOT_OK(action(*left_piece, *right_piece, iterator.position()));
}
return Status::OK();
}

} // namespace internal

/// \class Table
/// \brief Logical table as sequence of chunked arrays
class ARROW_EXPORT Table {
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ TEST_F(TestChunkedArray, EqualsDifferingLengths) {
ASSERT_TRUE(one_->Equals(*another_.get()));
}

TEST_F(TestChunkedArray, EqualsDifferingMetadata) {
auto left_ty = list(field("item", int32()));

auto metadata = KeyValueMetadata::Make({"foo"}, {"bar"});
auto right_ty = list(field("item", int32(), true, metadata));

std::vector<std::shared_ptr<Array>> left_chunks = {ArrayFromJSON(left_ty, "[[]]")};
std::vector<std::shared_ptr<Array>> right_chunks = {ArrayFromJSON(right_ty, "[[]]")};

ChunkedArray left(left_chunks);
ChunkedArray right(right_chunks);
ASSERT_TRUE(left.Equals(right));
}

TEST_F(TestChunkedArray, SliceEquals) {
arrays_one_.push_back(MakeRandomArray<Int32Array>(100));
arrays_one_.push_back(MakeRandomArray<Int32Array>(50));
Expand Down
38 changes: 21 additions & 17 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,6 @@

namespace arrow {

static void PrintChunkedArray(const ChunkedArray& carr, std::stringstream* ss) {
for (int i = 0; i < carr.num_chunks(); ++i) {
auto c1 = carr.chunk(i);
*ss << "Chunk " << i << std::endl;
::arrow::PrettyPrintOptions options(/*indent=*/2);
ARROW_EXPECT_OK(::arrow::PrettyPrint(*c1, options, ss));
*ss << std::endl;
}
}

template <typename T>
void AssertTsEqual(const T& expected, const T& actual) {
if (!expected.Equals(actual)) {
Expand Down Expand Up @@ -280,15 +270,29 @@ void AssertTablesEqual(const Table& expected, const Table& actual, bool same_chu
}
} else {
std::stringstream ss;
if (!actual.Equals(expected)) {
for (int i = 0; i < expected.num_columns(); ++i) {
ss << "Actual column " << i << std::endl;
PrintChunkedArray(*actual.column(i), &ss);
for (int i = 0; i < actual.num_columns(); ++i) {
auto actual_col = actual.column(i);
auto expected_col = expected.column(i);

PrettyPrintOptions options(/*indent=*/2);
options.window = 50;

ss << "Expected column " << i << std::endl;
PrintChunkedArray(*expected.column(i), &ss);
if (!actual_col->Equals(*expected_col)) {
ASSERT_OK(internal::ApplyToChunkOverlaps(
*actual_col, *expected_col,
[&](const Array& left_piece, const Array& right_piece, int64_t position) {
std::stringstream diff;
if (!left_piece.Equals(right_piece, EqualOptions().diff_sink(&diff))) {
ss << "Unequal at absolute position " << position << "\n" << diff.str();
ss << "Expected:\n";
ARROW_EXPECT_OK(PrettyPrint(right_piece, options, &ss));
ss << "\nActual:\n";
ARROW_EXPECT_OK(PrettyPrint(left_piece, options, &ss));
}
return Status::OK();
}));
FAIL() << ss.str();
}
FAIL() << ss.str();
}
}
}
Expand Down
18 changes: 16 additions & 2 deletions cpp/src/arrow/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ std::shared_ptr<Field> Field::WithMetadata(
return std::make_shared<Field>(name_, type_, nullable_, metadata);
}

std::shared_ptr<Field> Field::WithMergedMetadata(
const std::shared_ptr<const KeyValueMetadata>& metadata) const {
std::shared_ptr<const KeyValueMetadata> merged_metadata;
if (metadata_) {
merged_metadata = metadata_->Merge(*metadata);
} else {
merged_metadata = metadata;
}
return std::make_shared<Field>(name_, type_, nullable_, merged_metadata);
}

std::shared_ptr<Field> Field::RemoveMetadata() const {
return std::make_shared<Field>(name_, type_, nullable_);
}
Expand Down Expand Up @@ -168,10 +179,13 @@ bool Field::IsCompatibleWith(const std::shared_ptr<Field>& other) const {

std::string Field::ToString() const {
std::stringstream ss;
ss << this->name_ << ": " << this->type_->ToString();
if (!this->nullable_) {
ss << name_ << ": " << type_->ToString();
if (!nullable_) {
ss << " not null";
}
if (metadata_) {
ss << metadata_->ToString();
}
return ss.str();
}

Expand Down
7 changes: 6 additions & 1 deletion cpp/src/arrow/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@ class ARROW_EXPORT Field : public detail::Fingerprintable {
std::shared_ptr<Field> WithMetadata(
const std::shared_ptr<const KeyValueMetadata>& metadata) const;

/// \brief EXPERIMENTAL: Return a copy of this field with the given metadata
/// merged with existing metadata (any colliding keys will be overridden by
/// the passed metadata)
std::shared_ptr<Field> WithMergedMetadata(
const std::shared_ptr<const KeyValueMetadata>& metadata) const;

ARROW_DEPRECATED("Use WithMetadata")
std::shared_ptr<Field> AddMetadata(
const std::shared_ptr<const KeyValueMetadata>& metadata) const;
Expand Down Expand Up @@ -395,7 +401,6 @@ class ARROW_EXPORT Field : public detail::Fingerprintable {
/// - have the same name
/// - have the same type, or of compatible types according to `options`.
///
///
/// The metadata of the current field is preserved; the metadata of the other
/// field is discarded.
Result<std::shared_ptr<Field>> MergeWith(
Expand Down
Loading

0 comments on commit 08890cb

Please sign in to comment.