Skip to content

Backport ExtendedV2Writes rule for Spark 3.1#243

Merged
chenselena merged 6 commits into
linkedin:openhouse-1.2.0from
chenselena:selchen/backport-spark32-write
May 8, 2026
Merged

Backport ExtendedV2Writes rule for Spark 3.1#243
chenselena merged 6 commits into
linkedin:openhouse-1.2.0from
chenselena:selchen/backport-spark32-write

Conversation

@chenselena
Copy link
Copy Markdown
Collaborator

@chenselena chenselena commented May 6, 2026

Summary

Backports Spark 3.5's V2Writes idea to Spark 3.1: a rule that attaches a local Sort to V2 write commands (AppendData/OverwriteByExpression/OverwritePartitionsDynamic) so partitioned writes don't fail
in ClusteredDataWriter. Aligns Spark 3.1 behavior with Spark 3.5 ahead of the upgrade.

Behavior

  • Sort attached when the table has an explicit sort order (partition cols prepended via Spark3Util.buildRequiredOrdering).
  • No sort attached for unsorted tables — users must enable write.spark.fanout.enabled=true or pre-cluster the input. Matches Spark 3.5's default path.
  • No Exchange is ever attached: Spark 3.1 only has strict RepartitionByExpression, and a strict repartition would turn skewed partition keys into stragglers. Spark 3.4+'s non-strict
    RebalancePartitions (which Spark 3.5 uses) doesn't exist here.
  • MERGE/UPDATE/DELETE skippedRewriteRowLevelOperationHelper.buildWritePlan already prepares those queries; alreadyPrepared() detects its output shape to avoid double-wrapping.

Files

  • ExtendedV2Writes.scala — the rule + a local requiredOrdering wrapper around Spark3Util.buildRequiredOrdering.
  • IcebergSparkSessionExtensions.scala — registers the rule.
  • TestRequiredDistributionAndOrdering.java — covers sorted/RANGE/HASH/NONE distributions, fanout paths, AQE, single-shuffle-partition, null partition values, empty input, and saveAsTable append.
    *FailsWithoutRule tests pin the pre-rule baseline.

Test plan

  • ./gradlew :iceberg-spark:iceberg-spark-extensions-3.1_2.12:test --tests "*TestRequiredDistributionAndOrdering*"

@github-actions github-actions Bot added the SPARK label May 6, 2026
@chenselena chenselena force-pushed the selchen/backport-spark32-write branch from 2d6cd50 to 7eb18de Compare May 6, 2026 05:23
@chenselena chenselena force-pushed the selchen/backport-spark32-write branch from 53c2d22 to 7769824 Compare May 6, 2026 05:41
@chenselena chenselena marked this pull request as ready for review May 6, 2026 17:19
Copy link
Copy Markdown
Collaborator

@dxichen dxichen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the backport!

@chenselena chenselena merged commit bf6a720 into linkedin:openhouse-1.2.0 May 8, 2026
9 checks passed
chenselena added a commit to linkedin/openhouse that referenced this pull request May 9, 2026
## Summary

This PR brings in change from
linkedin/iceberg#243 to backport the following
to Spark 3.1: a rule that attaches a local Sort to V2 write commands
(AppendData/OverwriteByExpression/OverwritePartitionsDynamic) so
partitioned writes don't fail in ClusteredDataWriter.

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

Sort attached when the table has an explicit sort order (partition cols
prepended via Spark3Util.buildRequiredOrdering).
No sort attached for unsorted tables — users must enable
write.spark.fanout.enabled=true or pre-cluster the input. Matches Spark
3.5's default path.
No Exchange is ever attached: Spark 3.1 only has strict
RepartitionByExpression, and a strict repartition would turn skewed
partition keys into stragglers. Spark 3.4+'s non-strict
RebalancePartitions (which Spark 3.5 uses) doesn't exist here.
MERGE/UPDATE/DELETE skipped —
RewriteRowLevelOperationHelper.buildWritePlan already prepares those
queries; alreadyPrepared() detects its output shape to avoid
double-wrapping.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [x] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [ ] Added new tests for the changes made.
- [x] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

