Skip to content

Commit

Permalink
[Datasets] Unrevert Arrow table copy method change. (#19534)
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Oct 20, 2021
1 parent c51f79b commit 88c5fcd
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 5 deletions.
29 changes: 24 additions & 5 deletions python/ray/data/impl/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,8 @@ def __next__(self):
def slice(self, start: int, end: int, copy: bool) -> "pyarrow.Table":
view = self._table.slice(start, end - start)
if copy:
# TODO(ekl) there must be a cleaner way to force a copy of a table.
copy = [c.to_pandas() for c in view.itercolumns()]
return pyarrow.Table.from_arrays(copy, schema=self._table.schema)
else:
return view
view = _copy_table(view)
return view

def random_shuffle(self, random_seed: Optional[int]) -> List[T]:
random = np.random.RandomState(random_seed)
Expand Down Expand Up @@ -286,3 +283,25 @@ def merge_sorted_blocks(
indices = pyarrow.compute.sort_indices(ret, sort_keys=key)
ret = ret.take(indices)
return ret, ArrowBlockAccessor(ret).get_metadata(None)


def _copy_table(table: "pyarrow.Table") -> "pyarrow.Table":
"""Copy the provided Arrow table.
"""
import pyarrow as pa

# Copy the table by copying each column and constructing a new table with
# the same schema.
cols = table.columns
new_cols = []
for col in cols:
if col.num_chunks > 0 and isinstance(col.chunk(0), pa.ExtensionArray):
# If an extension array, we copy the underlying storage arrays.
chunk = col.chunk(0)
arr = type(chunk).from_storage(
chunk.type, pa.concat_arrays([c.storage for c in col.chunks]))
else:
# Otherwise, we copy the top-level chunk arrays.
arr = col.combine_chunks()
new_cols.append(arr)
return pa.Table.from_arrays(new_cols, schema=table.schema)
100 changes: 100 additions & 0 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,68 @@ def test_batch_tensors(ray_start_regular_shared):
assert df.to_dict().keys() == {0, 1}


def test_arrow_block_slice_copy():
# Test that ArrowBlock slicing properly copies the underlying Arrow
# table.
def check_for_copy(table1, table2, a, b, is_copy):
expected_slice = table1.slice(a, b - a)
assert table2.equals(expected_slice)
assert table2.schema == table1.schema
assert table1.num_columns == table2.num_columns
for col1, col2 in zip(table1.columns, table2.columns):
assert col1.num_chunks == col2.num_chunks
for chunk1, chunk2 in zip(col1.chunks, col2.chunks):
bufs1 = chunk1.buffers()
bufs2 = chunk2.buffers()
expected_offset = 0 if is_copy else a
assert chunk2.offset == expected_offset
assert len(chunk2) == b - a
if is_copy:
assert bufs2[1].address != bufs1[1].address
else:
assert bufs2[1].address == bufs1[1].address

n = 20
df = pd.DataFrame({
"one": list(range(n)),
"two": ["a"] * n,
"three": [np.nan] + [1.5] * (n - 1)
})
table = pa.Table.from_pandas(df)
a, b = 5, 10
block_accessor = BlockAccessor.for_block(table)

# Test with copy.
table2 = block_accessor.slice(a, b, True)
check_for_copy(table, table2, a, b, is_copy=True)

# Test without copy.
table2 = block_accessor.slice(a, b, False)
check_for_copy(table, table2, a, b, is_copy=False)


def test_arrow_block_slice_copy_empty():
# Test that ArrowBlock slicing properly copies the underlying Arrow
# table when the table is empty.
df = pd.DataFrame({"one": []})
table = pa.Table.from_pandas(df)
a, b = 0, 0
expected_slice = table.slice(a, b - a)
block_accessor = BlockAccessor.for_block(table)

# Test with copy.
table2 = block_accessor.slice(a, b, True)
assert table2.equals(expected_slice)
assert table2.schema == table.schema
assert table2.num_rows == 0

# Test without copy.
table2 = block_accessor.slice(a, b, False)
assert table2.equals(expected_slice)
assert table2.schema == table.schema
assert table2.num_rows == 0


def test_tensors(ray_start_regular_shared):
# Create directly.
ds = ray.data.range_tensor(5, shape=(3, 5))
Expand Down Expand Up @@ -359,6 +421,44 @@ def test_tensor_array_reductions(ray_start_regular_shared):
reducer(arr, axis=0, **np_kwargs))


def test_tensor_array_block_slice():
# Test that ArrowBlock slicing workers with tensor column extension type.
def check_for_copy(table1, table2, a, b, is_copy):
expected_slice = table1.slice(a, b - a)
assert table2.equals(expected_slice)
assert table2.schema == table1.schema
assert table1.num_columns == table2.num_columns
for col1, col2 in zip(table1.columns, table2.columns):
assert col1.num_chunks == col2.num_chunks
for chunk1, chunk2 in zip(col1.chunks, col2.chunks):
bufs1 = chunk1.buffers()
bufs2 = chunk2.buffers()
expected_offset = 0 if is_copy else a
assert chunk2.offset == expected_offset
assert len(chunk2) == b - a
if is_copy:
assert bufs2[1].address != bufs1[1].address
else:
assert bufs2[1].address == bufs1[1].address

n = 20
df = pd.DataFrame({
"one": TensorArray(np.array(list(range(n)))),
"two": ["a"] * n
})
table = pa.Table.from_pandas(df)
a, b = 5, 10
block_accessor = BlockAccessor.for_block(table)

# Test with copy.
table2 = block_accessor.slice(a, b, True)
check_for_copy(table, table2, a, b, is_copy=True)

# Test without copy.
table2 = block_accessor.slice(a, b, False)
check_for_copy(table, table2, a, b, is_copy=False)


def test_arrow_tensor_array_getitem(ray_start_regular_shared):
outer_dim = 3
inner_shape = (2, 2, 2)
Expand Down

0 comments on commit 88c5fcd

Please sign in to comment.