Skip to content

Commit

Permalink
[FLINK-5488] Close YarnClient on error in AbstractYarnClusterDescriptor
Browse files Browse the repository at this point in the history
This closes apache#4022.
  • Loading branch information
FangYongs authored and zentol committed Jun 29, 2017
1 parent 2f5985c commit e2584cb
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ protected YarnClient getYarnClient() {
@Override
public YarnClusterClient retrieve(String applicationID) {

YarnClient yarnClient = null;
try {
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null &&
Expand All @@ -402,7 +403,7 @@ public YarnClusterClient retrieve(String applicationID) {
}

final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
final YarnClient yarnClient = getYarnClient();
yarnClient = getYarnClient();
final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);

if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
Expand All @@ -420,6 +421,9 @@ public YarnClusterClient retrieve(String applicationID) {

return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
} catch (Exception e) {
if (null != yarnClient) {
yarnClient.stop();
}
throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
}
}
Expand Down Expand Up @@ -539,7 +543,14 @@ protected YarnClusterClient deployInternal() throws Exception {
"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
ClusterResourceDescription freeClusterMem;
try {
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
}

if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
Expand Down

0 comments on commit e2584cb

Please sign in to comment.