Skip to content

Commit

Permalink
[BugFix] [UDF] Fix the issue where UDF cannot find classes in Flink S…
Browse files Browse the repository at this point in the history
…QL tasks (DataLinkDC#3523)
  • Loading branch information
zackyoungh authored and sunlichao11 committed May 23, 2024
1 parent 654f684 commit 6d3bfcb
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ public void run() throws Exception {
List<File> jarFiles =
new ArrayList<>(jobManager.getUdfPathContextHolder().getAllFileSet());

String[] userCustomUdfJarPath = UDFUtil.initJavaUDF(udfList, taskId);
String[] jarPaths = CollUtil.removeNull(jarFiles).stream()
.map(File::getAbsolutePath)
.toArray(String[]::new);

if (GATEWAY_TYPE_MAP.get(SESSION).contains(runMode)) {
config.setJarFiles(jarPaths);
}
Expand All @@ -82,6 +82,7 @@ public void run() throws Exception {
String[] pyPaths = UDFUtil.initPythonUDF(
udfList, runMode, config.getTaskId(), executor.getTableConfig().getConfiguration());

executor.initUDF(userCustomUdfJarPath);
executor.initUDF(jarPaths);

if (ArrayUtil.isNotEmpty(pyPaths)) {
Expand All @@ -91,6 +92,13 @@ public void run() throws Exception {
}
}
}
if (ArrayUtil.isNotEmpty(userCustomUdfJarPath)) {
for (String jarPath : userCustomUdfJarPath) {
if (StrUtil.isNotBlank(jarPath)) {
jobManager.getUdfPathContextHolder().addUdfPath(new File(jarPath));
}
}
}

Set<File> pyUdfFile = jobManager.getUdfPathContextHolder().getPyUdfFile();
executor.initPyUDF(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@

import org.apache.flink.configuration.ReadableConfig;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import cn.hutool.core.lang.Singleton;
import cn.hutool.core.util.StrUtil;

/** @since 0.6.8 */
public interface FunctionCompiler {
Set<String> COMPILER_CACHE = new HashSet<>();

/**
* 函数代码在线动态编译
Expand All @@ -54,6 +57,11 @@ public interface FunctionCompiler {
static boolean getCompiler(UDF udf, ReadableConfig conf, Integer missionId) {
Asserts.checkNull(udf, "udf为空");
Asserts.checkNull(udf.getCode(), "udf 代码为空");

String key = udf.getClassName() + udf.getFunctionLanguage();
if (COMPILER_CACHE.contains(key)) {
return true;
}
boolean success;
switch (udf.getFunctionLanguage()) {
case JAVA:
Expand All @@ -69,6 +77,9 @@ static boolean getCompiler(UDF udf, ReadableConfig conf, Integer missionId) {
throw UDFCompilerException.notSupportedException(
udf.getFunctionLanguage().name());
}
if (success) {
COMPILER_CACHE.add(key);
}
return success;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@

import org.apache.flink.configuration.ReadableConfig;

import java.util.HashSet;
import java.util.Set;

import lombok.extern.slf4j.Slf4j;

/**
* java 编译
*/
@Slf4j
public class JavaCompiler implements FunctionCompiler {
private static final Set<String> COMPILER_CACHE = new HashSet<>();

/**
* 函数代码在线动态编译
Expand All @@ -46,22 +42,18 @@ public class JavaCompiler implements FunctionCompiler {
*/
@Override
public synchronized boolean compiler(UDF udf, ReadableConfig conf, Integer missionId) {
String key = udf.getClassName() + udf.getFunctionLanguage();
if (COMPILER_CACHE.contains(key)) {
return true;
}

// TODO 改为ProcessStep注释
log.info("Compiling java code, class: {}", udf.getClassName());
CustomStringJavaCompiler compiler = new CustomStringJavaCompiler(udf.getCode());
boolean res = compiler.compilerToTmpPath(PathConstant.getUdfCompilerJavaPath(missionId));
String className = compiler.getFullClassName();
if (res) {
log.info("class编译成功:{}", className);
log.info("class compiled successfully:{}", className);
log.info("compiler take time:{}", compiler.getCompilerTakeTime());
COMPILER_CACHE.add(key);
return true;
} else {
log.error("class编译失败:{}", className);
log.error("class compilation failed:{}", className);
log.error(compiler.getCompilerMessage());
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public static String templateParse(String dialect, String template, String class
}
}

public static String[] initJavaUDF(List<UDF> udf, GatewayType gatewayType, Integer missionId) {
public static String[] initJavaUDF(List<UDF> udf, Integer missionId) {
return FunctionFactory.initUDF(
CollUtil.newArrayList(
CollUtil.filterNew(udf, x -> x.getFunctionLanguage() != FunctionLanguage.PYTHON)),
Expand Down

0 comments on commit 6d3bfcb

Please sign in to comment.