Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Commit

Permalink
Merge ae7087b into 533094d
Browse files Browse the repository at this point in the history
  • Loading branch information
Vrushali Channapattan committed Jan 17, 2015
2 parents 533094d + ae7087b commit 35af222
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 11 deletions.
4 changes: 4 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ public class Constants {
public static final String SLOTS_MILLIS_REDUCES = "SLOTS_MILLIS_REDUCES";
public static final String REDUCE_SHUFFLE_BYTES = "REDUCE_SHUFFLE_BYTES";

/** MB MILLIS counters in hadoop2 */
public static final String MB_MILLIS_MAPS = "MB_MILLIS_MAPS";
public static final String MB_MILLIS_REDUCES = "MB_MILLIS_REDUCES";

/** used to indicate how expensive a job is in terms of memory and time taken*/
public static final String MEGABYTEMILLIS = "megabytemillis" ;
public static final byte[] MEGABYTEMILLIS_BYTES = Bytes.toBytes(MEGABYTEMILLIS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public class JobHistoryFileParserHadoop2 extends JobHistoryFileParserBase {
private long mapSlotMillis = 0L;
private long reduceSlotMillis = 0L;

// initialize mb millis to NOTFOUND_VALUE
// since older versions of hadoop2 did not have this counter
private long mapMbMillis = Constants.NOTFOUND_VALUE;
private long reduceMbMillis = Constants.NOTFOUND_VALUE;

private long startTime = Constants.NOTFOUND_VALUE;
private long endTime = Constants.NOTFOUND_VALUE;
private static final String LAUNCH_TIME_KEY_STR = JobHistoryKeys.LAUNCH_TIME.toString();
Expand Down Expand Up @@ -690,7 +695,18 @@ private void populatePut(Put p, byte[] family, String key, String groupName, Str
byte[] groupPrefix = Bytes.add(counterPrefix, Bytes.toBytes(groupName), Constants.SEP_BYTES);
byte[] qualifier = Bytes.add(groupPrefix, Bytes.toBytes(counterName));

/**
/*
* store the map and reduce mb millis counter value
*/
if (Constants.JOB_COUNTER_HADOOP2.equals(groupName)) {
if (Constants.MB_MILLIS_MAPS.equals(counterName)) {
this.mapMbMillis = counterValue;
} else if (Constants.MB_MILLIS_REDUCES.equals(counterName)) {
this.reduceMbMillis = counterValue;
}
}

/*
* correct and populate map and reduce slot millis
*/
if ((Constants.SLOTS_MILLIS_MAPS.equals(counterName)) ||
Expand Down Expand Up @@ -861,19 +877,47 @@ public Long getMegaByteMillis() {
+ " since " + Constants.AM_MEMORY_MB_CONF_KEY + " not found!");
}

/* in case of older versions of hadoop2
* the counter of mb millis is not available
* then use slot millis counter value
*/
if (this.mapMbMillis == Constants.NOTFOUND_VALUE) {
this.mapMbMillis = (mapMb * mapSlotMillis);
}
if (this.reduceMbMillis == Constants.NOTFOUND_VALUE) {
this.reduceMbMillis = (reduceMb * reduceSlotMillis);
}

Long mbMillis = 0L;
if (uberized) {
mbMillis = amMb * jobRunTime;
} else {
mbMillis = (mapMb * mapSlotMillis) + (reduceMb * reduceSlotMillis) + (amMb * jobRunTime);
mbMillis = this.mapMbMillis + this.reduceMbMillis + (amMb * jobRunTime);
}

LOG.debug("For " + jobKey.toString() + " " + Constants.MEGABYTEMILLIS + " is " + mbMillis
+ " since \n uberized: " + uberized + " \n " + "mapMb: " + mapMb + " mapSlotMillis: "
LOG.debug("For " + jobKey.toString() + "\n" + Constants.MEGABYTEMILLIS + " is " + mbMillis
+ " since \n uberized: " + uberized + " \n " + "mapMbMillis: " + mapMbMillis
+ " reduceMbMillis:" + reduceMbMillis + "mapMb: " + mapMb + " mapSlotMillis: "
+ mapSlotMillis + " \n " + " reduceMb: " + reduceMb + " reduceSlotMillis: "
+ reduceSlotMillis + " \n " + " amMb: " + amMb + " jobRunTime: " + jobRunTime
+ " start time: " + this.startTime + " endtime " + this.endTime);

return mbMillis;
}

public long getMapMbMillis() {
return mapMbMillis;
}

public void setMapMbMillis(long mapMbMillis) {
this.mapMbMillis = mapMbMillis;
}

public long getReduceMbMillis() {
return reduceMbMillis;
}

public void setReduceMbMillis(long reduceMbMillis) {
this.reduceMbMillis = reduceMbMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,20 +205,24 @@ public void testMissingSlotMillis() throws IOException {
Configuration jobConf = new Configuration();
jobConf.addResource(new Path(JOB_CONF_FILE_NAME));

JobHistoryFileParser historyFileParser =
JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf,
HistoryFileType.TWO);
JobHistoryFileParserHadoop2 historyFileParser =
new JobHistoryFileParserHadoop2(jobConf);
assertNotNull(historyFileParser);

// confirm that we get back an object that can parse hadoop 2.0 files
assertTrue(historyFileParser instanceof JobHistoryFileParserHadoop2);
JobKey jobKey = new JobKey("cluster1", "user", "Sleep", 1, "job_1329348432655_0001");
historyFileParser.parse(contents, jobKey);

// this history file has only map slot millis no reduce millis
Long mapMbMillis = historyFileParser.getMapMbMillis();
assertNotNull(mapMbMillis);
assertEquals(mapMbMillis, new Long(178169856L));
Long reduceMbMillis = historyFileParser.getReduceMbMillis();
assertNotNull(reduceMbMillis);
assertEquals(reduceMbMillis, Constants.NOTFOUND_VALUE);

Long mbMillis = historyFileParser.getMegaByteMillis();
assertNotNull(mbMillis);
Long expValue = 10402816L;
Long expValue = 188559872L;
assertEquals(expValue, mbMillis);
}

Expand Down Expand Up @@ -340,4 +344,5 @@ public void testCreateJobHistoryFileParserJobError() throws IOException {
jobKeyConv.fromBytes(jobPuts.get(0).getRow()).toString());

}

}

0 comments on commit 35af222

Please sign in to comment.