Skip to content

Backport sort order to Spark 3.1 V2 Writes#574

Merged
chenselena merged 1 commit into
linkedin:mainfrom
chenselena:selchen/bump-oss-iceberg
May 9, 2026
Merged

Backport sort order to Spark 3.1 V2 Writes#574
chenselena merged 1 commit into
linkedin:mainfrom
chenselena:selchen/bump-oss-iceberg

Conversation

@chenselena
Copy link
Copy Markdown
Collaborator

@chenselena chenselena commented May 8, 2026

Summary

This PR brings in change from linkedin/iceberg#243 to backport the following to Spark 3.1: a rule that attaches a local Sort to V2 write commands (AppendData/OverwriteByExpression/OverwritePartitionsDynamic) so partitioned writes don't fail in ClusteredDataWriter.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Sort attached when the table has an explicit sort order (partition cols prepended via Spark3Util.buildRequiredOrdering).
No sort attached for unsorted tables — users must enable write.spark.fanout.enabled=true or pre-cluster the input. Matches Spark 3.5's default path.
No Exchange is ever attached: Spark 3.1 only has strict RepartitionByExpression, and a strict repartition would turn skewed partition keys into stragglers. Spark 3.4+'s non-strict
RebalancePartitions (which Spark 3.5 uses) doesn't exist here.
MERGE/UPDATE/DELETE skipped — RewriteRowLevelOperationHelper.buildWritePlan already prepares those queries; alreadyPrepared() detects its output shape to avoid double-wrapping.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Tested manually in Spark SQL, verified that with change, if no sort order applied on table the sort is not injected during write

scala> spark.sql(s"EXPLAIN EXTENDED INSERT INTO ${table}_t4_2 SELECT id, segmentType, shard, payload FROM test_data_view").show(500, false)
|== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [openhouse, u_dalihi, test_partition_35_sortonly_1_t4_2], [], false, false, false
+- 'Project ['id, 'segmentType, 'shard, 'payload]
   +- 'UnresolvedRelation [test_data_view], [], false

== Analyzed Logical Plan ==

