Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Commit

Permalink
Merge 2680bc3 into 0dd1bef
Browse files Browse the repository at this point in the history
  • Loading branch information
Vrushali Channapattan committed Nov 20, 2014
2 parents 0dd1bef + 2680bc3 commit 9c1a03f
Show file tree
Hide file tree
Showing 46 changed files with 1,240 additions and 186 deletions.
4 changes: 2 additions & 2 deletions hraven-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
<parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven-assembly</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<name>hRaven-assembly</name>
<description>hRaven - Assembly artifacts</description>
<packaging>pom</packaging>
Expand Down
4 changes: 2 additions & 2 deletions hraven-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
<parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven-core</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<name>hRaven - core</name>
<description>Core components of hRaven, including model classes and data access layer</description>

Expand Down
42 changes: 40 additions & 2 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,17 @@ public class Constants {
public static final byte[] SUBMIT_TIME_PREFIX_HADOOP2_BYTES = Bytes
.toBytes(SUBMIT_TIME_PREFIX_HADOOP2);

/**
* The string representing the submit time in a spark job history file.
*/
public static final String SPARK_SUBMIT_TIME = "\"submit_time\":";

/**
* Raw bytes representation of {@link #SPARK_SUBMIT_TIME};
*/
public static final byte[] SPARK_SUBMIT_TIME_BYTES = Bytes
.toBytes(SPARK_SUBMIT_TIME);

/**
* length of string that contains a unix timestamp in milliseconds
* this length will be correct till Sat, 20 Nov 2286 which is
Expand All @@ -267,6 +278,16 @@ public class Constants {
public static final int MAX_LONG_LENGTH = Long.toString(Long.MAX_VALUE)
.length();

/**
* length of run id in bytes in the job key
*/
public static final int RUN_ID_LENGTH_JOBKEY = 8;

/**
* length of sequence number in bytes in the job key
*/
public static final int SEQUENCE_NUM_LENGTH_JOBKEY = 8;

public static final String USER_CONF_KEY = "user.name";
public static final String USER_CONF_KEY_HADOOP2 = "mapreduce.job.user.name";

Expand Down Expand Up @@ -322,6 +343,9 @@ public class Constants {

public static final String MR_RUN_CONF_KEY = "mapred.app.submitted.timestamp";

public static final String FRAMEWORK_CONF_KEY = "mapreduce.framework.name";
public static final String FRAMEWORK_CONF_SPARK_VALUE = "spark";
public static final String SPARK_VERSION_CONF_KEY = "spark.signature";
/**
* hadoop2 memory mb for container size
* for map, reduce and AM containers
Expand Down Expand Up @@ -358,7 +382,9 @@ public class Constants {
}

/**
* Regex to parse a job file name. First back-ref is JT name, second one if
* Regex to parse a job file name.
* It could be a job history file from a mapreduce job or a spark job
* First back-ref is JT name, second one if
* job-id
* <p>
* For example,
Expand All @@ -372,8 +398,12 @@ public class Constants {
* job_201306192120_0003_1371677828795_hadoop_conf.xml
* in hadoop 2.0, job history file names are named as
* job_1374258111572_0003-1374260622449-userName1-TeraGen-1374260635219-2-0-SUCCEEDED-default.jhist
* in spark, the file name looks like:
* spark_1413515656084_3051855
* and the spark conf file name:
* spark_1413515656084_3051855_conf.xml
*/
public static final String JOB_FILENAME_PATTERN_REGEX = ".*(job_[0-9]*_[0-9]*)(-|_)([0-9]*[aA-zZ]*)*(.*)$";
public static final String JOB_FILENAME_PATTERN_REGEX = ".*?((job|spark)_[0-9]*_[0-9]*)(-|_)*([0-9]*[aA-zZ]*)*(.*)$";

// JobHistory file name parsing related
public static final String JOB_CONF_FILE_END = "(.*)(conf.xml)$";
Expand Down Expand Up @@ -426,4 +456,12 @@ public class Constants {

/** name of the properties file used for cluster to cluster identifier mapping */
public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties";

/** spark job keys have a prefix of "spark"
* hence spark job key length is calculated as
* 5
* +
* regular job key length which is 16 (epoch and sequence number)
*/
public static final int SPARK_JOB_KEY_LENGTH = 21;
}
20 changes: 10 additions & 10 deletions hraven-core/src/main/java/com/twitter/hraven/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public byte[] code() {
/** app Version for this flow */
private String version ;

/** hadoop Version for this flow */
private HadoopVersion hadoopVersion ;
/** hadoop history file type for this flow */
private HistoryFileType historyFileType ;

/** hadoop pool/queue for this flow */
private String queue ;
Expand Down Expand Up @@ -281,13 +281,13 @@ public void addJob(JobDetails job) {
// set the submit time of the flow to the submit time of the first job
if (( this.submitTime == 0L ) || (job.getSubmitTime() < this.submitTime)) {
this.submitTime = job.getSubmitTime();
// set the hadoop version once for this job
this.hadoopVersion = job.getHadoopVersion();
// set the hadoop history file type once for this job
this.historyFileType = job.getHistoryFileType();
// set the queue/pool once for this flow
this.queue = job.getQueue();
if (this.hadoopVersion == null) {
if (this.historyFileType == null) {
// set it to default so that we don't run into NPE during json serialization
this.hadoopVersion = HadoopVersion.ONE;
this.historyFileType = HistoryFileType.ONE;
}
}

Expand Down Expand Up @@ -430,12 +430,12 @@ public void setCost(double cost) {
this.cost = cost;
}

public HadoopVersion getHadoopVersion() {
return hadoopVersion;
public HistoryFileType getHistoryFileType() {
return historyFileType;
}

public void setHadoopVersion(HadoopVersion hadoopVersion) {
this.hadoopVersion = hadoopVersion;
public void setHistoryFileType(HistoryFileType hadoopFileType) {
this.historyFileType = hadoopFileType;
}

public long getReduceShuffleBytes() {
Expand Down
4 changes: 4 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/Framework.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public enum Framework {
*
*/
SCALDING("s", "scalding"),
/**
* Identifies Spark applications
*/
SPARK("sp", "spark"),
/**
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package com.twitter.hraven;

/**
* hadoop versions
* hadoop file type being processed in hRaven
* ONE: hadoop v1 file
* TWO: hadoop v2 file
* SPARK: spark history file
*/
public enum HadoopVersion {
ONE, TWO
public enum HistoryFileType {
ONE, TWO, SPARK
};
26 changes: 17 additions & 9 deletions hraven-core/src/main/java/com/twitter/hraven/JobDescFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class JobDescFactory {

private static final MRJobDescFactory MR_JOB_DESC_FACTORY = new MRJobDescFactory();
private static final PigJobDescFactory PIG_JOB_DESC_FACTORY = new PigJobDescFactory();
private static final SparkJobDescFactory SPARK_JOB_DESC_FACTORY = new SparkJobDescFactory();
private static final ScaldingJobDescFactory SCALDING_JOB_DESC_FACTORY =
new ScaldingJobDescFactory();

Expand All @@ -43,6 +44,8 @@ public static JobDescFactoryBase getFrameworkSpecificJobDescFactory(Configuratio
return PIG_JOB_DESC_FACTORY;
case SCALDING:
return SCALDING_JOB_DESC_FACTORY;
case SPARK:
return SPARK_JOB_DESC_FACTORY;
default:
return MR_JOB_DESC_FACTORY;
}
Expand All @@ -69,17 +72,22 @@ public static JobDesc createJobDesc(QualifiedJobId qualifiedJobId,
*/
public static Framework getFramework(Configuration jobConf) {
// Check if this is a pig job
boolean isPig = jobConf.get(Constants.PIG_CONF_KEY) != null;
if (isPig) {
if (jobConf.get(Constants.PIG_CONF_KEY) != null) {
return Framework.PIG;
} else {
String flowId = jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY);
if ((flowId == null) || (flowId.length() == 0)) {
return Framework.NONE;
} else {
return Framework.SCALDING;
}
}

if ((jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY) != null)
&& (jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY).length() != 0)) {
return Framework.SCALDING;
}

if (Constants.FRAMEWORK_CONF_SPARK_VALUE
.equalsIgnoreCase(jobConf.get(Constants.FRAMEWORK_CONF_KEY))) {
return Framework.SPARK;
}

return Framework.NONE;

}

/**
Expand Down
25 changes: 13 additions & 12 deletions hraven-core/src/main/java/com/twitter/hraven/JobDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class JobDetails implements Comparable<JobDetails> {
private String priority;
private String status;
private String version;
private HadoopVersion hadoopVersion;
private HistoryFileType historyFileType;
private String queue;
private long submitTime;
private long launchTime;
Expand Down Expand Up @@ -182,13 +182,13 @@ public void setVersion(String version) {
this.version = version;
}

public HadoopVersion getHadoopVersion() {
return hadoopVersion;
public HistoryFileType getHistoryFileType() {
return historyFileType;
}

public void setHadoopVersion(String hadoopVersion) {
public void setHistoryFileType(String historyFileType) {
// the enum.valueOf could throw a NPE or IllegalArgumentException
this.hadoopVersion = HadoopVersion.valueOf(hadoopVersion);
this.historyFileType = HistoryFileType.valueOf(historyFileType);
}

public long getSubmitTime() {
Expand Down Expand Up @@ -523,21 +523,22 @@ public static double getValueAsDouble(byte[] key, Map<byte[], byte[]> infoValues
}

/**
* return an enum value from the NavigableMap for hadoop version
* return an enum value from the NavigableMap
* for history file type
* @param key
* @param infoValues
* @return value as a enum or default of hadoop ONE
*/
private HadoopVersion getHadoopVersionFromResult(final JobHistoryKeys key,
private HistoryFileType getHistoryFileTypeFromResult(final JobHistoryKeys key,
final NavigableMap<byte[], byte[]> infoValues) {
byte[] value = infoValues.get(JobHistoryKeys.KEYS_TO_BYTES.get(key));
if (value != null) {
String hv = Bytes.toString(value);
// could throw an NPE or IllegalArgumentException
return HadoopVersion.valueOf(hv);
return HistoryFileType.valueOf(hv);
} else {
// default is hadoop 1
return HadoopVersion.ONE;
return HistoryFileType.ONE;
}
}

Expand Down Expand Up @@ -594,7 +595,7 @@ public void populate(Result result) {
this.jobName = getValueAsString(JobHistoryKeys.JOBNAME, infoValues);
this.priority = getValueAsString(JobHistoryKeys.JOB_PRIORITY, infoValues);
this.status = getValueAsString(JobHistoryKeys.JOB_STATUS, infoValues);
this.hadoopVersion = getHadoopVersionFromResult(JobHistoryKeys.hadoopversion, infoValues);
this.historyFileType = getHistoryFileTypeFromResult(JobHistoryKeys.hadoopversion, infoValues);
this.version = getValueAsString(Constants.VERSION_COLUMN_BYTES, infoValues);
this.cost = getValueAsDouble(Constants.JOBCOST_BYTES, infoValues);

Expand Down Expand Up @@ -622,8 +623,8 @@ public void populate(Result result) {
infoValues);

// populate stats from counters for this job based on
// hadoop version
if (this.hadoopVersion == HadoopVersion.TWO) {
// history file type
if (this.historyFileType == HistoryFileType.TWO) {
// map file bytes read
this.mapFileBytesRead = getCounterValueAsLong(this.mapCounters,
Constants.FILESYSTEM_COUNTER_HADOOP2, Constants.FILES_BYTES_READ);
Expand Down
Loading

0 comments on commit 9c1a03f

Please sign in to comment.