Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Commit

Permalink
Adding in the processing step and unit tests, changes to JobDescFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
Vrushali Channapattan committed Nov 18, 2014
1 parent a2dbc93 commit ed6f6c3
Show file tree
Hide file tree
Showing 22 changed files with 528 additions and 103 deletions.
13 changes: 13 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,17 @@ public class Constants {
public static final byte[] SUBMIT_TIME_PREFIX_HADOOP2_BYTES = Bytes
.toBytes(SUBMIT_TIME_PREFIX_HADOOP2);

/**
* The string representing the submit time in a spark job history file.
*/
public static final String SPARK_SUBMIT_TIME = "\"submit_time\":";

/**
* Raw bytes representation of {@link #SPARK_SUBMIT_TIME};
*/
public static final byte[] SPARK_SUBMIT_TIME_BYTES = Bytes
.toBytes(SPARK_SUBMIT_TIME);

/**
* length of string that contains a unix timestamp in milliseconds
* this length will be correct till Sat, 20 Nov 2286 which is
Expand Down Expand Up @@ -445,4 +456,6 @@ public class Constants {

/** name of the properties file used for cluster to cluster identifier mapping */
public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties";

public static final int SPARK_JOB_KEY_LENGTH = 21;
}
30 changes: 17 additions & 13 deletions hraven-core/src/main/java/com/twitter/hraven/JobDescFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.twitter.hraven;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;

/**
Expand Down Expand Up @@ -72,21 +74,23 @@ public static JobDesc createJobDesc(QualifiedJobId qualifiedJobId,
*/
public static Framework getFramework(Configuration jobConf) {
// Check if this is a pig job
boolean isPig = jobConf.get(Constants.PIG_CONF_KEY) != null;
if (isPig) {
if (jobConf.get(Constants.PIG_CONF_KEY) != null) {
return Framework.PIG;
} else {
String flowId = jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY);
if ((flowId == null) || (flowId.length() == 0)) {
if (Constants.FRAMEWORK_CONF_SPARK_VALUE.equals(
jobConf.get(Constants.FRAMEWORK_CONF_KEY))) {
return Framework.SPARK;
}
return Framework.NONE;
} else {
return Framework.SCALDING;
}
}

if ((jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY) != null)
&& (jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY).length() != 0)) {
return Framework.SCALDING;
}

if ((jobConf.get(Constants.FRAMEWORK_CONF_KEY) != null)
&& (Constants.FRAMEWORK_CONF_SPARK_VALUE
.equalsIgnoreCase(jobConf.get(Constants.FRAMEWORK_CONF_KEY)))) {
return Framework.SPARK;
}

return Framework.NONE;

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,8 @@ public String getCluster() {
return cluster;
}

