Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Commit

Permalink
Merge a2dbc93 into 0dd1bef
Browse files Browse the repository at this point in the history
  • Loading branch information
Vrushali Channapattan committed Nov 11, 2014
2 parents 0dd1bef + a2dbc93 commit 5fb5417
Show file tree
Hide file tree
Showing 39 changed files with 754 additions and 128 deletions.
4 changes: 2 additions & 2 deletions hraven-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
<parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven-assembly</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<name>hRaven-assembly</name>
<description>hRaven - Assembly artifacts</description>
<packaging>pom</packaging>
Expand Down
4 changes: 2 additions & 2 deletions hraven-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
<parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven-core</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<name>hRaven - core</name>
<description>Core components of hRaven, including model classes and data access layer</description>

Expand Down
23 changes: 21 additions & 2 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,16 @@ public class Constants {
public static final int MAX_LONG_LENGTH = Long.toString(Long.MAX_VALUE)
.length();

/**
* length of run id in bytes in the job key
*/
public static final int RUN_ID_LENGTH_JOBKEY = 8;

/**
* length of sequence number in bytes in the job key
*/
public static final int SEQUENCE_NUM_LENGTH_JOBKEY = 8;

public static final String USER_CONF_KEY = "user.name";
public static final String USER_CONF_KEY_HADOOP2 = "mapreduce.job.user.name";

Expand Down Expand Up @@ -322,6 +332,9 @@ public class Constants {

public static final String MR_RUN_CONF_KEY = "mapred.app.submitted.timestamp";

public static final String FRAMEWORK_CONF_KEY = "mapreduce.framework.name";
public static final String FRAMEWORK_CONF_SPARK_VALUE = "spark";
public static final String SPARK_VERSION_CONF_KEY = "spark.signature";
/**
* hadoop2 memory mb for container size
* for map, reduce and AM containers
Expand Down Expand Up @@ -358,7 +371,9 @@ public class Constants {
}

/**
* Regex to parse a job file name. First back-ref is JT name, second one if
* Regex to parse a job file name.
* It could be a job history file from a mapreduce job or a spark job
* First back-ref is JT name, second one if
* job-id
* <p>
* For example,
Expand All @@ -372,8 +387,12 @@ public class Constants {
* job_201306192120_0003_1371677828795_hadoop_conf.xml
* in hadoop 2.0, job history file names are named as
* job_1374258111572_0003-1374260622449-userName1-TeraGen-1374260635219-2-0-SUCCEEDED-default.jhist
* in spark, the file name looks like:
* spark_1413515656084_3051855
* and the spark conf file name:
* spark_1413515656084_3051855_conf.xml
*/
public static final String JOB_FILENAME_PATTERN_REGEX = ".*(job_[0-9]*_[0-9]*)(-|_)([0-9]*[aA-zZ]*)*(.*)$";
public static final String JOB_FILENAME_PATTERN_REGEX = ".*?((job|spark)_[0-9]*_[0-9]*)(-|_)*([0-9]*[aA-zZ]*)*(.*)$";

// JobHistory file name parsing related
public static final String JOB_CONF_FILE_END = "(.*)(conf.xml)$";
Expand Down
20 changes: 10 additions & 10 deletions hraven-core/src/main/java/com/twitter/hraven/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public byte[] code() {
/** app Version for this flow */
private String version ;

/** hadoop Version for this flow */
private HadoopVersion hadoopVersion ;
/** hadoop history file type for this flow */
private HistoryFileType historyFileType ;

/** hadoop pool/queue for this flow */
private String queue ;
Expand Down Expand Up @@ -281,13 +281,13 @@ public void addJob(JobDetails job) {
// set the submit time of the flow to the submit time of the first job
if (( this.submitTime == 0L ) || (job.getSubmitTime() < this.submitTime)) {
this.submitTime = job.getSubmitTime();
// set the hadoop version once for this job
this.hadoopVersion = job.getHadoopVersion();
// set the hadoop history file type once for this job
this.historyFileType = job.getHistoryFileType();
// set the queue/pool once for this flow
this.queue = job.getQueue();
if (this.hadoopVersion == null) {
if (this.historyFileType == null) {
// set it to default so that we don't run into NPE during json serialization
this.hadoopVersion = HadoopVersion.ONE;
this.historyFileType = HistoryFileType.ONE;
}
}

Expand Down Expand Up @@ -430,12 +430,12 @@ public void setCost(double cost) {
this.cost = cost;
}

public HadoopVersion getHadoopVersion() {
return hadoopVersion;
public HistoryFileType getHistoryFileType() {
return historyFileType;
}

public void setHadoopVersion(HadoopVersion hadoopVersion) {
this.hadoopVersion = hadoopVersion;
public void setHistoryFileType(HistoryFileType hadoopVersion) {
this.historyFileType = hadoopVersion;
}

public long getReduceShuffleBytes() {
Expand Down
4 changes: 4 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/Framework.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public enum Framework {
*
*/
SCALDING("s", "scalding"),
/**
* Identifies Spark applications
*/
SPARK("sp", "spark"),
/**
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package com.twitter.hraven;

/**
* hadoop versions
* hadoop file type being processed in hRaven
* ONE: hadoop v1 file
* TWO: hadoop v2 file
* SPARK: spark history file
*/
public enum HadoopVersion {
ONE, TWO
public enum HistoryFileType {
ONE, TWO, SPARK
};
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class JobDescFactory {

private static final MRJobDescFactory MR_JOB_DESC_FACTORY = new MRJobDescFactory();
private static final PigJobDescFactory PIG_JOB_DESC_FACTORY = new PigJobDescFactory();
private static final SparkJobDescFactory SPARK_JOB_DESC_FACTORY = new SparkJobDescFactory();
private static final ScaldingJobDescFactory SCALDING_JOB_DESC_FACTORY =
new ScaldingJobDescFactory();

Expand All @@ -43,6 +44,8 @@ public static JobDescFactoryBase getFrameworkSpecificJobDescFactory(Configuratio
return PIG_JOB_DESC_FACTORY;
case SCALDING:
return SCALDING_JOB_DESC_FACTORY;
case SPARK:
return SPARK_JOB_DESC_FACTORY;
default:
return MR_JOB_DESC_FACTORY;
}
Expand Down Expand Up @@ -75,6 +78,10 @@ public static Framework getFramework(Configuration jobConf) {
} else {
String flowId = jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY);
if ((flowId == null) || (flowId.length() == 0)) {
if (Constants.FRAMEWORK_CONF_SPARK_VALUE.equals(
jobConf.get(Constants.FRAMEWORK_CONF_KEY))) {
return Framework.SPARK;
}
return Framework.NONE;
} else {
return Framework.SCALDING;
Expand Down
25 changes: 13 additions & 12 deletions hraven-core/src/main/java/com/twitter/hraven/JobDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class JobDetails implements Comparable<JobDetails> {
private String priority;
private String status;
private String version;
private HadoopVersion hadoopVersion;
private HistoryFileType historyFileType;
private String queue;
private long submitTime;
private long launchTime;
Expand Down Expand Up @@ -182,13 +182,13 @@ public void setVersion(String version) {
this.version = version;
}

public HadoopVersion getHadoopVersion() {
return hadoopVersion;
public HistoryFileType getHistoryFileType() {
return historyFileType;
}

public void setHadoopVersion(String hadoopVersion) {
public void setHistoryFileType(String historyFileType) {
// the enum.valueOf could throw a NPE or IllegalArgumentException
this.hadoopVersion = HadoopVersion.valueOf(hadoopVersion);
this.historyFileType = HistoryFileType.valueOf(historyFileType);
}

public long getSubmitTime() {
Expand Down Expand Up @@ -523,21 +523,22 @@ public static double getValueAsDouble(byte[] key, Map<byte[], byte[]> infoValues
}

/**
* return an enum value from the NavigableMap for hadoop version
* return an enum value from the NavigableMap
* for history file type
* @param key
* @param infoValues
* @return value as a enum or default of hadoop ONE
*/
private HadoopVersion getHadoopVersionFromResult(final JobHistoryKeys key,
private HistoryFileType getHistoryFileTypeFromResult(final JobHistoryKeys key,
final NavigableMap<byte[], byte[]> infoValues) {
byte[] value = infoValues.get(JobHistoryKeys.KEYS_TO_BYTES.get(key));
if (value != null) {
String hv = Bytes.toString(value);
// could throw an NPE or IllegalArgumentException
return HadoopVersion.valueOf(hv);
return HistoryFileType.valueOf(hv);
} else {
// default is hadoop 1
return HadoopVersion.ONE;
return HistoryFileType.ONE;
}
}

Expand Down Expand Up @@ -594,7 +595,7 @@ public void populate(Result result) {
this.jobName = getValueAsString(JobHistoryKeys.JOBNAME, infoValues);
this.priority = getValueAsString(JobHistoryKeys.JOB_PRIORITY, infoValues);
this.status = getValueAsString(JobHistoryKeys.JOB_STATUS, infoValues);
this.hadoopVersion = getHadoopVersionFromResult(JobHistoryKeys.hadoopversion, infoValues);
this.historyFileType = getHistoryFileTypeFromResult(JobHistoryKeys.hadoopversion, infoValues);
this.version = getValueAsString(Constants.VERSION_COLUMN_BYTES, infoValues);
this.cost = getValueAsDouble(Constants.JOBCOST_BYTES, infoValues);

Expand Down Expand Up @@ -622,8 +623,8 @@ public void populate(Result result) {
infoValues);

// populate stats from counters for this job based on
// hadoop version
if (this.hadoopVersion == HadoopVersion.TWO) {
// history file type
if (this.historyFileType == HistoryFileType.TWO) {
// map file bytes read
this.mapFileBytesRead = getCounterValueAsLong(this.mapCounters,
Constants.FILESYSTEM_COUNTER_HADOOP2, Constants.FILES_BYTES_READ);
Expand Down
67 changes: 57 additions & 10 deletions hraven-core/src/main/java/com/twitter/hraven/JobId.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.twitter.hraven;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.CompareToBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.codehaus.jackson.annotate.JsonCreator;
Expand All @@ -27,6 +28,14 @@
*/
public class JobId implements Comparable<JobId> {
protected static final String JOB_ID_SEP = "_";
public static final String JOB_PREFIX = "job";
/**
* The prefix of the job id which could be
* "" for a mapreduce job:
* - to maintain backward compatibility with existing data
* "spark" for a spark job
*/
protected String jobPrefix;
/**
* The jobtracker start time from the job ID, obtained from parsing the
* center component of the job ID.
Expand All @@ -43,27 +52,59 @@ public JobId(@JsonProperty("jobIdString") String jobId) {
if (jobId != null) {
String[] elts = jobId.trim().split(JOB_ID_SEP);
try {
this.jobPrefix = elts[0];
if (JOB_PREFIX.equalsIgnoreCase(this.jobPrefix)) {
// set it to blank since existing data
// is being stored without the "job" prefix
this.jobPrefix = "";
}
this.jobEpoch = Long.parseLong(elts[1]);
this.jobSequence = Long.parseLong(elts[2]);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid job ID '"+jobId+
"', must be in the format 'job_[0-9]+_[0-9]+'");
throw new IllegalArgumentException("Invalid job ID '" + jobId
+ "', must be in the format '(job|spark)_[0-9]+_[0-9]+'");
}
} else {
this.jobPrefix = "";
this.jobEpoch = 0L;
this.jobSequence = 0L;
}
}

public JobId(long epoch, long seq) {
this.jobEpoch = epoch;
this.jobSequence = seq;
// set default prefix of job to be blank
this.jobPrefix = "";
}

public JobId(String jobPrefix, long epoch, long seq) {
if(JOB_PREFIX.equalsIgnoreCase(jobPrefix)) {
// set it to blank since existing data
// is being stored without the "job" prefix
this.jobPrefix = "";
} else {
this.jobPrefix = jobPrefix;
}
this.jobEpoch = epoch;
this.jobSequence = seq;
}

public JobId(JobId idToCopy) {
if (idToCopy != null) {
this.jobPrefix = idToCopy.getJobPrefix();
this.jobEpoch = idToCopy.getJobEpoch();
this.jobSequence = idToCopy.getJobSequence();
}
}

/**
* Returns the prefix part of the job id
*/
public String getJobPrefix() {
return this.jobPrefix;
}

/**
* Returns the epoch value from the job ID. The epoch value is generated by simply
* parsing the date formatted jobtracker start time as a long value.
Expand All @@ -86,7 +127,11 @@ public long getJobSequence() {
}

public String getJobIdString() {
return String.format("job_%d_%04d", this.jobEpoch, this.jobSequence);
String prefix = this.jobPrefix;
if(StringUtils.isBlank(prefix)) {
prefix = JOB_PREFIX;
}
return String.format("%s_%d_%04d", prefix, this.jobEpoch, this.jobSequence);
}

public String toString() {
Expand All @@ -95,17 +140,18 @@ public String toString() {

/**
* Compares two JobId objects on the basis of their
* jobPrefix (either blank or "spark")
* jobEpoch (jobtracker start time from the job ID)
* and
* jobSequence( jobtracker assigned sequence number for the job,)
*
* @param other
* @return 0 if this jobEpoch and jobSequence are equal to
* other jobEpoch and jobSequence,
* 1 if this jobEpoch and jobSequence are greater than
* other jobEpoch and jobSequence,
* -1 if this jobEpoch and jobSequence less than
* other jobEpoch and jobSequence
* @return 0 if this jobPrefix, jobEpoch and jobSequence are equal to
* other jobPrefix, jobEpoch and jobSequence,
* 1 if this jobPrefix, jobEpoch and jobSequence are greater than
* other jobPrefix, jobEpoch and jobSequence,
* -1 if this jobPrefix, jobEpoch and jobSequence less than
* other jobPrefix, jobEpoch and jobSequence
*
*/
@Override
Expand All @@ -114,8 +160,8 @@ public int compareTo(JobId o) {
// nulls sort last
return -1;
}

return new CompareToBuilder()
.append(this.jobPrefix, o.getJobPrefix())
.append(this.jobEpoch, o.getJobEpoch())
.append(this.jobSequence, o.getJobSequence())
.toComparison();
Expand All @@ -132,6 +178,7 @@ public boolean equals(Object other) {
@Override
public int hashCode(){
return new HashCodeBuilder()
.append(this.jobPrefix)
.append(this.jobEpoch)
.append(this.jobSequence)
.toHashCode();
Expand Down
Loading

0 comments on commit 5fb5417

Please sign in to comment.