From c963d2134020b2a92ed398295448becae3cb6ce5 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 29 Apr 2024 20:54:04 +0800 Subject: [PATCH 01/10] feat: add static_name() to ExecutionPlan (#10266) * feat: add static_name() to ExecutionPlan Signed-off-by: Ruihang Xia * add test to invoke from type Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- datafusion/physical-plan/src/lib.rs | 21 +++++++++++++++++-- .../src/repartition/distributor_channels.rs | 2 +- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index e1c8489655bf..cd2be33e86c1 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -117,7 +117,19 @@ pub mod udaf { /// [`required_input_ordering`]: ExecutionPlan::required_input_ordering pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Short name for the ExecutionPlan, such as 'ParquetExec'. - fn name(&self) -> &'static str { + fn name(&self) -> &'static str + where + Self: Sized, + { + Self::static_name() + } + + /// Short name for the ExecutionPlan, such as 'ParquetExec'. + /// Like [`name`](ExecutionPlan::name) but can be called without an instance. + fn static_name() -> &'static str + where + Self: Sized, + { let full_name = std::any::type_name::(); let maybe_start_idx = full_name.rfind(':'); match maybe_start_idx { @@ -125,6 +137,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { None => "UNKNOWN", } } + /// Returns the execution plan as [`Any`] so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; @@ -873,7 +886,10 @@ mod tests { } impl ExecutionPlan for RenamedEmptyExec { - fn name(&self) -> &'static str { + fn static_name() -> &'static str + where + Self: Sized, + { "MyRenamedEmptyExec" } @@ -918,6 +934,7 @@ mod tests { let schema2 = Arc::new(Schema::empty()); let renamed_exec = RenamedEmptyExec::new(schema2); assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec"); + assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec"); } } diff --git a/datafusion/physical-plan/src/repartition/distributor_channels.rs b/datafusion/physical-plan/src/repartition/distributor_channels.rs index bad923ce9e82..675d26bbfb9f 100644 --- a/datafusion/physical-plan/src/repartition/distributor_channels.rs +++ b/datafusion/physical-plan/src/repartition/distributor_channels.rs @@ -474,7 +474,7 @@ type SharedGate = Arc; #[cfg(test)] mod tests { - use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::atomic::AtomicBool; use futures::{task::ArcWake, FutureExt}; From 5104d0e0fa4516fb99d000337502138561f6374e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Mon, 29 Apr 2024 16:14:47 +0100 Subject: [PATCH 02/10] Zero-copy conversion from SchemaRef to DfSchema (#10298) --- datafusion/common/src/dfschema.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 64e40ea99e67..b2a3de72356c 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -810,9 +810,16 @@ impl From<&DFSchema> for Schema { impl TryFrom for DFSchema { type Error = DataFusionError; fn try_from(schema: Schema) -> Result { + Self::try_from(Arc::new(schema)) + } +} + +impl TryFrom for DFSchema { + type Error = DataFusionError; + fn try_from(schema: SchemaRef) -> Result { let field_count = schema.fields.len(); let dfschema = Self { - inner: schema.into(), + inner: schema, field_qualifiers: vec![None; field_count], functional_dependencies: FunctionalDependencies::empty(), }; @@ -856,12 +863,7 @@ impl ToDFSchema for Schema { impl ToDFSchema for SchemaRef { fn to_dfschema(self) -> Result { - // Attempt to use the Schema directly if there are no other - // references, otherwise clone - match Self::try_unwrap(self) { - Ok(schema) => DFSchema::try_from(schema), - Err(schemaref) => DFSchema::try_from(schemaref.as_ref().clone()), - } + DFSchema::try_from(self) } } From 1c0ecf6047aa439d4bfeeb56f8c0e60e612f5a87 Mon Sep 17 00:00:00 2001 From: Alex Huang Date: Mon, 29 Apr 2024 23:51:24 +0800 Subject: [PATCH 03/10] chore: Update Error for Unnest Rewritter (#10263) * chore: Update Error for Unnest Rewritter * chore: Update unnest.slt test --- datafusion/optimizer/src/analyzer/type_coercion.rs | 6 +++--- datafusion/sqllogictest/test_files/unnest.slt | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index f96a359f9d47..240afbb5543d 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -24,8 +24,8 @@ use arrow::datatypes::{DataType, IntervalUnit}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; use datafusion_common::{ - exec_err, internal_err, plan_datafusion_err, plan_err, DFSchema, DFSchemaRef, - DataFusionError, Result, ScalarValue, + exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema, + DFSchemaRef, DataFusionError, Result, ScalarValue, }; use datafusion_expr::expr::{ self, AggregateFunctionDefinition, Between, BinaryExpr, Case, Exists, InList, @@ -125,7 +125,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter { fn f_up(&mut self, expr: Expr) -> Result> { match expr { - Expr::Unnest(_) => internal_err!( + Expr::Unnest(_) => not_impl_err!( "Unnest should be rewritten to LogicalPlan::Unnest before type coercion" ), Expr::ScalarSubquery(Subquery { diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 28f3369ca0a2..ca7e73cb87e0 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -383,5 +383,8 @@ select unnest(array_remove(column1, 3)) - 1 as c1, column3 from unnest_table; 5 3 11 NULL +query error DataFusion error: type_coercion\ncaused by\nThis feature is not implemented: Unnest should be rewritten to LogicalPlan::Unnest before type coercion +select sum(unnest(generate_series(1,10))); + statement ok drop table unnest_table; From 0ea9bc659cbcdd1d70cf8ea8c284634d65171c33 Mon Sep 17 00:00:00 2001 From: Jonah Gao Date: Tue, 30 Apr 2024 02:51:32 +0800 Subject: [PATCH 04/10] feat(CLI): print column headers for empty query results (#10300) * feat(CLI): print column headers for empty query results * Narrow now()'s scope * retry ci --- datafusion-cli/src/command.rs | 18 ++-- datafusion-cli/src/exec.rs | 5 +- datafusion-cli/src/print_format.rs | 130 ++++++++++++++++++++++------ datafusion-cli/src/print_options.rs | 7 +- 4 files changed, 122 insertions(+), 38 deletions(-) diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index d3d7b65f0a50..be6393351aed 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -17,7 +17,7 @@ //! Command within CLI -use crate::exec::exec_from_lines; +use crate::exec::{exec_and_print, exec_from_lines}; use crate::functions::{display_all_functions, Function}; use crate::print_format::PrintFormat; use crate::print_options::PrintOptions; @@ -58,18 +58,18 @@ impl Command { ctx: &mut SessionContext, print_options: &mut PrintOptions, ) -> Result<()> { - let now = Instant::now(); match self { - Self::Help => print_options.print_batches(&[all_commands_info()], now), + Self::Help => { + let now = Instant::now(); + let command_batch = all_commands_info(); + print_options.print_batches(command_batch.schema(), &[command_batch], now) + } Self::ListTables => { - let df = ctx.sql("SHOW TABLES").await?; - let batches = df.collect().await?; - print_options.print_batches(&batches, now) + exec_and_print(ctx, print_options, "SHOW TABLES".into()).await } Self::DescribeTableStmt(name) => { - let df = ctx.sql(&format!("SHOW COLUMNS FROM {}", name)).await?; - let batches = df.collect().await?; - print_options.print_batches(&batches, now) + exec_and_print(ctx, print_options, format!("SHOW COLUMNS FROM {}", name)) + .await } Self::Include(filename) => { if let Some(filename) = filename { diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 5fbcea0c0683..19bff0528b77 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -203,7 +203,7 @@ pub async fn exec_from_repl( rl.save_history(".history") } -async fn exec_and_print( +pub(super) async fn exec_and_print( ctx: &mut SessionContext, print_options: &PrintOptions, sql: String, @@ -235,8 +235,9 @@ async fn exec_and_print( let stream = execute_stream(physical_plan, task_ctx.clone())?; print_options.print_stream(stream, now).await?; } else { + let schema = physical_plan.schema(); let results = collect(physical_plan, task_ctx.clone()).await?; - adjusted.into_inner().print_batches(&results, now)?; + adjusted.into_inner().print_batches(schema, &results, now)?; } } diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 2de52be612bb..c95bde7fc6c7 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -22,6 +22,7 @@ use std::str::FromStr; use crate::print_options::MaxRows; use arrow::csv::writer::WriterBuilder; +use arrow::datatypes::SchemaRef; use arrow::json::{ArrayWriter, LineDelimitedWriter}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches_with_options; @@ -157,6 +158,7 @@ impl PrintFormat { pub fn print_batches( &self, writer: &mut W, + schema: SchemaRef, batches: &[RecordBatch], maxrows: MaxRows, with_header: bool, @@ -168,7 +170,7 @@ impl PrintFormat { .cloned() .collect(); if batches.is_empty() { - return Ok(()); + return self.print_empty(writer, schema); } match self { @@ -186,6 +188,27 @@ impl PrintFormat { Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches), } } + + /// Print when the result batches contain no rows + fn print_empty( + &self, + writer: &mut W, + schema: SchemaRef, + ) -> Result<()> { + match self { + // Print column headers for Table format + Self::Table if !schema.fields().is_empty() => { + let empty_batch = RecordBatch::new_empty(schema); + let formatted = pretty_format_batches_with_options( + &[empty_batch], + &DEFAULT_FORMAT_OPTIONS, + )?; + writeln!(writer, "{}", formatted)?; + } + _ => {} + } + Ok(()) + } } #[cfg(test)] @@ -193,7 +216,7 @@ mod tests { use super::*; use std::sync::Arc; - use arrow::array::{ArrayRef, Int32Array}; + use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field, Schema}; #[test] @@ -201,7 +224,6 @@ mod tests { for format in [ PrintFormat::Csv, PrintFormat::Tsv, - PrintFormat::Table, PrintFormat::Json, PrintFormat::NdJson, PrintFormat::Automatic, @@ -209,10 +231,26 @@ mod tests { // no output for empty batches, even with header set PrintBatchesTest::new() .with_format(format) + .with_schema(three_column_schema()) .with_batches(vec![]) .with_expected(&[""]) .run(); } + + // output column headers for empty batches when format is Table + #[rustfmt::skip] + let expected = &[ + "+---+---+---+", + "| a | b | c |", + "+---+---+---+", + "+---+---+---+", + ]; + PrintBatchesTest::new() + .with_format(PrintFormat::Table) + .with_schema(three_column_schema()) + .with_batches(vec![]) + .with_expected(expected) + .run(); } #[test] @@ -385,6 +423,7 @@ mod tests { for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] { PrintBatchesTest::new() .with_format(PrintFormat::Table) + .with_schema(one_column_schema()) .with_batches(vec![one_column_batch()]) .with_maxrows(max_rows) .with_expected(expected) @@ -450,15 +489,15 @@ mod tests { let empty_batch = RecordBatch::new_empty(batch.schema()); #[rustfmt::skip] - let expected =&[ - "+---+", - "| a |", - "+---+", - "| 1 |", - "| 2 |", - "| 3 |", - "+---+", - ]; + let expected =&[ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| 2 |", + "| 3 |", + "+---+", + ]; PrintBatchesTest::new() .with_format(PrintFormat::Table) @@ -468,14 +507,32 @@ mod tests { } #[test] - fn test_print_batches_empty_batches_no_header() { + fn test_print_batches_empty_batch() { let empty_batch = RecordBatch::new_empty(one_column_batch().schema()); - // empty batches should not print a header - let expected = &[""]; + // Print column headers for empty batch when format is Table + #[rustfmt::skip] + let expected =&[ + "+---+", + "| a |", + "+---+", + "+---+", + ]; + + PrintBatchesTest::new() + .with_format(PrintFormat::Table) + .with_schema(one_column_schema()) + .with_batches(vec![empty_batch]) + .with_header(WithHeader::Yes) + .with_expected(expected) + .run(); + // No output for empty batch when schema contains no columns + let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty())); + let expected = &[""]; PrintBatchesTest::new() .with_format(PrintFormat::Table) + .with_schema(Arc::new(Schema::empty())) .with_batches(vec![empty_batch]) .with_header(WithHeader::Yes) .with_expected(expected) @@ -485,6 +542,7 @@ mod tests { #[derive(Debug)] struct PrintBatchesTest { format: PrintFormat, + schema: SchemaRef, batches: Vec, maxrows: MaxRows, with_header: WithHeader, @@ -504,6 +562,7 @@ mod tests { fn new() -> Self { Self { format: PrintFormat::Table, + schema: Arc::new(Schema::empty()), batches: vec![], maxrows: MaxRows::Unlimited, with_header: WithHeader::Ignored, @@ -517,6 +576,12 @@ mod tests { self } + // set the schema + fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = schema; + self + } + /// set the batches to convert fn with_batches(mut self, batches: Vec) -> Self { self.batches = batches; @@ -573,21 +638,31 @@ mod tests { fn output_with_header(&self, with_header: bool) -> String { let mut buffer: Vec = vec![]; self.format - .print_batches(&mut buffer, &self.batches, self.maxrows, with_header) + .print_batches( + &mut buffer, + self.schema.clone(), + &self.batches, + self.maxrows, + with_header, + ) .unwrap(); String::from_utf8(buffer).unwrap() } } - /// Return a batch with three columns and three rows - fn three_column_batch() -> RecordBatch { - let schema = Arc::new(Schema::new(vec![ + /// Return a schema with three columns + fn three_column_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), Field::new("c", DataType::Int32, false), - ])); + ])) + } + + /// Return a batch with three columns and three rows + fn three_column_batch() -> RecordBatch { RecordBatch::try_new( - schema, + three_column_schema(), vec![ Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(Int32Array::from(vec![4, 5, 6])), @@ -597,12 +672,17 @@ mod tests { .unwrap() } + /// Return a schema with one column + fn one_column_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])) + } + /// return a batch with one column and three rows fn one_column_batch() -> RecordBatch { - RecordBatch::try_from_iter(vec![( - "a", - Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, - )]) + RecordBatch::try_new( + one_column_schema(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + ) .unwrap() } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index bede5dd15eb6..e80cc55663ae 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use datafusion::common::instant::Instant; use std::fmt::{Display, Formatter}; use std::io::Write; use std::pin::Pin; @@ -23,7 +22,9 @@ use std::str::FromStr; use crate::print_format::PrintFormat; +use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use datafusion::common::instant::Instant; use datafusion::common::DataFusionError; use datafusion::error::Result; use datafusion::physical_plan::RecordBatchStream; @@ -98,6 +99,7 @@ impl PrintOptions { /// Print the batches to stdout using the specified format pub fn print_batches( &self, + schema: SchemaRef, batches: &[RecordBatch], query_start_time: Instant, ) -> Result<()> { @@ -105,7 +107,7 @@ impl PrintOptions { let mut writer = stdout.lock(); self.format - .print_batches(&mut writer, batches, self.maxrows, true)?; + .print_batches(&mut writer, schema, batches, self.maxrows, true)?; let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); let formatted_exec_details = get_execution_details_formatted( @@ -148,6 +150,7 @@ impl PrintOptions { row_count += batch.num_rows(); self.format.print_batches( &mut writer, + batch.schema(), &[batch], MaxRows::Unlimited, with_header, From a46adeef26103efc767ef15031f3aad1bdde0406 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Mon, 29 Apr 2024 21:56:41 +0300 Subject: [PATCH 05/10] Clean-up: Remove AggregateExec::group_by() (#10297) * Delete docs.yaml * Remove group_by * Update convert_first_last.rs --------- Co-authored-by: metesynnada <100111937+metesynnada@users.noreply.github.com> Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Co-authored-by: Mustafa Akur --- .../src/physical_optimizer/combine_partial_final_agg.rs | 6 +++--- .../core/src/physical_optimizer/convert_first_last.rs | 2 +- .../core/src/physical_optimizer/enforce_distribution.rs | 6 +++--- .../src/physical_optimizer/limited_distinct_aggregation.rs | 4 ++-- datafusion/core/src/physical_optimizer/topk_aggregation.rs | 2 +- datafusion/physical-plan/src/aggregates/mod.rs | 6 +----- 6 files changed, 11 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 92787df461d3..e41e4dd31647 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -70,12 +70,12 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { AggregateMode::Partial ) && can_combine( ( - agg_exec.group_by(), + agg_exec.group_expr(), agg_exec.aggr_expr(), agg_exec.filter_expr(), ), ( - input_agg_exec.group_by(), + input_agg_exec.group_expr(), input_agg_exec.aggr_expr(), input_agg_exec.filter_expr(), ), @@ -88,7 +88,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { }; AggregateExec::try_new( mode, - input_agg_exec.group_by().clone(), + input_agg_exec.group_expr().clone(), input_agg_exec.aggr_expr().to_vec(), input_agg_exec.filter_expr().to_vec(), input_agg_exec.input().clone(), diff --git a/datafusion/core/src/physical_optimizer/convert_first_last.rs b/datafusion/core/src/physical_optimizer/convert_first_last.rs index 14860eecf189..62537169cfc6 100644 --- a/datafusion/core/src/physical_optimizer/convert_first_last.rs +++ b/datafusion/core/src/physical_optimizer/convert_first_last.rs @@ -79,7 +79,7 @@ fn get_common_requirement_of_aggregate_input( if let Some(aggr_exec) = plan.as_any().downcast_ref::() { let input = aggr_exec.input(); let mut aggr_expr = try_get_updated_aggr_expr_from_child(aggr_exec); - let group_by = aggr_exec.group_by(); + let group_by = aggr_exec.group_expr(); let mode = aggr_exec.mode(); let input_eq_properties = input.equivalence_properties(); diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 14232f4933f8..02612a13ada5 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -461,7 +461,7 @@ fn reorder_aggregate_keys( ) -> Result { let parent_required = &agg_node.data; let output_columns = agg_exec - .group_by() + .group_expr() .expr() .iter() .enumerate() @@ -474,7 +474,7 @@ fn reorder_aggregate_keys( .collect::>(); if parent_required.len() == output_exprs.len() - && agg_exec.group_by().null_expr().is_empty() + && agg_exec.group_expr().null_expr().is_empty() && !physical_exprs_equal(&output_exprs, parent_required) { if let Some(positions) = expected_expr_positions(&output_exprs, parent_required) { @@ -482,7 +482,7 @@ fn reorder_aggregate_keys( agg_exec.input().as_any().downcast_ref::() { if matches!(agg_exec.mode(), &AggregateMode::Partial) { - let group_exprs = agg_exec.group_by().expr(); + let group_exprs = agg_exec.group_expr().expr(); let new_group_exprs = positions .into_iter() .map(|idx| group_exprs[idx].clone()) diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index d211d2c8b2d7..950bb3c8eeb2 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -55,7 +55,7 @@ impl LimitedDistinctAggregation { // We found what we want: clone, copy the limit down, and return modified node let new_aggr = AggregateExec::try_new( *aggr.mode(), - aggr.group_by().clone(), + aggr.group_expr().clone(), aggr.aggr_expr().to_vec(), aggr.filter_expr().to_vec(), aggr.input().clone(), @@ -116,7 +116,7 @@ impl LimitedDistinctAggregation { if let Some(parent_aggr) = match_aggr.as_any().downcast_ref::() { - if !parent_aggr.group_by().eq(aggr.group_by()) { + if !parent_aggr.group_expr().eq(aggr.group_expr()) { // a partial and final aggregation with different groupings disqualifies // rewriting the child aggregation rewrite_applicable = false; diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index 95f7067cbe1b..7c0519eda3b3 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -73,7 +73,7 @@ impl TopKAggregation { // We found what we want: clone, copy the limit down, and return modified node let new_aggr = AggregateExec::try_new( *aggr.mode(), - aggr.group_by().clone(), + aggr.group_expr().clone(), aggr.aggr_expr().to_vec(), aggr.filter_expr().to_vec(), aggr.input().clone(), diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 25f550836505..95376e7e69cd 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -502,17 +502,13 @@ impl AggregateExec { } } - pub fn group_by(&self) -> &PhysicalGroupBy { - &self.group_by - } - /// true, if this Aggregate has a group-by with no required or explicit ordering, /// no filtering and no aggregate expressions /// This method qualifies the use of the LimitedDistinctAggregation rewrite rule /// on an AggregateExec. pub fn is_unordered_unfiltered_group_by_distinct(&self) -> bool { // ensure there is a group by - if self.group_by().is_empty() { + if self.group_expr().is_empty() { return false; } // ensure there are no aggregate expressions From fedee070c86f5769088960fab351f1bc0ab30248 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 29 Apr 2024 14:57:42 -0400 Subject: [PATCH 06/10] Add mailing list descriptions to documentation (#10284) * Add mailing list descriptions to documentation * remove random ```` * prettoer --- .../source/contributor-guide/communication.md | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/docs/source/contributor-guide/communication.md b/docs/source/contributor-guide/communication.md index 3e5e816d2f90..40cb28bbde00 100644 --- a/docs/source/contributor-guide/communication.md +++ b/docs/source/contributor-guide/communication.md @@ -37,18 +37,31 @@ We use the Slack and Discord platforms for informal discussions and coordination meet other contributors and get guidance on where to contribute. It is important to note that any technical designs and decisions are made fully in the open, on GitHub. -Most of us use the `#datafusion` and `#arrow-rust` channels in the [ASF Slack workspace](https://s.apache.org/slack-invite) . -Unfortunately, due to spammers, the ASF Slack workspace requires an invitation to join. To get an invitation, -request one in the `Arrow Rust` channel of the [Arrow Rust Discord server](https://discord.gg/Qw5gKqHxUM). +Most of us use the `#datafusion` and `#arrow-rust` channels in the [ASF Slack +workspace](https://s.apache.org/slack-invite) and the [Arrow Rust Discord +server](https://discord.gg/Qw5gKqHxUM) for discussions. -## Mailing list +Unfortunately, due to spammers, the ASF Slack workspace requires an invitation +to join. We are happy to invite you -- please ask for an invitation in the +Discord server. -We also use arrow.apache.org's `dev@` mailing list for release coordination and occasional design discussions. Other -than the release process, most DataFusion mailing list traffic will link to a GitHub issue or PR for discussion. -([subscribe](mailto:dev-subscribe@datafusion.apache.org), -[unsubscribe](mailto:dev-unsubscribe@datafusion.apache.org), -[archives](https://lists.apache.org/list.html?dev@arrow.apache.org)). +## Mailing Lists -When emailing the dev list, please make sure to prefix the subject line with a -`[DataFusion]` tag, e.g. `"[DataFusion] New API for remote data sources"`, so -that the appropriate people in the Apache Arrow community notice the message. +Like other Apache projects, we use [mailing lists] for certain purposes, most +importantly release coordination. Other than the release process, most +DataFusion mailing list traffic will simply link to a GitHub issue or PR where +the actual discussion occurs. The project mailing lists are: + +- [`dev@datafusion.apache.org`](mailto:dev@datafusion.apache.org): the main + mailing list for release coordination and other project-wide discussions. Links: + [archives](https://lists.apache.org/list.html?dev@datafusion.apache.org), + [subscribe](mailto:dev-subscribe@datafusion.apache.org), + [unsubscribe](mailto:dev-unsubscribe@datafusion.apache.org) +- `github@datafusion.apache.org`: read-only mailing list that receives all GitHub notifications for the project. Links: + [archives](https://lists.apache.org/list.html?github@datafusion.apache.org) +- `commits@datafusion.apache.org`: read-only mailing list that receives all GitHub commits for the project. Links: + [archives](https://lists.apache.org/list.html?commits@datafusion.apache.org) +- `private@datafusion.apache.org`: private mailing list for PMC members. This list has very little traffic, almost exclusively discussions on growing the committer and PMC membership. Links: + [archives](https://lists.apache.org/list.html?private@datafusion.apache.org) + +[mailing lists]: https://www.apache.org/foundation/mailinglists From 7d3e6c7e29a7426fa99c5765e6ad86e5bc4de7e6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 29 Apr 2024 14:58:52 -0400 Subject: [PATCH 07/10] chore(deps): update substrait requirement from 0.31.0 to 0.32.0 (#10279) Updates the requirements on [substrait](https://github.com/substrait-io/substrait-rs) to permit the latest version. - [Release notes](https://github.com/substrait-io/substrait-rs/releases) - [Changelog](https://github.com/substrait-io/substrait-rs/blob/main/CHANGELOG.md) - [Commits](https://github.com/substrait-io/substrait-rs/compare/v0.31.0...v0.32.0) --- updated-dependencies: - dependency-name: substrait dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- datafusion/substrait/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index a947ac2c51c3..dce8ce10b587 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -39,7 +39,7 @@ itertools = { workspace = true } object_store = { workspace = true } prost = "0.12" prost-types = "0.12" -substrait = "0.31.0" +substrait = "0.32.0" [dev-dependencies] tokio = { workspace = true } From acd98654bd86bfadafb76ab99b0e767ec4326bdb Mon Sep 17 00:00:00 2001 From: Eren Avsarogullari Date: Mon, 29 Apr 2024 12:13:37 -0700 Subject: [PATCH 08/10] refactor: Convert IPCWriter metrics from u64 to usize (#10278) --- datafusion/physical-plan/src/common.rs | 10 +++++----- datafusion/physical-plan/src/sorts/sort.rs | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index f7cad9df4ba1..cdd122cf36fe 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -259,11 +259,11 @@ pub struct IPCWriter { /// inner writer pub writer: FileWriter, /// batches written - pub num_batches: u64, + pub num_batches: usize, /// rows written - pub num_rows: u64, + pub num_rows: usize, /// bytes written - pub num_bytes: u64, + pub num_bytes: usize, } impl IPCWriter { @@ -306,9 +306,9 @@ impl IPCWriter { pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { self.writer.write(batch)?; self.num_batches += 1; - self.num_rows += batch.num_rows() as u64; + self.num_rows += batch.num_rows(); let num_bytes: usize = batch.get_array_memory_size(); - self.num_bytes += num_bytes as u64; + self.num_bytes += num_bytes; Ok(()) } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index e4e3d46dfbbc..ebeaf9e471c3 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -406,7 +406,7 @@ impl ExternalSorter { let used = self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(used); - self.metrics.spilled_rows.add(spilled_rows as usize); + self.metrics.spilled_rows.add(spilled_rows); self.spills.push(spill_file); Ok(used) } @@ -674,7 +674,7 @@ async fn spill_sorted_batches( batches: Vec, path: &Path, schema: SchemaRef, -) -> Result { +) -> Result { let path: PathBuf = path.into(); let task = SpawnedTask::spawn_blocking(move || write_sorted(batches, path, schema)); match task.join().await { @@ -705,7 +705,7 @@ fn write_sorted( batches: Vec, path: PathBuf, schema: SchemaRef, -) -> Result { +) -> Result { let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; for batch in batches { writer.write(&batch)?; @@ -715,7 +715,7 @@ fn write_sorted( "Spilled {} batches of total {} rows to disk, memory released {}", writer.num_batches, writer.num_rows, - human_readable_size(writer.num_bytes as usize), + human_readable_size(writer.num_bytes), ); Ok(writer.num_rows) } From 0f2a68ee1676c0d141d2c7cacf4b7c21d0033870 Mon Sep 17 00:00:00 2001 From: Duong Cong Toai <35887761+duongcongtoai@users.noreply.github.com> Date: Mon, 29 Apr 2024 21:27:40 +0200 Subject: [PATCH 09/10] Validate ScalarUDF output rows and fix nulls for `array_has` and `get_field` for `Map` (#10148) * validate input/output of udf * clip * fmt * clean garbage * don't check if output is scalar * lint * fix array_has * rm debug * chore: temp code for demonstration * getfield retains number of rows * rust fmt * minor comments * fmt * refactor * compile err * fmt again * fmt * add validate_number_of_rows for UDF * only check for columnarvalue::array --- .../user_defined_scalar_functions.rs | 40 ++++++++++- datafusion/functions-array/src/array_has.rs | 60 ++++++++-------- datafusion/functions/src/core/getfield.rs | 70 +++++++++++++------ .../physical-expr/src/scalar_function.rs | 15 ++-- datafusion/sqllogictest/test_files/array.slt | 15 ++-- datafusion/sqllogictest/test_files/map.slt | 1 + 6 files changed, 141 insertions(+), 60 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 4f262b54fb20..def9fcb4c61b 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -26,7 +26,7 @@ use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, cast::as_float64_array, cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, ScalarValue, }; -use datafusion_common::{exec_err, internal_err, DataFusionError}; +use datafusion_common::{assert_contains, exec_err, internal_err, DataFusionError}; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ @@ -205,6 +205,44 @@ impl ScalarUDFImpl for Simple0ArgsScalarUDF { } } +#[tokio::test] +async fn test_row_mismatch_error_in_scalar_udf() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(Int32Array::from(vec![1, 2]))], + )?; + + let ctx = SessionContext::new(); + + ctx.register_batch("t", batch)?; + + // udf that always return 1 row + let buggy_udf = Arc::new(|_: &[ColumnarValue]| { + Ok(ColumnarValue::Array(Arc::new(Int32Array::from(vec![0])))) + }); + + ctx.register_udf(create_udf( + "buggy_func", + vec![DataType::Int32], + Arc::new(DataType::Int32), + Volatility::Immutable, + buggy_udf, + )); + assert_contains!( + ctx.sql("select buggy_func(a) from t") + .await? + .show() + .await + .err() + .unwrap() + .to_string(), + "UDF returned a different number of rows than expected" + ); + Ok(()) +} + #[tokio::test] async fn scalar_udf_zero_params() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); diff --git a/datafusion/functions-array/src/array_has.rs b/datafusion/functions-array/src/array_has.rs index ee064335c1cc..e5e8add95fbe 100644 --- a/datafusion/functions-array/src/array_has.rs +++ b/datafusion/functions-array/src/array_has.rs @@ -288,36 +288,40 @@ fn general_array_has_dispatch( } else { array }; - for (row_idx, (arr, sub_arr)) in array.iter().zip(sub_array.iter()).enumerate() { - if let (Some(arr), Some(sub_arr)) = (arr, sub_arr) { - let arr_values = converter.convert_columns(&[arr])?; - let sub_arr_values = if comparison_type != ComparisonType::Single { - converter.convert_columns(&[sub_arr])? - } else { - converter.convert_columns(&[element.clone()])? - }; - - let mut res = match comparison_type { - ComparisonType::All => sub_arr_values - .iter() - .dedup() - .all(|elem| arr_values.iter().dedup().any(|x| x == elem)), - ComparisonType::Any => sub_arr_values - .iter() - .dedup() - .any(|elem| arr_values.iter().dedup().any(|x| x == elem)), - ComparisonType::Single => arr_values - .iter() - .dedup() - .any(|x| x == sub_arr_values.row(row_idx)), - }; - - if comparison_type == ComparisonType::Any { - res |= res; + match (arr, sub_arr) { + (Some(arr), Some(sub_arr)) => { + let arr_values = converter.convert_columns(&[arr])?; + let sub_arr_values = if comparison_type != ComparisonType::Single { + converter.convert_columns(&[sub_arr])? + } else { + converter.convert_columns(&[element.clone()])? + }; + + let mut res = match comparison_type { + ComparisonType::All => sub_arr_values + .iter() + .dedup() + .all(|elem| arr_values.iter().dedup().any(|x| x == elem)), + ComparisonType::Any => sub_arr_values + .iter() + .dedup() + .any(|elem| arr_values.iter().dedup().any(|x| x == elem)), + ComparisonType::Single => arr_values + .iter() + .dedup() + .any(|x| x == sub_arr_values.row(row_idx)), + }; + + if comparison_type == ComparisonType::Any { + res |= res; + } + boolean_builder.append_value(res); + } + // respect null input + (_, _) => { + boolean_builder.append_null(); } - - boolean_builder.append_value(res); } } Ok(Arc::new(boolean_builder.finish())) diff --git a/datafusion/functions/src/core/getfield.rs b/datafusion/functions/src/core/getfield.rs index b00b8ea553f2..a092aac159bb 100644 --- a/datafusion/functions/src/core/getfield.rs +++ b/datafusion/functions/src/core/getfield.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{Scalar, StringArray}; +use arrow::array::{ + make_array, Array, Capacities, MutableArrayData, Scalar, StringArray, +}; use arrow::datatypes::DataType; use datafusion_common::cast::{as_map_array, as_struct_array}; use datafusion_common::{exec_err, ExprSchema, Result, ScalarValue}; @@ -107,29 +109,55 @@ impl ScalarUDFImpl for GetFieldFunc { ); } }; + match (array.data_type(), name) { - (DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => { - let map_array = as_map_array(array.as_ref())?; - let key_scalar = Scalar::new(StringArray::from(vec![k.clone()])); - let keys = arrow::compute::kernels::cmp::eq(&key_scalar, map_array.keys())?; - let entries = arrow::compute::filter(map_array.entries(), &keys)?; - let entries_struct_array = as_struct_array(entries.as_ref())?; - Ok(ColumnarValue::Array(entries_struct_array.column(1).clone())) - } - (DataType::Struct(_), ScalarValue::Utf8(Some(k))) => { - let as_struct_array = as_struct_array(&array)?; - match as_struct_array.column_by_name(k) { - None => exec_err!( - "get indexed field {k} not found in struct"), - Some(col) => Ok(ColumnarValue::Array(col.clone())) + (DataType::Map(_, _), ScalarValue::Utf8(Some(k))) => { + let map_array = as_map_array(array.as_ref())?; + let key_scalar: Scalar>> = Scalar::new(StringArray::from(vec![k.clone()])); + let keys = arrow::compute::kernels::cmp::eq(&key_scalar, map_array.keys())?; + + // note that this array has more entries than the expected output/input size + // because maparray is flatten + let original_data = map_array.entries().column(1).to_data(); + let capacity = Capacities::Array(original_data.len()); + let mut mutable = + MutableArrayData::with_capacities(vec![&original_data], true, + capacity); + + for entry in 0..map_array.len(){ + let start = map_array.value_offsets()[entry] as usize; + let end = map_array.value_offsets()[entry + 1] as usize; + + let maybe_matched = + keys.slice(start, end-start). + iter().enumerate(). + find(|(_, t)| t.unwrap()); + if maybe_matched.is_none(){ + mutable.extend_nulls(1); + continue } + let (match_offset,_) = maybe_matched.unwrap(); + mutable.extend(0, start + match_offset, start + match_offset + 1); + } + let data = mutable.freeze(); + let data = make_array(data); + Ok(ColumnarValue::Array(data)) + } + (DataType::Struct(_), ScalarValue::Utf8(Some(k))) => { + let as_struct_array = as_struct_array(&array)?; + match as_struct_array.column_by_name(k) { + None => exec_err!("get indexed field {k} not found in struct"), + Some(col) => Ok(ColumnarValue::Array(col.clone())), } - (DataType::Struct(_), name) => exec_err!( - "get indexed field is only possible on struct with utf8 indexes. \ - Tried with {name:?} index"), - (dt, name) => exec_err!( - "get indexed field is only possible on lists with int64 indexes or struct \ - with utf8 indexes. Tried {dt:?} with {name:?} index"), } + (DataType::Struct(_), name) => exec_err!( + "get indexed field is only possible on struct with utf8 indexes. \ + Tried with {name:?} index" + ), + (dt, name) => exec_err!( + "get indexed field is only possible on lists with int64 indexes or struct \ + with utf8 indexes. Tried {dt:?} with {name:?} index" + ), + } } } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 3b360fc20c39..b9c6ff3cfefc 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -146,11 +146,18 @@ impl PhysicalExpr for ScalarFunctionExpr { // evaluate the function match self.fun { ScalarFunctionDefinition::UDF(ref fun) => { - if self.args.is_empty() { - fun.invoke_no_args(batch.num_rows()) - } else { - fun.invoke(&inputs) + let output = match self.args.is_empty() { + true => fun.invoke_no_args(batch.num_rows()), + false => fun.invoke(&inputs), + }?; + + if let ColumnarValue::Array(array) = &output { + if array.len() != batch.num_rows() { + return internal_err!("UDF returned a different number of rows than expected. Expected: {}, Got: {}", + batch.num_rows(), array.len()); + } } + Ok(output) } ScalarFunctionDefinition::Name(_) => { internal_err!( diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index c3c5603dafc6..b33419ecd47c 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -5169,8 +5169,9 @@ false false false true true false true false true false false true false true false false -false false false false -false false false false +NULL NULL false false +false false NULL false +false false false NULL query BBBB select array_has(arrow_cast(column1, 'LargeList(List(Int64))'), make_array(5, 6)), @@ -5183,8 +5184,9 @@ false false false true true false true false true false false true false true false false -false false false false -false false false false +NULL NULL false false +false false NULL false +false false false NULL query BBBB select array_has(column1, make_array(5, 6)), @@ -5197,8 +5199,9 @@ false false false true true false true false true false false true false true false false -false false false false -false false false false +NULL NULL false false +false false NULL false +false false false NULL query BBBBBBBBBBBBB select array_has_all(make_array(1,2,3), make_array(1,3)), diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index 415fabf224d7..8ff7d119c454 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -44,6 +44,7 @@ DELETE 24 query T SELECT strings['not_found'] FROM data LIMIT 1; ---- +NULL statement ok drop table data; From dd5683745e7d527b01b804c8f4f1a0a53aa225e8 Mon Sep 17 00:00:00 2001 From: Lordworms <48054792+Lordworms@users.noreply.github.com> Date: Mon, 29 Apr 2024 18:14:06 -0500 Subject: [PATCH 10/10] Minor: return NULL for range and generate_series (#10275) * return NULL for range and generate_series * Update datafusion/sqllogictest/test_files/array.slt Co-authored-by: Andrew Lamb * Update datafusion/sqllogictest/test_files/array.slt Co-authored-by: Andrew Lamb --------- Co-authored-by: Andrew Lamb --- datafusion/functions-array/src/range.rs | 45 ++++-- datafusion/functions-array/src/udf.rs | 140 ------------------- datafusion/sqllogictest/test_files/array.slt | 37 +++-- 3 files changed, 59 insertions(+), 163 deletions(-) diff --git a/datafusion/functions-array/src/range.rs b/datafusion/functions-array/src/range.rs index 1c9e0c878e6e..150fe5960266 100644 --- a/datafusion/functions-array/src/range.rs +++ b/datafusion/functions-array/src/range.rs @@ -17,14 +17,12 @@ //! [`ScalarUDFImpl`] definitions for range and gen_series functions. +use crate::utils::make_scalar_function; use arrow::array::{Array, ArrayRef, Int64Array, ListArray}; use arrow::datatypes::{DataType, Field}; -use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; -use std::any::Any; - -use crate::utils::make_scalar_function; use arrow_array::types::{Date32Type, IntervalMonthDayNanoType}; -use arrow_array::Date32Array; +use arrow_array::{Date32Array, NullArray}; +use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; use arrow_schema::DataType::{Date32, Int64, Interval, List}; use arrow_schema::IntervalUnit::MonthDayNano; use datafusion_common::cast::{as_date32_array, as_int64_array, as_interval_mdn_array}; @@ -34,6 +32,7 @@ use datafusion_expr::Expr; use datafusion_expr::{ ColumnarValue, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; +use std::any::Any; use std::sync::Arc; make_udf_function!( @@ -57,6 +56,7 @@ impl Range { TypeSignature::Exact(vec![Int64, Int64]), TypeSignature::Exact(vec![Int64, Int64, Int64]), TypeSignature::Exact(vec![Date32, Date32, Interval(MonthDayNano)]), + TypeSignature::Any(3), ], Volatility::Immutable, ), @@ -77,14 +77,21 @@ impl ScalarUDFImpl for Range { } fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(List(Arc::new(Field::new( - "item", - arg_types[0].clone(), - true, - )))) + if arg_types.iter().any(|t| t.eq(&DataType::Null)) { + Ok(DataType::Null) + } else { + Ok(List(Arc::new(Field::new( + "item", + arg_types[0].clone(), + true, + )))) + } } fn invoke(&self, args: &[ColumnarValue]) -> Result { + if args.iter().any(|arg| arg.data_type() == DataType::Null) { + return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1)))); + } match args[0].data_type() { Int64 => make_scalar_function(|args| gen_range_inner(args, false))(args), Date32 => make_scalar_function(|args| gen_range_date(args, false))(args), @@ -120,6 +127,7 @@ impl GenSeries { TypeSignature::Exact(vec![Int64, Int64]), TypeSignature::Exact(vec![Int64, Int64, Int64]), TypeSignature::Exact(vec![Date32, Date32, Interval(MonthDayNano)]), + TypeSignature::Any(3), ], Volatility::Immutable, ), @@ -140,14 +148,21 @@ impl ScalarUDFImpl for GenSeries { } fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(List(Arc::new(Field::new( - "item", - arg_types[0].clone(), - true, - )))) + if arg_types.iter().any(|t| t.eq(&DataType::Null)) { + Ok(DataType::Null) + } else { + Ok(List(Arc::new(Field::new( + "item", + arg_types[0].clone(), + true, + )))) + } } fn invoke(&self, args: &[ColumnarValue]) -> Result { + if args.iter().any(|arg| arg.data_type() == DataType::Null) { + return Ok(ColumnarValue::Array(Arc::new(NullArray::new(1)))); + } match args[0].data_type() { Int64 => make_scalar_function(|args| gen_range_inner(args, true))(args), Date32 => make_scalar_function(|args| gen_range_date(args, true))(args), diff --git a/datafusion/functions-array/src/udf.rs b/datafusion/functions-array/src/udf.rs index 1462b3efad33..c723fbb42cfc 100644 --- a/datafusion/functions-array/src/udf.rs +++ b/datafusion/functions-array/src/udf.rs @@ -166,146 +166,6 @@ impl ScalarUDFImpl for StringToArray { } } -make_udf_function!( - Range, - range, - start stop step, - "create a list of values in the range between start and stop", - range_udf -); -#[derive(Debug)] -pub struct Range { - signature: Signature, - aliases: Vec, -} -impl Range { - pub fn new() -> Self { - use DataType::*; - Self { - signature: Signature::one_of( - vec![ - TypeSignature::Exact(vec![Int64]), - TypeSignature::Exact(vec![Int64, Int64]), - TypeSignature::Exact(vec![Int64, Int64, Int64]), - TypeSignature::Exact(vec![Date32, Date32, Interval(MonthDayNano)]), - ], - Volatility::Immutable, - ), - aliases: vec![String::from("range")], - } - } -} -impl ScalarUDFImpl for Range { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { - "range" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - use DataType::*; - Ok(List(Arc::new(Field::new( - "item", - arg_types[0].clone(), - true, - )))) - } - - fn invoke(&self, args: &[ColumnarValue]) -> Result { - let args = ColumnarValue::values_to_arrays(args)?; - match args[0].data_type() { - arrow::datatypes::DataType::Int64 => { - crate::kernels::gen_range(&args, false).map(ColumnarValue::Array) - } - arrow::datatypes::DataType::Date32 => { - crate::kernels::gen_range_date(&args, false).map(ColumnarValue::Array) - } - _ => { - exec_err!("unsupported type for range") - } - } - } - - fn aliases(&self) -> &[String] { - &self.aliases - } -} - -make_udf_function!( - GenSeries, - gen_series, - start stop step, - "create a list of values in the range between start and stop, include upper bound", - gen_series_udf -); -#[derive(Debug)] -pub struct GenSeries { - signature: Signature, - aliases: Vec, -} -impl GenSeries { - pub fn new() -> Self { - use DataType::*; - Self { - signature: Signature::one_of( - vec![ - TypeSignature::Exact(vec![Int64]), - TypeSignature::Exact(vec![Int64, Int64]), - TypeSignature::Exact(vec![Int64, Int64, Int64]), - TypeSignature::Exact(vec![Date32, Date32, Interval(MonthDayNano)]), - ], - Volatility::Immutable, - ), - aliases: vec![String::from("generate_series")], - } - } -} -impl ScalarUDFImpl for GenSeries { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { - "generate_series" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - use DataType::*; - Ok(List(Arc::new(Field::new( - "item", - arg_types[0].clone(), - true, - )))) - } - - fn invoke(&self, args: &[ColumnarValue]) -> Result { - let args = ColumnarValue::values_to_arrays(args)?; - match args[0].data_type() { - arrow::datatypes::DataType::Int64 => { - crate::kernels::gen_range(&args, true).map(ColumnarValue::Array) - } - arrow::datatypes::DataType::Date32 => { - crate::kernels::gen_range_date(&args, true).map(ColumnarValue::Array) - } - _ => { - exec_err!("unsupported type for range") - } - } - } - - fn aliases(&self) -> &[String] { - &self.aliases - } -} - make_udf_function!( ArrayDims, array_dims, diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index b33419ecd47c..3b90187f07e0 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -5634,15 +5634,26 @@ select range(NULL) ---- NULL -## should throw error -query error +## should return NULL +query ? select range(DATE '1992-09-01', NULL, INTERVAL '1' YEAR); +---- +NULL -query error +query ? select range(DATE '1992-09-01', DATE '1993-03-01', NULL); +---- +NULL -query error +query ? select range(NULL, DATE '1993-03-01', INTERVAL '1' YEAR); +---- +NULL + +query ? +select range(NULL, NULL, NULL); +---- +NULL query ? select range(DATE '1989-04-01', DATE '1993-03-01', INTERVAL '-1' YEAR) @@ -5668,16 +5679,26 @@ select generate_series(5), ---- [0, 1, 2, 3, 4, 5] [2, 3, 4, 5] [2, 5, 8] [1, 2, 3, 4, 5] [5, 4, 3, 2, 1] [10, 7, 4] [1992-09-01, 1992-10-01, 1992-11-01, 1992-12-01, 1993-01-01, 1993-02-01, 1993-03-01] [1993-02-01, 1993-01-31, 1993-01-30, 1993-01-29, 1993-01-28, 1993-01-27, 1993-01-26, 1993-01-25, 1993-01-24, 1993-01-23, 1993-01-22, 1993-01-21, 1993-01-20, 1993-01-19, 1993-01-18, 1993-01-17, 1993-01-16, 1993-01-15, 1993-01-14, 1993-01-13, 1993-01-12, 1993-01-11, 1993-01-10, 1993-01-09, 1993-01-08, 1993-01-07, 1993-01-06, 1993-01-05, 1993-01-04, 1993-01-03, 1993-01-02, 1993-01-01] [1989-04-01, 1990-04-01, 1991-04-01, 1992-04-01] -## should throw error -query error +## should return NULL +query ? select generate_series(DATE '1992-09-01', NULL, INTERVAL '1' YEAR); +---- +NULL -query error +query ? select generate_series(DATE '1992-09-01', DATE '1993-03-01', NULL); +---- +NULL -query error +query ? select generate_series(NULL, DATE '1993-03-01', INTERVAL '1' YEAR); +---- +NULL +query ? +select generate_series(NULL, NULL, NULL); +---- +NULL query ? select generate_series(DATE '1989-04-01', DATE '1993-03-01', INTERVAL '-1' YEAR)