Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.11.0] [Cherry-pick] [Datasets] Fix boolean tensor column representation and slicing. #22358

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 36 additions & 5 deletions python/ray/data/extensions/tensor_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,12 @@ def from_numpy(cls, arr):
if element_shape else 1)

# Data buffer.
if pa.types.is_boolean(pa_dtype):
# NumPy doesn't represent boolean arrays as bit-packed, so we manually
# bit-pack the booleans before handing the buffer off to Arrow.
# NOTE: Arrow expects LSB bit-packed ordering.
# NOTE: This creates a copy.
arr = np.packbits(arr, bitorder="little")
data_buffer = pa.py_buffer(arr)
data_array = pa.Array.from_buffers(pa_dtype, total_num_items,
[None, data_buffer])
Expand Down Expand Up @@ -1275,16 +1281,18 @@ def _to_numpy(self,
The corresponding tensor element as an ndarray if an index was
given, or the entire array of tensors as an ndarray otherwise.
"""
# TODO(Clark): Enforce zero_copy_only.
# TODO(Clark): Support strides?
# Buffers schema:
# [None, offset_buffer, None, data_buffer]
buffers = self.buffers()
data_buffer = buffers[3]
storage_list_type = self.storage.type
ext_dtype = storage_list_type.value_type.to_pandas_dtype()
shape = self.type.shape
value_type = storage_list_type.value_type
ext_dtype = value_type.to_pandas_dtype()
shape = self.type.shape
if pa.types.is_boolean(value_type):
# Boolean array buffers are byte-packed, with 8 entries per byte,
# Arrow boolean array buffers are bit-packed, with 8 entries per byte,
# and are accessed via bit offsets.
buffer_item_width = value_type.bit_width
else:
Expand All @@ -1309,8 +1317,31 @@ def _to_numpy(self,
else:
# Getting the entire array of tensors.
shape = (len(self), ) + shape
# TODO(Clark): Enforce zero_copy_only.
# TODO(Clark): Support strides?
if pa.types.is_boolean(value_type):
# Special handling for boolean arrays, since Arrow bit-packs boolean arrays
# while NumPy does not.
# Cast as uint8 array and let NumPy unpack into a boolean view.
# Offset into uint8 array, where each element is a bucket for 8 booleans.
byte_bucket_offset = offset // 8
# Offset for a specific boolean, within a uint8 array element.
bool_offset = offset % 8
# The number of uint8 array elements (buckets) that our slice spans.
# Note that, due to the offset for a specific boolean, the slice can span
# byte boundaries even if it contains less than 8 booleans.
num_boolean_byte_buckets = 1 + (
(bool_offset + np.prod(shape) - 1) // 8)
# Construct the uint8 array view on the buffer.
arr = np.ndarray(
(num_boolean_byte_buckets, ),
dtype=np.uint8,
buffer=data_buffer,
offset=byte_bucket_offset,
)
# Unpack into a byte per boolean, using LSB bit-packed ordering.
arr = np.unpackbits(arr, bitorder="little")
# Interpret buffer as boolean array.
return np.ndarray(
shape, dtype=np.bool_, buffer=arr, offset=bool_offset)
return np.ndarray(
shape, dtype=ext_dtype, buffer=data_buffer, offset=offset)

Expand Down
104 changes: 104 additions & 0 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,110 @@ def check_for_copy(table1, table2, a, b, is_copy):
check_for_copy(table, table2, a, b, is_copy=False)


@pytest.mark.parametrize(
"test_data,a,b",
[
([[False, True], [True, False], [True, True], [False, False]], 1, 3),
([[False, True], [True, False], [True, True], [False, False]], 0, 1),
(
[
[False, True],
[True, False],
[True, True],
[False, False],
[True, False],
[False, False],
[False, True],
[True, True],
[False, False],
[True, True],
[False, True],
[True, False],
],
3,
6,
),
(
[
[False, True],
[True, False],
[True, True],
[False, False],
[True, False],
[False, False],
[False, True],
[True, True],
[False, False],
[True, True],
[False, True],
[True, False],
],
7,
11,
),
(
[
[False, True],
[True, False],
[True, True],
[False, False],
[True, False],
[False, False],
[False, True],
[True, True],
[False, False],
[True, True],
[False, True],
[True, False],
],
9,
12,
),
],
)
@pytest.mark.parametrize("init_with_pandas", [True, False])
def test_tensor_array_boolean_slice_pandas_roundtrip(init_with_pandas,
test_data, a, b):
n = len(test_data)
test_arr = np.array(test_data)
df = pd.DataFrame({"one": TensorArray(test_arr), "two": ["a"] * n})
if init_with_pandas:
table = pa.Table.from_pandas(df)
else:
pa_dtype = pa.bool_()
flat = [w for v in test_data for w in v]
data_array = pa.array(flat, pa_dtype)
inner_len = len(test_data[0])
offsets = list(range(0, len(flat) + 1, inner_len))
offset_buffer = pa.py_buffer(np.int32(offsets))
storage = pa.Array.from_buffers(
pa.list_(pa_dtype),
len(test_data),
[None, offset_buffer],
children=[data_array],
)
t_arr = pa.ExtensionArray.from_storage(
ArrowTensorType((inner_len, ), pa.bool_()), storage)
table = pa.table({"one": t_arr, "two": ["a"] * n})
block_accessor = BlockAccessor.for_block(table)

# Test without copy.
table2 = block_accessor.slice(a, b, False)
np.testing.assert_array_equal(table2["one"].chunk(0).to_numpy(),
test_arr[a:b, :])
pd.testing.assert_frame_equal(
table2.to_pandas().reset_index(drop=True),
df[a:b].reset_index(drop=True))

# Test with copy.
table2 = block_accessor.slice(a, b, True)
np.testing.assert_array_equal(table2["one"].chunk(0).to_numpy(),
test_arr[a:b, :])
pd.testing.assert_frame_equal(
table2.to_pandas().reset_index(drop=True),
df[a:b].reset_index(drop=True))


def test_arrow_tensor_array_getitem(ray_start_regular_shared):
outer_dim = 3
inner_shape = (2, 2, 2)
Expand Down