Skip to content

Commit

Permalink
Spark 3.3: Support storage-partitioned joins (apache#6371)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Jan 7, 2023
1 parent ddfb010 commit 7649595
Show file tree
Hide file tree
Showing 23 changed files with 1,583 additions and 218 deletions.
29 changes: 23 additions & 6 deletions core/src/main/java/org/apache/iceberg/Partitioning.java
Expand Up @@ -200,6 +200,18 @@ public Void alwaysNull(int fieldId, String sourceName, int sourceId) {
/**
* Builds a grouping key type considering all provided specs.
*
* @param specs one or many specs
* @return the constructed grouping key type
* @deprecated use {@link #groupingKeyType(Schema, Collection)} instead; will be removed in 1.3.0
*/
@Deprecated
public static StructType groupingKeyType(Collection<PartitionSpec> specs) {
return groupingKeyType(null, specs);
}

/**
* Builds a grouping key type considering the provided schema and specs.
*
* <p>A grouping key defines how data is split between files and consists of partition fields with
* non-void transforms that are present in each provided spec. Iceberg guarantees that records
* with different values for the grouping key are disjoint and are stored in separate files.
Expand All @@ -215,11 +227,15 @@ public Void alwaysNull(int fieldId, String sourceName, int sourceId) {
* that have the same field ID but use a void transform under the hood. Such fields cannot be part
* of the grouping key as void transforms always return null.
*
* <p>If the provided schema is not null, this method will only take into account partition fields
* on top of columns present in the schema. Otherwise, all partition fields will be considered.
*
* @param schema a schema specifying a set of source columns to consider (null to consider all)
* @param specs one or many specs
* @return the constructed grouping key type
*/
public static StructType groupingKeyType(Collection<PartitionSpec> specs) {
return buildPartitionProjectionType("grouping key", specs, commonActiveFieldIds(specs));
public static StructType groupingKeyType(Schema schema, Collection<PartitionSpec> specs) {
return buildPartitionProjectionType("grouping key", specs, commonActiveFieldIds(schema, specs));
}

/**
Expand Down Expand Up @@ -341,15 +357,15 @@ private static Set<Integer> allFieldIds(Collection<PartitionSpec> specs) {
}

// collects IDs of partition fields with non-void transforms that are present in each spec
private static Set<Integer> commonActiveFieldIds(Collection<PartitionSpec> specs) {
private static Set<Integer> commonActiveFieldIds(Schema schema, Collection<PartitionSpec> specs) {
Set<Integer> commonActiveFieldIds = Sets.newHashSet();

int specIndex = 0;
for (PartitionSpec spec : specs) {
if (specIndex == 0) {
commonActiveFieldIds.addAll(activeFieldIds(spec));
commonActiveFieldIds.addAll(activeFieldIds(schema, spec));
} else {
commonActiveFieldIds.retainAll(activeFieldIds(spec));
commonActiveFieldIds.retainAll(activeFieldIds(schema, spec));
}

specIndex++;
Expand All @@ -358,8 +374,9 @@ private static Set<Integer> commonActiveFieldIds(Collection<PartitionSpec> specs
return commonActiveFieldIds;
}

private static List<Integer> activeFieldIds(PartitionSpec spec) {
private static List<Integer> activeFieldIds(Schema schema, PartitionSpec spec) {
return spec.fields().stream()
.filter(field -> schema == null || schema.findField(field.sourceId()) != null)
.filter(field -> !isVoidTransform(field))
.map(PartitionField::fieldId)
.collect(Collectors.toList());
Expand Down
39 changes: 26 additions & 13 deletions core/src/test/java/org/apache/iceberg/TestPartitioning.java
Expand Up @@ -180,7 +180,7 @@ public void testGroupingKeyTypeWithSpecEvolutionInV1Tables() {

StructType expectedType =
StructType.of(NestedField.optional(1000, "data", Types.StringType.get()));
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -195,7 +195,7 @@ public void testGroupingKeyTypeWithSpecEvolutionInV2Tables() {

StructType expectedType =
StructType.of(NestedField.optional(1000, "data", Types.StringType.get()));
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -211,7 +211,7 @@ public void testGroupingKeyTypeWithDroppedPartitionFieldInV1Tables() {

StructType expectedType =
StructType.of(NestedField.optional(1000, "data", Types.StringType.get()));
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -227,7 +227,7 @@ public void testGroupingKeyTypeWithDroppedPartitionFieldInV2Tables() {

StructType expectedType =
StructType.of(NestedField.optional(1000, "data", Types.StringType.get()));
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -243,7 +243,7 @@ public void testGroupingKeyTypeWithRenamesInV1Table() {

StructType expectedType =
StructType.of(NestedField.optional(1000, "p2", Types.StringType.get()));
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -259,7 +259,7 @@ public void testGroupingKeyTypeWithRenamesInV2Table() {

StructType expectedType =
StructType.of(NestedField.optional(1000, "p2", Types.StringType.get()));
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -273,7 +273,7 @@ public void testGroupingKeyTypeWithEvolvedIntoUnpartitionedSpecV1Table() {
Assert.assertEquals("Should have 2 specs", 2, table.specs().size());

StructType expectedType = StructType.of();
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -287,7 +287,7 @@ public void testGroupingKeyTypeWithEvolvedIntoUnpartitionedSpecV2Table() {
Assert.assertEquals("Should have 2 specs", 2, table.specs().size());

StructType expectedType = StructType.of();
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -302,7 +302,7 @@ public void testGroupingKeyTypeWithAddingBackSamePartitionFieldInV1Table() {

StructType expectedType =
StructType.of(NestedField.optional(1000, "category", Types.StringType.get()));
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -317,7 +317,7 @@ public void testGroupingKeyTypeWithAddingBackSamePartitionFieldInV2Table() {

StructType expectedType =
StructType.of(NestedField.optional(1000, "category", Types.StringType.get()));
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -330,7 +330,7 @@ public void testGroupingKeyTypeWithOnlyUnpartitionedSpec() {
Assert.assertEquals("Should have 1 spec", 1, table.specs().size());

StructType expectedType = StructType.of();
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -345,7 +345,20 @@ public void testGroupingKeyTypeWithEvolvedUnpartitionedSpec() {
Assert.assertEquals("Should have 2 specs", 2, table.specs().size());

StructType expectedType = StructType.of();
StructType actualType = Partitioning.groupingKeyType(table.specs().values());
StructType actualType = Partitioning.groupingKeyType(table.schema(), table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

@Test
public void testGroupingKeyTypeWithProjectedSchema() {
TestTables.TestTable table =
TestTables.create(tableDir, "test", SCHEMA, BY_CATEGORY_DATA_SPEC, V1_FORMAT_VERSION);

Schema projectedSchema = table.schema().select("id", "data");

StructType expectedType =
StructType.of(NestedField.optional(1001, "data", Types.StringType.get()));
StructType actualType = Partitioning.groupingKeyType(projectedSchema, table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

Expand All @@ -366,6 +379,6 @@ public void testGroupingKeyTypeWithIncompatibleSpecEvolution() {
"Should complain about incompatible specs",
ValidationException.class,
"Conflicting partition fields",
() -> Partitioning.groupingKeyType(table.specs().values()));
() -> Partitioning.groupingKeyType(table.schema(), table.specs().values()));
}
}
Expand Up @@ -404,15 +404,15 @@ public void testSparkTableAddDropPartitions() throws Exception {
assertPartitioningEquals(sparkTable(), 1, "bucket(16, id)");

sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName);
assertPartitioningEquals(sparkTable(), 2, "truncate(data, 4)");
assertPartitioningEquals(sparkTable(), 2, "truncate(4, data)");

sql("ALTER TABLE %s ADD PARTITION FIELD years(ts)", tableName);
assertPartitioningEquals(sparkTable(), 3, "years(ts)");

sql("ALTER TABLE %s DROP PARTITION FIELD years(ts)", tableName);
assertPartitioningEquals(sparkTable(), 2, "truncate(data, 4)");
assertPartitioningEquals(sparkTable(), 2, "truncate(4, data)");

sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName);
sql("ALTER TABLE %s DROP PARTITION FIELD truncate(4, data)", tableName);
assertPartitioningEquals(sparkTable(), 1, "bucket(16, id)");

sql("ALTER TABLE %s DROP PARTITION FIELD shard", tableName);
Expand Down
Expand Up @@ -19,8 +19,17 @@
package org.apache.iceberg.spark.extensions;

import java.util.Map;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.Assert;
import org.junit.Test;

public class TestCopyOnWriteDelete extends TestDelete {

Expand All @@ -38,4 +47,32 @@ public TestCopyOnWriteDelete(
protected Map<String, String> extraTableProperties() {
return ImmutableMap.of(TableProperties.DELETE_MODE, "copy-on-write");
}

@Test
public void testRuntimeFilteringWithPreservedDataGrouping() throws NoSuchTableException {
createAndInitPartitionedTable();

append(new Employee(1, "hr"), new Employee(3, "hr"));
append(new Employee(1, "hardware"), new Employee(2, "hardware"));

Map<String, String> sqlConf =
ImmutableMap.of(
SQLConf.V2_BUCKETING_ENABLED().key(),
"true",
SparkSQLProperties.PRESERVE_DATA_GROUPING,
"true");

withSQLConf(sqlConf, () -> sql("DELETE FROM %s WHERE id = 2", tableName));

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots()));

Snapshot currentSnapshot = table.currentSnapshot();
validateCopyOnWrite(currentSnapshot, "1", "1", "1");

assertEquals(
"Should have expected rows",
ImmutableList.of(row(1, "hardware"), row(1, "hr"), row(3, "hr")),
sql("SELECT * FROM %s ORDER BY id, dep", tableName));
}
}
Expand Up @@ -18,9 +18,19 @@
*/
package org.apache.iceberg.spark.extensions;

import java.util.Collections;
import java.util.Map;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.Assert;
import org.junit.Test;

public class TestCopyOnWriteMerge extends TestMerge {

Expand All @@ -38,4 +48,45 @@ public TestCopyOnWriteMerge(
protected Map<String, String> extraTableProperties() {
return ImmutableMap.of(TableProperties.MERGE_MODE, "copy-on-write");
}

@Test
public void testRuntimeFilteringWithReportedPartitioning() {
createAndInitTable("id INT, dep STRING");
sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 3, \"dep\": \"hr\" }");
append(
tableName,
"{ \"id\": 1, \"dep\": \"hardware\" }\n" + "{ \"id\": 2, \"dep\": \"hardware\" }");

createOrReplaceView("source", Collections.singletonList(2), Encoders.INT());

Map<String, String> sqlConf =
ImmutableMap.of(
SQLConf.V2_BUCKETING_ENABLED().key(),
"true",
SparkSQLProperties.PRESERVE_DATA_GROUPING,
"true");

withSQLConf(
sqlConf,
() ->
sql(
"MERGE INTO %s t USING source s "
+ "ON t.id == s.value "
+ "WHEN MATCHED THEN "
+ " UPDATE SET id = -1",
tableName));

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots()));

Snapshot currentSnapshot = table.currentSnapshot();
validateCopyOnWrite(currentSnapshot, "1", "1", "1");

assertEquals(
"Should have expected rows",
ImmutableList.of(row(-1, "hardware"), row(1, "hardware"), row(1, "hr"), row(3, "hr")),
sql("SELECT * FROM %s ORDER BY id, dep", tableName));
}
}
Expand Up @@ -19,8 +19,16 @@
package org.apache.iceberg.spark.extensions;

import java.util.Map;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.Assert;
import org.junit.Test;

public class TestCopyOnWriteUpdate extends TestUpdate {

Expand All @@ -38,4 +46,35 @@ public TestCopyOnWriteUpdate(
protected Map<String, String> extraTableProperties() {
return ImmutableMap.of(TableProperties.UPDATE_MODE, "copy-on-write");
}

@Test
public void testRuntimeFilteringWithReportedPartitioning() {
createAndInitTable("id INT, dep STRING");
sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 3, \"dep\": \"hr\" }");
append(
tableName,
"{ \"id\": 1, \"dep\": \"hardware\" }\n" + "{ \"id\": 2, \"dep\": \"hardware\" }");

Map<String, String> sqlConf =
ImmutableMap.of(
SQLConf.V2_BUCKETING_ENABLED().key(),
"true",
SparkSQLProperties.PRESERVE_DATA_GROUPING,
"true");

withSQLConf(sqlConf, () -> sql("UPDATE %s SET id = -1 WHERE id = 2", tableName));

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots()));

Snapshot currentSnapshot = table.currentSnapshot();
validateCopyOnWrite(currentSnapshot, "1", "1", "1");

assertEquals(
"Should have expected rows",
ImmutableList.of(row(-1, "hardware"), row(1, "hardware"), row(1, "hr"), row(3, "hr")),
sql("SELECT * FROM %s ORDER BY id, dep", tableName));
}
}

0 comments on commit 7649595

Please sign in to comment.