New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add binder and executor for aggregation #69
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work!
b6f2fc1
to
36e4cfc
Compare
It seems that Some tips:
|
c48001d
to
12bb5bc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work! From my perspective, this PR is too large for a single review. I think it can be split at least into two parts:
- add binder and executor for simple aggregation, and sum state
- add group by support and
HashAgg
- (if you want to split further, ) add max/min and rowcount state
As this PR generally looks good to me and needs few modification before merging, I think it's okay to submit one big patch this time. For the next time, I think a PR of ~300 LoCs is of a reasonable size. Just add new functionalities little by little.
src/executor/aggregation/min_max.rs
Outdated
match (array, &self.input_datatype, self.is_min) { | ||
(ArrayImpl::Int32(arr), DataTypeKind::Int, true) => { | ||
let mut temp: Option<i32> = None; | ||
temp = arr.iter().fold(temp, min_i32); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not set the fold initial value to self.result
? And we don't need the following match to do extra works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.result
is DataValue
. So, to get the result, we have
self.result = array.iter().fold(self.result.clone(), min_i32);
But we can not call iter()
on ArrayImpl
. And we have to refactor the min_i32
to calculate on ArrayImpl
instead of i32
.
src/executor/aggregation/min_max.rs
Outdated
array.filter(visibility.iter().copied().collect::<Vec<_>>().into_iter()) | ||
} | ||
}; | ||
match (array, &self.input_datatype, self.is_min) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically, is_min
should be some kind of const generic parameter, so that we can reduce runtime overhead. We may save the optimization later.
src/executor/hash_agg.rs
Outdated
for col in group_cols.iter() { | ||
group_key.push(col.get(row_idx)); | ||
} | ||
let vis_map = key_to_vis_maps.entry(group_key.clone()).or_insert_with(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So even if we do not need to insert a new group_key
into the key_to_vis_maps
, we still need to clone
the group_key
. I don't think this is necesssary.
Meanwhile, I suggest using Arc<str>
instead of String
everywhere when we need to handle user inputs, so as to reduce clone
overhead. I'll draft a RFC about this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest Cow<str>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in general. 👍
impl PartialEq for DataValue { | ||
fn eq(&self, other: &Self) -> bool { | ||
match (self, other) { | ||
(Self::Null, Self::Null) => true, | ||
(Self::Bool(left), Self::Bool(right)) => left == right, | ||
(Self::Int32(left), Self::Int32(right)) => left == right, | ||
(Self::Int64(left), Self::Int64(right)) => left == right, | ||
(Self::String(left), Self::String(right)) => left == right, | ||
(Self::Float64(left), Self::Float64(right)) => left == right, | ||
_ => false, | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation seems to be equivalent to #[derive(PartialEq)]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PartialEq
is added because of clippy's warning. Firstly, we need a custom Hash
function for DataValue
, which doesn't take the concrete type into account, and only feed the actual value into the hasher. On that, we need to also implement PartialEq by ourselves, instead of derive(PartialEq)
. However, I don't think this is a good thing to do in our system. Maybe we can take another look and change the implementation.
src/executor/hash_agg.rs
Outdated
for col in group_cols.iter() { | ||
group_key.push(col.get(row_idx)); | ||
} | ||
let vis_map = key_to_vis_maps.entry(group_key.clone()).or_insert_with(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest Cow<str>
(ArrayImpl::Int32(arr), DataTypeKind::Int) => { | ||
let temp = arr | ||
.iter() | ||
.fold(None, if self.is_min { min_i32 } else { max_i32 }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving such branch if self.is_min
into a single iteration of loop might be very inefficient. We can make it into a const generics parameter later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, just find out that the if
is used to decide which function to use, instead of inner function. This looks reasonable to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM. I don't want this PR to be held for too many days, so we can get it merged for now, and resolve the minor issues in following PRs.
SelectItem::Wildcard => { | ||
// TODO: support wildcard in aggregation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is not implemented, just panic.
(ArrayImpl::Int32(arr), DataTypeKind::Int) => { | ||
let temp = arr | ||
.iter() | ||
.fold(None, if self.is_min { min_i32 } else { max_i32 }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, just find out that the if
is used to decide which function to use, instead of inner function. This looks reasonable to me.
|
||
/// `AggregationState` records the state of an aggregation | ||
pub trait AggregationState: 'static + Send + Sync { | ||
fn update(&mut self, array: &ArrayImpl) -> Result<(), ExecutorError>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also document the trait functions.
pub trait AggregationState: 'static + Send + Sync { | ||
fn update(&mut self, array: &ArrayImpl) -> Result<(), ExecutorError>; | ||
|
||
fn update_single(&mut self, value: &DataValue) -> Result<(), ExecutorError>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to implement update_single
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, it is used in HashAgg
. I still prefer feeding a batch into the aggregator, and use a visibility
bitmap or Iterator<Item = bool>
to indicate valid entries. Let's do this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#69 (comment)
As mentioned earlier, constructing a visibility
bitmap for every unique group key incurs high time and space complexity. Instead, if we use the row-by-row update in the current implementation, we can avoid the cost from bitmap construction.
// Update states | ||
let num_rows = chunk.cardinality(); | ||
for row_idx in 0..num_rows { | ||
let mut group_key = HashKey::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Group key doesn't need to be constructed before checking its existence in state_entries. We can optimize this later.
builder.push(result); | ||
builder.finish() | ||
} | ||
None => ArrayBuilderImpl::new(&DataType::new(DataTypeKind::Int, true)).finish(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will the data_type
be null
in SimpleAgg
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When executing aggregation on an empty table, the result is None
. Then the executor should return an empty array in the DataChunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the type of the array should be determined in advance. e.g. SELECT sum(x) + 1.0 FROM table
when x
is f64
. In this case, the SimpleAgg
will return a I32Array
, and the + 1.0
part might fail with mismatched expression type.
Ok(()) | ||
} | ||
|
||
fn finish_agg(states: SmallVec<[Box<dyn AggregationState>; 16]>) -> DataChunk { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Box<dyn AggregationState>
can be defined as a separate type. pub type BoxedAggregationState = Box<dyn AggregationState>;
38c212d
to
ef1dcb6
Compare
TODO in aggregation