Skip to content

Commit

Permalink
[Datasets] Fix boolean tensor column representation and slicing. (#22323
Browse files Browse the repository at this point in the history
)

This PR fixes our {NumPy, Pandas} <--> Arrow interop for boolean tensor columns. NumPy and Pandas represent boolean arrays with a byte per boolean, while Arrow bit-packs booleans with 8 booleans per byte. Previously, when casting NumPy arrays to tensor columns, we were interpreting NumPy's boolean array buffers as being bit-packed when they were not. This PR completes support by packing and unpacking bits for boolean arrays when creating a boolean tensor column from an ndarray and when creating an ndarray from a boolean tensor column, respectively.
  • Loading branch information
clarkzinzow committed Feb 14, 2022
1 parent db5de9c commit 4434169
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 5 deletions.
39 changes: 34 additions & 5 deletions python/ray/data/extensions/tensor_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,12 @@ def from_numpy(cls, arr):
num_items_per_element = np.prod(element_shape) if element_shape else 1

# Data buffer.
if pa.types.is_boolean(pa_dtype):
# NumPy doesn't represent boolean arrays as bit-packed, so we manually
# bit-pack the booleans before handing the buffer off to Arrow.
# NOTE: Arrow expects LSB bit-packed ordering.
# NOTE: This creates a copy.
arr = np.packbits(arr, bitorder="little")
data_buffer = pa.py_buffer(arr)
data_array = pa.Array.from_buffers(
pa_dtype, total_num_items, [None, data_buffer]
Expand Down Expand Up @@ -1289,16 +1295,18 @@ def _to_numpy(self, index: Optional[int] = None, zero_copy_only: bool = False):
The corresponding tensor element as an ndarray if an index was
given, or the entire array of tensors as an ndarray otherwise.
"""
# TODO(Clark): Enforce zero_copy_only.
# TODO(Clark): Support strides?
# Buffers schema:
# [None, offset_buffer, None, data_buffer]
buffers = self.buffers()
data_buffer = buffers[3]
storage_list_type = self.storage.type
ext_dtype = storage_list_type.value_type.to_pandas_dtype()
shape = self.type.shape
value_type = storage_list_type.value_type
ext_dtype = value_type.to_pandas_dtype()
shape = self.type.shape
if pa.types.is_boolean(value_type):
# Boolean array buffers are byte-packed, with 8 entries per byte,
# Arrow boolean array buffers are bit-packed, with 8 entries per byte,
# and are accessed via bit offsets.
buffer_item_width = value_type.bit_width
else:
Expand All @@ -1324,8 +1332,29 @@ def _to_numpy(self, index: Optional[int] = None, zero_copy_only: bool = False):
else:
# Getting the entire array of tensors.
shape = (len(self),) + shape
# TODO(Clark): Enforce zero_copy_only.
# TODO(Clark): Support strides?
if pa.types.is_boolean(value_type):
# Special handling for boolean arrays, since Arrow bit-packs boolean arrays
# while NumPy does not.
# Cast as uint8 array and let NumPy unpack into a boolean view.
# Offset into uint8 array, where each element is a bucket for 8 booleans.
byte_bucket_offset = offset // 8
# Offset for a specific boolean, within a uint8 array element.
bool_offset = offset % 8
# The number of uint8 array elements (buckets) that our slice spans.
# Note that, due to the offset for a specific boolean, the slice can span
# byte boundaries even if it contains less than 8 booleans.
num_boolean_byte_buckets = 1 + ((bool_offset + np.prod(shape) - 1) // 8)
# Construct the uint8 array view on the buffer.
arr = np.ndarray(
(num_boolean_byte_buckets,),
dtype=np.uint8,
buffer=data_buffer,
offset=byte_bucket_offset,
)
# Unpack into a byte per boolean, using LSB bit-packed ordering.
arr = np.unpackbits(arr, bitorder="little")
# Interpret buffer as boolean array.
return np.ndarray(shape, dtype=np.bool_, buffer=arr, offset=bool_offset)
return np.ndarray(shape, dtype=ext_dtype, buffer=data_buffer, offset=offset)

def to_numpy(self, zero_copy_only: bool = True):
Expand Down
102 changes: 102 additions & 0 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,108 @@ def check_for_copy(table1, table2, a, b, is_copy):
check_for_copy(table, table2, a, b, is_copy=False)


@pytest.mark.parametrize(
"test_data,a,b",
[
([[False, True], [True, False], [True, True], [False, False]], 1, 3),
([[False, True], [True, False], [True, True], [False, False]], 0, 1),
(
[
[False, True],
[True, False],
[True, True],
[False, False],
[True, False],
[False, False],
[False, True],
[True, True],
[False, False],
[True, True],
[False, True],
[True, False],
],
3,
6,
),
(
[
[False, True],
[True, False],
[True, True],
[False, False],
[True, False],
[False, False],
[False, True],
[True, True],
[False, False],
[True, True],
[False, True],
[True, False],
],
7,
11,
),
(
[
[False, True],
[True, False],
[True, True],
[False, False],
[True, False],
[False, False],
[False, True],
[True, True],
[False, False],
[True, True],
[False, True],
[True, False],
],
9,
12,
),
],
)
@pytest.mark.parametrize("init_with_pandas", [True, False])
def test_tensor_array_boolean_slice_pandas_roundtrip(init_with_pandas, test_data, a, b):
n = len(test_data)
test_arr = np.array(test_data)
df = pd.DataFrame({"one": TensorArray(test_arr), "two": ["a"] * n})
if init_with_pandas:
table = pa.Table.from_pandas(df)
else:
pa_dtype = pa.bool_()
flat = [w for v in test_data for w in v]
data_array = pa.array(flat, pa_dtype)
inner_len = len(test_data[0])
offsets = list(range(0, len(flat) + 1, inner_len))
offset_buffer = pa.py_buffer(np.int32(offsets))
storage = pa.Array.from_buffers(
pa.list_(pa_dtype),
len(test_data),
[None, offset_buffer],
children=[data_array],
)
t_arr = pa.ExtensionArray.from_storage(
ArrowTensorType((inner_len,), pa.bool_()), storage
)
table = pa.table({"one": t_arr, "two": ["a"] * n})
block_accessor = BlockAccessor.for_block(table)

# Test without copy.
table2 = block_accessor.slice(a, b, False)
np.testing.assert_array_equal(table2["one"].chunk(0).to_numpy(), test_arr[a:b, :])
pd.testing.assert_frame_equal(
table2.to_pandas().reset_index(drop=True), df[a:b].reset_index(drop=True)
)

# Test with copy.
table2 = block_accessor.slice(a, b, True)
np.testing.assert_array_equal(table2["one"].chunk(0).to_numpy(), test_arr[a:b, :])
pd.testing.assert_frame_equal(
table2.to_pandas().reset_index(drop=True), df[a:b].reset_index(drop=True)
)


def test_arrow_tensor_array_getitem(ray_start_regular_shared):
outer_dim = 3
inner_shape = (2, 2, 2)
Expand Down

0 comments on commit 4434169

Please sign in to comment.