6
6
* to you under the Apache License, Version 2.0 (the
7
7
* "License"); you may not use this file except in compliance
8
8
* with the License. You may obtain a copy of the License at
9
- *
9
+ * <p>
10
10
* http://www.apache.org/licenses/LICENSE-2.0
11
- *
11
+ * <p>
12
12
* Unless required by applicable law or agreed to in writing, software
13
13
* distributed under the License is distributed on an "AS IS" BASIS,
14
14
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -67,7 +67,7 @@ public class PerJobClusterClientBuilder {
67
67
68
68
public void init (String yarnConfDir , Configuration flinkConfig , Properties userConf ) throws Exception {
69
69
70
- if (Strings .isNullOrEmpty (yarnConfDir )) {
70
+ if (Strings .isNullOrEmpty (yarnConfDir )) {
71
71
throw new RuntimeException ("parameters of yarn is required" );
72
72
}
73
73
userConf .forEach ((key , val ) -> flinkConfig .setString (key .toString (), val .toString ()));
@@ -79,8 +79,7 @@ public void init(String yarnConfDir, Configuration flinkConfig, Properties userC
79
79
yarnClient .init (yarnConf );
80
80
yarnClient .start ();
81
81
82
- System .out .println ("----init yarn success ----" );
83
- // LOG.info("----init yarn success ----");
82
+ LOG .info ("----init yarn success ----" );
84
83
}
85
84
86
85
public AbstractYarnClusterDescriptor createPerJobClusterDescriptor (String flinkJarPath , Options launcherOptions , JobGraph jobGraph )
@@ -131,8 +130,8 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJ
131
130
132
131
private static void fillJobGraphClassPath (JobGraph jobGraph ) throws MalformedURLException {
133
132
Map <String , DistributedCache .DistributedCacheEntry > jobCacheFileConfig = jobGraph .getUserArtifacts ();
134
- for (Map .Entry <String , DistributedCache .DistributedCacheEntry > tmp : jobCacheFileConfig .entrySet ()){
135
- if (tmp .getKey ().startsWith ("class_path" )){
133
+ for (Map .Entry <String , DistributedCache .DistributedCacheEntry > tmp : jobCacheFileConfig .entrySet ()) {
134
+ if (tmp .getKey ().startsWith ("class_path" )) {
136
135
jobGraph .getClasspaths ().add (new URL ("file:" + tmp .getValue ().filePath ));
137
136
}
138
137
}
@@ -141,8 +140,8 @@ private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURL
141
140
private List <File > getPluginPathToShipFiles (JobGraph jobGraph ) {
142
141
List <File > shipFiles = new ArrayList <>();
143
142
Map <String , DistributedCache .DistributedCacheEntry > jobCacheFileConfig = jobGraph .getUserArtifacts ();
144
- for (Map .Entry <String , DistributedCache .DistributedCacheEntry > tmp : jobCacheFileConfig .entrySet ()){
145
- if (tmp .getKey ().startsWith ("class_path" )){
143
+ for (Map .Entry <String , DistributedCache .DistributedCacheEntry > tmp : jobCacheFileConfig .entrySet ()) {
144
+ if (tmp .getKey ().startsWith ("class_path" )) {
146
145
shipFiles .add (new File (tmp .getValue ().filePath ));
147
146
}
148
147
}
0 commit comments