AppendData RelationV2[id#209L, segmentType#210, shard#211, payload#212] openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, false, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- Project [ansi_cast(id#204 as bigint) AS id#213L, segmentType#205, shard#206, payload#207]
   +- Project [id#204, segmentType#205, shard#206, payload#207]
      +- SubqueryAlias test_data_view
         +- View (`test_data_view`, [id#204,segmentType#205,shard#206,payload#207])
            +- Project [cast(id#208 as int) AS id#204, cast(segmentType#201 as string) AS segmentType#205, cast(shard#202 as int) AS shard#206, cast(payload#203 as string) AS payload#207]
               +- Project [id#208, CASE WHEN ((id#208 % 3) = 0) THEN A WHEN ((id#208 % 3) = 1) THEN B ELSE C END AS segmentType#201, cast((id#208 % 4) as int) AS shard#202, concat(payload-, cast(id#208 as string)) AS payload#203]
                  +- SubqueryAlias __auto_generated_subquery_name
                     +- Project [id#208]
                        +- Generate explode(sequence(1, 10000, None, Some(UTC))), false, [id#208]
                           +- OneRowRelation

== Optimized Logical Plan ==
AppendData RelationV2[id#209L, segmentType#210, shard#211, payload#212] openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, false, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- Project [ansi_cast(id#208 as bigint) AS id#213L, CASE WHEN ((id#208 % 3) = 0) THEN A WHEN ((id#208 % 3) = 1) THEN B ELSE C END AS segmentType#205, (id#208 % 4) AS shard#206, concat(payload-, cast(id#208 as string)) AS payload#207]
   +- Generate explode(org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@7a3e9096), false, [id#208]
      +- OneRowRelation

== Physical Plan ==
AppendData openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, [], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5105/0x0000000802665040@3fce7731, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- *(2) Project [ansi_cast(id#208 as bigint) AS id#213L, CASE WHEN ((id#208 % 3) = 0) THEN A WHEN ((id#208 % 3) = 1) THEN B ELSE C END AS segmentType#205, (id#208 % 4) AS shard#206, concat(payload-, cast(id#208 as string)) AS payload#207]
   +- Generate explode(org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@7a3e9096), false, [id#208]
      +- *(1) Scan OneRowRelation[]

Adding write sort order on the table automatically injects a sort to the query plan:

scala> spark.sql(s"ALTER TABLE ${table}_t4_2 WRITE ORDERED BY (segmentType, shard)")

scala> spark.sql(s"EXPLAIN EXTENDED INSERT INTO ${table}_t4_2 SELECT id, segmentType, shard, payload FROM test_data_view").show(500, false)
|== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [openhouse, u_dalihi, test_partition_35_sortonly_1_t4_2], [], false, false, false
+- 'Project ['id, 'segmentType, 'shard, 'payload]
   +- 'UnresolvedRelation [test_data_view], [], false

== Analyzed Logical Plan ==

AppendData RelationV2[id#236L, segmentType#237, shard#238, payload#239] openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, false, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- Project [ansi_cast(id#231 as bigint) AS id#240L, segmentType#232, shard#233, payload#234]
   +- Project [id#231, segmentType#232, shard#233, payload#234]
      +- SubqueryAlias test_data_view
         +- View (`test_data_view`, [id#231,segmentType#232,shard#233,payload#234])
            +- Project [cast(id#235 as int) AS id#231, cast(segmentType#228 as string) AS segmentType#232, cast(shard#229 as int) AS shard#233, cast(payload#230 as string) AS payload#234]
               +- Project [id#235, CASE WHEN ((id#235 % 3) = 0) THEN A WHEN ((id#235 % 3) = 1) THEN B ELSE C END AS segmentType#228, cast((id#235 % 4) as int) AS shard#229, concat(payload-, cast(id#235 as string)) AS payload#230]
                  +- SubqueryAlias __auto_generated_subquery_name
                     +- Project [id#235]
                        +- Generate explode(sequence(1, 10000, None, Some(UTC))), false, [id#235]
                           +- OneRowRelation

== Optimized Logical Plan ==
AppendData RelationV2[id#236L, segmentType#237, shard#238, payload#239] openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, false, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- Sort [segmentType#232 ASC NULLS FIRST, shard#233 ASC NULLS FIRST], false
   +- Project [ansi_cast(id#235 as bigint) AS id#240L, CASE WHEN ((id#235 % 3) = 0) THEN A WHEN ((id#235 % 3) = 1) THEN B ELSE C END AS segmentType#232, (id#235 % 4) AS shard#233, concat(payload-, cast(id#235 as string)) AS payload#234]
      +- Generate explode(org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@7a3e9096), false, [id#235]
         +- OneRowRelation

== Physical Plan ==
AppendData openhouse.u_dalihi.test_partition_35_sortonly_1_t4_2, [], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$5105/0x0000000802665040@c8157a5, StructType(StructField(id,LongType,false), StructField(segmentType,StringType,false), StructField(shard,IntegerType,true), StructField(payload,StringType,false))
+- *(2) Sort [segmentType#232 ASC NULLS FIRST, shard#233 ASC NULLS FIRST], false, 0
   +- *(2) Project [ansi_cast(id#235 as bigint) AS id#240L, CASE WHEN ((id#235 % 3) = 0) THEN A WHEN ((id#235 % 3) = 1) THEN B ELSE C END AS segmentType#232, (id#235 % 4) AS shard#233, concat(payload-, cast(id#235 as string)) AS payload#234]
      +- Generate explode(org.apache.spark.sql.catalyst.expressions.UnsafeArrayData@7a3e9096), false, [id#235]
         +- *(1) Scan OneRowRelation[]

Also added the following unit tests:

  1. testWriteOrderedByPersistsMultiColumnSortOrder — multi-column ASC sort persists correctly
  2. testWriteOrderedByRespectsDirectionAndNullOrder — DESC + explicit NULLS FIRST persists correctly
  3. testWriteOrderedByRoundTripsThroughInsert — INSERT after WRITE ORDERED BY works and the sort order metadata isn't reset by the write

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

For all the boxes checked, include additional details of the changes made in this pull request.

@chenselena chenselena merged commit 0c02ee3 into linkedin:main May 9, 2026
1 check passed
@chenselena chenselena changed the title Add tests for spark 3.1 to confirm sort order is respected Backport sort order to Spark 3.1 V2 Writes May 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants