Skip to content

Commit

Permalink
apacheGH-40866: [C++][Python] Basic conversion of RecordBatch to Arro…
Browse files Browse the repository at this point in the history
…w Tensor - add support for row-major (apache#40867)

### Rationale for this change

The conversion from `RecordBatch` to `Tensor` class now exists but it doesn't support row-major `Tensor` as an output. This PR adds support for an option to construct row-major `Tensor`.

### What changes are included in this PR?

This PR adds a `row_major` option in `RecordBatch::ToTensor` so that row-major `Tensor` can be constructed. The default conversion will be row-major. This for example works:

```python
>>> import pyarrow as pa
>>> import numpy as np

>>> arr1 = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> arr2 = [10, 20, 30, 40, 50, 60, 70, 80, 90]
>>> batch = pa.RecordBatch.from_arrays(
...     [
...         pa.array(arr1, type=pa.uint16()),
...         pa.array(arr2, type=pa.int16()),
... 
...     ], ["a", "b"]
... )

# Row-major

>>> batch.to_tensor()
<pyarrow.Tensor>
type: int32
shape: (9, 2)
strides: (8, 4)

>>> batch.to_tensor().to_numpy().flags
  C_CONTIGUOUS : True
  F_CONTIGUOUS : False
  OWNDATA : False
  WRITEABLE : True
  ALIGNED : True
  WRITEBACKIFCOPY : False

# Column-major

>>> batch.to_tensor(row_major=False)
<pyarrow.Tensor>
type: int32
shape: (9, 2)
strides: (4, 36)

>>> batch.to_tensor(row_major=False).to_numpy().flags
  C_CONTIGUOUS : False
  F_CONTIGUOUS : True
  OWNDATA : False
  WRITEABLE : True
  ALIGNED : True
  WRITEBACKIFCOPY : False
```

### Are these changes tested?

Yes, in C++ and Python.

### Are there any user-facing changes?

No.
* GitHub Issue: apache#40866

Lead-authored-by: AlenkaF <frim.alenka@gmail.com>
Co-authored-by: Alenka Frim <AlenkaF@users.noreply.github.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
2 people authored and tolleybot committed May 4, 2024
1 parent 61c4296 commit c04f893
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 21 deletions.
31 changes: 30 additions & 1 deletion cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,35 @@ struct ConvertColumnsToTensorVisitor {
}
};

template <typename Out>
struct ConvertColumnsToTensorRowMajorVisitor {
Out*& out_values;
const ArrayData& in_data;
int num_cols;
int col_idx;

template <typename T>
Status Visit(const T&) {
if constexpr (is_numeric(T::type_id)) {
using In = typename T::c_type;
auto in_values = ArraySpan(in_data).GetSpan<In>(1, in_data.length);

if (in_data.null_count == 0) {
for (int64_t i = 0; i < in_data.length; ++i) {
out_values[i * num_cols + col_idx] = static_cast<Out>(in_values[i]);
}
} else {
for (int64_t i = 0; i < in_data.length; ++i) {
out_values[i * num_cols + col_idx] =
in_data.IsNull(i) ? static_cast<Out>(NAN) : static_cast<Out>(in_values[i]);
}
}
return Status::OK();
}
Unreachable();
}
};

