Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Calculate job cost based on hadoop2 counters of mbmillis #132

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ public class Constants {
public static final String SLOTS_MILLIS_REDUCES = "SLOTS_MILLIS_REDUCES";
public static final String REDUCE_SHUFFLE_BYTES = "REDUCE_SHUFFLE_BYTES";

/** MB MILLIS counters in hadoop2 */
public static final String MB_MILLIS_MAPS = "MB_MILLIS_MAPS";
public static final String MB_MILLIS_REDUCES = "MB_MILLIS_REDUCES";

/** used to indicate how expensive a job is in terms of memory and time taken*/
public static final String MEGABYTEMILLIS = "megabytemillis" ;
public static final byte[] MEGABYTEMILLIS_BYTES = Bytes.toBytes(MEGABYTEMILLIS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public class JobHistoryFileParserHadoop2 extends JobHistoryFileParserBase {
private long mapSlotMillis = 0L;
private long reduceSlotMillis = 0L;

// initialize mb millis to NOTFOUND_VALUE
// since older versions of hadoop2 did not have this counter
private long mapMbMillis = Constants.NOTFOUND_VALUE;
private long reduceMbMillis = Constants.NOTFOUND_VALUE;

private long startTime = Constants.NOTFOUND_VALUE;
private long endTime = Constants.NOTFOUND_VALUE;
private static final String LAUNCH_TIME_KEY_STR = JobHistoryKeys.LAUNCH_TIME.toString();
Expand Down Expand Up @@ -690,7 +695,18 @@ private void populatePut(Put p, byte[] family, String key, String groupName, Str
byte[] groupPrefix = Bytes.add(counterPrefix, Bytes.toBytes(groupName), Constants.SEP_BYTES);
byte[] qualifier = Bytes.add(groupPrefix, Bytes.toBytes(counterName));

/**
/*
* store the map and reduce mb millis counter value
*/
if (Constants.JOB_COUNTER_HADOOP2.equals(groupName)) {
if (Constants.MB_MILLIS_MAPS.equals(counterName)) {
this.mapMbMillis = counterValue;
} else if (Constants.MB_MILLIS_REDUCES.equals(counterName)) {
this.reduceMbMillis = counterValue;
}
}

/*
* correct and populate map and reduce slot millis
*/
if ((Constants.SLOTS_MILLIS_MAPS.equals(counterName)) ||
Expand Down Expand Up @@ -724,10 +740,13 @@ private long getMemoryMb(String key) {
* hadoop2ReportedMapSlotMillis * yarn.scheduler.minimum-allocation-mb
* / mapreduce.mapreduce.memory.mb
* similarly for reduce slot millis
*
* Marking method as deprecated as noted in Pull Request #132
* @param counterName
* @param counterValue
* @return corrected counter value
*/
@Deprecated
private Long getStandardizedCounterValue(String counterName, Long counterValue) {
if (jobConf == null) {
throw new ProcessingException("While correcting slot millis, jobConf is null");
Expand Down Expand Up @@ -861,19 +880,47 @@ public Long getMegaByteMillis() {
+ " since " + Constants.AM_MEMORY_MB_CONF_KEY + " not found!");
}

/* in case of older versions of hadoop2
* the counter of mb millis is not available
* then use slot millis counter value
*/
if (this.mapMbMillis == Constants.NOTFOUND_VALUE) {
this.mapMbMillis = (mapMb * mapSlotMillis);
}
if (this.reduceMbMillis == Constants.NOTFOUND_VALUE) {
this.reduceMbMillis = (reduceMb * reduceSlotMillis);
}

Long mbMillis = 0L;
if (uberized) {
mbMillis = amMb * jobRunTime;
} else {
mbMillis = (mapMb * mapSlotMillis) + (reduceMb * reduceSlotMillis) + (amMb * jobRunTime);
mbMillis = this.mapMbMillis + this.reduceMbMillis + (amMb * jobRunTime);
}

LOG.debug("For " + jobKey.toString() + " " + Constants.MEGABYTEMILLIS + " is " + mbMillis
+ " since \n uberized: " + uberized + " \n " + "mapMb: " + mapMb + " mapSlotMillis: "
LOG.debug("For " + jobKey.toString() + "\n" + Constants.MEGABYTEMILLIS + " is " + mbMillis
+ " since \n uberized: " + uberized + " \n " + "mapMbMillis: " + mapMbMillis
+ " reduceMbMillis:" + reduceMbMillis + "mapMb: " + mapMb + " mapSlotMillis: "
+ mapSlotMillis + " \n " + " reduceMb: " + reduceMb + " reduceSlotMillis: "
+ reduceSlotMillis + " \n " + " amMb: " + amMb + " jobRunTime: " + jobRunTime
+ " start time: " + this.startTime + " endtime " + this.endTime);

return mbMillis;
}

public long getMapMbMillis() {
return mapMbMillis;
}

public void setMapMbMillis(long mapMbMillis) {
this.mapMbMillis = mapMbMillis;
}

public long getReduceMbMillis() {
return reduceMbMillis;
}

public void setReduceMbMillis(long reduceMbMillis) {
this.reduceMbMillis = reduceMbMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,20 +205,24 @@ public void testMissingSlotMillis() throws IOException {
Configuration jobConf = new Configuration();
jobConf.addResource(new Path(JOB_CONF_FILE_NAME));

JobHistoryFileParser historyFileParser =
JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf,
HistoryFileType.TWO);
JobHistoryFileParserHadoop2 historyFileParser =
new JobHistoryFileParserHadoop2(jobConf);
assertNotNull(historyFileParser);

// confirm that we get back an object that can parse hadoop 2.0 files
assertTrue(historyFileParser instanceof JobHistoryFileParserHadoop2);
JobKey jobKey = new JobKey("cluster1", "user", "Sleep", 1, "job_1329348432655_0001");
historyFileParser.parse(contents, jobKey);

// this history file has only map slot millis no reduce millis
Long mapMbMillis = historyFileParser.getMapMbMillis();
assertNotNull(mapMbMillis);
assertEquals(mapMbMillis, new Long(178169856L));
Long reduceMbMillis = historyFileParser.getReduceMbMillis();
assertNotNull(reduceMbMillis);
assertEquals(reduceMbMillis, Constants.NOTFOUND_VALUE);

Long mbMillis = historyFileParser.getMegaByteMillis();
assertNotNull(mbMillis);
Long expValue = 10402816L;
Long expValue = 188559872L;
assertEquals(expValue, mbMillis);
}

Expand Down Expand Up @@ -340,4 +344,5 @@ public void testCreateJobHistoryFileParserJobError() throws IOException {
jobKeyConv.fromBytes(jobPuts.get(0).getRow()).toString());

}

}
Loading