Skip to content

Spark3.5: Support ROW level operation on insert-replace-where #13332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

dolcino-li
Copy link

@dolcino-li dolcino-li commented Jun 17, 2025

Spark has insert-replace-where query since version 3.5.

Current Iceberg only has metadata-level support, this pr will contribute a row-level support.
If table could use metadata to achieve the insert-replace-where, then will keep the original overwrite logical plan.

Original logical plan is

== Analyzed Logical Plan ==
OverwriteByExpression RelationV2[name#141, age#142, job#143] spark_catalog.aorakili.person spark_catalog.aorakili.person, (age#142 > 22), false
+- Project [cast(col1#144 as string) AS name#147, cast(col2#145 as int) AS age#148, cast(col3#146 as string) AS job#149]
   +- LocalRelation [col1#144, col2#145, col3#146]

For Copy-On-Write mode, the logical plan is

== Optimized Logical Plan ==
ReplaceData RelationV2[name#13, age#14, job#15] spark_catalog.aorakili.person spark_catalog.aorakili.person, IcebergWrite(table=spark_catalog.aorakili.person, format=PARQUET)
+- Union false, false
   :- Project [name#13, age#14, job#15]
   :  +- Filter (dynamicpruningexpression(_file#24 IN (list#43 [])) AND NOT ((age#14 > 22) <=> true))
   :     :  +- Aggregate [_file#42], [_file#42]
   :     :     +- Project [_file#42]
   :     :        +- Filter (isnotnull(age#40) AND (age#40 > 22))
   :     :           +- RelationV2[age#40, _file#42] spark_catalog.aorakili.person
   :     +- RelationV2[name#13, age#14, job#15, _file#24] spark_catalog.aorakili.person
   +- Expand [[name#19, age#20, job#21]], [name#27, age#28, job#29]
      +- LocalRelation [name#19, age#20, job#21]

For Merge-On-Read mode, the logical plan is

== Optimized Logical Plan ==
WriteDelta RelationV2[name#54, age#55, job#56] spark_catalog.aorakili.person spark_catalog.aorakili.person, org.apache.iceberg.spark.source.SparkPositionDeltaWrite@77d5395a
+- RebalancePartitions [_spec_id#75, _partition#76, _file#73], 402653184
   +- Union false, false
      :- Expand [[1, null, null, null, _file#65, _pos#66L, _spec_id#63, _partition#64]], [__row_operation#69, name#70, age#71, job#72, _file#73, _pos#74L, _spec_id#75, _partition#76]
      :  +- Project [_file#65, _pos#66L, _spec_id#63, _partition#64]
      :     +- Filter (isnotnull(age#55) AND (age#55 > 22))
      :        +- RelationV2[age#55, _file#65, _pos#66L, _spec_id#63, _partition#64] spark_catalog.aorakili.person
      +- Expand [[3, name#60, age#61, job#62, null, null, null, null]], [__row_operation#78, name#79, age#80, job#81, _file#82, _pos#83L, _spec_id#84, _partition#85]
         +- LocalRelation [name#60, age#61, job#62]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant