Skip to content

Commit

Permalink
fix: Recompute RowIndex schema after projection pd (#15625)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 13, 2024
1 parent f8764c0 commit f62c51b
Show file tree
Hide file tree
Showing 26 changed files with 169 additions and 128 deletions.
8 changes: 1 addition & 7 deletions crates/polars-lazy/src/frame/mod.rs
Expand Up @@ -36,7 +36,6 @@ pub use polars_plan::frame::{AllowedOptimizations, OptState};
use polars_plan::global::FETCH_ROWS;
use smartstring::alias::String as SmartString;

use crate::fallible;
use crate::physical_plan::executors::Executor;
use crate::physical_plan::planner::{create_physical_expr, create_physical_plan};
use crate::physical_plan::state::ExecutionState;
Expand Down Expand Up @@ -1688,15 +1687,10 @@ impl LazyFrame {
};

if add_row_index_in_map {
let schema = fallible!(self.schema(), &self);
let schema = schema
.new_inserting_at_index(0, name.into(), IDX_DTYPE)
.unwrap();

self.map_private(FunctionNode::RowIndex {
name: Arc::from(name),
offset,
schema: Arc::new(schema),
schema: Default::default(),
})
} else {
self
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/executors/cache.rs
Expand Up @@ -27,9 +27,9 @@ impl Executor for CacheExec {

if state.verbose() {
if cache_hit {
println!("CACHE HIT: cache id: {:x}", self.id);
eprintln!("CACHE HIT: cache id: {:x}", self.id);
} else {
println!("CACHE SET: cache id: {:x}", self.id);
eprintln!("CACHE SET: cache id: {:x}", self.id);
}
}

Expand Down
Expand Up @@ -10,7 +10,7 @@ impl Executor for ExternalContext {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run ExternalContext")
eprintln!("run ExternalContext")
}
}
// we evaluate contexts first as input may has pushed exprs.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/filter.rs
Expand Up @@ -68,7 +68,7 @@ impl Executor for FilterExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run FilterExec")
eprintln!("run FilterExec")
}
}
let df = self.input.execute(state)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/group_by.rs
Expand Up @@ -119,7 +119,7 @@ impl Executor for GroupByExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run GroupbyExec")
eprintln!("run GroupbyExec")
}
}
if state.verbose() {
Expand Down
Expand Up @@ -81,7 +81,7 @@ impl Executor for GroupByDynamicExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run GroupbyDynamicExec")
eprintln!("run GroupbyDynamicExec")
}
}
let df = self.input.execute(state)?;
Expand Down
Expand Up @@ -348,7 +348,7 @@ impl Executor for PartitionGroupByExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run PartitionGroupbyExec")
eprintln!("run PartitionGroupbyExec")
}
}
let original_df = self.input.execute(state)?;
Expand Down
Expand Up @@ -100,7 +100,7 @@ impl Executor for GroupByRollingExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run GroupbyRollingExec")
eprintln!("run GroupbyRollingExec")
}
}
let df = self.input.execute(state)?;
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-lazy/src/physical_plan/executors/hconcat.rs
Expand Up @@ -12,14 +12,14 @@ impl Executor for HConcatExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run HConcatExec")
eprintln!("run HConcatExec")
}
}
let mut inputs = std::mem::take(&mut self.inputs);

