Skip to content

Commit

Permalink
Merged twitter#132 from twitter_only to master (twitter#134).
Browse files Browse the repository at this point in the history
  • Loading branch information
Sangjin Lee committed Jan 29, 2015
1 parent 69704fe commit 5771b47
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 11 deletions.
4 changes: 4 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ public class Constants {
public static final String SLOTS_MILLIS_REDUCES = "SLOTS_MILLIS_REDUCES";
public static final String REDUCE_SHUFFLE_BYTES = "REDUCE_SHUFFLE_BYTES";

/** MB MILLIS counters in hadoop2 */
public static final String MB_MILLIS_MAPS = "MB_MILLIS_MAPS";
public static final String MB_MILLIS_REDUCES = "MB_MILLIS_REDUCES";

/** used to indicate how expensive a job is in terms of memory and time taken*/
public static final String MEGABYTEMILLIS = "megabytemillis" ;
public static final byte[] MEGABYTEMILLIS_BYTES = Bytes.toBytes(MEGABYTEMILLIS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public class JobHistoryFileParserHadoop2 extends JobHistoryFileParserBase {
*/
public static final String JOB_STATUS_SUCCEEDED = "SUCCEEDED";

// initialize mb millis to NOTFOUND_VALUE
// since older versions of hadoop2 did not have this counter
private long mapMbMillis = Constants.NOTFOUND_VALUE;
private long reduceMbMillis = Constants.NOTFOUND_VALUE;

private static final String LAUNCH_TIME_KEY_STR = JobHistoryKeys.LAUNCH_TIME.toString();
private static final String FINISH_TIME_KEY_STR = JobHistoryKeys.FINISH_TIME.toString();

Expand Down Expand Up @@ -705,7 +710,18 @@ private void populatePut(Put p, byte[] family, String key, String groupName, Str
byte[] groupPrefix = Bytes.add(counterPrefix, Bytes.toBytes(groupName), Constants.SEP_BYTES);
byte[] qualifier = Bytes.add(groupPrefix, Bytes.toBytes(counterName));

/**
/*
* store the map and reduce mb millis counter value
*/
if (Constants.JOB_COUNTER_HADOOP2.equals(groupName)) {
if (Constants.MB_MILLIS_MAPS.equals(counterName)) {
this.mapMbMillis = counterValue;
} else if (Constants.MB_MILLIS_REDUCES.equals(counterName)) {
this.reduceMbMillis = counterValue;
}
}

/*
* correct and populate map and reduce slot millis
*/
if ((Constants.SLOTS_MILLIS_MAPS.equals(counterName)) ||
Expand Down Expand Up @@ -739,10 +755,13 @@ private long getMemoryMb(String key) {
* hadoop2ReportedMapSlotMillis * yarn.scheduler.minimum-allocation-mb
* / mapreduce.mapreduce.memory.mb
* similarly for reduce slot millis
*
* Marking method as deprecated as noted in Pull Request #132
* @param counterName
* @param counterValue
* @return corrected counter value
*/
@Deprecated
private Long getStandardizedCounterValue(String counterName, Long counterValue) {
if (jobConf == null) {
throw new ProcessingException("While correcting slot millis, jobConf is null");
Expand Down Expand Up @@ -879,17 +898,30 @@ public Long getMegaByteMillis() {
+ " since " + Constants.AM_MEMORY_MB_CONF_KEY + " not found!");
}

long mbMillis = 0L;
long mapSlotMillis = this.jobDetails.getMapSlotMillis();
long reduceSlotMillis = this.jobDetails.getReduceSlotMillis();

/* in case of older versions of hadoop2
* the counter of mb millis is not available
* then use slot millis counter value
*/
if (this.mapMbMillis == Constants.NOTFOUND_VALUE) {
this.mapMbMillis = (mapMb * mapSlotMillis);
}
if (this.reduceMbMillis == Constants.NOTFOUND_VALUE) {
this.reduceMbMillis = (reduceMb * reduceSlotMillis);
}

long mbMillis = 0L;
if (uberized) {
mbMillis = amMb * jobRunTime;
} else {
mbMillis = (mapMb * mapSlotMillis) + (reduceMb * reduceSlotMillis) + (amMb * jobRunTime);
mbMillis = this.mapMbMillis + this.reduceMbMillis + (amMb * jobRunTime);
}

LOG.debug("For " + jobKey.toString() + " " + Constants.MEGABYTEMILLIS + " is " + mbMillis
+ " since \n uberized: " + uberized + " \n " + "mapMb: " + mapMb + " mapSlotMillis: "
LOG.debug("For " + jobKey.toString() + "\n" + Constants.MEGABYTEMILLIS + " is " + mbMillis
+ " since \n uberized: " + uberized + " \n " + "mapMbMillis: " + mapMbMillis
+ " reduceMbMillis:" + reduceMbMillis + "mapMb: " + mapMb + " mapSlotMillis: "
+ mapSlotMillis + " \n " + " reduceMb: " + reduceMb + " reduceSlotMillis: "
+ reduceSlotMillis + " \n " + " amMb: " + amMb + " jobRunTime: " + jobRunTime
+ " start time: " + startTime + " endtime " + endTime);
Expand All @@ -902,4 +934,20 @@ public Long getMegaByteMillis() {
public JobDetails getJobDetails() {
return this.jobDetails;
}

public long getMapMbMillis() {
return mapMbMillis;
}

public void setMapMbMillis(long mapMbMillis) {
this.mapMbMillis = mapMbMillis;
}

public long getReduceMbMillis() {
return reduceMbMillis;
}

public void setReduceMbMillis(long reduceMbMillis) {
this.reduceMbMillis = reduceMbMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,19 +203,24 @@ public void testMissingSlotMillis() throws IOException {
Configuration jobConf = new Configuration();
jobConf.addResource(new Path(JOB_CONF_FILE_NAME));

JobHistoryFileParser historyFileParser =
JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf);
JobHistoryFileParserHadoop2 historyFileParser =
new JobHistoryFileParserHadoop2(jobConf);
assertNotNull(historyFileParser);

// confirm that we get back an object that can parse hadoop 2.0 files
assertTrue(historyFileParser instanceof JobHistoryFileParserHadoop2);
JobKey jobKey = new JobKey("cluster1", "user", "Sleep", 1, "job_1329348432655_0001");
historyFileParser.parse(contents, jobKey);

// this history file has only map slot millis no reduce millis
Long mapMbMillis = historyFileParser.getMapMbMillis();
assertNotNull(mapMbMillis);
assertEquals(mapMbMillis, new Long(178169856L));
Long reduceMbMillis = historyFileParser.getReduceMbMillis();
assertNotNull(reduceMbMillis);
assertEquals(reduceMbMillis, Constants.NOTFOUND_VALUE);

Long mbMillis = historyFileParser.getMegaByteMillis();
assertNotNull(mbMillis);
Long expValue = 10402816L;
Long expValue = 188559872L;
assertEquals(expValue, mbMillis);
}

Expand Down Expand Up @@ -307,4 +312,5 @@ public void testVersion2_4() throws IOException {
// ensure that we got the hadoop2 version put
assertTrue(foundQueue);
}

}
Loading

0 comments on commit 5771b47

Please sign in to comment.