Skip to content

Commit

Permalink
Add withFunction() method to StatementOnlySqlTransform
Browse files Browse the repository at this point in the history
  • Loading branch information
Becket Qin committed Apr 24, 2024
1 parent a8a431d commit 1e301e0
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public void translateNode(PTransform<PBegin, PDone> transform, FlinkStreamingTra

StreamTableEnvironment tEnv = StreamTableEnvironment.create(context.getExecutionEnvironment());
sqlTransform.getCatalogs().forEach(tEnv::registerCatalog);
sqlTransform.getFunctionClasses().forEach(tEnv::createTemporarySystemFunction);
sqlTransform.getFunctionInstances().forEach(tEnv::createTemporarySystemFunction);

StringJoiner combinedStatements = new StringJoiner("\n\n");
StreamStatementSet ss = tEnv.createStatementSet();
for (String statement : sqlTransform.getStatements()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PDone;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.util.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand All @@ -41,10 +42,14 @@ public class StatementOnlySqlTransform extends PTransform<PBegin, PDone> {

private final List<String> statements;
private final Map<String, SerializableCatalog> catalogs;
private final Map<String, UserDefinedFunction> functionInstances;
private final Map<String, Class<? extends UserDefinedFunction>> functionClasses;

StatementOnlySqlTransform() {
this.statements = new ArrayList<>();
this.catalogs = new HashMap<>();
this.functionInstances = new HashMap<>();
this.functionClasses = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -87,6 +92,36 @@ public StatementOnlySqlTransform withCatalog(String name, SerializableCatalog ca
return this;
}

/**
* Register a temporary user defined function for this SQL transform. The function will be
* registered as a <i>System Function</i> which means it will temporarily override other functions
* with the same name, if such function exists.
*
* @param name the name of the function.
* @param functionClass the class of the user defined function.
* @return this {@link StatementOnlySqlTransform} itself.
*/
public StatementOnlySqlTransform withFunction(
String name, Class<? extends UserDefinedFunction> functionClass) {
functionClasses.put(name, functionClass);
return this;
}

/**
* Register a temporary user defined function for this SQL transform. The function will be
* registered as a <i>System Function</i> which means it will temporarily override other functions
* with the same name, if such function exists.
*
* @param name the name of the function.
* @param functionInstance the user defined function instance.
* @return this {@link StatementOnlySqlTransform} itself.
*/
public StatementOnlySqlTransform withFunction(
String name, UserDefinedFunction functionInstance) {
functionInstances.put(name, functionInstance);
return this;
}

// --------------------- package private getters -----------------
List<String> getStatements() {
return Collections.unmodifiableList(statements);
Expand All @@ -96,6 +131,14 @@ Map<String, SerializableCatalog> getCatalogs() {
return Collections.unmodifiableMap(catalogs);
}

Map<String, UserDefinedFunction> getFunctionInstances() {
return functionInstances;
}

Map<String, Class<? extends UserDefinedFunction>> getFunctionClasses() {
return functionClasses;
}

// --------------------- private helpers ------------------------
private static String cleanUp(String s) {
return s.trim().endsWith(";") ? s : s + ";";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ public static CatalogTable getOrdersCatalogTable() {
return new ResolvedCatalogTable(origin, resolvedSchema);
}

public static CatalogTable getOrdersVerifyCatalogTable() {
public static CatalogTable getOrdersVerifyCatalogTable(String verificationFile) {
// Create schema
ResolvedSchema resolvedSchema = getOrdersSchema();

Map<String, String> connectorOptions = new HashMap<>();
connectorOptions.put(FactoryUtil.CONNECTOR.key(), VerifyingTableSinkFactory.IDENTIFIER);
connectorOptions.put(VerifyingTableSinkFactory.EXPECTED_RESULT_FILE_PATH_OPTION.key(), getFilePath("Orders"));
connectorOptions.put(VerifyingTableSinkFactory.EXPECTED_RESULT_FILE_PATH_OPTION.key(), getFilePath(verificationFile));
connectorOptions.put(VerifyingTableSinkFactory.HAS_HEADER_OPTION.key(), "true");

final CatalogTable origin =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,25 @@ public void testInsertOverwrite() {
pipeline.run(getPipelineOptions());
}

@Test
public void testWithFunction() {
SerializableCatalog catalog = TestingInMemCatalogFactory.getCatalog("TestCatalog");
Pipeline pipeline = Pipeline.create();

StatementOnlySqlTransform transform = SqlTransform.ofStatements();
transform
.withCatalog("MyCatalog", catalog)
.withFunction("udfViaClass", FlinkSqlTestUtils.ToUpperCaseAndReplaceString.class)
.withFunction("udfViaInstance", new FlinkSqlTestUtils.ToUpperCaseAndReplaceString())
.addStatement("INSERT INTO MyCatalog.TestDatabase.OrdersVerifyWithModifiedBuyerNames "
+ "SELECT orderNumber, product, amount, price, udfViaClass(buyer), orderTime FROM MyCatalog.TestDatabase.Orders")
.addStatement("INSERT INTO MyCatalog.TestDatabase.OrdersVerifyWithModifiedBuyerNames "
+ "SELECT orderNumber, product, amount, price, udfViaInstance(buyer), orderTime FROM MyCatalog.TestDatabase.Orders");

pipeline.apply(transform);
pipeline.run(getPipelineOptions());
}

// ----------------
private void testBasics(boolean isStreaming) {
SerializableCatalog catalog = TestingInMemCatalogFactory.getCatalog("TestCatalog");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public static TestingInMemCatalog getCatalog(String name) {
true);
catalog.createTable(
new ObjectPath("TestDatabase", "OrdersVerify"),
getOrdersVerifyCatalogTable(),
getOrdersVerifyCatalogTable("Orders"),
true);
catalog.createTable(
new ObjectPath("TestDatabase", "OrdersVerifyWithModifiedBuyerNames"),
getOrdersVerifyCatalogTable("OrdersWithConvertedBuyerNames"),
true);
} catch (TableAlreadyExistException | DatabaseNotExistException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#orderNumber,product,amount,price,buyer,orderTime
1,Apple,1,10.0,ALIC3,2023-03-24 16:23:00
2,Orange,2,100.0,B0B,2022-04-24 16:12:00
3,Mango,3,1000.0,CHARLI3,2021-04-26 15:00:00
4,Pear,1,12.0,D0NNA,2023-04-26 23:00:00
5,Strawberry,6,13.0,3LL3N,2023-04-24 15:00:00
6,Orange,1,120.0,ALIC3,2022-04-25 07:00:00
7,Blueberry,7,900.0,CHARLI3,2023-04-23 22:00:00
8,Mango,3,1000.0,D0NNA,2023-04-22 11:00:00
9,Mango,5,950.0,D0NNA,2020-02-14 17:00:00
10,Orange,6,90.0,B0B,2023-01-24 18:00:00

0 comments on commit 1e301e0

Please sign in to comment.