Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed May 3, 2024
1 parent 5dd4289 commit dbd56b3
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 24 deletions.
64 changes: 50 additions & 14 deletions datafusion/core/src/physical_optimizer/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4868,6 +4868,7 @@ fn expr_mapping(schema_mapping: IndexMap<Column, Column>) -> ExprMapping {
#[cfg(test)]
mod tests {
use super::*;
use std::any::Any;
use std::sync::Arc;

use crate::datasource::file_format::file_compression_type::FileCompressionType;
Expand All @@ -4886,9 +4887,14 @@ mod tests {

use arrow_schema::{DataType, Field, Schema, SortOptions};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{JoinSide, JoinType, Result, ScalarValue, Statistics};
use datafusion_common::{
plan_err, JoinSide, JoinType, Result, ScalarValue, Statistics,
};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{Operator, ScalarFunctionDefinition, WindowFrame};
use datafusion_expr::{
ColumnarValue, Operator, ScalarFunctionDefinition, ScalarUDF, ScalarUDFImpl,
Signature, Volatility, WindowFrame,
};
use datafusion_physical_expr::expressions::{
rank, BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr, RowNumber,
Sum,
Expand All @@ -4903,6 +4909,44 @@ mod tests {
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::InputOrderMode;

#[derive(Debug)]
struct AddOne {
signature: Signature,
}

impl AddOne {
fn new() -> Self {
Self {
signature: Signature::uniform(
1,
vec![DataType::Int32],
Volatility::Immutable,
),
}
}
}

impl ScalarUDFImpl for AddOne {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"add_one"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, args: &[DataType]) -> Result<DataType> {
if !matches!(args.get(0), Some(&DataType::Int32)) {
return plan_err!("add_one only accepts Int32 arguments");
}
Ok(DataType::Int32)
}
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
unimplemented!()
}
}

fn create_simple_csv_exec() -> Arc<dyn ExecutionPlan> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Expand Down Expand Up @@ -4975,9 +5019,7 @@ mod tests {
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
ScalarFunctionDefinition::Name(
"dummy".to_owned().into_boxed_str().into(),
),
ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::from(AddOne::new()))),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Expand Down Expand Up @@ -5043,9 +5085,7 @@ mod tests {
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 5)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
ScalarFunctionDefinition::Name(
"dummy".to_owned().into_boxed_str().into(),
),
ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::from(AddOne::new()))),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Expand Down Expand Up @@ -5114,9 +5154,7 @@ mod tests {
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
ScalarFunctionDefinition::Name(
"dummy".to_owned().into_boxed_str().into(),
),
ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::from(AddOne::new()))),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Expand Down Expand Up @@ -5182,9 +5220,7 @@ mod tests {
Arc::new(NegativeExpr::new(Arc::new(Column::new("f_new", 5)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
ScalarFunctionDefinition::Name(
"dummy".to_owned().into_boxed_str().into(),
),
ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::from(AddOne::new()))),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b_new", 1)),
Expand Down
Loading

0 comments on commit dbd56b3

Please sign in to comment.