Skip to content

Commit

Permalink
Use implicit row constructors. Optionally skip nested rows. Nullabili…
Browse files Browse the repository at this point in the history
…ty bug fixes. (#64)
  • Loading branch information
ryannedolan committed Apr 9, 2024
1 parent 8edcd27 commit c3fa16a
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
.filter(x -> !x.getName().startsWith("__")) // don't write out hidden fields
.map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null))
.collect(Collectors.toList());
return Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields);
return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields),
dataType.isNullable());
} else {
switch (dataType.getSqlTypeName()) {
case INTEGER:
Expand All @@ -42,6 +43,15 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable());
case CHAR:
return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
case BOOLEAN:
return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable());
case ARRAY:
return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())),
dataType.isNullable());
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable());
case UNKNOWN:
case NULL:
return Schema.createUnion(Schema.create(Schema.Type.NULL));
Expand All @@ -56,14 +66,18 @@ public static Schema avro(String namespace, String name, RelProtoDataType relPro
return avro(namespace, name, relProtoDataType.apply(factory));
}

private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) {
private static Schema createAvroSchemaWithNullability(Schema schema, boolean nullable) {
if (nullable) {
return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(rawType));
return Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
} else {
return Schema.create(rawType);
return schema;
}
}

private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) {
return createAvroSchemaWithNullability(Schema.create(rawType), nullable);
}

public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
RelDataType unknown = typeFactory.createUnknownType();
switch (schema.getType()) {
Expand All @@ -74,17 +88,25 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList()));
case INT:
// schema.isNullable() should be false for basic types iiuc
return createRelTypeWithNullability(typeFactory, SqlTypeName.INTEGER, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.INTEGER);
case LONG:
return createRelTypeWithNullability(typeFactory, SqlTypeName.BIGINT, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.BIGINT);
case ENUM:
case FIXED:
case STRING:
return createRelTypeWithNullability(typeFactory, SqlTypeName.VARCHAR, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.VARCHAR);
case FLOAT:
return createRelTypeWithNullability(typeFactory, SqlTypeName.FLOAT, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.FLOAT);
case DOUBLE:
return createRelTypeWithNullability(typeFactory, SqlTypeName.DOUBLE, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.DOUBLE);
case BOOLEAN:
return createRelType(typeFactory, SqlTypeName.BOOLEAN);
case ARRAY:
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1);
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory));
case UNION:
if (schema.isNullable() && schema.getTypes().size() == 2) {
Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
Expand All @@ -102,9 +124,9 @@ public static RelDataType rel(Schema schema) {
return rel(schema, DataType.DEFAULT_TYPE_FACTORY);
}

private static RelDataType createRelTypeWithNullability(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) {
private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) {
RelDataType rawType = typeFactory.createSqlType(typeName);
return typeFactory.createTypeWithNullability(rawType, nullable);
return typeFactory.createTypeWithNullability(rawType, false);
}

public static RelProtoDataType proto(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/** Common data types. Not authoratitive or exhaustive. */
public enum DataType {

VARCHAR_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)),
VARCHAR(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)),
VARCHAR_NOT_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), false));

