Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes MapReduce aggregator and heuristic to correctly handle task data when sampling is enabled #222

Merged
merged 2 commits into from
Mar 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public long getResourceUsed() {

/**
* Computes the aggregated metrics -> peakMemory, delay, total task duration, wasted resources and memory usage.
* Aggregated metrics are expected to be approximation when sampling is enabled.
* @param taskDatas
* @param containerSize
* @param idealStartTime
Expand All @@ -106,6 +107,9 @@ private void compute(MapReduceTaskData[] taskDatas, long containerSize, long ide
}

for (MapReduceTaskData taskData: taskDatas) {
if (!taskData.isTimeAndCounterDataPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this always true ? Can you elaborate on what exactly is the issue you are trying to fix ? I remember you mentioning something in the meetup but dont recollect exactly what the issue was.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not always true. If sampling is enabled, then it's only going to be true for sampled tasks. In MapReduceFetcherHadoop2 class, MapReduceTaskData instance is created for all the tasks, but time and counter data is only stored for sampled tasks. Please look at the logic here.
The unit test in this change will fail because NullPointerException without this check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to the function saying that the aggregatemetrics are not expected to be accurate when sampling is enabled so that it's clear ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shankar37 for the review.
Added a comment. PTAL.

continue;
}
long taskMemory = taskData.getCounters().get(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES)/ FileUtils.ONE_MB; // MB
long taskVM = taskData.getCounters().get(MapReduceCounterData.CounterName.VIRTUAL_MEMORY_BYTES)/ FileUtils.ONE_MB; // MB
long taskDuration = taskData.getFinishTimeMs() - taskData.getStartTimeMs(); // Milliseconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
import com.linkedin.drelephant.mapreduce.data.MapReduceTaskData;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import com.linkedin.drelephant.analysis.Heuristic;
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.analysis.Severity;


public class JobQueueLimitHeuristic implements Heuristic<MapReduceApplicationData> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akshayrai Is this heuristic used at linkedin ? I dont see it enabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We don't use this. As far as I remember, this heuristic was created to warn jobs which were hitting close to the default queue time out limit of 15mins.


private HeuristicConfigurationData _heuristicConfData;
Expand Down Expand Up @@ -89,13 +91,13 @@ public HeuristicResult apply(MapReduceApplicationData data) {
}

private Severity[] getTasksSeverity(MapReduceTaskData[] tasks, long queueTimeout) {
Severity[] tasksSeverity = new Severity[tasks.length];
int i = 0;
List<Severity> taskSeverityList = new ArrayList<Severity>();
for (MapReduceTaskData task : tasks) {
tasksSeverity[i] = getQueueLimitSeverity(task.getTotalRunTimeMs(), queueTimeout);
i++;
if (task.isTimeAndCounterDataPresent()) {
taskSeverityList.add(getQueueLimitSeverity(task.getTotalRunTimeMs(), queueTimeout));
}
}
return tasksSeverity;
return taskSeverityList.toArray(new Severity[taskSeverityList.size()]);
}

private long getSeverityFrequency(Severity severity, Severity[] tasksSeverity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testNullTaskArray() {

@Test
public void testTaskLevelData() {
MapReduceTaskData taskData[] = new MapReduceTaskData[2];
MapReduceTaskData taskData[] = new MapReduceTaskData[3];
MapReduceCounterData counterData = new MapReduceCounterData();
counterData.set(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES, 655577088L);
counterData.set(MapReduceCounterData.CounterName.VIRTUAL_MEMORY_BYTES, 3051589632L);
Expand All @@ -51,6 +51,8 @@ public void testTaskLevelData() {
taskData[0].setTimeAndCounter(time, counterData);
taskData[1] = new MapReduceTaskData("task", "id");
taskData[1].setTimeAndCounter(new long[5], counterData);
// Non-sampled task, which does not contain time and counter data
taskData[2] = new MapReduceTaskData("task", "id");
TaskLevelAggregatedMetrics taskMetrics = new TaskLevelAggregatedMetrics(taskData, 4096L, 1463218501117L);
Assert.assertEquals(taskMetrics.getDelay(), 1000000000L);
Assert.assertEquals(taskMetrics.getResourceUsed(), 135168L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,23 @@ public void testNonDefaultRuntimeNone() throws IOException {

private Severity analyzeJob(long runtimeMs, String queueName) throws IOException {
MapReduceCounterData dummyCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[2 * NUM_TASKS / 3];
MapReduceTaskData[] reducers = new MapReduceTaskData[NUM_TASKS / 3];
MapReduceTaskData[] mappers = new MapReduceTaskData[(2 * NUM_TASKS / 3) + 1];
MapReduceTaskData[] reducers = new MapReduceTaskData[(NUM_TASKS / 3) + 1];
Properties jobConf = new Properties();
jobConf.put("mapred.job.queue.name", queueName);
int i = 0;
for (; i < 2 * NUM_TASKS / 3; i++) {
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, dummyCounter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
for (i = 0; i < NUM_TASKS / 3; i++) {
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, dummyCounter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
MapReduceApplicationData data =
new MapReduceApplicationData().setCounters(dummyCounter).setReducerData(reducers).setMapperData(mappers)
.setJobConf(jobConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testSmallTasks() throws IOException {
private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInputSize, long largeInputSize)
throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[numSmallTasks + numLargeTasks];
MapReduceTaskData[] mappers = new MapReduceTaskData[numSmallTasks + numLargeTasks + 1];

MapReduceCounterData smallCounter = new MapReduceCounterData();
smallCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, smallInputSize);
Expand All @@ -89,6 +89,8 @@ private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInpu
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[5], largeCounter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testShortTasksNone() throws IOException {

private Severity analyzeJob(long runtimeMs, long cpuMs, long gcMs) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.CPU_MILLISECONDS, cpuMs);
Expand All @@ -73,6 +73,8 @@ private Severity analyzeJob(long runtimeMs, long cpuMs, long gcMs) throws IOExce
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[]{runtimeMs, 0 , 0, 0, 0}, counter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testDefaultContainerNoneMore() throws IOException {

private Severity analyzeJob(long taskAvgMemMB, long containerMemMB) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES, taskAvgMemMB* FileUtils.ONE_MB);
Expand All @@ -83,6 +83,8 @@ private Severity analyzeJob(long taskAvgMemMB, long containerMemMB) throws IOExc
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[5], counter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
data.setJobConf(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testShortTask() throws IOException {

private Severity analyzeJob(long runtimeMs, long readBytes) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, readBytes);
Expand All @@ -92,6 +92,8 @@ private Severity analyzeJob(long runtimeMs, long readBytes) throws IOException {
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0 ,0, 0}, counter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,19 @@ public void testSmallNumTasks() throws IOException {

private Severity analyzeJob(long spilledRecords, long mapRecords, int numTasks) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[numTasks];
MapReduceTaskData[] mappers = new MapReduceTaskData[numTasks + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.SPILLED_RECORDS, spilledRecords);
counter.set(MapReduceCounterData.CounterName.MAP_OUTPUT_RECORDS, mapRecords);

for (int i=0; i < numTasks; i++) {
int i = 0;
for (; i < numTasks; i++) {
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[5], counter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testShortRuntimeTasksNone() throws IOException {

private Severity analyzeJob(int numTasks, long runtime) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[numTasks];
MapReduceTaskData[] mappers = new MapReduceTaskData[numTasks + 1];

MapReduceCounterData taskCounter = new MapReduceCounterData();
taskCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, DUMMY_INPUT_SIZE);
Expand All @@ -98,6 +98,8 @@ private Severity analyzeJob(int numTasks, long runtime) throws IOException {
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[] { runtime, 0, 0, 0, 0 }, taskCounter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testSmallTasks() throws IOException {
private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInputSize, long largeInputSize)
throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] reducers = new MapReduceTaskData[numSmallTasks + numLargeTasks];
MapReduceTaskData[] reducers = new MapReduceTaskData[numSmallTasks + numLargeTasks + 1];

MapReduceCounterData smallCounter = new MapReduceCounterData();
smallCounter.set(MapReduceCounterData.CounterName.REDUCE_SHUFFLE_BYTES, smallInputSize);
Expand All @@ -88,6 +88,8 @@ private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInpu
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[5], largeCounter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setReducerData(reducers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testShortTasksNone() throws IOException {

private Severity analyzeJob(long runtimeMs, long cpuMs, long gcMs) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.CPU_MILLISECONDS, cpuMs);
Expand All @@ -72,6 +72,8 @@ private Severity analyzeJob(long runtimeMs, long cpuMs, long gcMs) throws IOExce
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, counter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setReducerData(reducers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testDefaultContainerNoneMore() throws IOException {

private Severity analyzeJob(long taskAvgMemMB, long containerMemMB) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES, taskAvgMemMB* FileUtils.ONE_MB);
Expand All @@ -82,6 +82,8 @@ private Severity analyzeJob(long taskAvgMemMB, long containerMemMB) throws IOExc
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[5], counter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setReducerData(reducers);
data.setJobConf(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ public void testLongRunetimeSevereMore() throws IOException {

private Severity analyzeJob(long runtimeMs, int numTasks) throws IOException {
MapReduceCounterData dummyCounter = new MapReduceCounterData();
MapReduceTaskData[] reducers = new MapReduceTaskData[numTasks];
MapReduceTaskData[] reducers = new MapReduceTaskData[numTasks + 1];

int i = 0;
for (; i < numTasks; i++) {
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, dummyCounter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(dummyCounter).setReducerData(reducers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,16 @@ public void testShortSort() throws IOException {

private Severity analyzeJob(long shuffleTimeMs, long sortTimeMs, long reduceTimeMs) throws IOException {
MapReduceCounterData dummyCounter = new MapReduceCounterData();
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS + 1];

int i = 0;
for (; i < NUMTASKS; i++) {
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(
new long[] { shuffleTimeMs + sortTimeMs + reduceTimeMs, shuffleTimeMs, sortTimeMs, 0, 0}, dummyCounter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
MapReduceApplicationData data = new MapReduceApplicationData().setCounters(dummyCounter).setReducerData(reducers);
HeuristicResult result = _heuristic.apply(data);
return result.getSeverity();
Expand Down