Skip to content

Commit

Permalink
ARROW-3020: [C++/Python] Allow empty arrow::Table objects to be writt…
Browse files Browse the repository at this point in the history
…en as empty Parquet row groups

While it's unclear how useful this is, it at least preserves the intent of the user if they decide to call `write_table` with an empty table

Author: Wes McKinney <wesm+git@apache.org>

Closes apache#3269 from wesm/ARROW-3020 and squashes the following commits:

b8c0cc2 <Wes McKinney> Revert changes to CMakeLists.txt
12b92cf <Wes McKinney> Allow empty arrow::Table objects to be written as empty Parquet row groups, and read back
  • Loading branch information
wesm authored and romainfrancois committed Jan 3, 2019
1 parent 5076264 commit d004bec
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
30 changes: 22 additions & 8 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ class ArrowColumnWriter {
Status Write(const Array& data);

Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) {
if (data.length() == 0) {
return Status::OK();
}

int64_t absolute_position = 0;
int chunk_index = 0;
int64_t chunk_offset = 0;
Expand Down Expand Up @@ -1134,22 +1138,32 @@ Status WriteFileMetaData(const FileMetaData& file_metadata,
namespace {} // namespace

Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
if (chunk_size <= 0) {
if (chunk_size <= 0 && table.num_rows() > 0) {
return Status::Invalid("chunk size per row_group must be greater than 0");
} else if (chunk_size > impl_->properties().max_row_group_length()) {
chunk_size = impl_->properties().max_row_group_length();
}

for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
int64_t size = std::min(chunk_size, table.num_rows() - offset);

RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close()));
auto WriteRowGroup = [&](int64_t offset, int64_t size) {
RETURN_NOT_OK(NewRowGroup(size));
for (int i = 0; i < table.num_columns(); i++) {
auto chunked_data = table.column(i)->data();
RETURN_NOT_OK_ELSE(WriteColumnChunk(chunked_data, offset, size),
PARQUET_IGNORE_NOT_OK(Close()));
RETURN_NOT_OK(WriteColumnChunk(chunked_data, offset, size));
}
return Status::OK();
};

if (table.num_rows() == 0) {
// Append a row group with 0 rows
RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
return Status::OK();
}

for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
RETURN_NOT_OK_ELSE(
WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
PARQUET_IGNORE_NOT_OK(Close()));
}
return Status::OK();
}
Expand Down
13 changes: 6 additions & 7 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -909,17 +909,16 @@ cdef class ParquetWriter:
check_status(self.sink.get().Close())

def write_table(self, Table table, row_group_size=None):
cdef CTable* ctable = table.table
cdef:
CTable* ctable = table.table
int64_t c_row_group_size

if row_group_size is None or row_group_size == -1:
if ctable.num_rows() > 0:
row_group_size = ctable.num_rows()
else:
row_group_size = 1
c_row_group_size = ctable.num_rows()
elif row_group_size == 0:
raise ValueError('Row group size cannot be 0')

cdef int64_t c_row_group_size = row_group_size
else:
c_row_group_size = row_group_size

with nogil:
check_status(self.writer.get()
Expand Down
18 changes: 18 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2251,6 +2251,24 @@ def test_merging_parquet_tables_with_different_pandas_metadata(tempdir):
writer.write_table(table2)


def test_empty_row_groups(tempdir):
# ARROW-3020
table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0'])

path = tempdir / 'empty_row_groups.parquet'

num_groups = 3
with pq.ParquetWriter(path, table.schema) as writer:
for i in range(num_groups):
writer.write_table(table)

reader = pq.ParquetFile(path)
assert reader.metadata.num_row_groups == num_groups

for i in range(num_groups):
assert reader.read_row_group(i).equals(table)


def test_writing_empty_lists():
# ARROW-2591: [Python] Segmentation fault issue in pq.write_table
arr1 = pa.array([[], []], pa.list_(pa.int32()))
Expand Down

0 comments on commit d004bec

Please sign in to comment.