Skip to content
This repository was archived by the owner on Nov 1, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions csrc/velox/functions/functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ inline void registerTorchArrowFunctions() {
velox::registerFunction<udf_torcharrow_isnumeric, bool, velox::Varchar>(
{"isnumeric"});

// Natural logarithm
velox::registerFunction<udf_torcharrow_log, float, float>({"torcharrow_log"});
velox::registerFunction<udf_torcharrow_log, double, double>(
{"torcharrow_log"});
velox::registerFunction<udf_torcharrow_log, float, bool>({"torcharrow_log"});
// TODO: support type promotion in TorchArrow-Velox backend so registering less
// overloads.
velox::registerFunction<udf_torcharrow_log, float, int8_t>(
{"torcharrow_log"});
velox::registerFunction<udf_torcharrow_log, float, int16_t>(
Expand Down
3 changes: 3 additions & 0 deletions torcharrow/idataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ def isin(self, values: Union[list, dict, IColumn]):
"""
raise self._not_supported("isin")

def log(self) -> IDataFrame:
raise self._not_supported("log")

# aggregation

@trace
Expand Down
1 change: 1 addition & 0 deletions torcharrow/inumerical_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ def to(self, device: Device):
raise AssertionError("unexpected case")

def log(self):
"""Returns a new column with the natural logarithm of the elements"""
raise self._not_supported("log")
49 changes: 45 additions & 4 deletions torcharrow/test/transformation/test_numeric_ops.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
import math
import unittest

import numpy.testing
import torcharrow as ta
import torcharrow.dtypes as dt


# TODO: add/migrate more numeric tests
class _TestNumericOpsBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Prepare input data as CPU dataframe
cls.base_df = ta.DataFrame({"c": [0, 1, 3], "d": [5, 5, 6], "e": [1.0, 1, 7]})
cls.base_add_df = ta.DataFrame(
{"c": [0, 1, 3], "d": [5, 5, 6], "e": [1.0, 1, 7]}
)
cls.base_log_df = ta.DataFrame(
{
"int32": ta.Column([1, 0, 4, None], dtype=dt.Int32(nullable=True)),
"int64": ta.Column([1, 0, 4, None], dtype=dt.Float32(nullable=True)),
"float64": ta.Column(
[1.0, 0.0, 4.0, None], dtype=dt.Float64(nullable=True)
),
}
)

cls.setUpTestCaseData()

Expand All @@ -20,18 +35,44 @@ def setUpTestCaseData(cls):
raise unittest.SkipTest("abstract base test")

def test_add(self):
c = type(self).df["c"]
d = type(self).df["d"]
c = type(self).add_df["c"]
d = type(self).add_df["d"]

self.assertEqual(list(c + 1), [1, 2, 4])
self.assertEqual(list(1 + c), [1, 2, 4])
self.assertEqual(list(c + d), [5, 6, 9])

def test_log(self):
log_int32_col = type(self).log_df["int32"].log()
log_int64_col = type(self).log_df["int64"].log()
log_float64_col = type(self).log_df["float64"].log()
log_whole_df = type(self).log_df.log()

for col in [
log_int32_col,
log_int64_col,
log_whole_df["int32"],
log_whole_df["int64"],
]:
numpy.testing.assert_almost_equal(
list(col)[:-1], [0.0, -float("inf"), math.log(4)]
)
self.assertEqual(col.dtype, dt.Float32(nullable=True))
self.assertEqual(list(col)[-1], None)

for col in [log_float64_col, log_whole_df["float64"]]:
numpy.testing.assert_almost_equal(
list(col)[:-1], [0.0, -float("inf"), math.log(4)]
)
self.assertEqual(col.dtype, dt.Float64(nullable=True))
self.assertEqual(list(col)[-1], None)


class TestNumericOpsCpu(_TestNumericOpsBase):
@classmethod
def setUpTestCaseData(cls):
cls.df = cls.base_df.copy()
cls.add_df = cls.base_add_df.copy()
cls.log_df = cls.base_log_df.copy()


if __name__ == "__main__":
Expand Down
25 changes: 24 additions & 1 deletion torcharrow/velox_rt/dataframe_cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _finalize(self):

def _fromdata(
self, field_data: OrderedDict[str, IColumn], mask: Optional[Iterable[bool]]
):
) -> DataFrameCpu:
dtype = dt.Struct(
[dt.Field(n, c.dtype) for n, c in field_data.items()],
nullable=self.dtype.nullable,
Expand All @@ -150,6 +150,7 @@ def _fromdata(
for n, c in field_data.items():
col.set_child(col.type().get_child_idx(n), c._data)
col.set_length(len(c._data))

if mask is not None:
mask_list = list(mask)
assert len(field_data) == 0 or len(mask_list) == len(col)
Expand Down Expand Up @@ -230,6 +231,12 @@ def append(self, values: Iterable[Union[None, dict, tuple]]):
return self.append(
[{f.name: v for f, v in zip(self.dtype.fields, value)}]
).append(it)

else:
raise TypeError(
f"Unexpected value type to append to DataFrame: {type(value).__name__}, the value being appended is: {value}"
)

except StopIteration:
return self

Expand Down Expand Up @@ -1209,6 +1216,22 @@ def __pos__(self):
self._mask,
)

def log(self) -> DataFrameCpu:
return self._fromdata(
{
self.dtype.fields[i]
.name: ColumnFromVelox._from_velox(
self.device,
self.dtype.fields[i].dtype,
self._data.child_at(i),
True,
)
.log()
for i in range(self._data.children_size())
},
mask=(None if self.null_count == 0 else self._mask),
)

# isin ---------------------------------------------------------------

@trace
Expand Down