let dfs = if !self.options.parallel {
if state.verbose() {
println!("HCONCAT: `parallel=false` hconcat is run sequentially")
eprintln!("HCONCAT: `parallel=false` hconcat is run sequentially")
}
let mut dfs = Vec::with_capacity(inputs.len());
for (idx, mut input) in inputs.into_iter().enumerate() {
Expand All @@ -33,7 +33,7 @@ impl Executor for HConcatExec {
dfs
} else {
if state.verbose() {
println!("HCONCAT: hconcat is run in parallel")
eprintln!("HCONCAT: hconcat is run in parallel")
}
// We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/join.rs
Expand Up @@ -38,7 +38,7 @@ impl Executor for JoinExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run JoinExec")
eprintln!("run JoinExec")
}
}
if state.verbose() {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/executors/projection.rs
Expand Up @@ -86,9 +86,9 @@ impl Executor for ProjectionExec {
{
if state.verbose() {
if self.cse_exprs.is_empty() {
println!("run ProjectionExec");
eprintln!("run ProjectionExec");
} else {
println!("run ProjectionExec with {} CSE", self.cse_exprs.len())
eprintln!("run ProjectionExec with {} CSE", self.cse_exprs.len())
};
}
}
Expand Down
Expand Up @@ -13,7 +13,7 @@ impl Executor for PythonScanExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run PythonScanExec")
eprintln!("run PythonScanExec")
}
}
let with_columns = self.options.with_columns.take();
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/slice.rs
Expand Up @@ -11,7 +11,7 @@ impl Executor for SliceExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run SliceExec")
eprintln!("run SliceExec")
}
}
let df = self.input.execute(state)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/sort.rs
Expand Up @@ -42,7 +42,7 @@ impl Executor for SortExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run SortExec")
eprintln!("run SortExec")
}
}
let df = self.input.execute(state)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/executors/stack.rs
Expand Up @@ -69,9 +69,9 @@ impl Executor for StackExec {
{
if state.verbose() {
if self.cse_exprs.is_empty() {
println!("run StackExec");
eprintln!("run StackExec");
} else {
println!("run StackExec with {} CSE", self.cse_exprs.len());
eprintln!("run StackExec with {} CSE", self.cse_exprs.len());
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/udf.rs
Expand Up @@ -11,7 +11,7 @@ impl Executor for UdfExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run UdfExec")
eprintln!("run UdfExec")
}
}
let df = self.input.execute(state)?;
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-lazy/src/physical_plan/executors/union.rs
Expand Up @@ -14,7 +14,7 @@ impl Executor for UnionExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run UnionExec")
eprintln!("run UnionExec")
}
}
// keep scans thread local if 'fetch' is used.
Expand All @@ -32,9 +32,9 @@ impl Executor for UnionExec {
if !self.options.parallel || sliced_path {
if state.verbose() {
if !self.options.parallel {
println!("UNION: `parallel=false` union is run sequentially")
eprintln!("UNION: `parallel=false` union is run sequentially")
} else {
println!("UNION: `slice is set` union is run sequentially")
eprintln!("UNION: `slice is set` union is run sequentially")
}
}

Expand Down Expand Up @@ -80,7 +80,7 @@ impl Executor for UnionExec {
concat_df(&dfs)
} else {
if state.verbose() {
println!("UNION: union is run in parallel")
eprintln!("UNION: union is run in parallel")
}

// we don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/unique.rs
Expand Up @@ -11,7 +11,7 @@ impl Executor for UniqueExec {
#[cfg(debug_assertions)]
{
if state.verbose() {
println!("run UniqueExec")
eprintln!("run UniqueExec")
}
}
let df = self.input.execute(state)?;
Expand Down
6 changes: 1 addition & 5 deletions crates/polars-plan/src/logical_plan/builder.rs
Expand Up @@ -867,16 +867,12 @@ impl LogicalPlanBuilder {
}

pub fn row_index(self, name: &str, offset: Option<IdxSize>) -> Self {
let mut schema = try_delayed!(self.0.schema(), &self.0, into).into_owned();
let schema_mut = Arc::make_mut(&mut schema);
row_index_schema(schema_mut, name);

LogicalPlan::MapFunction {
input: Arc::new(self.0),
function: FunctionNode::RowIndex {
name: ColumnName::from(name),
offset,
schema,
schema: Default::default(),
},
}
.into()
Expand Down
6 changes: 1 addition & 5 deletions crates/polars-plan/src/logical_plan/builder_alp.rs
Expand Up @@ -311,16 +311,12 @@ impl<'a> IRBuilder<'a> {
}

pub fn row_index(self, name: Arc<str>, offset: Option<IdxSize>) -> Self {
let mut schema = self.schema().into_owned();
let schema_mut = Arc::make_mut(&mut schema);
row_index_schema(schema_mut, name.as_ref());

let lp = IR::MapFunction {
input: self.root,
function: FunctionNode::RowIndex {
name,
offset,
schema,
schema: Default::default(),
},
};
self.add_alp(lp)
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-plan/src/logical_plan/builder_functions.rs
Expand Up @@ -54,7 +54,3 @@ pub(super) fn det_melt_schema(args: &MeltArgs, input_schema: &Schema) -> SchemaR
new_schema.with_column(value_name, supertype);
Arc::new(new_schema)
}

pub(super) fn row_index_schema(schema: &mut Schema, name: &str) {
schema.insert_at_index(0, name.into(), IDX_DTYPE).unwrap();
}
86 changes: 7 additions & 79 deletions crates/polars-plan/src/logical_plan/functions/mod.rs
Expand Up @@ -4,11 +4,13 @@ mod merge_sorted;
#[cfg(feature = "python")]
mod python_udf;
mod rename;
mod schema;

use std::borrow::Cow;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use polars_core::prelude::*;
#[cfg(feature = "serde")]
Expand All @@ -21,6 +23,8 @@ use crate::dsl::python_udf::PythonFunction;
use crate::logical_plan::functions::merge_sorted::merge_sorted;
use crate::prelude::*;

type CachedSchema = Arc<Mutex<Option<SchemaRef>>>;

#[derive(Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum FunctionNode {
Expand Down Expand Up @@ -91,7 +95,8 @@ pub enum FunctionNode {
},
RowIndex {
name: Arc<str>,
schema: SchemaRef,
// Might be cached.
schema: CachedSchema,
offset: Option<IdxSize>,
},
}
Expand Down Expand Up @@ -201,83 +206,6 @@ impl FunctionNode {
}
}

pub(crate) fn schema<'a>(
&self,
input_schema: &'a SchemaRef,
) -> PolarsResult<Cow<'a, SchemaRef>> {
use FunctionNode::*;
match self {
Opaque { schema, .. } => match schema {
None => Ok(Cow::Borrowed(input_schema)),
Some(schema_fn) => {
let output_schema = schema_fn.get_schema(input_schema)?;
Ok(Cow::Owned(output_schema))
},
},
#[cfg(feature = "python")]
OpaquePython { schema, .. } => Ok(schema
.as_ref()
.map(|schema| Cow::Owned(schema.clone()))
.unwrap_or_else(|| Cow::Borrowed(input_schema))),
Pipeline { schema, .. } => Ok(Cow::Owned(schema.clone())),
DropNulls { .. } => Ok(Cow::Borrowed(input_schema)),
Count { alias, .. } => {
let mut schema: Schema = Schema::with_capacity(1);
let name = SmartString::from(
alias
.as_ref()
.map(|alias| alias.as_ref())
.unwrap_or(crate::constants::LEN),
);
schema.insert_at_index(0, name, IDX_DTYPE)?;
Ok(Cow::Owned(Arc::new(schema)))
},
Rechunk => Ok(Cow::Borrowed(input_schema)),
Unnest { columns: _columns } => {
#[cfg(feature = "dtype-struct")]
{
let mut new_schema = Schema::with_capacity(input_schema.len() * 2);
for (name, dtype) in input_schema.iter() {
if _columns.iter().any(|item| item.as_ref() == name.as_str()) {
match dtype {
DataType::Struct(flds) => {
for fld in flds {
new_schema.with_column(
fld.name().clone(),
fld.data_type().clone(),
);
}
},
DataType::Unknown => {
// pass through unknown
},
_ => {
polars_bail!(
SchemaMismatch: "expected struct dtype, got: `{}`", dtype
);
},
}
} else {
new_schema.with_column(name.clone(), dtype.clone());
}
}

Ok(Cow::Owned(Arc::new(new_schema)))
}
#[cfg(not(feature = "dtype-struct"))]
{
panic!("activate feature 'dtype-struct'")
}
},
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => Ok(Cow::Borrowed(input_schema)),
Rename { existing, new, .. } => rename::rename_schema(input_schema, existing, new),
Explode { schema, .. } | RowIndex { schema, .. } | Melt { schema, .. } => {
Ok(Cow::Owned(schema.clone()))
},
}
}

pub(crate) fn allow_predicate_pd(&self) -> bool {
use FunctionNode::*;
match self {
Expand Down

0 comments on commit f62c51b

Please sign in to comment.