Skip to content

Commit

Permalink
Revert "[HUDI-3523] Introduce AddColumnSchemaPostProcessor to support…
Browse files Browse the repository at this point in the history
… add columns to the end of a schema (apache#5031)" (apache#6768)

This reverts commit 092375f.
  • Loading branch information
wangxianghu committed Sep 23, 2022
1 parent 092375f commit 9bab106
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 229 deletions.

This file was deleted.

Expand Up @@ -20,7 +20,6 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.schema.AddColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor;
import org.apache.hudi.utilities.schema.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
Expand All @@ -34,14 +33,10 @@
import org.apache.avro.Schema.Type;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -60,18 +55,13 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
+ "{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\","
+ "\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"}]}";

private static Stream<Arguments> configParams() {
String[] types = {"bytes", "string", "int", "long", "float", "double", "boolean"};
return Stream.of(types).map(Arguments::of);
}

@Test
public void testPostProcessor() throws IOException {
properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName());
SchemaProvider provider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
properties, jsc, null);
UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
properties, jsc,null);

Schema schema = provider.getSourceSchema();
assertEquals(schema.getType(), Type.RECORD);
Expand All @@ -86,9 +76,9 @@ public void testSparkAvro() throws IOException {
transformerClassNames.add(FlatteningTransformer.class.getName());

SchemaProvider provider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
properties, jsc, transformerClassNames);
UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(SparkAvroSchemaProvider.class.getName(), properties, jsc),
properties, jsc, transformerClassNames);

Schema schema = provider.getSourceSchema();
assertEquals(schema.getType(), Type.RECORD);
Expand Down Expand Up @@ -154,48 +144,6 @@ public void testDeleteColumnThrows() {
Assertions.assertThrows(HoodieSchemaPostProcessException.class, () -> processor.processSchema(schema));
}

@ParameterizedTest
@MethodSource("configParams")
public void testAddPrimitiveTypeColumn(String type) {
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "primitive_column");
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), type);
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NEXT_PROP.key(), "fare");
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "primitive column test");

AddColumnSchemaPostProcessor processor = new AddColumnSchemaPostProcessor(properties, null);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
Schema targetSchema = processor.processSchema(schema);

Schema.Field newColumn = targetSchema.getField("primitive_column");
Schema.Field nextColumn = targetSchema.getField("fare");

assertNotNull(newColumn);
assertEquals("primitive column test", newColumn.doc());
assertEquals(type, newColumn.schema().getType().getName());
assertEquals(nextColumn.pos(), newColumn.pos() + 1);
}

@Test
public void testAddDecimalColumn() {
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "decimal_column");
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), "decimal");
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "decimal column test");
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(), "0.75");
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_PRECISION_PROP.key(), "8");
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SCALE_PROP.key(), "6");
properties.put(AddColumnSchemaPostProcessor.Config.SCHEMA_POST_PROCESSOR_ADD_COLUMN_SIZE_PROP.key(), "8");

AddColumnSchemaPostProcessor processor = new AddColumnSchemaPostProcessor(properties, null);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
Schema targetSchema = processor.processSchema(schema);

Schema.Field newColumn = targetSchema.getField("decimal_column");

assertNotNull(newColumn);
assertEquals("decimal", newColumn.schema().getLogicalType().getName());
assertEquals(5, newColumn.pos());
}

@Test
public void testSparkAvroSchema() throws IOException {
SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null);
Expand Down

0 comments on commit 9bab106

Please sign in to comment.