Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 committed Mar 1, 2024
1 parent 80eae25 commit 8b6ff2d
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 17 deletions.
2 changes: 0 additions & 2 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,6 @@ pub fn create_udaf(

/// Creates a new UDAF with a specific signature, state type and return type.
/// The signature and state type must match the `Accumulator's implementation`.
#[allow(clippy::too_many_arguments)]
pub fn create_udaf_with_ordering(
name: &str,
input_type: Vec<DataType>,
Expand Down Expand Up @@ -1202,7 +1201,6 @@ impl Debug for SimpleOrderedAggregateUDF {
impl SimpleOrderedAggregateUDF {
/// Create a new `AggregateUDFImpl` from a name, input types, return type, state type and
/// implementation. Implementing [`AggregateUDFImpl`] allows more flexibility
#[allow(clippy::too_many_arguments)]
pub fn new(
name: impl Into<String>,
input_type: Vec<DataType>,
Expand Down
19 changes: 10 additions & 9 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ impl AggregateUDF {
sort_exprs: &[Expr],
schema: &Schema,
) -> Result<Box<dyn Accumulator>> {
// let sort_exprs = self.inner.sort_exprs();
// let schema = self.inner.schema();
self.inner.accumulator(return_type, sort_exprs, schema)
}

Expand All @@ -180,10 +178,6 @@ impl AggregateUDF {
pub fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
self.inner.create_groups_accumulator()
}

pub fn sort_exprs() -> Vec<Expr> {
vec![]
}
}

impl<F> From<F> for AggregateUDF
Expand Down Expand Up @@ -238,7 +232,7 @@ where
/// Ok(DataType::Float64)
/// }
/// // This is the accumulator factory; DataFusion uses it to create new accumulators.
/// fn accumulator(&self, _arg: &DataType, _sort_exprs: Vec<Expr>, _schema: Option<&Schema>) -> Result<Box<dyn Accumulator>> { unimplemented!() }
/// fn accumulator(&self, _arg: &DataType, _sort_exprs: &[Expr], _schema: &Schema) -> Result<Box<dyn Accumulator>> { unimplemented!() }
/// fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> {
/// Ok(vec![DataType::Float64, DataType::UInt32])
/// }
Expand Down Expand Up @@ -266,8 +260,15 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;

/// Return a new [`Accumulator`] that aggregates values for a specific
/// group during query execution. sort_exprs is a list of ordering expressions,
/// and schema is used while ordering.
/// group during query execution.
///
/// `arg`: the type of the argument to this accumulator
///
/// `sort_exprs`: contains a list of `Expr::SortExpr`s if the
/// aggregate is called with an explicit `ORDER BY`. For example,
/// `ARRAY_AGG(x ORDER BY y ASC)`. In this case, `sort_exprs` would contain `[y ASC]`
///
/// `schema` is the input schema to the udaf
fn accumulator(
&self,
arg: &DataType,
Expand Down
8 changes: 3 additions & 5 deletions datafusion/physical-plan/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ impl AggregateExpr for AggregateFunctionExpr {
}

fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
let accumulator = self.fun.accumulator(
&self.data_type,
self.sort_exprs.as_slice(),
&self.schema,
)?;
let accumulator =
self.fun
.accumulator(&self.data_type, &self.sort_exprs, &self.schema)?;

// Accumulators that have window frame startings different
// than `UNBOUNDED PRECEDING`, such as `1 PRECEEDING`, need to
Expand Down
4 changes: 3 additions & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,10 @@ impl AsExecutionPlan for PhysicalPlanNode {
}
AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
let agg_udf = registry.udaf(udaf_name)?;
// TODO: `order by` is not supported for UDAF yet
let sort_exprs = &[];
let ordering_req = &[];
udaf::create_aggregate_expr(agg_udf.as_ref(), &input_phy_expr, &[], ordering_req, &physical_schema, name)
udaf::create_aggregate_expr(agg_udf.as_ref(), &input_phy_expr, sort_exprs, ordering_req, &physical_schema, name)
}
}
}).transpose()?.ok_or_else(|| {
Expand Down

0 comments on commit 8b6ff2d

Please sign in to comment.