Skip to content

Commit

Permalink
TEZ-4267: Remove Superfluous Code from DAGAppMaster (apache#90)
Browse files Browse the repository at this point in the history
* TEZ-4267: Remove Superfluous Code from DAGAppMaster

* Remove superfluous variable

* Remove superfluous comment

Co-authored-by: David Mollitor <david.mollitor@cloudera.com>
  • Loading branch information
belugabehr and David Mollitor committed Jun 30, 2021
1 parent 4684d06 commit 47a59ab
Showing 1 changed file with 17 additions and 35 deletions.
52 changes: 17 additions & 35 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -224,7 +223,6 @@ public class DAGAppMaster extends AbstractService {
* Priority of the DAGAppMaster shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Joiner PATH_JOINER = Joiner.on('/');

@VisibleForTesting
static final String INVALID_SESSION_ERR_MSG = "Initial application attempt in session mode failed. "
Expand Down Expand Up @@ -311,7 +309,6 @@ public class DAGAppMaster extends AbstractService {
/**
* set of already executed dag names.
*/
Set<String> dagNames = new HashSet<String>();
Set<String> dagIDs = new HashSet<String>();

protected boolean isLastAMRetry = false;
Expand Down Expand Up @@ -371,19 +368,17 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
this.containerID.toString(), this.appMasterUgi.getShortUserName());

LOG.info("Created DAGAppMaster for application " + applicationAttemptId
+ ", versionInfo=" + dagVersionInfo.toString());
+ ", versionInfo=" + dagVersionInfo);
TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am");
}

// Pull this WebAppUtils function into Tez until YARN-4186
public static String getRunningLogURL(String nodeHttpAddress,
private static String getRunningLogURL(String nodeHttpAddress,
String containerId, String user) {
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|| containerId == null || containerId.isEmpty() || user == null
|| user.isEmpty()) {
if (containerId.isEmpty() || user == null | user.isEmpty()) {
return null;
}
return PATH_JOINER.join(nodeHttpAddress, "node", "containerlogs",
return String.format("%s/node/containerlogs/%s/%s", nodeHttpAddress,
containerId, user);
}

Expand Down Expand Up @@ -695,8 +690,7 @@ private void handleInternalError(String errDiagnosticsPrefix, String errDiagDagE
state = DAGAppMasterState.ERROR;
if (currentDAG != null) {
_updateLoggers(currentDAG, "_post");
String errDiagnostics = errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID();
LOG.info(errDiagnostics);
LOG.info(errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID());
// Inform the current DAG about the error
sendEvent(new DAGEventInternalError(currentDAG.getID(), errDiagDagEvent));
} else {
Expand Down Expand Up @@ -758,22 +752,20 @@ protected synchronized void handle(DAGAppMasterEvent event) {
DAGAppMasterEventDAGFinished finishEvt =
(DAGAppMasterEventDAGFinished) event;
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
System.err.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId().toString());
System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId().toString());
System.err.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId());
System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId());
// Stop vertex services if any
stopVertexServices(currentDAG);
if (!isSession) {
LOG.info("Not a session, AM will unregister as DAG has completed");
this.taskSchedulerManager.setShouldUnregisterFlag();
_updateLoggers(currentDAG, "_post");
setStateOnDAGCompletion();
LOG.info("Shutting down on completion of dag:" +
finishEvt.getDAGId().toString());
LOG.info("Shutting down on completion of dag:" + finishEvt.getDAGId());
shutdownHandler.shutdown();
} else {
LOG.info("DAG completed, dagId="
+ finishEvt.getDAGId().toString()
+ ", dagState=" + finishEvt.getDAGState());
LOG.info("DAG completed, dagId=" + finishEvt.getDAGId() + ", dagState="
+ finishEvt.getDAGState());
lastDAGCompletionTime = clock.getTime();
_updateLoggers(currentDAG, "_post");
if (this.historyEventHandler.hasRecoveryFailed()) {
Expand Down Expand Up @@ -1028,17 +1020,16 @@ DAGImpl createDAG(DAGPlan dagPB, TezDAGID dagId) {

try {
if (LOG.isDebugEnabled()) {
LOG.debug("JSON dump for submitted DAG, dagId=" + dagId.toString()
+ ", json="
+ DAGUtils.generateSimpleJSONPlan(dagPB).toString());
LOG.debug("JSON dump for submitted DAG, dagId=" + dagId + ", json="
+ DAGUtils.generateSimpleJSONPlan(dagPB));
}
} catch (JSONException e) {
LOG.warn("Failed to generate json for DAG", e);
}

writeDebugArtifacts(dagPB, newDag);
return newDag;
} // end createDag()
}

private void writeDebugArtifacts(DAGPlan dagPB, DAGImpl newDag) {
boolean debugArtifacts =
Expand All @@ -1052,7 +1043,7 @@ private void writeDebugArtifacts(DAGPlan dagPB, DAGImpl newDag) {

private void writePBTextFile(DAG dag) {
String logFile = logDirs[new Random().nextInt(logDirs.length)] + File.separatorChar
+ dag.getID().toString() + "-" + TezConstants.TEZ_PB_PLAN_TEXT_NAME;
+ dag.getID() + "-" + TezConstants.TEZ_PB_PLAN_TEXT_NAME;

LOG.info("Writing DAG plan to: " + logFile);
File outFile = new File(logFile);
Expand All @@ -1061,7 +1052,7 @@ private void writePBTextFile(DAG dag) {
printWriter.println(TezUtilsInternal.convertDagPlanToString(dag.getJobPlan()));
printWriter.close();
} catch (IOException e) {
LOG.warn("Failed to write TEZ_PLAN to " + outFile.toString(), e);
LOG.warn("Failed to write TEZ_PLAN to " + outFile, e);
}
}

Expand Down Expand Up @@ -2256,15 +2247,6 @@ public void handle(VertexEvent event) {
}
}

private static void validateInputParam(String value, String param)
throws IOException {
if (value == null) {
String msg = param + " is null";
LOG.error(msg);
throw new IOException(msg);
}
}

private long checkAndHandleDAGClientTimeout() throws TezException {
if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.RECOVERING).contains(this.state)
|| sessionStopped.get()) {
Expand Down Expand Up @@ -2333,8 +2315,8 @@ public static void main(String[] args) {
clientVersion = VersionInfo.UNKNOWN;
}

validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
Objects.requireNonNull(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV + " is null");

ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
Expand Down

0 comments on commit 47a59ab

Please sign in to comment.