public static final RelDataTypeFactory DEFAULT_TYPE_FACTORY = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
Expand Down Expand Up @@ -56,16 +56,24 @@ public static Struct struct(RelDataType relDataType) {
/** Convenience builder for non-scalar types */
public interface Struct extends RelProtoDataType {

default Struct with(String name, DataType dataType) {
default Struct with(String name, RelDataType dataType) {
return x -> {
RelDataType existing = apply(x);
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(x);
builder.addAll(existing.getFieldList());
builder.add(name, dataType.rel(x));
builder.add(name, dataType);
return builder.build();
};
}

default Struct with(String name, DataType dataType) {
return with(name, dataType.rel());
}

default Struct with(String name, Struct struct) {
return with(name, struct.rel());
}

default RelDataType rel() {
return apply(DEFAULT_TYPE_FACTORY);
}
Expand All @@ -85,6 +93,17 @@ default Struct drop(String name) {
};
}

default Struct dropNestedRows() {
return x -> {
RelDataType dataType = apply(x);
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(x);
builder.addAll(dataType.getFieldList().stream()
.filter(y -> y.getType().getSqlTypeName() != SqlTypeName.ROW)
.collect(Collectors.toList()));
return builder.build();
};
}

default Struct get(String name) {
return x -> {
RelDataTypeField field = apply(x).getField(name, true, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
package com.linkedin.hoptimator.catalog;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
import org.apache.calcite.rel.rel2sql.SqlImplementor;
import org.apache.calcite.sql.SqlWriter;
//import org.apache.calcite.sql.SqlWriterConfig;
// needed in next Calcite version
// import org.apache.calcite.sql.SqlWriterConfig;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCollectionTypeNameSpec;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.fun.SqlRowOperator;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
import org.apache.calcite.rel.rel2sql.SqlImplementor;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.util.SqlShuttle;

import java.util.Map;
import java.util.List;
Expand Down Expand Up @@ -94,6 +104,7 @@ default String sql() {
/** Render the script as DDL/SQL in the given dialect */
default String sql(SqlDialect dialect) {
SqlWriter w = new SqlPrettyWriter(dialect);
// TODO: fix in next Calcite version
// above is deprecated; replace with:
// SqlWriter w = new SqlPrettyWriter(SqlWriterConfig.of().withDialect(dialect));
implement(w);
Expand Down Expand Up @@ -129,9 +140,31 @@ public QueryImplementor(RelNode relNode) {
public void implement(SqlWriter w) {
RelToSqlConverter converter = new RelToSqlConverter(w.getDialect());
SqlImplementor.Result result = converter.visitRoot(relNode);
w.literal(result.asSelect().toSqlString(w.getDialect()).getSql());
SqlSelect select = result.asSelect();
if (select.getSelectList() != null) {
select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR));
}
w.literal(select.toSqlString(w.getDialect()).getSql());
}
}

// A `ROW(...)` operator which will unparse as just `(...)`.
private final SqlRowOperator IMPLIED_ROW_OPERATOR = new SqlRowOperator(""); // empty string name

// a shuttle that replaces `Row(...)` with just `(...)`
private final SqlShuttle REMOVE_ROW_CONSTRUCTOR = new SqlShuttle() {
@Override
public SqlNode visit(SqlCall call) {
List<SqlNode> operands = call.getOperandList().stream().map(x -> x.accept(this)).collect(Collectors.toList());
if ((call.getKind() == SqlKind.ROW || call.getKind() == SqlKind.COLUMN_LIST
|| call.getOperator() instanceof SqlRowOperator)
&& operands.size() > 1) {
return IMPLIED_ROW_OPERATOR.createCall(call.getParserPosition(), operands);
} else {
return call.getOperator().createCall(call.getParserPosition(), operands);
}
}
};
}

/**
* Implements a CREATE TABLE...WITH... DDL statement.
Expand Down Expand Up @@ -291,14 +324,18 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) {
.map(x -> toSpec(x))
.collect(Collectors.toList());
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlRowTypeNameSpec(SqlParserPos.ZERO, fieldNames, fieldTypes), SqlParserPos.ZERO));
} if (dataType.getComponentType() != null) {
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec(new SqlBasicTypeNameSpec(
dataType.getComponentType().getSqlTypeName(), SqlParserPos.ZERO), dataType.getSqlTypeName(), SqlParserPos.ZERO),
SqlParserPos.ZERO));
} else {
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlBasicTypeNameSpec(dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO));
}
}

private static SqlDataTypeSpec maybeNullable(RelDataType dataType, SqlDataTypeSpec spec) {
if (!dataType.isNullable()) {
return spec.withNullable(true);
return spec.withNullable(false);
} else {
// we don't want "VARCHAR NULL", only "VARCHAR NOT NULL"
return spec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;

import java.util.concurrent.ExecutionException;
import java.util.function.Function;

/** Resolves a table name into a concrete row type. Usually involves a network call. */
public interface TableResolver {
RelDataType resolve(String table) throws InterruptedException, ExecutionException;

static TableResolver from(Function<String, RelDataType> f) {
return x -> f.apply(x);
}

/** Appends an extra column to the resolved type */
default TableResolver with(String name, RelDataType dataType) {
return x -> {
Expand All @@ -19,4 +25,20 @@ default TableResolver with(String name, RelDataType dataType) {
return builder.build();
};
}

default TableResolver with(String name, DataType dataType) {
return with(name, dataType.rel());
}

default TableResolver with(String name, DataType.Struct struct) {
return with(name, struct.rel());
}

default TableResolver mapStruct(Function<DataType.Struct, DataType.Struct> f) {
return x -> f.apply(DataType.struct(resolve(x))).rel();
}

default TableResolver map(Function<RelDataType, RelDataType> f) {
return x -> f.apply(resolve(x));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.linkedin.hoptimator.catalog;

import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.Litmus;
import org.apache.avro.Schema;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

public class AvroConverterTest {

@Test
public void convertsNestedSchemas() {
String schemaString = "{\"type\":\"record\",\"name\":\"E\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"h\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"H\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"A\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"A\",\"fields\":[]}]}]}]}]}";

Schema avroSchema1 = (new Schema.Parser()).parse(schemaString);
RelDataType rel1 = AvroConverter.rel(avroSchema1);
assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size());
assertTrue(rel1.toString(), rel1.getField("h", false, false) != null);
RelDataType rel2 = rel1.getField("h", false, false).getType();
assertTrue(rel2.toString(), rel2.isNullable());
Schema avroSchema2 = avroSchema1.getField("h").schema().getTypes().get(1);
assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getFields().size());
assertTrue(rel2.toString(), rel2.getField("A", false, false) != null);
RelDataType rel3 = rel2.getField("A", false, false).getType();
assertTrue(rel3.toString(), rel3.isNullable());
Schema avroSchema3 = avroSchema2.getField("A").schema().getTypes().get(1);
assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size());
Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1);
assertTrue("!avroSchema4.isNullable()", !avroSchema4.isNullable());
assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount());
Schema avroSchema5 = AvroConverter.avro("NS", "R", rel2);
assertTrue("avroSchema5.isNullable()", avroSchema5.isNullable());
assertEquals(avroSchema5.toString(), avroSchema5.getTypes().get(1).getFields().size(), rel2.getFieldCount());
Schema avroSchema6 = AvroConverter.avro("NS", "R", rel3);
assertEquals(avroSchema6.toString(), avroSchema6.getTypes().get(1).getFields().size(), rel3.getFieldCount());
RelDataType rel4 = AvroConverter.rel(avroSchema4);
assertTrue("types match", RelOptUtil.eq("rel4", rel4, "rel1", rel1, Litmus.THROW));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.hoptimator.catalog;

import org.apache.calcite.rel.type.RelDataType;

import static org.junit.Assert.assertTrue;
import org.junit.Test;

public class DataTypeTest {

@Test
public void skipsNestedRows() {
DataType.Struct struct = DataType.struct().with("one", DataType.VARCHAR)
.with("two", DataType.struct().with("three", DataType.VARCHAR));
RelDataType row1 = struct.rel();
assertTrue(row1.toString(), row1.getFieldCount() == 2);
assertTrue(row1.toString(), row1.getField("one", false, false) != null);
assertTrue(row1.toString(), row1.getField("two", false, false) != null);
RelDataType row2 = struct.dropNestedRows().rel();
assertTrue(row2.toString(), row2.getFieldCount() == 1);
assertTrue(row2.toString(), row2.getField("one", false, false) != null);
assertTrue(row2.toString(), row2.getField("two", false, false) == null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public void implementsFlinkCreateTableDDL() {
// Output isn't necessarily deterministic, but should be something like:
// CREATE TABLE IF NOT EXISTS "DATABASE"."TABLE1" ("idValue1" VARCHAR) WITH
// ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='topic1')
assertTrue(out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH "));
assertTrue(out.contains("'connector'='kafka'"));
assertTrue(out.contains("'properties.bootstrap.servers'='localhost:9092'"));
assertTrue(out.contains("'topic'='topic1'"));
assertFalse(out.contains("Row"));
assertTrue(out, out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH "));
assertTrue(out, out.contains("'connector'='kafka'"));
assertTrue(out, out.contains("'properties.bootstrap.servers'='localhost:9092'"));
assertTrue(out, out.contains("'topic'='topic1'"));
assertFalse(out, out.contains("Row"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
String principal = (String) operand.getOrDefault("principal", "User:ANONYMOUS");
Map<String, Object> clientConfig = (Map<String, Object>) operand.get("clientConfig");
DataType.Struct rowType = DataType.struct()
.with("PAYLOAD", DataType.VARCHAR_NULL)
.with("KEY", DataType.VARCHAR_NULL);
.with("PAYLOAD", DataType.VARCHAR)
.with("KEY", DataType.VARCHAR);
ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig)
.withPrefix("properties.")
.with("connector", "upsert-kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public Result reconcile(Request request) {
// Mark the Subscription as failed.
status.setFailed(true);
status.setMessage("Error: " + e.getMessage());
result = new Result(true, operator.failureRetryDuration());
}
} else if (status.getReady() == null && status.getResources() != null) {
// Phase 2
Expand Down

0 comments on commit c3fa16a

Please sign in to comment.