Skip to content

Commit

Permalink
Spark: Make column order check optional (apache#745)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravichinoy authored and Dilip Biswal committed Apr 8, 2020
1 parent 58cbf5d commit ecd1cd9
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,36 @@ public class CheckCompatibility extends TypeUtil.CustomOrderSchemaVisitor<List<S
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> writeCompatibilityErrors(Schema readSchema, Schema writeSchema) {
return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, true, true));
return writeCompatibilityErrors(readSchema, writeSchema, true);
}

/**
* Returns a list of compatibility errors for writing with the given write schema.
* This includes nullability: writing optional (nullable) values to a required field is an error
* Optionally this method allows case where input schema has different ordering than table schema.
* @param readSchema a read schema
* @param writeSchema a write schema
* @param checkOrdering If false, allow input schema to have different ordering than table schema
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> writeCompatibilityErrors(Schema readSchema, Schema writeSchema, boolean checkOrdering) {
return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, checkOrdering, true));
}

/**
* Returns a list of compatibility errors for writing with the given write schema.
* This checks type compatibility and not nullability: writing optional (nullable) values
* to a required field is not an error. To check nullability as well as types,
* Optionally this method allows case where input schema has different ordering than table schema.
* use {@link #writeCompatibilityErrors(Schema, Schema)}.
*
* @param readSchema a read schema
* @param writeSchema a write schema
* @param checkOrdering If false, allow input schema to have different ordering than table schema
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> typeCompatibilityErrors(Schema readSchema, Schema writeSchema, boolean checkOrdering) {
return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, checkOrdering, false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,22 @@ public void testIncompatibleListAndPrimitive() {
errors.get(0).contains("list cannot be read as a string"));
}

@Test
public void testDifferentFieldOrdering() {
// writes should not reorder fields
Schema read = new Schema(required(0, "nested", Types.StructType.of(
required(1, "field_a", Types.IntegerType.get()),
required(2, "field_b", Types.IntegerType.get())
)));
Schema write = new Schema(required(0, "nested", Types.StructType.of(
required(2, "field_b", Types.IntegerType.get()),
required(1, "field_a", Types.IntegerType.get())
)));

List<String> errors = CheckCompatibility.writeCompatibilityErrors(read, write, false);
Assert.assertEquals("Should produce 0 error message", 0, errors.size());
}

@Test
public void testStructWriteReordering() {
// writes should not reorder fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
Schema dsSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
validateWriteSchema(table.schema(), dsSchema, checkNullability(options));
validateWriteSchema(table.schema(), dsSchema, checkNullability(options), checkOrdering(options));
validatePartitionTransforms(table.spec());
String appId = lazySparkSession().sparkContext().applicationId();
String wapId = options.get("wap-id").orElse(lazySparkSession().conf().get("spark.wap.id", null));
Expand All @@ -148,7 +148,7 @@ public StreamWriter createStreamWriter(String runId, StructType dsStruct,
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
Schema dsSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
validateWriteSchema(table.schema(), dsSchema, checkNullability(options));
validateWriteSchema(table.schema(), dsSchema, checkNullability(options), checkOrdering(options));
validatePartitionTransforms(table.spec());
// Spark 2.4.x passes runId to createStreamWriter instead of real queryId,
// so we fetch it directly from sparkContext to make writes idempotent
Expand Down Expand Up @@ -205,12 +205,13 @@ protected static void mergeIcebergHadoopConfs(
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
}

protected void validateWriteSchema(Schema tableSchema, Schema dsSchema, Boolean checkNullability) {
private void validateWriteSchema(
Schema tableSchema, Schema dsSchema, Boolean checkNullability, Boolean checkOrdering) {
List<String> errors;
if (checkNullability) {
errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema, checkOrdering);
} else {
errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema);
errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema, checkOrdering);
}
if (!errors.isEmpty()) {
StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -243,4 +244,21 @@ protected boolean checkNullability(DataSourceOptions options) {
boolean dataFrameCheckNullability = options.getBoolean("check-nullability", true);
return sparkCheckNullability && dataFrameCheckNullability;
}

private boolean checkOrdering(DataSourceOptions options) {
boolean sparkCheckOrdering = Boolean.parseBoolean(lazySpark.conf()
.get("spark.sql.iceberg.check-ordering", "true"));
boolean dataFrameCheckOrdering = options.getBoolean("check-ordering", true);
return sparkCheckOrdering && dataFrameCheckOrdering;
}

private FileIO fileIO(Table table) {
if (table.io() instanceof HadoopFileIO) {
// we need to use Spark's SerializableConfiguration to avoid issues with Kryo serialization
SerializableConfiguration conf = new SerializableConfiguration(((HadoopFileIO) table.io()).conf());
return new HadoopFileIO(conf::value);
} else {
return table.io();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PartitionKey implements StructLike {
private final Accessor<InternalRow>[] accessors;

@SuppressWarnings("unchecked")
PartitionKey(PartitionSpec spec) {
PartitionKey(PartitionSpec spec, Schema inputSchema) {
this.spec = spec;

List<PartitionField> fields = spec.fields();
Expand All @@ -58,7 +58,7 @@ class PartitionKey implements StructLike {
this.accessors = (Accessor<InternalRow>[]) Array.newInstance(Accessor.class, size);

Schema schema = spec.schema();
Map<Integer, Accessor<InternalRow>> newAccessors = buildAccessors(schema);
Map<Integer, Accessor<InternalRow>> newAccessors = buildAccessors(inputSchema);
for (int i = 0; i < size; i += 1) {
PartitionField field = fields.get(i);
Accessor<InternalRow> accessor = newAccessors.get(field.sourceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, lo
if (spec.fields().isEmpty()) {
return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
} else {
return new PartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
return new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, dsSchema);
}
}

Expand Down Expand Up @@ -513,10 +513,10 @@ private static class PartitionedWriter extends BaseWriter {
AppenderFactory<InternalRow> appenderFactory,
WriterFactory.OutputFileFactory fileFactory,
FileIO fileIo,
long targetFileSize) {
long targetFileSize,
Schema writeSchema) {
super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);

this.key = new PartitionKey(spec);
this.key = new PartitionKey(spec, writeSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public void testNullPartitionValue() throws Exception {
Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);

try {
// TODO: incoming columns must be ordered according to the table's schema
df.select("id", "data").write()
.format("iceberg")
.mode("append")
Expand All @@ -154,6 +153,95 @@ public void testNullPartitionValue() throws Exception {
}
}

@Test
public void testReorderedColumns() throws Exception {
String desc = "reorder_columns";
File parent = temp.newFolder(desc);
File location = new File(parent, "test");
File dataFolder = new File(location, "data");
Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());

HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString());
table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();

List<SimpleRecord> expected = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c")
);

Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);

try {
df.select("data", "id").write()
.format("iceberg")
.mode("append")
.option("check-ordering", "false")
.save(location.toString());

Dataset<Row> result = spark.read()
.format("iceberg")
.load(location.toString());

List<SimpleRecord> actual = result
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();

Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);

} finally {
TestTables.clearTables();
}
}

@Test
public void testReorderedColumnsNoNullability() throws Exception {
String desc = "reorder_columns_no_nullability";
File parent = temp.newFolder(desc);
File location = new File(parent, "test");
File dataFolder = new File(location, "data");
Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());

HadoopTables tables = new HadoopTables(spark.sessionState().newHadoopConf());
Table table = tables.create(SIMPLE_SCHEMA, SPEC, location.toString());
table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();

List<SimpleRecord> expected = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c")
);

Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);

try {
df.select("data", "id").write()
.format("iceberg")
.mode("append")
.option("check-ordering", "false")
.option("check-nullability", "false")
.save(location.toString());

Dataset<Row> result = spark.read()
.format("iceberg")
.load(location.toString());

List<SimpleRecord> actual = result
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();

Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);

} finally {
TestTables.clearTables();
}
}

@Test
public void testPartitionValueTypes() throws Exception {
String[] columnNames = new String[] {
Expand Down

0 comments on commit ecd1cd9

Please sign in to comment.