Skip to content

Commit a91871d

Browse files
committed
Merge branch 'feat_1.8_jobSubmit' into 'v1.8.0_dev'
jobSubmit-standalone和yarn-session模式异常修复 See merge request dt-insight-engine/flinkStreamSQL!17
2 parents 6ddfba9 + d7baa87 commit a91871d

File tree

5 files changed

+87
-52
lines changed

5 files changed

+87
-52
lines changed

core/src/main/java/com/dtstack/flink/sql/option/OptionParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.lang.StringUtils;
2727
import java.lang.reflect.InvocationTargetException;
2828
import java.lang.reflect.Field;
29+
import java.nio.charset.StandardCharsets;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.io.File;
@@ -102,8 +103,8 @@ public List<String> getProgramExeArgList() throws Exception {
102103
continue;
103104
} else if (OPTION_SQL.equalsIgnoreCase(key)) {
104105
File file = new File(value.toString());
105-
String content = FileUtils.readFile(file, "UTF-8");
106-
value = URLEncoder.encode(content, Charsets.UTF_8.name());
106+
String content = FileUtils.readFile(file, StandardCharsets.UTF_8.name());
107+
value = URLEncoder.encode(content, StandardCharsets.UTF_8.name());
107108
}
108109
args.add("-" + key);
109110
args.add(value.toString());

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
* to you under the Apache License, Version 2.0 (the
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
9-
*
9+
* <p>
1010
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
11+
* <p>
1212
* Unless required by applicable law or agreed to in writing, software
1313
* distributed under the License is distributed on an "AS IS" BASIS,
1414
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,18 +21,15 @@
2121
import com.dtstack.flink.sql.enums.ClusterMode;
2222
import com.dtstack.flink.sql.option.Options;
2323
import com.dtstack.flink.sql.util.PluginUtil;
24-
import com.esotericsoftware.minlog.Log;
2524
import org.apache.commons.io.Charsets;
2625
import org.apache.commons.lang.StringUtils;
2726
import org.apache.flink.client.program.ClusterClient;
28-
import org.apache.flink.client.program.MiniClusterClient;
27+
import org.apache.flink.client.program.rest.RestClusterClient;
2928
import org.apache.flink.configuration.Configuration;
3029
import org.apache.flink.configuration.GlobalConfiguration;
3130
import org.apache.flink.configuration.JobManagerOptions;
3231
import org.apache.flink.core.fs.FileSystem;
3332
import org.apache.flink.runtime.akka.AkkaUtils;
34-
import org.apache.flink.runtime.minicluster.MiniCluster;
35-
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
3633
import org.apache.flink.runtime.util.LeaderConnectionInfo;
3734
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
3835
import org.apache.flink.yarn.YarnClusterDescriptor;
@@ -42,21 +39,35 @@
4239
import org.apache.hadoop.yarn.client.api.YarnClient;
4340
import org.apache.hadoop.yarn.conf.YarnConfiguration;
4441
import org.apache.hadoop.yarn.util.StringHelper;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
4544

4645
import java.net.InetSocketAddress;
4746
import java.net.URLDecoder;
4847
import java.util.EnumSet;
4948
import java.util.HashSet;
50-
import java.util.Iterator;
49+
import java.util.Set;
5150
import java.util.List;
5251
import java.util.Properties;
53-
import java.util.Set;
52+
import java.util.Iterator;
5453

5554
/**
5655
* @author sishu.yss
5756
*/
5857
public class ClusterClientFactory {
5958

59+
private static final Logger LOG = LoggerFactory.getLogger(ClusterClientFactory.class);
60+
61+
private static final String HA_CLUSTER_ID = "high-availability.cluster-id";
62+
63+
private static final String HIGH_AVAILABILITY = "high-availability";
64+
65+
private static final String NODE = "NONE";
66+
67+
private static final String ZOOKEEPER = "zookeeper";
68+
69+
private static final String HADOOP_CONF = "fs.hdfs.hadoopconf";
70+
6071
public static ClusterClient createClusterClient(Options launcherOptions) throws Exception {
6172
String mode = launcherOptions.getMode();
6273
if (mode.equals(ClusterMode.standalone.name())) {
@@ -70,10 +81,12 @@ public static ClusterClient createClusterClient(Options launcherOptions) throws
7081
public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception {
7182
String flinkConfDir = launcherOptions.getFlinkconf();
7283
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
73-
MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
74-
configBuilder.setConfiguration(config);
75-
MiniCluster miniCluster = new MiniCluster(configBuilder.build());
76-
MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
84+
85+
LOG.info("------------config params-------------------------");
86+
config.toMap().forEach((key, value) -> LOG.info("{}: {}", key, value));
87+
LOG.info("-------------------------------------------");
88+
89+
RestClusterClient clusterClient = new RestClusterClient<>(config, "clusterClient");
7790
LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();
7891
InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
7992
config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName());
@@ -89,18 +102,21 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
89102

90103
if (StringUtils.isNotBlank(yarnConfDir)) {
91104
try {
92-
config.setString("fs.hdfs.hadoopconf", yarnConfDir);
105+
boolean isHighAvailability;
106+
107+
config.setString(HADOOP_CONF, yarnConfDir);
93108
FileSystem.initialize(config);
94109

95110
YarnConfiguration yarnConf = YarnConfLoader.getYarnConf(yarnConfDir);
96111
YarnClient yarnClient = YarnClient.createYarnClient();
97112
yarnClient.init(yarnConf);
98113
yarnClient.start();
99-
ApplicationId applicationId = null;
114+
ApplicationId applicationId;
100115

101116
String yarnSessionConf = launcherOptions.getYarnSessionConf();
102117
yarnSessionConf = URLDecoder.decode(yarnSessionConf, Charsets.UTF_8.toString());
103118
Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);
119+
104120
Object yid = yarnSessionConfProperties.get("yid");
105121

106122
if (null != yid) {
@@ -109,20 +125,30 @@ public static ClusterClient createYarnSessionClient(Options launcherOptions) {
109125
applicationId = getYarnClusterApplicationId(yarnClient);
110126
}
111127

112-
Log.info("applicationId={}", applicationId.toString());
128+
LOG.info("current applicationId = {}", applicationId.toString());
113129

114130
if (StringUtils.isEmpty(applicationId.toString())) {
115131
throw new RuntimeException("No flink session found on yarn cluster.");
116132
}
117133

134+
isHighAvailability = config.getString(HIGH_AVAILABILITY, NODE).equals(ZOOKEEPER);
135+
136+
if (isHighAvailability && config.getString(HA_CLUSTER_ID, null) == null) {
137+
config.setString(HA_CLUSTER_ID, applicationId.toString());
138+
}
139+
140+
LOG.info("------------config params-------------------------");
141+
config.toMap().forEach((key, value) -> LOG.info("{}: {}", key, value));
142+
LOG.info("-------------------------------------------");
143+
118144
AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, flinkConfDir, yarnClient, false);
119145
ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
120146
clusterClient.setDetached(true);
121147
return clusterClient;
122148
} catch (Exception e) {
123149
throw new RuntimeException(e);
124150
}
125-
}else{
151+
} else {
126152
throw new RuntimeException("yarn mode must set param of 'yarnconf'!!!");
127153
}
128154
}
@@ -158,7 +184,7 @@ private static ApplicationId getYarnClusterApplicationId(YarnClient yarnClient)
158184

159185
}
160186

161-
if (null == applicationId) {
187+
if (applicationId == null || StringUtils.isEmpty(applicationId.toString())) {
162188
throw new RuntimeException("No flink session found on yarn cluster.");
163189
}
164190
return applicationId;

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.launcher;
2221

@@ -41,11 +40,12 @@
4140
import org.apache.flink.runtime.jobgraph.JobGraph;
4241
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
4342
import org.apache.flink.util.FileUtils;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
4445

4546
import java.io.File;
4647
import java.io.IOException;
4748
import java.net.URLDecoder;
48-
import java.nio.charset.StandardCharsets;
4949
import java.util.LinkedList;
5050
import java.util.List;
5151
import java.util.Map;
@@ -54,12 +54,15 @@
5454
/**
5555
* Date: 2017/2/20
5656
* Company: www.dtstack.com
57+
*
5758
* @author xuchao
5859
*/
5960

6061
public class LauncherMain {
6162
private static final String CORE_JAR = "core";
6263

64+
private static final Logger LOG = LoggerFactory.getLogger(LauncherMain.class);
65+
6366
private static String SP = File.separator;
6467

6568
private static String getLocalCoreJarPath(String localSqlRootJar) throws Exception {
@@ -68,7 +71,7 @@ private static String getLocalCoreJarPath(String localSqlRootJar) throws Excepti
6871
}
6972

7073
public static void main(String[] args) throws Exception {
71-
if (args.length == 1 && args[0].endsWith(".json")){
74+
if (args.length == 1 && args[0].endsWith(".json")) {
7275
args = parseJson(args);
7376
}
7477
OptionParser optionParser = new OptionParser(args);
@@ -80,7 +83,9 @@ public static void main(String[] args) throws Exception {
8083
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
8184
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
8285

83-
if(mode.equals(ClusterMode.local.name())) {
86+
LOG.info("current job mode is {}", mode);
87+
88+
if (mode.equals(ClusterMode.local.name())) {
8489
String[] localArgs = argList.toArray(new String[0]);
8590
Main.main(localArgs);
8691
return;
@@ -92,16 +97,19 @@ public static void main(String[] args) throws Exception {
9297
PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs);
9398

9499
String savePointPath = confProperties.getProperty(ConfigConstrant.SAVE_POINT_PATH_KEY);
95-
if(StringUtils.isNotBlank(savePointPath)){
100+
if (StringUtils.isNotBlank(savePointPath)) {
96101
String allowNonRestoredState = confProperties.getOrDefault(ConfigConstrant.ALLOW_NON_RESTORED_STATE_KEY, "false").toString();
97102
program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savePointPath, BooleanUtils.toBoolean(allowNonRestoredState)));
98103
}
99104

100-
if(mode.equals(ClusterMode.yarnPer.name())){
105+
if (mode.equals(ClusterMode.yarnPer.name())) {
101106
String flinkConfDir = launcherOptions.getFlinkconf();
102107
Configuration config = StringUtils.isEmpty(flinkConfDir) ? new Configuration() : GlobalConfiguration.loadConfiguration(flinkConfDir);
103108
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, config, 1);
104-
PerJobSubmitter.submit(launcherOptions, jobGraph, config);
109+
110+
LOG.info("current jobID is {}", jobGraph.getJobID());
111+
112+
LOG.info("submit applicationId is {}", PerJobSubmitter.submit(launcherOptions, jobGraph, config));
105113
} else {
106114
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
107115
clusterClient.run(program, 1);

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
* to you under the Apache License, Version 2.0 (the
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
9-
*
9+
* <p>
1010
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
11+
* <p>
1212
* Unless required by applicable law or agreed to in writing, software
1313
* distributed under the License is distributed on an "AS IS" BASIS,
1414
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,7 +21,6 @@
2121
import com.dtstack.flink.sql.enums.EPluginLoadMode;
2222
import com.dtstack.flink.sql.launcher.YarnConfLoader;
2323
import com.dtstack.flink.sql.option.Options;
24-
import com.esotericsoftware.minlog.Log;
2524
import org.apache.commons.lang3.StringUtils;
2625
import org.apache.flink.api.common.cache.DistributedCache;
2726
import org.apache.flink.configuration.Configuration;
@@ -66,7 +65,7 @@ public class PerJobClusterClientBuilder {
6665

6766
public void init(String yarnConfDir, Configuration flinkConfig, Properties userConf) throws Exception {
6867

69-
if(Strings.isNullOrEmpty(yarnConfDir)) {
68+
if (Strings.isNullOrEmpty(yarnConfDir)) {
7069
throw new RuntimeException("parameters of yarn is required");
7170
}
7271
userConf.forEach((key, val) -> flinkConfig.setString(key.toString(), val.toString()));
@@ -78,7 +77,7 @@ public void init(String yarnConfDir, Configuration flinkConfig, Properties userC
7877
yarnClient.init(yarnConf);
7978
yarnClient.start();
8079

81-
Log.info("----init yarn success ----");
80+
LOG.info("----init yarn success ----");
8281
}
8382

8483
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJarPath, Options launcherOptions, JobGraph jobGraph)
@@ -129,8 +128,8 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJ
129128

130129
private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURLException {
131130
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
132-
for(Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()){
133-
if(tmp.getKey().startsWith("class_path")){
131+
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()) {
132+
if (tmp.getKey().startsWith("class_path")) {
134133
jobGraph.getClasspaths().add(new URL("file:" + tmp.getValue().filePath));
135134
}
136135
}
@@ -139,8 +138,8 @@ private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURL
139138
private List<File> getPluginPathToShipFiles(JobGraph jobGraph) {
140139
List<File> shipFiles = new ArrayList<>();
141140
Map<String, DistributedCache.DistributedCacheEntry> jobCacheFileConfig = jobGraph.getUserArtifacts();
142-
for(Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()){
143-
if(tmp.getKey().startsWith("class_path")){
141+
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> tmp : jobCacheFileConfig.entrySet()) {
142+
if (tmp.getKey().startsWith("class_path")) {
144143
shipFiles.add(new File(tmp.getValue().filePath));
145144
}
146145
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobSubmitter.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.yarn.api.records.ApplicationId;
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
35+
3536
import java.net.URLDecoder;
3637
import java.util.Arrays;
3738
import java.util.List;
@@ -41,6 +42,7 @@
4142
* per job mode submitter
4243
* Date: 2018/11/17
4344
* Company: www.dtstack.com
45+
*
4446
* @author xuchao
4547
*/
4648

@@ -49,15 +51,15 @@ public class PerJobSubmitter {
4951
private static final Logger LOG = LoggerFactory.getLogger(PerJobSubmitter.class);
5052

5153
public static String submit(Options launcherOptions, JobGraph jobGraph, Configuration flinkConfig) throws Exception {
52-
if (!StringUtils.isBlank(launcherOptions.getAddjar())) {
53-
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
54-
List<String> paths = getJarPaths(addjarPath);
55-
paths.forEach( path -> {
56-
jobGraph.addJar(new Path("file://" + path));
57-
});
58-
}
59-
60-
String confProp = launcherOptions.getConfProp();
54+
if (!StringUtils.isBlank(launcherOptions.getAddjar())) {
55+
String addjarPath = URLDecoder.decode(launcherOptions.getAddjar(), Charsets.UTF_8.toString());
56+
List<String> paths = getJarPaths(addjarPath);
57+
paths.forEach(path -> {
58+
jobGraph.addJar(new Path("file://" + path));
59+
});
60+
}
61+
62+
String confProp = launcherOptions.getConfProp();
6163
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
6264
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
6365
ClusterSpecification clusterSpecification = FLinkPerJobResourceUtil.createClusterSpecification(confProperties);
@@ -67,7 +69,7 @@ public static String submit(Options launcherOptions, JobGraph jobGraph, Configur
6769

6870
String flinkJarPath = launcherOptions.getFlinkJarPath();
6971
AbstractYarnClusterDescriptor yarnClusterDescriptor = perJobClusterClientBuilder.createPerJobClusterDescriptor(flinkJarPath, launcherOptions, jobGraph);
70-
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph,true);
72+
ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
7173

7274
String applicationId = clusterClient.getClusterId().toString();
7375
String flinkJobId = jobGraph.getJobID().toString();
@@ -78,12 +80,11 @@ public static String submit(Options launcherOptions, JobGraph jobGraph, Configur
7880
return applicationId;
7981
}
8082

81-
private static List<String> getJarPaths(String addjarPath) {
82-
if (addjarPath.length() > 2) {
83-
addjarPath = addjarPath.substring(1,addjarPath.length()-1).replace("\"","");
84-
}
85-
List<String> paths = Arrays.asList(StringUtils.split(addjarPath, ","));
86-
return paths;
87-
}
83+
private static List<String> getJarPaths(String addjarPath) {
84+
if (addjarPath.length() > 2) {
85+
addjarPath = addjarPath.substring(1, addjarPath.length() - 1).replace("\"", "");
86+
}
87+
return Arrays.asList(StringUtils.split(addjarPath, ","));
88+
}
8889

8990
}

0 commit comments

Comments
 (0)