diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlyFlinkSqlTransformTranslator.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlyFlinkSqlTransformTranslator.java index 4a36dea4d109f..42e1a8d93bbf6 100644 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlyFlinkSqlTransformTranslator.java +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlyFlinkSqlTransformTranslator.java @@ -48,6 +48,9 @@ public void translateNode(PTransform transform, FlinkStreamingTra StreamTableEnvironment tEnv = StreamTableEnvironment.create(context.getExecutionEnvironment()); sqlTransform.getCatalogs().forEach(tEnv::registerCatalog); + sqlTransform.getFunctionClasses().forEach(tEnv::createTemporarySystemFunction); + sqlTransform.getFunctionInstances().forEach(tEnv::createTemporarySystemFunction); + StringJoiner combinedStatements = new StringJoiner("\n\n"); StreamStatementSet ss = tEnv.createStatementSet(); for (String statement : sqlTransform.getStatements()) { diff --git a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransform.java b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransform.java index 6fbcf7b39ed21..0fcd9c4151256 100644 --- a/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransform.java +++ b/runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransform.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PDone; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.util.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -41,10 +42,14 @@ public class StatementOnlySqlTransform extends PTransform { private final List statements; private final Map catalogs; + private final Map functionInstances; + private final Map> functionClasses; StatementOnlySqlTransform() { this.statements = new ArrayList<>(); this.catalogs = new HashMap<>(); + this.functionInstances = new HashMap<>(); + this.functionClasses = new HashMap<>(); } @Override @@ -87,6 +92,36 @@ public StatementOnlySqlTransform withCatalog(String name, SerializableCatalog ca return this; } + /** + * Register a temporary user defined function for this SQL transform. The function will be + * registered as a System Function which means it will temporarily override other functions + * with the same name, if such function exists. + * + * @param name the name of the function. + * @param functionClass the class of the user defined function. + * @return this {@link StatementOnlySqlTransform} itself. + */ + public StatementOnlySqlTransform withFunction( + String name, Class functionClass) { + functionClasses.put(name, functionClass); + return this; + } + + /** + * Register a temporary user defined function for this SQL transform. The function will be + * registered as a System Function which means it will temporarily override other functions + * with the same name, if such function exists. + * + * @param name the name of the function. + * @param functionInstance the user defined function instance. + * @return this {@link StatementOnlySqlTransform} itself. + */ + public StatementOnlySqlTransform withFunction( + String name, UserDefinedFunction functionInstance) { + functionInstances.put(name, functionInstance); + return this; + } + // --------------------- package private getters ----------------- List getStatements() { return Collections.unmodifiableList(statements); @@ -96,6 +131,14 @@ Map getCatalogs() { return Collections.unmodifiableMap(catalogs); } + Map getFunctionInstances() { + return functionInstances; + } + + Map> getFunctionClasses() { + return functionClasses; + } + // --------------------- private helpers ------------------------ private static String cleanUp(String s) { return s.trim().endsWith(";") ? s : s + ";"; diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java index 9c60fc0903ec7..a606a63f882e1 100644 --- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/FlinkSqlTestUtils.java @@ -167,13 +167,13 @@ public static CatalogTable getOrdersCatalogTable() { return new ResolvedCatalogTable(origin, resolvedSchema); } - public static CatalogTable getOrdersVerifyCatalogTable() { + public static CatalogTable getOrdersVerifyCatalogTable(String verificationFile) { // Create schema ResolvedSchema resolvedSchema = getOrdersSchema(); Map connectorOptions = new HashMap<>(); connectorOptions.put(FactoryUtil.CONNECTOR.key(), VerifyingTableSinkFactory.IDENTIFIER); - connectorOptions.put(VerifyingTableSinkFactory.EXPECTED_RESULT_FILE_PATH_OPTION.key(), getFilePath("Orders")); + connectorOptions.put(VerifyingTableSinkFactory.EXPECTED_RESULT_FILE_PATH_OPTION.key(), getFilePath(verificationFile)); connectorOptions.put(VerifyingTableSinkFactory.HAS_HEADER_OPTION.key(), "true"); final CatalogTable origin = diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransformTest.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransformTest.java index b9f64e2ee4511..682d64fca7e66 100644 --- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransformTest.java +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/StatementOnlySqlTransformTest.java @@ -90,6 +90,25 @@ public void testInsertOverwrite() { pipeline.run(getPipelineOptions()); } + @Test + public void testWithFunction() { + SerializableCatalog catalog = TestingInMemCatalogFactory.getCatalog("TestCatalog"); + Pipeline pipeline = Pipeline.create(); + + StatementOnlySqlTransform transform = SqlTransform.ofStatements(); + transform + .withCatalog("MyCatalog", catalog) + .withFunction("udfViaClass", FlinkSqlTestUtils.ToUpperCaseAndReplaceString.class) + .withFunction("udfViaInstance", new FlinkSqlTestUtils.ToUpperCaseAndReplaceString()) + .addStatement("INSERT INTO MyCatalog.TestDatabase.OrdersVerifyWithModifiedBuyerNames " + + "SELECT orderNumber, product, amount, price, udfViaClass(buyer), orderTime FROM MyCatalog.TestDatabase.Orders") + .addStatement("INSERT INTO MyCatalog.TestDatabase.OrdersVerifyWithModifiedBuyerNames " + + "SELECT orderNumber, product, amount, price, udfViaInstance(buyer), orderTime FROM MyCatalog.TestDatabase.Orders"); + + pipeline.apply(transform); + pipeline.run(getPipelineOptions()); + } + // ---------------- private void testBasics(boolean isStreaming) { SerializableCatalog catalog = TestingInMemCatalogFactory.getCatalog("TestCatalog"); diff --git a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java index 3d28b67677a0b..66b519e61104f 100644 --- a/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java +++ b/runners/flink/1.15/src/test/java/org/apache/beam/runners/flink/transform/sql/TestingInMemCatalogFactory.java @@ -41,7 +41,11 @@ public static TestingInMemCatalog getCatalog(String name) { true); catalog.createTable( new ObjectPath("TestDatabase", "OrdersVerify"), - getOrdersVerifyCatalogTable(), + getOrdersVerifyCatalogTable("Orders"), + true); + catalog.createTable( + new ObjectPath("TestDatabase", "OrdersVerifyWithModifiedBuyerNames"), + getOrdersVerifyCatalogTable("OrdersWithConvertedBuyerNames"), true); } catch (TableAlreadyExistException | DatabaseNotExistException e) { throw new RuntimeException(e); diff --git a/runners/flink/1.15/src/test/resources/tables/OrdersWithConvertedBuyerNames b/runners/flink/1.15/src/test/resources/tables/OrdersWithConvertedBuyerNames new file mode 100644 index 0000000000000..00a105bd8a63e --- /dev/null +++ b/runners/flink/1.15/src/test/resources/tables/OrdersWithConvertedBuyerNames @@ -0,0 +1,11 @@ +#orderNumber,product,amount,price,buyer,orderTime +1,Apple,1,10.0,ALIC3,2023-03-24 16:23:00 +2,Orange,2,100.0,B0B,2022-04-24 16:12:00 +3,Mango,3,1000.0,CHARLI3,2021-04-26 15:00:00 +4,Pear,1,12.0,D0NNA,2023-04-26 23:00:00 +5,Strawberry,6,13.0,3LL3N,2023-04-24 15:00:00 +6,Orange,1,120.0,ALIC3,2022-04-25 07:00:00 +7,Blueberry,7,900.0,CHARLI3,2023-04-23 22:00:00 +8,Mango,3,1000.0,D0NNA,2023-04-22 11:00:00 +9,Mango,5,950.0,D0NNA,2020-02-14 17:00:00 +10,Orange,6,90.0,B0B,2023-01-24 18:00:00 \ No newline at end of file