diff --git a/app/com/linkedin/drelephant/mapreduce/data/MapReduceTaskData.java b/app/com/linkedin/drelephant/mapreduce/data/MapReduceTaskData.java index 5be043009..56696aca1 100644 --- a/app/com/linkedin/drelephant/mapreduce/data/MapReduceTaskData.java +++ b/app/com/linkedin/drelephant/mapreduce/data/MapReduceTaskData.java @@ -23,6 +23,7 @@ public class MapReduceTaskData { private MapReduceCounterData _counterHolder; + private String _state; private String _taskId; // The successful attempt id private String _attemptId; @@ -32,21 +33,32 @@ public class MapReduceTaskData { private long _startTimeMs = 0; private long _finishTimeMs = 0; // This flag will only be true when successfully setting time and counter values. - private boolean _isTimeAndCounterDataPresent = false; + private boolean _isTimeDataPresent = false; + private boolean _isCounterDataPresent = false; + public MapReduceTaskData(String taskId, String taskAttemptId) { + this(taskId, taskAttemptId, "SUCCEEDED"); + } + public MapReduceTaskData(String taskId, String taskAttemptId, String state) { this._taskId = taskId; this._attemptId = taskAttemptId; + this._state = state; } public void setTimeAndCounter(long[] time, MapReduceCounterData counterHolder) { - this._totalTimeMs = time[0]; - this._shuffleTimeMs = time[1]; - this._sortTimeMs = time[2]; - this._startTimeMs = time[3]; - this._finishTimeMs = time[4]; - this._counterHolder = counterHolder; - this._isTimeAndCounterDataPresent = true; + if (time != null) { + this._totalTimeMs = time[0]; + this._shuffleTimeMs = time[1]; + this._sortTimeMs = time[2]; + this._startTimeMs = time[3]; + this._finishTimeMs = time[4]; + this._isTimeDataPresent = true; + } + if (counterHolder != null) { + this._counterHolder = counterHolder; + this._isCounterDataPresent = true; + } } public MapReduceCounterData getCounters() { @@ -77,10 +89,14 @@ public long getFinishTimeMs() { return _finishTimeMs; } - public boolean isTimeAndCounterDataPresent() { - return _isTimeAndCounterDataPresent; + public boolean isTimeDataPresent() { + return _isTimeDataPresent; } + public boolean isCounterDataPresent() { return _isCounterDataPresent; } + + public boolean isTimeAndCounterDataPresent() { return isTimeDataPresent() && isCounterDataPresent();} + public String getTaskId() { return _taskId; } @@ -88,4 +104,6 @@ public String getTaskId() { public String getAttemptId() { return _attemptId; } + + public String getState() { return _state; } } diff --git a/app/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFSFetcherHadoop2.java b/app/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFSFetcherHadoop2.java index 1c5904b9e..cadf6345e 100644 --- a/app/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFSFetcherHadoop2.java +++ b/app/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFSFetcherHadoop2.java @@ -236,51 +236,52 @@ public MapReduceApplicationData fetchData(AnalyticJob job) throws IOException { String state = jobInfo.getJobStatus(); if (state.equals("SUCCEEDED")) { - jobData.setSucceeded(true); - - // Fetch job counter - MapReduceCounterData jobCounter = getCounterData(jobInfo.getTotalCounters()); - - // Fetch task data - Map allTasks = jobInfo.getAllTasks(); - List mapperInfoList = new ArrayList(); - List reducerInfoList = new ArrayList(); - for (JobHistoryParser.TaskInfo taskInfo : allTasks.values()) { - if (taskInfo.getTaskType() == TaskType.MAP) { - mapperInfoList.add(taskInfo); - } else { - reducerInfoList.add(taskInfo); - } - } - if (jobInfo.getTotalMaps() > MAX_SAMPLE_SIZE) { - logger.debug(jobId + " total mappers: " + mapperInfoList.size()); - } - if (jobInfo.getTotalReduces() > MAX_SAMPLE_SIZE) { - logger.debug(jobId + " total reducers: " + reducerInfoList.size()); - } - MapReduceTaskData[] mapperList = getTaskData(jobId, mapperInfoList); - MapReduceTaskData[] reducerList = getTaskData(jobId, reducerInfoList); - - jobData.setCounters(jobCounter).setMapperData(mapperList).setReducerData(reducerList); - } else if (state.equals("FAILED")) { - + } + else if (state.equals("FAILED")) { jobData.setSucceeded(false); jobData.setDiagnosticInfo(jobInfo.getErrorInfo()); } else { - // Should not reach here - throw new RuntimeException("Job state not supported. Should be either SUCCEEDED or FAILED"); + throw new RuntimeException("job neither succeeded or failed. can not process it "); } + + // Fetch job counter + MapReduceCounterData jobCounter = getCounterData(jobInfo.getTotalCounters()); + + // Fetch task data + Map allTasks = jobInfo.getAllTasks(); + List mapperInfoList = new ArrayList(); + List reducerInfoList = new ArrayList(); + for (JobHistoryParser.TaskInfo taskInfo : allTasks.values()) { + if (taskInfo.getTaskType() == TaskType.MAP) { + mapperInfoList.add(taskInfo); + } else { + reducerInfoList.add(taskInfo); + } + } + if (jobInfo.getTotalMaps() > MAX_SAMPLE_SIZE) { + logger.debug(jobId + " total mappers: " + mapperInfoList.size()); + } + if (jobInfo.getTotalReduces() > MAX_SAMPLE_SIZE) { + logger.debug(jobId + " total reducers: " + reducerInfoList.size()); + } + MapReduceTaskData[] mapperList = getTaskData(jobId, mapperInfoList); + MapReduceTaskData[] reducerList = getTaskData(jobId, reducerInfoList); + + jobData.setCounters(jobCounter).setMapperData(mapperList).setReducerData(reducerList); + return jobData; } private MapReduceCounterData getCounterData(Counters counters) { MapReduceCounterData holder = new MapReduceCounterData(); - for (CounterGroup group : counters) { - String groupName = group.getName(); - for (Counter counter : group) { - holder.set(groupName, counter.getName(), counter.getValue()); + if (counters != null) { + for (CounterGroup group : counters) { + String groupName = group.getName(); + for (Counter counter : group) { + holder.set(groupName, counter.getName(), counter.getValue()); + } } } return holder; @@ -309,17 +310,23 @@ protected MapReduceTaskData[] getTaskData(String jobId, List taskList = new ArrayList(); for (int i = 0; i < sampleSize; i++) { JobHistoryParser.TaskInfo tInfo = infoList.get(i); - if (!"SUCCEEDED".equals(tInfo.getTaskStatus())) { - logger.info(String.format("Skipped a failed task of %s: %s", jobId, tInfo.getTaskId().toString())); - continue; - } String taskId = tInfo.getTaskId().toString(); - TaskAttemptID attemptId = tInfo.getSuccessfulAttemptId(); - MapReduceTaskData taskData = new MapReduceTaskData(taskId, attemptId.toString()); + TaskAttemptID attemptId = null; + if(tInfo.getTaskStatus().equals("SUCCEEDED")) { + attemptId = tInfo.getSuccessfulAttemptId(); + } else { + attemptId = tInfo.getFailedDueToAttemptId(); + } + + MapReduceTaskData taskData = new MapReduceTaskData(taskId, attemptId == null ? "" : attemptId.toString() , tInfo.getTaskStatus()); MapReduceCounterData taskCounterData = getCounterData(tInfo.getCounters()); - long[] taskExecTime = getTaskExecTime(tInfo.getAllTaskAttempts().get(attemptId)); + + long[] taskExecTime = null; + if (attemptId != null) { + taskExecTime = getTaskExecTime(tInfo.getAllTaskAttempts().get(attemptId)); + } taskData.setTimeAndCounter(taskExecTime, taskCounterData); taskList.add(taskData); diff --git a/app/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFetcherHadoop2.java b/app/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFetcherHadoop2.java index 4fa89d922..4165971aa 100644 --- a/app/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFetcherHadoop2.java +++ b/app/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFetcherHadoop2.java @@ -25,6 +25,7 @@ import com.linkedin.drelephant.util.Utils; import java.io.IOException; +import java.lang.Integer; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; @@ -110,11 +111,26 @@ public MapReduceApplicationData fetchData(AnalyticJob analyticJob) throws IOExce } else if (state.equals("FAILED")) { jobData.setSucceeded(false); + // Fetch job counter + MapReduceCounterData jobCounter = _jsonFactory.getJobCounter(_urlFactory.getJobCounterURL(jobId)); + + // Fetch task data + URL taskListURL = _urlFactory.getTaskListURL(jobId); + List mapperList = new ArrayList(); + List reducerList = new ArrayList(); + _jsonFactory.getTaskDataAll(taskListURL, jobId, mapperList, reducerList); + + MapReduceTaskData[] mapperData = mapperList.toArray(new MapReduceTaskData[mapperList.size()]); + MapReduceTaskData[] reducerData = reducerList.toArray(new MapReduceTaskData[reducerList.size()]); + + jobData.setCounters(jobCounter).setMapperData(mapperData).setReducerData(reducerData); + String diagnosticInfo; try { diagnosticInfo = parseException(jobData.getJobId(), _jsonFactory.getDiagnosticInfo(jobURL)); } catch(Exception e) { diagnosticInfo = null; + logger.warn("Failed getting diagnostic info for failed job " + jobData.getJobId()); } jobData.setDiagnosticInfo(diagnosticInfo); } else { @@ -132,17 +148,12 @@ private String parseException(String jobId, String diagnosticInfo) throws Malfor AuthenticationException { Matcher m = ThreadContextMR2.getDiagnosticMatcher(diagnosticInfo); if (m.matches()) { - if (Integer.parseInt(m.group(2)) == 0) { - // This is due to bug in hadoop 2.3 and shoufixed in 2.4 - throw new RuntimeException("Error in diagnosticInfo"); - } String taskId = m.group(1); - System.out.println("parse succedded. " + m.group(1) + " " + m.group(2)); return _jsonFactory.getTaskFailedStackTrace(_urlFactory.getTaskAllAttemptsURL(jobId, taskId)); } - logger.info("Does not match regex!!"); + logger.warn("Does not match regex!!"); // Diagnostic info not present in the job. Usually due to exception during AM setup - throw new RuntimeException("No sufficient diagnostic Info"); + return "No sufficient diagnostic Info"; } private URL getTaskCounterURL(String jobId, String taskId) throws MalformedURLException { @@ -303,18 +314,23 @@ private void getTaskDataAll(URL url, String jobId, List mappe for (JsonNode task : tasks) { String state = task.get("state").getValueAsText(); - if (!state.equals("SUCCEEDED")) { - // This is a failed task. - continue; - } String taskId = task.get("id").getValueAsText(); - String attemptId = task.get("successfulAttempt").getValueAsText(); + String attemptId = ""; + if(state.equals("SUCCEEDED")) { + attemptId = task.get("successfulAttempt").getValueAsText(); + } else { + JsonNode firstAttempt = getTaskFirstFailedAttempt(_urlFactory.getTaskAllAttemptsURL(jobId, taskId)); + if( firstAttempt != null) { + attemptId = firstAttempt.get("id").getValueAsText(); + } + } + boolean isMapper = task.get("type").getValueAsText().equals("MAP"); if (isMapper) { - mapperList.add(new MapReduceTaskData(taskId, attemptId)); + mapperList.add(new MapReduceTaskData(taskId, attemptId, state)); } else { - reducerList.add(new MapReduceTaskData(taskId, attemptId)); + reducerList.add(new MapReduceTaskData(taskId, attemptId, state)); } } @@ -332,30 +348,42 @@ private void getTaskData(String jobId, List taskList) throws URL taskCounterURL = getTaskCounterURL(jobId, data.getTaskId()); MapReduceCounterData taskCounter = getTaskCounter(taskCounterURL); - URL taskAttemptURL = getTaskAttemptURL(jobId, data.getTaskId(), data.getAttemptId()); - long[] taskExecTime = getTaskExecTime(taskAttemptURL); - + long[] taskExecTime = null; + if(!data.getAttemptId().isEmpty()) { + URL taskAttemptURL = getTaskAttemptURL(jobId, data.getTaskId(), data.getAttemptId()); + taskExecTime = getTaskExecTime(taskAttemptURL); + } data.setTimeAndCounter(taskExecTime, taskCounter); } } private String getTaskFailedStackTrace(URL taskAllAttemptsUrl) throws IOException, AuthenticationException { + JsonNode firstAttempt = getTaskFirstFailedAttempt(taskAllAttemptsUrl); + if(firstAttempt != null) { + String stacktrace = firstAttempt.get("diagnostics").getValueAsText(); + return stacktrace; + } else { + return null; + } + } + + private JsonNode getTaskFirstFailedAttempt(URL taskAllAttemptsUrl) throws IOException, AuthenticationException { JsonNode rootNode = ThreadContextMR2.readJsonNode(taskAllAttemptsUrl); - JsonNode tasks = rootNode.path("taskAttempts").path("taskAttempt"); - for (JsonNode task : tasks) { - String state = task.get("state").getValueAsText(); - if (!state.equals("FAILED")) { + long firstAttemptFinishTime = Long.MAX_VALUE; + JsonNode firstAttempt = null; + JsonNode taskAttempts = rootNode.path("taskAttempts").path("taskAttempt"); + for (JsonNode taskAttempt : taskAttempts) { + String state = taskAttempt.get("state").getValueAsText(); + if (state.equals("SUCCEEDED")) { continue; } - String stacktrace = task.get("diagnostics").getValueAsText(); - if (stacktrace.startsWith("Error:")) { - return stacktrace; - } else { - // This is not a valid stacktrace. Might due to a bug in hadoop2.3 and fixed in 2.4 - throw new RuntimeException("This is not a valid stack trace."); + long finishTime = taskAttempt.get("finishTime").getLongValue(); + if( finishTime < firstAttemptFinishTime) { + firstAttempt = taskAttempt; + firstAttemptFinishTime = finishTime; } } - throw new RuntimeException("No failed task attempt in this failed task."); + return firstAttempt; } } } @@ -379,7 +407,7 @@ public Integer initialValue() { public Pattern initialValue() { // Example: "Task task_1443068695259_9143_m_000475 failed 1 times" return Pattern.compile( - "Task[\\s\\u00A0]+(.*)[\\s\\u00A0]+failed[\\s\\u00A0]+([0-9])[\\s\\u00A0]+times[\\s\\u00A0]+"); + ".*[\\s\\u00A0]+(task_[0-9]+_[0-9]+_[m|r]_[0-9]+)[\\s\\u00A0]+.*"); } }; diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/GenericDataSkewHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/GenericDataSkewHeuristic.java index b46a2bcdb..03d65416f 100644 --- a/app/com/linkedin/drelephant/mapreduce/heuristics/GenericDataSkewHeuristic.java +++ b/app/com/linkedin/drelephant/mapreduce/heuristics/GenericDataSkewHeuristic.java @@ -113,7 +113,7 @@ public HeuristicResult apply(MapReduceApplicationData data) { List inputBytes = new ArrayList(); for (int i = 0; i < tasks.length; i++) { - if (tasks[i].isTimeAndCounterDataPresent()) { + if (tasks[i].isCounterDataPresent()) { inputBytes.add(tasks[i].getCounters().get(_counterName)); } } diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/JobQueueLimitHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/JobQueueLimitHeuristic.java index c0936ed01..413963626 100644 --- a/app/com/linkedin/drelephant/mapreduce/heuristics/JobQueueLimitHeuristic.java +++ b/app/com/linkedin/drelephant/mapreduce/heuristics/JobQueueLimitHeuristic.java @@ -93,7 +93,7 @@ public HeuristicResult apply(MapReduceApplicationData data) { private Severity[] getTasksSeverity(MapReduceTaskData[] tasks, long queueTimeout) { List taskSeverityList = new ArrayList(); for (MapReduceTaskData task : tasks) { - if (task.isTimeAndCounterDataPresent()) { + if (task.isTimeDataPresent()) { taskSeverityList.add(getQueueLimitSeverity(task.getTotalRunTimeMs(), queueTimeout)); } } diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/MapperSpillHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/MapperSpillHeuristic.java index c7caa6582..e5f4ae8e8 100644 --- a/app/com/linkedin/drelephant/mapreduce/heuristics/MapperSpillHeuristic.java +++ b/app/com/linkedin/drelephant/mapreduce/heuristics/MapperSpillHeuristic.java @@ -90,7 +90,7 @@ public HeuristicResult apply(MapReduceApplicationData data) { for (MapReduceTaskData task : tasks) { - if (task.isTimeAndCounterDataPresent()) { + if (task.isCounterDataPresent()) { totalSpills += task.getCounters().get(MapReduceCounterData.CounterName.SPILLED_RECORDS); totalOutputRecords += task.getCounters().get(MapReduceCounterData.CounterName.MAP_OUTPUT_RECORDS); } diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/ReducerTimeHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/ReducerTimeHeuristic.java index a71604b5d..6b37aab6b 100644 --- a/app/com/linkedin/drelephant/mapreduce/heuristics/ReducerTimeHeuristic.java +++ b/app/com/linkedin/drelephant/mapreduce/heuristics/ReducerTimeHeuristic.java @@ -104,7 +104,7 @@ public HeuristicResult apply(MapReduceApplicationData data) { long taskMaxMs = 0; for (MapReduceTaskData task : tasks) { - if (task.isTimeAndCounterDataPresent()) { + if (task.isTimeDataPresent()) { long taskTime = task.getTotalRunTimeMs(); runTimesMs.add(taskTime); taskMinMs = Math.min(taskMinMs, taskTime); diff --git a/app/com/linkedin/drelephant/mapreduce/heuristics/ShuffleSortHeuristic.java b/app/com/linkedin/drelephant/mapreduce/heuristics/ShuffleSortHeuristic.java index 7d047313f..b4d84649a 100644 --- a/app/com/linkedin/drelephant/mapreduce/heuristics/ShuffleSortHeuristic.java +++ b/app/com/linkedin/drelephant/mapreduce/heuristics/ShuffleSortHeuristic.java @@ -94,7 +94,7 @@ public HeuristicResult apply(MapReduceApplicationData data) { List sortTimeMs = new ArrayList(); for (MapReduceTaskData task : tasks) { - if (task.isTimeAndCounterDataPresent()) { + if (task.isTimeDataPresent()) { execTimeMs.add(task.getCodeExecutionTimeMs()); shuffleTimeMs.add(task.getShuffleTimeMs()); sortTimeMs.add(task.getSortTimeMs()); diff --git a/test/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFSFetcherHadoop2Test.java b/test/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFSFetcherHadoop2Test.java index bf020ea45..34dc3272d 100644 --- a/test/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFSFetcherHadoop2Test.java +++ b/test/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFSFetcherHadoop2Test.java @@ -169,10 +169,15 @@ public void testGetTaskData() { MapReduceTaskData[] taskList = fetcher.getTaskData(jobId, infoList); Assert.assertNotNull("taskList should not be null.", taskList); + int succeededTaskCount = 0; for (MapReduceTaskData task : taskList) { Assert.assertNotNull("Null pointer in taskList.", task); + if(task.getState().equals("SUCCEEDED")) { + succeededTaskCount++; + } } - Assert.assertEquals("Should have only one succeeded task.", 1, taskList.length); + Assert.assertEquals("Should have total two tasks.", 2, taskList.length); + Assert.assertEquals("Should have only one succeeded task.", 1, succeededTaskCount); } catch (IOException e) { Assert.assertNull("Failed to initialize FileSystem.", e); } diff --git a/test/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFetcherHadoop2Test.java b/test/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFetcherHadoop2Test.java index 94ab6b6a7..531bb3724 100644 --- a/test/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFetcherHadoop2Test.java +++ b/test/com/linkedin/drelephant/mapreduce/fetchers/MapReduceFetcherHadoop2Test.java @@ -16,6 +16,8 @@ package com.linkedin.drelephant.mapreduce.fetchers; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.junit.Assert; import org.junit.Test; @@ -24,12 +26,11 @@ public class MapReduceFetcherHadoop2Test { @Test public void testDiagnosticMatcher() { - Assert.assertEquals("Task[\\s\\u00A0]+(.*)[\\s\\u00A0]+failed[\\s\\u00A0]+([0-9])[\\s\\u00A0]+times[\\s\\u00A0]+", - ThreadContextMR2.getDiagnosticMatcher("Task task_1443068695259_9143_m_000475 failed 1 time") - .pattern().toString()); - - Assert.assertEquals(2, ThreadContextMR2.getDiagnosticMatcher("Task task_1443068695259_9143_m_000475 failed 1 time") - .groupCount()); + Matcher matcher = ThreadContextMR2.getDiagnosticMatcher("Task task_1443068695259_9143_m_000475 failed 1 time"); + Assert.assertEquals(".*[\\s\\u00A0]+(task_[0-9]+_[0-9]+_[m|r]_[0-9]+)[\\s\\u00A0]+.*", matcher.pattern().toString()); + Assert.assertEquals(true, matcher.matches()); + Assert.assertEquals(1, matcher.groupCount()); + Assert.assertEquals("task_1443068695259_9143_m_000475", matcher.group(1)); } } diff --git a/web/app/templates/components/single-heuristic-detail.hbs b/web/app/templates/components/single-heuristic-detail.hbs index 04ed5be47..b65eba744 100644 --- a/web/app/templates/components/single-heuristic-detail.hbs +++ b/web/app/templates/components/single-heuristic-detail.hbs @@ -30,10 +30,19 @@ {{#each yarnappheuristicresult.details as |yarnappheuristicresultdetail|}} - - {{yarnappheuristicresultdetail.name}} - {{yarnappheuristicresultdetail.value}} - + {{#if (eq "Error" yarnappheuristicresultdetail.name) }} + + {{yarnappheuristicresultdetail.value}} + + +
{{yarnappheuristicresultdetail.details}}
+ + {{else}} + + {{yarnappheuristicresultdetail.name}} + {{yarnappheuristicresultdetail.value}} + + {{/if}} {{/each}}