template <typename DataType>
inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out,
bool row_major) {
Expand All @@ -302,7 +331,7 @@ inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out,
}
}

Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan,
Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan, bool row_major,
MemoryPool* pool) const {
if (num_columns() == 0) {
return Status::TypeError(
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ class ARROW_EXPORT RecordBatch {
/// Generated Tensor will have column-major layout.
///
/// \param[in] null_to_nan if true, convert nulls to NaN
/// \param[in] row_major if true, create row-major Tensor else column-major Tensor
/// \param[in] pool the memory pool to allocate the tensor buffer
/// \return the resulting Tensor
Result<std::shared_ptr<Tensor>> ToTensor(
bool null_to_nan = false, MemoryPool* pool = default_memory_pool()) const;
bool null_to_nan = false, bool row_major = true,
MemoryPool* pool = default_memory_pool()) const;

/// \brief Construct record batch from struct array
///
Expand Down
44 changes: 41 additions & 3 deletions cpp/src/arrow/record_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,8 @@ TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) {

auto batch = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor(/*null_to_nan=*/true));
ASSERT_OK_AND_ASSIGN(auto tensor,
batch->ToTensor(/*null_to_nan=*/true, /*row_major=*/false));
ASSERT_OK(tensor->Validate());

std::vector<int64_t> shape = {9, 2};
Expand All @@ -794,6 +795,19 @@ TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) {

CheckTensor<DoubleType>(tensor, 18, shape, f_strides);

ASSERT_OK_AND_ASSIGN(auto tensor_row, batch->ToTensor(/*null_to_nan=*/true));
ASSERT_OK(tensor_row->Validate());

std::vector<int64_t> strides = {f64_size * shape[1], f64_size};
std::shared_ptr<Tensor> tensor_expected_row = TensorFromJSON(
float64(), "[NaN, 10, 2, 20, 3, 30, 4, 40, 5, NaN, 6, 60, 7, 70, 8, 80, 9, 90]",
shape, strides);

EXPECT_FALSE(tensor_expected_row->Equals(*tensor_row));
EXPECT_TRUE(tensor_expected_row->Equals(*tensor_row, EqualOptions().nans_equal(true)));

CheckTensorRowMajor<DoubleType>(tensor_row, 18, shape, strides);

// int32 -> float64
auto f2 = field("f2", int32());

Expand All @@ -803,14 +817,23 @@ TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) {
auto a2 = ArrayFromJSON(int32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
auto batch1 = RecordBatch::Make(schema1, length, {a0, a2});

ASSERT_OK_AND_ASSIGN(auto tensor1, batch1->ToTensor(/*null_to_nan=*/true));
ASSERT_OK_AND_ASSIGN(auto tensor1,
batch1->ToTensor(/*null_to_nan=*/true, /*row_major=*/false));
ASSERT_OK(tensor1->Validate());

EXPECT_FALSE(tensor_expected->Equals(*tensor1));
EXPECT_TRUE(tensor_expected->Equals(*tensor1, EqualOptions().nans_equal(true)));

CheckTensor<DoubleType>(tensor1, 18, shape, f_strides);

ASSERT_OK_AND_ASSIGN(auto tensor1_row, batch1->ToTensor(/*null_to_nan=*/true));
ASSERT_OK(tensor1_row->Validate());

EXPECT_FALSE(tensor_expected_row->Equals(*tensor1_row));
EXPECT_TRUE(tensor_expected_row->Equals(*tensor1_row, EqualOptions().nans_equal(true)));

CheckTensorRowMajor<DoubleType>(tensor1_row, 18, shape, strides);

// int8 -> float32
auto f3 = field("f3", int8());
auto f4 = field("f4", int8());
Expand All @@ -822,7 +845,8 @@ TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) {
auto a4 = ArrayFromJSON(int8(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
auto batch2 = RecordBatch::Make(schema2, length, {a3, a4});

ASSERT_OK_AND_ASSIGN(auto tensor2, batch2->ToTensor(/*null_to_nan=*/true));
ASSERT_OK_AND_ASSIGN(auto tensor2,
batch2->ToTensor(/*null_to_nan=*/true, /*row_major=*/false));
ASSERT_OK(tensor2->Validate());

const int64_t f32_size = sizeof(float);
Expand All @@ -835,6 +859,20 @@ TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) {
EXPECT_TRUE(tensor_expected_2->Equals(*tensor2, EqualOptions().nans_equal(true)));

CheckTensor<FloatType>(tensor2, 18, shape, f_strides_2);

ASSERT_OK_AND_ASSIGN(auto tensor2_row, batch2->ToTensor(/*null_to_nan=*/true));
ASSERT_OK(tensor2_row->Validate());

std::vector<int64_t> strides_2 = {f32_size * shape[1], f32_size};
std::shared_ptr<Tensor> tensor2_expected_row = TensorFromJSON(
float32(), "[NaN, 10, 2, 20, 3, 30, 4, 40, 5, NaN, 6, 60, 7, 70, 8, 80, 9, 90]",
shape, strides_2);

EXPECT_FALSE(tensor2_expected_row->Equals(*tensor2_row));
EXPECT_TRUE(
tensor2_expected_row->Equals(*tensor2_row, EqualOptions().nans_equal(true)));

CheckTensorRowMajor<FloatType>(tensor2_row, 18, shape, strides_2);
}

TEST_F(TestRecordBatch, ToTensorSupportedTypesMixed) {
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CRecordBatch] Slice(int64_t offset)
shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)

CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, CMemoryPool* pool) const
CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, c_bool row_major,
CMemoryPool* pool) const

cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
shared_ptr[CRecordBatch] batch
Expand Down
36 changes: 28 additions & 8 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3469,20 +3469,24 @@ cdef class RecordBatch(_Tabular):
<CResult[shared_ptr[CArray]]>deref(c_record_batch).ToStructArray())
return pyarrow_wrap_array(c_array)