@Override
public String toString() {
return this.cluster + Constants.SEP + this.getJobIdString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,15 @@ public Scan getHistoryRawTableScan(String cluster, String minJobId,
InclusiveStopFilter inclusiveStopFilter = new InclusiveStopFilter(lastRow);
filters.addFilter(inclusiveStopFilter);
LOG.info("Stopping raw table scan (stop filter) at "
+ Bytes.toStringBinary(lastRow) + " " + idConv.fromBytes(lastRow));
+ Bytes.toStringBinary(lastRow) + " " + idConv.fromBytes(lastRow).toString());

// Add one to the jobSequence of the maximum JobId.
JobId maximumJobId = new JobId(maxJobId);
JobId oneBiggerThanMaxJobId = new JobId(maximumJobId.getJobEpoch(),
JobId oneBiggerThanMaxJobId = new JobId(maximumJobId.getJobPrefix(),
maximumJobId.getJobEpoch(),
maximumJobId.getJobSequence() + 1);
stopRow = idConv.toBytes(new QualifiedJobId(cluster,
oneBiggerThanMaxJobId));

} else {
char oneBiggerSep = (char) (Constants.SEP_CHAR + 1);
stopRow = Bytes.toBytes(cluster + oneBiggerSep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public class JobIdConverter implements ByteConverter<JobId> {
@Override
public byte[] toBytes(JobId jobId) {
String prefix = jobId.getJobPrefix();
if ((StringUtils.isNotBlank(prefix) && (JobId.JOB_PREFIX.equalsIgnoreCase(prefix)))
|| (StringUtils.isBlank(prefix))) {
if (StringUtils.isBlank(prefix)) {
// do not include "job" prefix in conversion
return Bytes.add(Bytes.toBytes(jobId.getJobEpoch()),
Bytes.toBytes(jobId.getJobSequence()));
Expand All @@ -51,13 +50,13 @@ public JobId fromBytes(byte[] bytes) {
if (bytes.length <= packedBytesEpochSeqSize) {
// expect a packed bytes encoding of [8 bytes epoch][8 bytes seq]
long epoch = Bytes.toLong(bytes, 0);
long seq = Bytes.toLong(bytes, Constants.RUN_ID_LENGTH_JOBKEY);
long seq = Bytes.toLong(bytes, Constants.SEQUENCE_NUM_LENGTH_JOBKEY);
return new JobId(epoch, seq);
} else {
// expect a packed bytes encoding of [prefix][8 bytes epoch][8 bytes seq]
String prefix = Bytes.toString(bytes, 0, bytes.length - packedBytesEpochSeqSize);
long epoch = Bytes.toLong(bytes, bytes.length - packedBytesEpochSeqSize);
long seq = Bytes.toLong(bytes, bytes.length - Constants.SEQUENCE_NUM_LENGTH_JOBKEY);
String prefix = Bytes.toString(bytes, 0, (bytes.length - packedBytesEpochSeqSize));
long epoch = Bytes.toLong(bytes, prefix.length());
long seq = Bytes.toLong(bytes, (bytes.length - Constants.SEQUENCE_NUM_LENGTH_JOBKEY));
return new JobId(prefix, epoch, seq);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.twitter.hraven.datasource;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;

import com.twitter.hraven.Constants;
Expand Down Expand Up @@ -150,19 +151,17 @@ private static byte[] extractRemainder(int offset, byte[] remainder) {
}

private static int getLengthJobIdPackedBytes(int offset, byte[] remainder) {

int lengthRest = remainder.length - offset ;
byte[] jobIdOtherStuff = null;
if (lengthRest > offset) {
jobIdOtherStuff = ByteUtil.safeCopy(remainder, offset,
remainder.length-offset);
} else {
jobIdOtherStuff = new byte[0];
if (StringUtils.startsWith(Bytes.toString(jobIdOtherStuff), Constants.FRAMEWORK_CONF_SPARK_VALUE)) {
return Constants.SPARK_JOB_KEY_LENGTH;
}
}
byte[][] splitRunIdJobIdExtra = ByteUtil.split(jobIdOtherStuff,
Constants.SEP_BYTES);
int lengthJobId = (splitRunIdJobIdExtra.length >= 1 ?
splitRunIdJobIdExtra[0].length : 0);
return lengthJobId;
return (Constants.SEQUENCE_NUM_LENGTH_JOBKEY + Constants.RUN_ID_LENGTH_JOBKEY);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.junit.Test;

/**
* test class for hadoop versions
* test class for hadoop history file type
*/
public class TestHistoryFileType {

Expand All @@ -32,15 +32,15 @@ private enum ExpHistoryFileType {
}

@Test
public void checkVersions() {
public void checkHistoryFileType() {
assertEquals(ExpHistoryFileType.values().length, HistoryFileType.values().length);
for (HistoryFileType hv : HistoryFileType.values()) {
assertTrue(ExpHistoryFileType.valueOf(hv.toString()) != null);
}
}

@Test(expected=IllegalArgumentException.class)
public void testNonExistentVersion() {
public void testNonExistentHistoryFileType() {
assertNull(HistoryFileType.valueOf("DOES NOT EXIST"));
}
};
9 changes: 9 additions & 0 deletions hraven-core/src/test/java/com/twitter/hraven/TestJobId.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void testSerialization() {
String job4 = "job_"+epoch+"_11111";
String job5 = "spark_"+epoch+"_11112";
String job6 = "spark_"+epoch+"_11113";
String job7 = "spark_1413515656084_305185";

JobId jobId1 = new JobId(job1);
assertEquals(epoch, jobId1.getJobEpoch());
Expand Down Expand Up @@ -80,6 +81,11 @@ public void testSerialization() {
// check Comparable implementation
assertTrue(jobId5.compareTo(jobId6) < 0);

JobId jobId7 = new JobId(job7);
assertEquals(1413515656084L, jobId7.getJobEpoch());
assertEquals(305185L, jobId7.getJobSequence());
assertEquals(job7, jobId7.getJobIdString());

JobIdConverter conv = new JobIdConverter();
JobId tmp = conv.fromBytes( conv.toBytes(jobId1) );
assertEquals(jobId1, tmp);
Expand All @@ -100,6 +106,9 @@ public void testSerialization() {
tmp = conv.fromBytes( conv.toBytes(jobId6) );
assertEquals(jobId6, tmp);
assertEquals(jobId6.hashCode(), tmp.hashCode());
tmp = conv.fromBytes( conv.toBytes(jobId7) );
assertEquals(jobId7, tmp);
assertEquals(jobId7.hashCode(), tmp.hashCode());
}

/**
Expand Down
25 changes: 22 additions & 3 deletions hraven-core/src/test/java/com/twitter/hraven/TestJobKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@ public void testKeySerialization() {
@Test
public void testKeySerializationSpark() {
JobKeyConverter conv = new JobKeyConverter();
JobKey key = new JobKey("clusterS1@identifierS1", "userS1", "appSpark1",
13, "spark_1413515656084_3051855");
JobKey key = new JobKey("clusterS1@identifierS1",
"userS1",
"appSpark1",
13,
"spark_1413515656084_3051855");
byte[] keyBytes = conv.toBytes(key);
JobKey key2 = conv.fromBytes(keyBytes);
assertEquals(key.getCluster(), key2.getCluster());
Expand All @@ -115,7 +118,23 @@ public void testKeySerializationSpark() {
keyBytes = conv.toBytes(key);
key2 = conv.fromBytes(keyBytes);
assertKey(key, key2);
}

JobKey jobKey = new JobKey("cluster1",
"user1",
"com.example.spark_program.simple_example.Main$",
13,
// this job id has the seperator in its byte representation
// hence check for this id
"spark_1413515656084_305185");
keyBytes = conv.toBytes(jobKey);
key2 = conv.fromBytes(keyBytes);
assertEquals(jobKey.getCluster(), key2.getCluster());
assertEquals(jobKey.getUserName(), key2.getUserName());
assertEquals(jobKey.getAppId(), key2.getAppId());
assertEquals(jobKey.getRunId(), key2.getRunId());
assertEquals(jobKey.getJobId(), key2.getJobId());

}

@Test
public void checkOldFormatKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ public static Long getXmxValue(String javaChildOptsStr) {
retVal /= (1024 * 1024);
}
} catch (NumberFormatException nfe) {
LOG.error("Unable to get the Xmx value from " + javaChildOptsStr);
nfe.printStackTrace();
LOG.error("Unable to get the Xmx value from " + javaChildOptsStr, nfe);
throw new ProcessingException("Unable to get the Xmx value from " + javaChildOptsStr, nfe);
}
return retVal;
Expand All @@ -160,16 +159,19 @@ public static Long getXmxTotal(final long xmx75) {
* @return the job submit time in milliseconds since January 1, 1970 UTC;
* or 0 if no value can be found.
*/
public static long getSubmitTimeMillisFromJobHistory(byte[] jobHistoryRaw) {
public static long getSubmitTimeMillisFromJobHistory(HistoryFileType historyFileType,
byte[] jobHistoryRaw) {

if (null == historyFileType) {
throw new IllegalArgumentException("History file type can't be null ");
}

long submitTimeMillis = 0;
if (null == jobHistoryRaw) {
return submitTimeMillis;
}

HistoryFileType hv = JobHistoryFileParserFactory.getVersion(jobHistoryRaw);

switch (hv) {
switch (historyFileType) {
case TWO:
// look for the job submitted event, since that has the job submit time
int startIndex = ByteUtil.indexOf(jobHistoryRaw, Constants.JOB_SUBMIT_EVENT_BYTES, 0);
Expand All @@ -185,7 +187,8 @@ public static long getSubmitTimeMillisFromJobHistory(byte[] jobHistoryRaw) {
try {
submitTimeMillis = Long.parseLong(submitTimeMillisString);
} catch (NumberFormatException nfe) {
LOG.error(" caught NFE during conversion of submit time " + submitTimeMillisString + " " + nfe.getMessage());
LOG.error(" caught NFE during conversion of submit time "
+ submitTimeMillisString + " ", nfe);
submitTimeMillis = 0;
}
}
Expand Down Expand Up @@ -218,12 +221,31 @@ public static long getSubmitTimeMillisFromJobHistory(byte[] jobHistoryRaw) {
submitTimeMillis = Long.parseLong(submitTimeMillisString);
} catch (NumberFormatException nfe) {
LOG.error(" caught NFE during conversion of submit time " + submitTimeMillisString
+ " " + nfe.getMessage());
+ " ", nfe);
submitTimeMillis = 0;
}
}
}
break;

case SPARK:
// look for the spark submit time key
startIndex = ByteUtil.indexOf(jobHistoryRaw,
Constants.SPARK_SUBMIT_TIME_BYTES, 0);
if (startIndex != -1) {
// read the string that contains the unix timestamp
String submitTimeMillisString = Bytes.toString(jobHistoryRaw,
startIndex + Constants.SPARK_SUBMIT_TIME_BYTES.length,
Constants.EPOCH_TIMESTAMP_STRING_LENGTH);
try {
submitTimeMillis = Long.parseLong(submitTimeMillisString);
} catch (NumberFormatException nfe) {
LOG.error(" caught NFE during conversion of submit time "
+ submitTimeMillisString + " ", nfe);
submitTimeMillis = 0;
}
}
break;
}

return submitTimeMillis;
Expand Down
Loading

0 comments on commit ed6f6c3

Please sign in to comment.