Skip to content

Commit 1eeea8a

Browse files
committed
add isHighAvailability 判断
1 parent 5962508 commit 1eeea8a

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ public class ClusterClientFactory {
6060

6161
private static final String HA_CLUSTER_ID = "high-availability.cluster-id";
6262

63+
private static final String HIGH_AVAILABILITY = "high-availability";
64+
65+
private static final String NODE = "NONE";
66+
67+
private static final String ZOOKEEPER = "zookeeper";
68+
6369
private static final String HADOOP_CONF = "fs.hdfs.hadoopconf";
6470

6571
public static ClusterClient createClusterClient(Options launcherOptions) throws Exception {
@@ -96,6 +102,8 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
96102

97103
if (StringUtils.isNotBlank(yarnConfDir)) {
98104
try {
105+
boolean isHighAvailability;
106+
99107
config.setString(HADOOP_CONF, yarnConfDir);
100108
FileSystem.initialize(config);
101109

@@ -123,7 +131,9 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
123131
throw new RuntimeException("No flink session found on yarn cluster.");
124132
}
125133

126-
if (config.getString(HA_CLUSTER_ID, null) == null) {
134+
isHighAvailability = config.getString(HIGH_AVAILABILITY, NODE).equals(ZOOKEEPER);
135+
136+
if (isHighAvailability && config.getString(HA_CLUSTER_ID, null) == null) {
127137
config.setString(HA_CLUSTER_ID, applicationId.toString());
128138
}
129139

0 commit comments

Comments
 (0)