def to_tensor(self, c_bool null_to_nan=False, MemoryPool memory_pool=None):
def to_tensor(self, c_bool null_to_nan=False, c_bool row_major=True, MemoryPool memory_pool=None):
"""
Convert to a :class:`~pyarrow.Tensor`.
RecordBatches that can be converted have fields of type signed or unsigned
integer or float, including all bit-widths. RecordBatches with validity bitmask
for any of the arrays can be converted with ``null_to_nan``turned to ``True``.
In this case null values are converted to NaN and signed or unsigned integer
type arrays are promoted to appropriate float type.
integer or float, including all bit-widths.
``null_to_nan`` is ``False`` by default and this method will raise an error in case
any nulls are present. RecordBatches with nulls can be converted with ``null_to_nan``
set to ``True``. In this case null values are converted to ``NaN`` and integer type
arrays are promoted to the appropriate float type.
Parameters
----------
null_to_nan : bool, default False
Whether to write null values in the result as ``NaN``.
row_major : bool, default True
Whether resulting Tensor is row-major or column-major
memory_pool : MemoryPool, default None
For memory allocations, if required, otherwise use default pool
Expand All @@ -3504,13 +3508,29 @@ cdef class RecordBatch(_Tabular):
a: [1,2,3,4,null]
b: [10,20,30,40,null]
Convert a RecordBatch to row-major Tensor with null values
written as ``NaN``s
>>> batch.to_tensor(null_to_nan=True)
<pyarrow.Tensor>
type: double
shape: (5, 2)
strides: (8, 40)
strides: (16, 8)
>>> batch.to_tensor(null_to_nan=True).to_numpy()
array([[ 1., 10.],
[ 2., 20.],
[ 3., 30.],
[ 4., 40.],
[nan, nan]])
Convert a RecordBatch to column-major Tensor
>>> batch.to_tensor(null_to_nan=True, row_major=False)
<pyarrow.Tensor>
type: double
shape: (5, 2)
strides: (8, 40)
>>> batch.to_tensor(null_to_nan=True, row_major=False).to_numpy()
array([[ 1., 10.],
[ 2., 20.],
[ 3., 30.],
Expand All @@ -3526,7 +3546,7 @@ cdef class RecordBatch(_Tabular):
with nogil:
c_tensor = GetResultValue(
<CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor(null_to_nan,
pool))
row_major, pool))
return pyarrow_wrap_tensor(c_tensor)

def _export_to_c(self, out_ptr, out_schema_ptr=0):
Expand Down
12 changes: 5 additions & 7 deletions python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1139,9 +1139,8 @@ def test_recordbatch_to_tensor_null():
):
batch.to_tensor()

result = batch.to_tensor(null_to_nan=True)

x = np.array([arr1, arr2], np.float64).transpose()
result = batch.to_tensor(null_to_nan=True, row_major=False)
x = np.column_stack([arr1, arr2]).astype(np.float64, order="F")
expected = pa.Tensor.from_numpy(x)

np.testing.assert_equal(result.to_numpy(), x)
Expand All @@ -1158,7 +1157,7 @@ def test_recordbatch_to_tensor_null():
], ["a", "b"]
)

result = batch.to_tensor(null_to_nan=True)
result = batch.to_tensor(null_to_nan=True, row_major=False)

np.testing.assert_equal(result.to_numpy(), x)
assert result.size == 18
Expand All @@ -1174,9 +1173,8 @@ def test_recordbatch_to_tensor_null():
], ["a", "b"]
)

result = batch.to_tensor(null_to_nan=True)

x = np.array([arr1, arr2], np.float32).transpose()
result = batch.to_tensor(null_to_nan=True, row_major=False)
x = np.column_stack([arr1, arr2]).astype(np.float32, order="F")
expected = pa.Tensor.from_numpy(x)

np.testing.assert_equal(result.to_numpy(), x)
Expand Down

0 comments on commit c04f893

Please sign in to comment.