Skip to content

[Data]Improve table aggregate function#61418

Merged
alexeykudinkin merged 3 commits intoray-project:masterfrom
laysfire:improve-table-agg
Mar 12, 2026
Merged

[Data]Improve table aggregate function#61418
alexeykudinkin merged 3 commits intoray-project:masterfrom
laysfire:improve-table-agg

Conversation

@laysfire
Copy link
Contributor

@laysfire laysfire commented Mar 2, 2026

Description

The current implementation of iter_groups in aggregate determine the boundary of each key by iterating row by row when need to split all rows into multiple groups. That is expensive operation. #58910 provide the same functionality function, named _iter_groups_sorted. This function is more efficient than original.

To verify, i mock the following table.

length content
50 12343432323232323232324343434234243434343423433333
60 123434323232323232323243434342342434343434234333331111111111
... ...

The 'content' column is randomly generated.
The 'length' column is length of 'content' column.
And use the following script to test aggregate function performance.

import ray
import pyarrow.parquet as pq
import pyarrow.compute as pac
from ray.data._internal.arrow_ops.transform_pyarrow import take_table
from ray.data._internal.execution.operators.hash_aggregate import ReducingAggregation
from typing import Optional
from ray.data.aggregate import AggregateFnV2
from ray.data.block import AggType, Block, BlockAccessor

import time

class SubstrAndSum(AggregateFnV2):
    def __init__(
        self,
        on: Optional[str] = None,
        ignore_nulls: bool = True,
        alias_name: Optional[str] = None,
    ):
        super().__init__(
            alias_name if alias_name else f"SubstrAndSum",
            on=on,
            ignore_nulls=ignore_nulls,
            zero_factory=lambda: 0,
        )

    def aggregate_block(self, block: Block) -> AggType:
        sum = 0
        for v in block['value']:
            sum += int(v.as_py()[0:2])
        return sum

    def combine(self, current_accumulator: AggType, new: AggType) -> AggType:
        return current_accumulator + new

aggregation = SubstrAndSum()
table = pq.read_table('~/part-01018-8710c510-d42e-4324-ac8b-37184c1541e4-c000.zstd.parquet')
sort_key = ReducingAggregation._get_sort_key(["length"])
indices = pac.sort_indices(table, sort_keys=sort_key.to_arrow_sort_args())
sort_table = take_table(table, indices)

start = time.time()
aggregate_table = BlockAccessor.for_block(sort_table)._aggregate(sort_key, [aggregation])
end = time.time()
print("time: ", (end-start))

The test result is:

Data Size CPU spec Code Version Time consumed
6276904 record Apple M4 original 20s
6276904 record Apple M4 optimized 5s
6276904 record Intel Xeon(R) Gold 6330 CPU @ 2.00GHz original 150s
6276904 record Intel Xeon(R) Gold 6330 CPU @ 2.00GHz optimized 25s

Related issues

Additional information

@laysfire laysfire requested a review from a team as a code owner March 2, 2026 08:30
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the group iteration logic in TableBlockAccessor._aggregate by replacing the manual row-by-row implementation with a call to the more efficient _iter_groups_sorted method. This change improves performance by leveraging a vectorized approach and simplifies the code. I have one minor suggestion to make the generator delegation more idiomatic.

@laysfire laysfire changed the title improve table aggregate function [Data]Improve table aggregate function Mar 2, 2026
Signed-off-by: yifan.xie <xyfabcd@163.com>
@laysfire laysfire force-pushed the improve-table-agg branch from 68f8b2c to 6a57c3c Compare March 2, 2026 08:57
Signed-off-by: yifan.xie <xyfabcd@163.com>
@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Mar 2, 2026
@laysfire
Copy link
Contributor Author

laysfire commented Mar 5, 2026

@bveeramani Could you help review this pr? thanks a lot.

@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Mar 12, 2026
@alexeykudinkin alexeykudinkin self-assigned this Mar 12, 2026
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) March 12, 2026 18:13
@alexeykudinkin alexeykudinkin merged commit befa5b8 into ray-project:master Mar 12, 2026
8 checks passed
@laysfire
Copy link
Contributor Author

@alexeykudinkin Thanks a lot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants