[Data] Added First aggregation#62765
Conversation
Signed-off-by: yifan.xie <xyfabcd@163.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a First aggregation function to Ray Data, implementing the necessary first methods in Arrow and Pandas block accessors along with the First aggregation class and end-to-end tests. The reviewer identified several issues, including compatibility concerns with older PyArrow versions, potential exceptions in the Pandas implementation for empty columns, a logic error where empty blocks could incorrectly nullify aggregation results, and a lack of documentation for the new API.
| def first(self, *, ignore_nulls: bool, as_py: bool = True) -> Any: | ||
| import pyarrow.compute as pac | ||
| # If there are only null-values, the column's type is `NullType`. | ||
| # Pyarrow call will throw a 'no kernel matching input types (null)' Exception | ||
| try: | ||
| res = pac.first(self._column, skip_nulls=ignore_nulls) | ||
| except pyarrow.ArrowNotImplementedError: | ||
| return None | ||
| return res.as_py() if as_py else res |
There was a problem hiding this comment.
The pyarrow.compute.first kernel was introduced in PyArrow 13.0.0. Ray typically maintains compatibility with older versions of PyArrow, and using this kernel without a check will cause an AttributeError on older installations. Additionally, pac.first on an empty array returns a null scalar, which can lead to 'poisoning' the aggregation in AggregateFnV2 when ignore_nulls=False (where any null in the stream makes the result null). A fallback implementation using indexing is safer and more compatible.
| def first(self, *, ignore_nulls: bool, as_py: bool = True) -> Any: | |
| import pyarrow.compute as pac | |
| # If there are only null-values, the column's type is `NullType`. | |
| # Pyarrow call will throw a 'no kernel matching input types (null)' Exception | |
| try: | |
| res = pac.first(self._column, skip_nulls=ignore_nulls) | |
| except pyarrow.ArrowNotImplementedError: | |
| return None | |
| return res.as_py() if as_py else res | |
| def first(self, *, ignore_nulls: bool, as_py: bool = True) -> Any: | |
| import pyarrow.compute as pac | |
| if len(self._column) == 0: | |
| return None | |
| if hasattr(pac, "first"): | |
| # If there are only null-values, the column's type is `NullType`. | |
| # Pyarrow call will throw a 'no kernel matching input types (null)' Exception | |
| try: | |
| res = pac.first(self._column, skip_nulls=ignore_nulls) | |
| except pyarrow.ArrowNotImplementedError: | |
| return None | |
| else: | |
| # Fallback for older pyarrow versions | |
| col = self.dropna() if ignore_nulls else self._column | |
| if len(col) == 0: | |
| return None | |
| res = col[0] | |
| return res.as_py() if as_py else res |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 2330e92. Configure here.
| # If there are only null-values, the column's type is `NullType`. | ||
| # Pyarrow call will throw a 'no kernel matching input types (null)' Exception | ||
| try: | ||
| res = pac.first(self._column, skip_nulls=ignore_nulls) |
There was a problem hiding this comment.
Arrow and Pandas first disagree when ignore_nulls=False
Medium Severity
The Arrow implementation of first with ignore_nulls=False uses pac.first(column, skip_nulls=False), which returns null if any null exists anywhere in the array. The Pandas implementation uses iloc[0], which returns the actual first element. For a column like [5, None, 7], Arrow returns None while Pandas returns 5. The test doesn't catch this because every group with nulls happens to have null as the first element.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 2330e92. Configure here.


Description
Added First aggregation allowing to returns the first value in a group.
Related issues
Additional information