diff --git a/csrc/velox/functions/functions.h b/csrc/velox/functions/functions.h index f5780fc68..474841e4f 100644 --- a/csrc/velox/functions/functions.h +++ b/csrc/velox/functions/functions.h @@ -22,10 +22,13 @@ inline void registerTorchArrowFunctions() { velox::registerFunction( {"isnumeric"}); + // Natural logarithm velox::registerFunction({"torcharrow_log"}); velox::registerFunction( {"torcharrow_log"}); velox::registerFunction({"torcharrow_log"}); + // TODO: support type promotion in TorchArrow-Velox backend so registering less + // overloads. velox::registerFunction( {"torcharrow_log"}); velox::registerFunction( diff --git a/torcharrow/idataframe.py b/torcharrow/idataframe.py index 0033c6363..f069b1d42 100644 --- a/torcharrow/idataframe.py +++ b/torcharrow/idataframe.py @@ -239,6 +239,9 @@ def isin(self, values: Union[list, dict, IColumn]): """ raise self._not_supported("isin") + def log(self) -> IDataFrame: + raise self._not_supported("log") + # aggregation @trace diff --git a/torcharrow/inumerical_column.py b/torcharrow/inumerical_column.py index 0187f9141..ec4201c9b 100644 --- a/torcharrow/inumerical_column.py +++ b/torcharrow/inumerical_column.py @@ -29,4 +29,5 @@ def to(self, device: Device): raise AssertionError("unexpected case") def log(self): + """Returns a new column with the natural logarithm of the elements""" raise self._not_supported("log") diff --git a/torcharrow/test/transformation/test_numeric_ops.py b/torcharrow/test/transformation/test_numeric_ops.py index 5a893cc50..f7bb64faa 100644 --- a/torcharrow/test/transformation/test_numeric_ops.py +++ b/torcharrow/test/transformation/test_numeric_ops.py @@ -1,14 +1,29 @@ # Copyright (c) Meta Platforms, Inc. and affiliates. +import math import unittest +import numpy.testing import torcharrow as ta +import torcharrow.dtypes as dt + # TODO: add/migrate more numeric tests class _TestNumericOpsBase(unittest.TestCase): @classmethod def setUpClass(cls): # Prepare input data as CPU dataframe - cls.base_df = ta.DataFrame({"c": [0, 1, 3], "d": [5, 5, 6], "e": [1.0, 1, 7]}) + cls.base_add_df = ta.DataFrame( + {"c": [0, 1, 3], "d": [5, 5, 6], "e": [1.0, 1, 7]} + ) + cls.base_log_df = ta.DataFrame( + { + "int32": ta.Column([1, 0, 4, None], dtype=dt.Int32(nullable=True)), + "int64": ta.Column([1, 0, 4, None], dtype=dt.Float32(nullable=True)), + "float64": ta.Column( + [1.0, 0.0, 4.0, None], dtype=dt.Float64(nullable=True) + ), + } + ) cls.setUpTestCaseData() @@ -20,18 +35,44 @@ def setUpTestCaseData(cls): raise unittest.SkipTest("abstract base test") def test_add(self): - c = type(self).df["c"] - d = type(self).df["d"] + c = type(self).add_df["c"] + d = type(self).add_df["d"] self.assertEqual(list(c + 1), [1, 2, 4]) self.assertEqual(list(1 + c), [1, 2, 4]) self.assertEqual(list(c + d), [5, 6, 9]) + def test_log(self): + log_int32_col = type(self).log_df["int32"].log() + log_int64_col = type(self).log_df["int64"].log() + log_float64_col = type(self).log_df["float64"].log() + log_whole_df = type(self).log_df.log() + + for col in [ + log_int32_col, + log_int64_col, + log_whole_df["int32"], + log_whole_df["int64"], + ]: + numpy.testing.assert_almost_equal( + list(col)[:-1], [0.0, -float("inf"), math.log(4)] + ) + self.assertEqual(col.dtype, dt.Float32(nullable=True)) + self.assertEqual(list(col)[-1], None) + + for col in [log_float64_col, log_whole_df["float64"]]: + numpy.testing.assert_almost_equal( + list(col)[:-1], [0.0, -float("inf"), math.log(4)] + ) + self.assertEqual(col.dtype, dt.Float64(nullable=True)) + self.assertEqual(list(col)[-1], None) + class TestNumericOpsCpu(_TestNumericOpsBase): @classmethod def setUpTestCaseData(cls): - cls.df = cls.base_df.copy() + cls.add_df = cls.base_add_df.copy() + cls.log_df = cls.base_log_df.copy() if __name__ == "__main__": diff --git a/torcharrow/velox_rt/dataframe_cpu.py b/torcharrow/velox_rt/dataframe_cpu.py index 2df57b063..b451cf5c7 100644 --- a/torcharrow/velox_rt/dataframe_cpu.py +++ b/torcharrow/velox_rt/dataframe_cpu.py @@ -141,7 +141,7 @@ def _finalize(self): def _fromdata( self, field_data: OrderedDict[str, IColumn], mask: Optional[Iterable[bool]] - ): + ) -> DataFrameCpu: dtype = dt.Struct( [dt.Field(n, c.dtype) for n, c in field_data.items()], nullable=self.dtype.nullable, @@ -150,6 +150,7 @@ def _fromdata( for n, c in field_data.items(): col.set_child(col.type().get_child_idx(n), c._data) col.set_length(len(c._data)) + if mask is not None: mask_list = list(mask) assert len(field_data) == 0 or len(mask_list) == len(col) @@ -230,6 +231,12 @@ def append(self, values: Iterable[Union[None, dict, tuple]]): return self.append( [{f.name: v for f, v in zip(self.dtype.fields, value)}] ).append(it) + + else: + raise TypeError( + f"Unexpected value type to append to DataFrame: {type(value).__name__}, the value being appended is: {value}" + ) + except StopIteration: return self @@ -1209,6 +1216,22 @@ def __pos__(self): self._mask, ) + def log(self) -> DataFrameCpu: + return self._fromdata( + { + self.dtype.fields[i] + .name: ColumnFromVelox._from_velox( + self.device, + self.dtype.fields[i].dtype, + self._data.child_at(i), + True, + ) + .log() + for i in range(self._data.children_size()) + }, + mask=(None if self.null_count == 0 else self._mask), + ) + # isin --------------------------------------------------------------- @trace