Skip to content

Commit

Permalink
ARROW-11607: [C++][Parquet] Update values_capacity_ when resetting.
Browse files Browse the repository at this point in the history
I'm not sure why values_capacity_ is different kept separately
from the buffer, but there is check which does not reserve
capacity again values_capacity_ is already the needed size.
When ReleaseValues is called, we allocate a brand new empty buffer.

I'm not really sure why this hasn't caused users more issues (maybe
increasingly large row groups or some other phenonemon). This bug
also highlight that our C++ tests have very limited coverage on batched
reads.  To fix this I added an batch read for every round trip test
to confirm it yields the same values.

Closes apache#9498 from emkornfield/ARROW-11607

Lead-authored-by: Micah Kornfield <emkornfield@gmail.com>
Co-authored-by: emkornfield <micahk@google.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
2 people authored and GeorgeAp committed Jun 7, 2021
1 parent 1577e34 commit 09ac2c2
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 5 deletions.
76 changes: 71 additions & 5 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Expand Up @@ -32,6 +32,7 @@
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_decimal.h"
#include "arrow/array/builder_dict.h"
#include "arrow/array/builder_nested.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/api.h"
Expand Down Expand Up @@ -80,6 +81,7 @@ using arrow::TimeUnit;
using arrow::compute::DictionaryEncode;
using arrow::internal::checked_cast;
using arrow::internal::checked_pointer_cast;
using arrow::internal::Iota;
using arrow::io::BufferReader;

using arrow::randint;
Expand Down Expand Up @@ -443,15 +445,58 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
}
}

void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
void DoRoundTripWithBatches(
const std::shared_ptr<Table>& table, bool use_threads, int64_t row_group_size,
const std::vector<int>& column_subset, std::shared_ptr<Table>* out,
const std::shared_ptr<ArrowWriterProperties>& arrow_writer_properties =
default_arrow_writer_properties()) {
std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(
WriteTableToBuffer(table, row_group_size, arrow_writer_properties, &buffer));

std::unique_ptr<FileReader> reader;
FileReaderBuilder builder;
ASSERT_OK_NO_THROW(builder.Open(std::make_shared<BufferReader>(buffer)));
ArrowReaderProperties arrow_reader_properties;
arrow_reader_properties.set_batch_size(row_group_size - 1);
ASSERT_OK_NO_THROW(builder.memory_pool(::arrow::default_memory_pool())
->properties(arrow_reader_properties)
->Build(&reader));
std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
if (column_subset.size() > 0) {
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader(
Iota(reader->parquet_reader()->metadata()->num_row_groups()), column_subset,
&batch_reader));
} else {
// Read everything

ASSERT_OK_NO_THROW(reader->GetRecordBatchReader(
Iota(reader->parquet_reader()->metadata()->num_row_groups()), &batch_reader));
}
ASSERT_OK_AND_ASSIGN(*out, Table::FromRecordBatchReader(batch_reader.get()));
}

void CheckSimpleRoundtrip(
const std::shared_ptr<Table>& table, int64_t row_group_size,
const std::shared_ptr<ArrowWriterProperties>& arrow_writer_properties =
default_arrow_writer_properties()) {
std::shared_ptr<Table> result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
table, false /* use_threads */, row_group_size, {}, &result, arrow_properties));
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(table, false /* use_threads */,
row_group_size, {}, &result,
arrow_writer_properties));
::arrow::AssertSchemaEqual(*table->schema(), *result->schema(),
/*check_metadata=*/false);
ASSERT_OK(result->ValidateFull());

::arrow::AssertTablesEqual(*table, *result, false);

ASSERT_NO_FATAL_FAILURE(DoRoundTripWithBatches(table, false /* use_threads */,
row_group_size, {}, &result,
arrow_writer_properties));
::arrow::AssertSchemaEqual(*table->schema(), *result->schema(),
/*check_metadata=*/false);
ASSERT_OK(result->ValidateFull());

::arrow::AssertTablesEqual(*table, *result, false);
}

Expand Down Expand Up @@ -2475,6 +2520,27 @@ TEST(TestArrowReadWrite, TableWithChunkedColumns) {
}
}

TEST(TestArrowReadWrite, ManySmallLists) {
// ARROW-11607: The actual scenario this forces is no data reads for
// a first batch, and then a single element read for the second batch.

// Constructs
std::shared_ptr<::arrow::Int32Builder> value_builder =
std::make_shared<::arrow::Int32Builder>();
constexpr int64_t kNullCount = 6;
auto type = ::arrow::list(::arrow::int32());
std::vector<std::shared_ptr<Array>> arrays(1);
arrays[0] = ArrayFromJSON(type, R"([null, null, null, null, null, null, [1]])");

auto field = ::arrow::field("fname", type);
auto schema = ::arrow::schema({field});
auto table = Table::Make(schema, {std::make_shared<ChunkedArray>(arrays)});
ASSERT_EQ(table->num_rows(), kNullCount + 1);

CheckSimpleRoundtrip(table, /*row_group_size=*/kNullCount,
default_arrow_writer_properties());
}

TEST(TestArrowReadWrite, TableWithDuplicateColumns) {
// See ARROW-1974
using ::arrow::ArrayFromVector;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/column_reader.cc
Expand Up @@ -1201,6 +1201,7 @@ class TypedRecordReader : public ColumnReaderImplBase<DType>,
auto result = values_;
PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true));
values_ = AllocateBuffer(this->pool_);
values_capacity_ = 0;
return result;
} else {
return nullptr;
Expand Down
14 changes: 14 additions & 0 deletions python/pyarrow/tests/parquet/test_basic.py
Expand Up @@ -570,3 +570,17 @@ def test_empty_row_groups(tempdir):

for i in range(num_groups):
assert reader.read_row_group(i).equals(table)


def test_reads_over_batch(tempdir):
data = [None] * (1 << 20)
data.append([1])
# Large list<int64> with mostly nones and one final
# value. This should force batched reads when
# reading back.
table = pa.Table.from_arrays([data], ['column'])

path = tempdir / 'arrow-11607.parquet'
pq.write_table(table, path)
table2 = pq.read_table(path)
assert table == table2

0 comments on commit 09ac2c2

Please sign in to comment.