-
Notifications
You must be signed in to change notification settings - Fork 1.5k
adapt filter expressions to file schema during parquet scan #16461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
return exec_err!( | ||
"Non-nullable column '{}' is missing from the physical schema", | ||
column.name() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be useful to include some sort of file identifier here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed -- that might also be a nice usecase for a builder style struct
let new_predicate = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema)
.with_identifier(file_name)
.convert(predicate)?;
🤔
let stream = opener.open(make_meta(), file.clone()).unwrap().await.unwrap(); | ||
let (num_batches, num_rows) = count_batches_and_rows(stream).await; | ||
assert_eq!(num_batches, 1); | ||
assert_eq!(num_rows, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion fails on main
: all 3 rows are passed because the row filter cannot handle the partition columns. This PR somewhat coincidentally happens to allow the row filter to handle predicates that depend on partition and data columns!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- other than the removed test I think this PR makes sense to me and could be merged
I think it would be useful to refactor this code a bit and invest in testing infrastructure as we proceed towards adding nicer features (like unwrapping casts on columns for example)
/// Preference is always given to casting literal values to the data type of the column | ||
/// since casting the column to the literal value's data type can be significantly more expensive. | ||
/// Given two columns the cast is applied arbitrarily to the first column. | ||
pub fn cast_expr_to_schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is more general than just parquet, so perhaps we could move it into the datafusion-physical-schema
crate or perhaps somewhere near the physical planner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think it would really help (perhaps as a follow on PR) to add some more specific unit tests.
I wonder if an API like the following makes sense:
struct PhysicalExprSchemaRewriter {
...
}
// rewrite a predicate
let new_predicate = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema)
// optionally provide partition values
.with_partition_columns(partition_fields, partition_values
.convert(predicate)?;
Then I think writing unit tests would be easy and we could adapt / extend the code over time -- and it would set us up for adapting more sophisticated expressions like field extraction...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me, I will work on this next! Agreed on the unit tests 😄
|
||
assert!(candidate.is_none()); | ||
} | ||
|
||
#[test] | ||
fn test_filter_type_coercion() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way to keep this test (or something like it) that shows reading with a predicate of a different shcema is correctly coerced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so - I'll have to make it more e2e since it is no longer specific to the row filter. We have some other similar tests, I'll work this into there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() { | ||
let logical_field = match logical_file_schema.field_with_name(column.name()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could reduce a level of nesting with something like
if let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() { | |
let logical_field = match logical_file_schema.field_with_name(column.name()) { | |
let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() else { | |
return Ok(Transformed::no(expr)) | |
} |
} | ||
// If the column is not found in the logical schema, return an error | ||
// This should probably never be hit unless something upstream broke, but nontheless it's better | ||
// for us to return a handleable error than to panic / do something unexpected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
return exec_err!( | ||
"Non-nullable column '{}' is missing from the physical schema", | ||
column.name() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed -- that might also be a nice usecase for a builder style struct
let new_predicate = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema)
.with_identifier(file_name)
.convert(predicate)?;
🤔
|
||
// If the logical field and physical field are different, we need to cast | ||
// the column to the logical field's data type. | ||
// We will try later to move the cast to literal values if possible, which is computationally cheaper. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@alamb I've created the builder, moved the implementation and added some unit tests |
My hopes with this work is that we can:
|
@kosiew I'm curious what you think about this. Would it be possible to implement the nested struct imputation work you're doing with this approach? |
Just for fun I opened pydantic#31 to see how hard it would be to incorporate #15780 (comment), not too bad! Most of the code could be shared with the logical layer with some refactoring |
@adriangb
Do you mean reusing the PhysicalExprSchemaRewriter machinery to drive nested‐struct imputation? Here're some disadvantages of the rewrite‐centric approach versus the more data‐centric adapter approach:
So, could we bolt nested‐struct imputation onto his rewriter? Technically yes, we could extend rewrite_column so that, whenever we see a Column referring to foo.bar.baz that’s missing in the physical schema, you generate a Literal::Null of the full nested type (constructing the proper StructValue). But we’d still need an array-level counterpart to actually materialize those null nested fields in the RecordBatch when we call map_batch. In practice, the two approaches complement each other: Use the rewriter to handle predicate and projection expressions (so filters and column references don’t blow up). Continue to rely on cast_struct_column + NestedStructSchemaAdapter to adapt the actual batch data, filling in null arrays and doing recursive casts. That way we get the best of both worlds—clean, centralized expression rewriting for pushdown, and robust array-level marshalling for the final tables. 😊 Why the two-pronged approach makes sense
The PhysicalExprSchemaRewriter is perfect for rewriting predicates and projections so they don’t blow up when the on-disk schema diverges. But once you’ve read that Parquet row group into memory, you still need to reshape the StructArray itself—filling in null arrays for new nested fields, dropping old ones, recursively casting types.
Swapping out the old SchemaMapper for the rewriter in your pruning path is a great win: much less boilerplate, better separation of concerns, and no more “fake record batches” just to evaluate a filter. You can remove all of that pruning-specific adapter code and lean on the rewriter’s tree visitor.
Handling a top-level missing column is easy in the rewriter (you just rewrite col("foo") to lit(NULL)), but handling col("a.b.c") where b itself is a new struct, and c is a new field inside it… that’s far more natural in a recursive cast_struct_column that operates on StructArray children. |
Putting more words to how I understand pushdown and data adaptation:
At the end of this step, no data has actually been materialized—you’ve only modified the expression you use to decide what to read.
|
Thanks for the thoughtful reply. An important point that I maybe should have mentioned: we're actively working on projection / selection pushdown which would involve pushing down expressions into the scan (#14993), thus we would evaluate the expressions and materialize the record batches in the output. Basically what I see happening:
So some questions for nested structs are: is it going to be a pain to adapt the nested structs via projection expressions? Or is it going to be ~ the same as doing it at the SchemaAdapter / physical column level? |
FWIW I think this is one mechanism to turn the expression into an array (you just need to evaluate the expression into a PhysicalExpr): |
I would personally recommend proceeding in parallel with the two approaches, ensuring there are good end to end tests (.slt) -- and then if we find that the projection pushdown / rewriting code can subsume the schema adapter code we could make a PR to do that 🤔 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me -- thank you @adriangb
@@ -248,10 +248,25 @@ impl FileOpener for ParquetOpener { | |||
} | |||
} | |||
|
|||
let predicate = predicate | |||
.map(|p| { | |||
PhysicalExprSchemaRewriter::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😍
); | ||
} | ||
// If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. | ||
// TODO: do we need to sync this with what the `SchemaAdapter` actually does? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend ensuring this TODO is covered by a ticket and then adding a reference to the ticket here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own sake: the point here is that SchemaAdapter is a trait so it can do arbitrary things not just replace with nulls (that's what the default implementation does). It's super annoying to have to implement SchemaAdapterFactory and all of the 3 IIRC traits. I'd propose what we do here is just add hooks that follow the TreeNode APIs and make it a builder style struct.
|
||
let partition_fields = | ||
vec![Arc::new(Field::new("partition_col", DataType::Utf8, false))]; | ||
let partition_values = vec![ScalarValue::Utf8(Some("test_value".to_string()))]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make this less verbose like this if you want
let partition_values = vec![ScalarValue::Utf8(Some("test_value".to_string()))]; | |
let partition_values = vec![ScalarValue::from("test_value")]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I agree that is more concise but in this case since we are testing casting and such I think it's better to be explicit. I can see someone being very confused in the future as to why a test is failing and not realizing that ScalarValue::from("foo")
produces Utf8
and not e.g. Utf8View
.
} | ||
|
||
#[test] | ||
fn test_rewrite_column_with_type_cast() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend adding a test for a more sophisticated expression (like (a + 5.0::int64) || (c > 0.0::float)
) to ensure that the recursive rewrite case is covered
I opened #16528 to track further ideas / steps. For now I'd like to merge this so that I can iterate on the rest of the work 😄 |
#16530 will be able to easily be incorporated into this work, completely eliminating what are currently expensive casts |
Adding notes for future reference: Summary: Adapting Filter Expressions to File Schema During Parquet ScanBackground & Goal
Key Concepts1. Expression Rewriting (Pushdown Adaptation)
2. Data Adaptation (Batch-Level Reshaping)
Main Discussion Points
Diagram: Data Adaptation vs. Expression Rewriting Flow
Takeaways
Future Directions
SummaryThe issue centers on improving how DataFusion adapts filter expressions and projections during Parquet scans when the on-disk file schema differs from the logical query schema — especially for nested structs. The key is to separate expression rewriting (for pushdown safety) from actual batch data adaptation (for correctness), using a new builder abstraction for expression rewrites. This approach unblocks further optimizations and schema evolution support while keeping code maintainable and extensible. |
@kosiew I'm not sure I agree with the conclusions there. Why can't we use expressions to do the schema adapting during the scan? It's very possible as @alamb pointed out in #16461 (comment) to feed a RecordBatch into a an expression and get back a new array. So unless I'm missing something I don't think these are correct:
I'll put together an example to show how predicate rewrites can be used to reshape data. But also FWIW that's exactly how ProjectionExec works. |
@adriangb , I do look forward to a solution that handles schema adaptation in one pass. |
@kosiew could you take a look at 32725dd? I think this fits in well with the idea of #14993: we are quite literally moving work from ProjectionExec into the scan phase (with the added benefit that we can do smart things with avoiding casts as well as using file-specific optimizations like shredded variant columns).
I do think this is a concern, I'm not sure how hard it would be to actually implement, but it's theoretically very possible and I think we should be able to make it easy to implement with some elbow grease / the right helpers and abstractions. |
hi @adriangb
👍👍👍
So yes, for flat schemas and simple projections/filters, we can entirely sidestep a separate
✅ Where the rewrite-only approach shines
Why it’s possible but a lot of work (and a performance risk)
I hope I don't sound like I am dead against the rewrite approach. What I would love to hearHere's a simpler and faster approach ..... |
As far as I know a PhysicalExpr can operate at the array level. For example datafusion/datafusion/common/src/scalar/mod.rs Line 2672 in e3d3302
I think 3-4 function call hops would be an issue if it did happen for every row but it's happening at the array level - it's going to be inconsequential compared to the IO happening, etc.
Isn't there https://datafusion.apache.org/user-guide/sql/scalar_functions.html#struct? I'm sure we can call that ourselves without SQL: datafusion/datafusion/functions/src/core/struct.rs Lines 53 to 71 in e3d3302
|
Thank you very much for the feedback @kosiew 🙏🏻! I don't mean to disregard it, you make great points, but I think they are surmountable! Let's move forward with this and keep iterating on the approaches in parallel. If it turns out that this approach won't work for projection evaluation, it's still clearly a win for predicate evaluation. |
This reverts commit 4cef38f.
Great I'll address #16461 (comment) and then I think this will be ready to merge! |
I opened #16565 to track handling struct column sub-field adaptation specifically |
The idea here is to move us one step closer to #15780 although there is still work to do (e.g. #15780 (comment)).
My goal is that this immediately unblocks other work (in particular #15057) while being a relatively small change that we can build upon with more optimizations, etc.