Tested manually in Spark SQL, verified that with change, if no sort
order applied on table the sort is not injected during write
```
scala> spark.sql(s"EXPLAIN EXTENDED INSERT INTO ${table}_t4_2 SELECT id, segmentType, shard, payload FROM test_data_view").show(500, false)
|== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [openhouse, u_dalihi, test_partition_35_sortonly_1_t4_2], [], false, false, false
+- 'Project ['id, 'segmentType, 'shard, 'payload]
   +- 'UnresolvedRelation [test_data_view], [], false

== Analyzed Logical Plan ==

AppendData RelationV2[id#209L, segmentType#210, shard#211, payload#212] openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, false, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- Project [ansi_cast(id#204 as bigint) AS id#213L, segmentType#205, shard#206, payload#207]
   +- Project [id#204, segmentType#205, shard#206, payload#207]
      +- SubqueryAlias test_data_view
         +- View (`test_data_view`, [id#204,segmentType#205,shard#206,payload#207])
            +- Project [cast(id#208 as int) AS id#204, cast(segmentType#201 as string) AS segmentType#205, cast(shard#202 as int) AS shard#206, cast(payload#203 as string) AS payload#207]
               +- Project [id#208, CASE WHEN ((id#208 % 3) = 0) THEN A WHEN ((id#208 % 3) = 1) THEN B ELSE C END AS segmentType#201, cast((id#208 % 4) as int) AS shard#202, concat(payload-, cast(id#208 as string)) AS payload#203]
                  +- SubqueryAlias __auto_generated_subquery_name
                     +- Project [id#208]
                        +- Generate explode(sequence(1, 10000, None, Some(UTC))), false, [id#208]
                           +- OneRowRelation

== Optimized Logical Plan ==
AppendData RelationV2[id#209L, segmentType#210, shard#211, payload#212] openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, false, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- Project [ansi_cast(id#208 as bigint) AS id#213L, CASE WHEN ((id#208 % 3) = 0) THEN A WHEN ((id#208 % 3) = 1) THEN B ELSE C END AS segmentType#205, (id#208 % 4) AS shard#206, concat(payload-, cast(id#208 as string)) AS payload#207]
   +- Generate explode(org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@7a3e9096), false, [id#208]
      +- OneRowRelation

== Physical Plan ==
AppendData openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, [], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5105/0x0000000802665040@3fce7731, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- *(2) Project [ansi_cast(id#208 as bigint) AS id#213L, CASE WHEN ((id#208 % 3) = 0) THEN A WHEN ((id#208 % 3) = 1) THEN B ELSE C END AS segmentType#205, (id#208 % 4) AS shard#206, concat(payload-, cast(id#208 as string)) AS payload#207]
   +- Generate explode(org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@7a3e9096), false, [id#208]
      +- *(1) Scan OneRowRelation[]
```
Adding write sort order on the table automatically injects a sort to the
query plan:
```
scala> spark.sql(s"ALTER TABLE ${table}_t4_2 WRITE ORDERED BY (segmentType, shard)")

scala> spark.sql(s"EXPLAIN EXTENDED INSERT INTO ${table}_t4_2 SELECT id, segmentType, shard, payload FROM test_data_view").show(500, false)
|== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [openhouse, u_dalihi, test_partition_35_sortonly_1_t4_2], [], false, false, false
+- 'Project ['id, 'segmentType, 'shard, 'payload]
   +- 'UnresolvedRelation [test_data_view], [], false

== Analyzed Logical Plan ==

AppendData RelationV2[id#236L, segmentType#237, shard#238, payload#239] openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, false, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- Project [ansi_cast(id#231 as bigint) AS id#240L, segmentType#232, shard#233, payload#234]
   +- Project [id#231, segmentType#232, shard#233, payload#234]
      +- SubqueryAlias test_data_view
         +- View (`test_data_view`, [id#231,segmentType#232,shard#233,payload#234])
            +- Project [cast(id#235 as int) AS id#231, cast(segmentType#228 as string) AS segmentType#232, cast(shard#229 as int) AS shard#233, cast(payload#230 as string) AS payload#234]
               +- Project [id#235, CASE WHEN ((id#235 % 3) = 0) THEN A WHEN ((id#235 % 3) = 1) THEN B ELSE C END AS segmentType#228, cast((id#235 % 4) as int) AS shard#229, concat(payload-, cast(id#235 as string)) AS payload#230]
                  +- SubqueryAlias __auto_generated_subquery_name
                     +- Project [id#235]
                        +- Generate explode(sequence(1, 10000, None, Some(UTC))), false, [id#235]
                           +- OneRowRelation

== Optimized Logical Plan ==
AppendData RelationV2[id#236L, segmentType#237, shard#238, payload#239] openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, false, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- Sort [segmentType#232 ASC NULLS FIRST, shard#233 ASC NULLS FIRST], false
   +- Project [ansi_cast(id#235 as bigint) AS id#240L, CASE WHEN ((id#235 % 3) = 0) THEN A WHEN ((id#235 % 3) = 1) THEN B ELSE C END AS segmentType#232, (id#235 % 4) AS shard#233, concat(payload-, cast(id#235 as string)) AS payload#234]
      +- Generate explode(org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@7a3e9096), false, [id#235]
         +- OneRowRelation

== Physical Plan ==
AppendData openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, [], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5105/0x0000000802665040@c8157a5, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- *(2) Sort [segmentType#232 ASC NULLS FIRST, shard#233 ASC NULLS FIRST], false, 0
   +- *(2) Project [ansi_cast(id#235 as bigint) AS id#240L, CASE WHEN ((id#235 % 3) = 0) THEN A WHEN ((id#235 % 3) = 1) THEN B ELSE C END AS segmentType#232, (id#235 % 4) AS shard#233, concat(payload-, cast(id#235 as string)) AS payload#234]
      +- Generate explode(org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@7a3e9096), false, [id#235]
         +- *(1) Scan OneRowRelation[]
```

Also added the following unit tests:
```
  1. testWriteOrderedByPersistsMultiColumnSortOrder — multi-column ASC sort persists correctly
  2. testWriteOrderedByRespectsDirectionAndNullOrder — DESC + explicit NULLS FIRST persists correctly
  3. testWriteOrderedByRoundTripsThroughInsert — INSERT after WRITE ORDERED BY works and the sort order metadata isn't reset by the write
```

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants