Skip to content

Commit

Permalink
MRfetcher ignores failed tasks (#249)
Browse files Browse the repository at this point in the history
* We have been ignoring Failed Tasks in calculation of resource usage. This handles that.
* Fixes Exception heuristic which was supposed to give the stacktrace.
  • Loading branch information
shankar37 authored and akshayrai committed May 30, 2017
1 parent cae79c7 commit cdf680b
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 96 deletions.
38 changes: 28 additions & 10 deletions app/com/linkedin/drelephant/mapreduce/data/MapReduceTaskData.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
public class MapReduceTaskData {

private MapReduceCounterData _counterHolder;
private String _state;
private String _taskId;
// The successful attempt id
private String _attemptId;
Expand All @@ -32,21 +33,32 @@ public class MapReduceTaskData {
private long _startTimeMs = 0;
private long _finishTimeMs = 0;
// This flag will only be true when successfully setting time and counter values.
private boolean _isTimeAndCounterDataPresent = false;
private boolean _isTimeDataPresent = false;
private boolean _isCounterDataPresent = false;


public MapReduceTaskData(String taskId, String taskAttemptId) {
this(taskId, taskAttemptId, "SUCCEEDED");
}
public MapReduceTaskData(String taskId, String taskAttemptId, String state) {
this._taskId = taskId;
this._attemptId = taskAttemptId;
this._state = state;
}

public void setTimeAndCounter(long[] time, MapReduceCounterData counterHolder) {
this._totalTimeMs = time[0];
this._shuffleTimeMs = time[1];
this._sortTimeMs = time[2];
this._startTimeMs = time[3];
this._finishTimeMs = time[4];
this._counterHolder = counterHolder;
this._isTimeAndCounterDataPresent = true;
if (time != null) {
this._totalTimeMs = time[0];
this._shuffleTimeMs = time[1];
this._sortTimeMs = time[2];
this._startTimeMs = time[3];
this._finishTimeMs = time[4];
this._isTimeDataPresent = true;
}
if (counterHolder != null) {
this._counterHolder = counterHolder;
this._isCounterDataPresent = true;
}
}

public MapReduceCounterData getCounters() {
Expand Down Expand Up @@ -77,15 +89,21 @@ public long getFinishTimeMs() {
return _finishTimeMs;
}

public boolean isTimeAndCounterDataPresent() {
return _isTimeAndCounterDataPresent;
public boolean isTimeDataPresent() {
return _isTimeDataPresent;
}

public boolean isCounterDataPresent() { return _isCounterDataPresent; }

public boolean isTimeAndCounterDataPresent() { return isTimeDataPresent() && isCounterDataPresent();}

public String getTaskId() {
return _taskId;
}

public String getAttemptId() {
return _attemptId;
}

public String getState() { return _state; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,51 +236,52 @@ public MapReduceApplicationData fetchData(AnalyticJob job) throws IOException {

String state = jobInfo.getJobStatus();
if (state.equals("SUCCEEDED")) {

jobData.setSucceeded(true);

// Fetch job counter
MapReduceCounterData jobCounter = getCounterData(jobInfo.getTotalCounters());

// Fetch task data
Map<TaskID, JobHistoryParser.TaskInfo> allTasks = jobInfo.getAllTasks();
List<JobHistoryParser.TaskInfo> mapperInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
List<JobHistoryParser.TaskInfo> reducerInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
for (JobHistoryParser.TaskInfo taskInfo : allTasks.values()) {
if (taskInfo.getTaskType() == TaskType.MAP) {
mapperInfoList.add(taskInfo);
} else {
reducerInfoList.add(taskInfo);
}
}
if (jobInfo.getTotalMaps() > MAX_SAMPLE_SIZE) {
logger.debug(jobId + " total mappers: " + mapperInfoList.size());
}
if (jobInfo.getTotalReduces() > MAX_SAMPLE_SIZE) {
logger.debug(jobId + " total reducers: " + reducerInfoList.size());
}
MapReduceTaskData[] mapperList = getTaskData(jobId, mapperInfoList);
MapReduceTaskData[] reducerList = getTaskData(jobId, reducerInfoList);

jobData.setCounters(jobCounter).setMapperData(mapperList).setReducerData(reducerList);
} else if (state.equals("FAILED")) {

}
else if (state.equals("FAILED")) {
jobData.setSucceeded(false);
jobData.setDiagnosticInfo(jobInfo.getErrorInfo());
} else {
// Should not reach here
throw new RuntimeException("Job state not supported. Should be either SUCCEEDED or FAILED");
throw new RuntimeException("job neither succeeded or failed. can not process it ");
}


// Fetch job counter
MapReduceCounterData jobCounter = getCounterData(jobInfo.getTotalCounters());

// Fetch task data
Map<TaskID, JobHistoryParser.TaskInfo> allTasks = jobInfo.getAllTasks();
List<JobHistoryParser.TaskInfo> mapperInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
List<JobHistoryParser.TaskInfo> reducerInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
for (JobHistoryParser.TaskInfo taskInfo : allTasks.values()) {
if (taskInfo.getTaskType() == TaskType.MAP) {
mapperInfoList.add(taskInfo);
} else {
reducerInfoList.add(taskInfo);
}
}
if (jobInfo.getTotalMaps() > MAX_SAMPLE_SIZE) {
logger.debug(jobId + " total mappers: " + mapperInfoList.size());
}
if (jobInfo.getTotalReduces() > MAX_SAMPLE_SIZE) {
logger.debug(jobId + " total reducers: " + reducerInfoList.size());
}
MapReduceTaskData[] mapperList = getTaskData(jobId, mapperInfoList);
MapReduceTaskData[] reducerList = getTaskData(jobId, reducerInfoList);

jobData.setCounters(jobCounter).setMapperData(mapperList).setReducerData(reducerList);

return jobData;
}

private MapReduceCounterData getCounterData(Counters counters) {
MapReduceCounterData holder = new MapReduceCounterData();
for (CounterGroup group : counters) {
String groupName = group.getName();
for (Counter counter : group) {
holder.set(groupName, counter.getName(), counter.getValue());
if (counters != null) {
for (CounterGroup group : counters) {
String groupName = group.getName();
for (Counter counter : group) {
holder.set(groupName, counter.getName(), counter.getValue());
}
}
}
return holder;
Expand Down Expand Up @@ -309,17 +310,23 @@ protected MapReduceTaskData[] getTaskData(String jobId, List<JobHistoryParser.Ta
List<MapReduceTaskData> taskList = new ArrayList<MapReduceTaskData>();
for (int i = 0; i < sampleSize; i++) {
JobHistoryParser.TaskInfo tInfo = infoList.get(i);
if (!"SUCCEEDED".equals(tInfo.getTaskStatus())) {
logger.info(String.format("Skipped a failed task of %s: %s", jobId, tInfo.getTaskId().toString()));
continue;
}

String taskId = tInfo.getTaskId().toString();
TaskAttemptID attemptId = tInfo.getSuccessfulAttemptId();
MapReduceTaskData taskData = new MapReduceTaskData(taskId, attemptId.toString());
TaskAttemptID attemptId = null;
if(tInfo.getTaskStatus().equals("SUCCEEDED")) {
attemptId = tInfo.getSuccessfulAttemptId();
} else {
attemptId = tInfo.getFailedDueToAttemptId();
}

MapReduceTaskData taskData = new MapReduceTaskData(taskId, attemptId == null ? "" : attemptId.toString() , tInfo.getTaskStatus());

MapReduceCounterData taskCounterData = getCounterData(tInfo.getCounters());
long[] taskExecTime = getTaskExecTime(tInfo.getAllTaskAttempts().get(attemptId));

long[] taskExecTime = null;
if (attemptId != null) {
taskExecTime = getTaskExecTime(tInfo.getAllTaskAttempts().get(attemptId));
}

taskData.setTimeAndCounter(taskExecTime, taskCounterData);
taskList.add(taskData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.drelephant.util.Utils;

import java.io.IOException;
import java.lang.Integer;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -110,11 +111,26 @@ public MapReduceApplicationData fetchData(AnalyticJob analyticJob) throws IOExce
} else if (state.equals("FAILED")) {

jobData.setSucceeded(false);
// Fetch job counter
MapReduceCounterData jobCounter = _jsonFactory.getJobCounter(_urlFactory.getJobCounterURL(jobId));

// Fetch task data
URL taskListURL = _urlFactory.getTaskListURL(jobId);
List<MapReduceTaskData> mapperList = new ArrayList<MapReduceTaskData>();
List<MapReduceTaskData> reducerList = new ArrayList<MapReduceTaskData>();
_jsonFactory.getTaskDataAll(taskListURL, jobId, mapperList, reducerList);

MapReduceTaskData[] mapperData = mapperList.toArray(new MapReduceTaskData[mapperList.size()]);
MapReduceTaskData[] reducerData = reducerList.toArray(new MapReduceTaskData[reducerList.size()]);

jobData.setCounters(jobCounter).setMapperData(mapperData).setReducerData(reducerData);

String diagnosticInfo;
try {
diagnosticInfo = parseException(jobData.getJobId(), _jsonFactory.getDiagnosticInfo(jobURL));
} catch(Exception e) {
diagnosticInfo = null;
logger.warn("Failed getting diagnostic info for failed job " + jobData.getJobId());
}
jobData.setDiagnosticInfo(diagnosticInfo);
} else {
Expand All @@ -132,17 +148,12 @@ private String parseException(String jobId, String diagnosticInfo) throws Malfor
AuthenticationException {
Matcher m = ThreadContextMR2.getDiagnosticMatcher(diagnosticInfo);
if (m.matches()) {
if (Integer.parseInt(m.group(2)) == 0) {
// This is due to bug in hadoop 2.3 and shoufixed in 2.4
throw new RuntimeException("Error in diagnosticInfo");
}
String taskId = m.group(1);
System.out.println("parse succedded. " + m.group(1) + " " + m.group(2));
return _jsonFactory.getTaskFailedStackTrace(_urlFactory.getTaskAllAttemptsURL(jobId, taskId));
}
logger.info("Does not match regex!!");
logger.warn("Does not match regex!!");
// Diagnostic info not present in the job. Usually due to exception during AM setup
throw new RuntimeException("No sufficient diagnostic Info");
return "No sufficient diagnostic Info";
}

private URL getTaskCounterURL(String jobId, String taskId) throws MalformedURLException {
Expand Down Expand Up @@ -303,18 +314,23 @@ private void getTaskDataAll(URL url, String jobId, List<MapReduceTaskData> mappe

for (JsonNode task : tasks) {
String state = task.get("state").getValueAsText();
if (!state.equals("SUCCEEDED")) {
// This is a failed task.
continue;
}
String taskId = task.get("id").getValueAsText();
String attemptId = task.get("successfulAttempt").getValueAsText();
String attemptId = "";
if(state.equals("SUCCEEDED")) {
attemptId = task.get("successfulAttempt").getValueAsText();
} else {
JsonNode firstAttempt = getTaskFirstFailedAttempt(_urlFactory.getTaskAllAttemptsURL(jobId, taskId));
if( firstAttempt != null) {
attemptId = firstAttempt.get("id").getValueAsText();
}
}

boolean isMapper = task.get("type").getValueAsText().equals("MAP");

if (isMapper) {
mapperList.add(new MapReduceTaskData(taskId, attemptId));
mapperList.add(new MapReduceTaskData(taskId, attemptId, state));
} else {
reducerList.add(new MapReduceTaskData(taskId, attemptId));
reducerList.add(new MapReduceTaskData(taskId, attemptId, state));
}
}

Expand All @@ -332,30 +348,42 @@ private void getTaskData(String jobId, List<MapReduceTaskData> taskList) throws
URL taskCounterURL = getTaskCounterURL(jobId, data.getTaskId());
MapReduceCounterData taskCounter = getTaskCounter(taskCounterURL);

URL taskAttemptURL = getTaskAttemptURL(jobId, data.getTaskId(), data.getAttemptId());
long[] taskExecTime = getTaskExecTime(taskAttemptURL);

long[] taskExecTime = null;
if(!data.getAttemptId().isEmpty()) {
URL taskAttemptURL = getTaskAttemptURL(jobId, data.getTaskId(), data.getAttemptId());
taskExecTime = getTaskExecTime(taskAttemptURL);
}
data.setTimeAndCounter(taskExecTime, taskCounter);
}
}

private String getTaskFailedStackTrace(URL taskAllAttemptsUrl) throws IOException, AuthenticationException {
JsonNode firstAttempt = getTaskFirstFailedAttempt(taskAllAttemptsUrl);
if(firstAttempt != null) {
String stacktrace = firstAttempt.get("diagnostics").getValueAsText();
return stacktrace;
} else {
return null;
}
}

private JsonNode getTaskFirstFailedAttempt(URL taskAllAttemptsUrl) throws IOException, AuthenticationException {
JsonNode rootNode = ThreadContextMR2.readJsonNode(taskAllAttemptsUrl);
JsonNode tasks = rootNode.path("taskAttempts").path("taskAttempt");
for (JsonNode task : tasks) {
String state = task.get("state").getValueAsText();
if (!state.equals("FAILED")) {
long firstAttemptFinishTime = Long.MAX_VALUE;
JsonNode firstAttempt = null;
JsonNode taskAttempts = rootNode.path("taskAttempts").path("taskAttempt");
for (JsonNode taskAttempt : taskAttempts) {
String state = taskAttempt.get("state").getValueAsText();
if (state.equals("SUCCEEDED")) {
continue;
}
String stacktrace = task.get("diagnostics").getValueAsText();
if (stacktrace.startsWith("Error:")) {
return stacktrace;
} else {
// This is not a valid stacktrace. Might due to a bug in hadoop2.3 and fixed in 2.4
throw new RuntimeException("This is not a valid stack trace.");
long finishTime = taskAttempt.get("finishTime").getLongValue();
if( finishTime < firstAttemptFinishTime) {
firstAttempt = taskAttempt;
firstAttemptFinishTime = finishTime;
}
}
throw new RuntimeException("No failed task attempt in this failed task.");
return firstAttempt;
}
}
}
Expand All @@ -379,7 +407,7 @@ public Integer initialValue() {
public Pattern initialValue() {
// Example: "Task task_1443068695259_9143_m_000475 failed 1 times"
return Pattern.compile(
"Task[\\s\\u00A0]+(.*)[\\s\\u00A0]+failed[\\s\\u00A0]+([0-9])[\\s\\u00A0]+times[\\s\\u00A0]+");
".*[\\s\\u00A0]+(task_[0-9]+_[0-9]+_[m|r]_[0-9]+)[\\s\\u00A0]+.*");
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {
List<Long> inputBytes = new ArrayList<Long>();

for (int i = 0; i < tasks.length; i++) {
if (tasks[i].isTimeAndCounterDataPresent()) {
if (tasks[i].isCounterDataPresent()) {
inputBytes.add(tasks[i].getCounters().get(_counterName));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {
private Severity[] getTasksSeverity(MapReduceTaskData[] tasks, long queueTimeout) {
List<Severity> taskSeverityList = new ArrayList<Severity>();
for (MapReduceTaskData task : tasks) {
if (task.isTimeAndCounterDataPresent()) {
if (task.isTimeDataPresent()) {
taskSeverityList.add(getQueueLimitSeverity(task.getTotalRunTimeMs(), queueTimeout));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {

for (MapReduceTaskData task : tasks) {

if (task.isTimeAndCounterDataPresent()) {
if (task.isCounterDataPresent()) {
totalSpills += task.getCounters().get(MapReduceCounterData.CounterName.SPILLED_RECORDS);
totalOutputRecords += task.getCounters().get(MapReduceCounterData.CounterName.MAP_OUTPUT_RECORDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {
long taskMaxMs = 0;

for (MapReduceTaskData task : tasks) {
if (task.isTimeAndCounterDataPresent()) {
if (task.isTimeDataPresent()) {
long taskTime = task.getTotalRunTimeMs();
runTimesMs.add(taskTime);
taskMinMs = Math.min(taskMinMs, taskTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {
List<Long> sortTimeMs = new ArrayList<Long>();

for (MapReduceTaskData task : tasks) {
if (task.isTimeAndCounterDataPresent()) {
if (task.isTimeDataPresent()) {
execTimeMs.add(task.getCodeExecutionTimeMs());
shuffleTimeMs.add(task.getShuffleTimeMs());
sortTimeMs.add(task.getSortTimeMs());
Expand Down
Loading

0 comments on commit cdf680b

Please sign in to comment.