Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Commit

Permalink
Merge 34f6e78 into 0dd1bef
Browse files Browse the repository at this point in the history
  • Loading branch information
Vrushali Channapattan committed Nov 7, 2014
2 parents 0dd1bef + 34f6e78 commit 259e451
Show file tree
Hide file tree
Showing 35 changed files with 607 additions and 114 deletions.
4 changes: 2 additions & 2 deletions hraven-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
<parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven-assembly</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<name>hRaven-assembly</name>
<description>hRaven - Assembly artifacts</description>
<packaging>pom</packaging>
Expand Down
4 changes: 2 additions & 2 deletions hraven-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
<parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven-core</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<name>hRaven - core</name>
<description>Core components of hRaven, including model classes and data access layer</description>

Expand Down
10 changes: 8 additions & 2 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ public class Constants {
}

/**
* Regex to parse a job file name. First back-ref is JT name, second one if
* Regex to parse a job file name.
* It could be a job history file from a mapreduce job or a spark job
* First back-ref is JT name, second one if
* job-id
* <p>
* For example,
Expand All @@ -372,8 +374,12 @@ public class Constants {
* job_201306192120_0003_1371677828795_hadoop_conf.xml
* in hadoop 2.0, job history file names are named as
* job_1374258111572_0003-1374260622449-userName1-TeraGen-1374260635219-2-0-SUCCEEDED-default.jhist
* in spark, the file name looks like:
* spark_1413515656084_3051855
* and the spark conf file name:
* spark_1413515656084_3051855_conf.xml
*/
public static final String JOB_FILENAME_PATTERN_REGEX = ".*(job_[0-9]*_[0-9]*)(-|_)([0-9]*[aA-zZ]*)*(.*)$";
public static final String JOB_FILENAME_PATTERN_REGEX = ".*?((job|spark)_[0-9]*_[0-9]*)(-|_)*([0-9]*[aA-zZ]*)*(.*)$";

// JobHistory file name parsing related
public static final String JOB_CONF_FILE_END = "(.*)(conf.xml)$";
Expand Down
16 changes: 8 additions & 8 deletions hraven-core/src/main/java/com/twitter/hraven/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public byte[] code() {
private String version ;

/** hadoop Version for this flow */
private HadoopVersion hadoopVersion ;
private HistoryFileType historyFileType ;

/** hadoop pool/queue for this flow */
private String queue ;
Expand Down Expand Up @@ -282,12 +282,12 @@ public void addJob(JobDetails job) {
if (( this.submitTime == 0L ) || (job.getSubmitTime() < this.submitTime)) {
this.submitTime = job.getSubmitTime();
// set the hadoop version once for this job
this.hadoopVersion = job.getHadoopVersion();
this.historyFileType = job.getHistoryFileType();
// set the queue/pool once for this flow
this.queue = job.getQueue();
if (this.hadoopVersion == null) {
if (this.historyFileType == null) {
// set it to default so that we don't run into NPE during json serialization
this.hadoopVersion = HadoopVersion.ONE;
this.historyFileType = HistoryFileType.ONE;
}
}

Expand Down Expand Up @@ -430,12 +430,12 @@ public void setCost(double cost) {
this.cost = cost;
}

public HadoopVersion getHadoopVersion() {
return hadoopVersion;
public HistoryFileType getHistoryFileType() {
return historyFileType;
}

public void setHadoopVersion(HadoopVersion hadoopVersion) {
this.hadoopVersion = hadoopVersion;
public void setHistoryFileType(HistoryFileType hadoopVersion) {
this.historyFileType = hadoopVersion;
}

public long getReduceShuffleBytes() {
Expand Down
4 changes: 4 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/Framework.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public enum Framework {
*
*/
SCALDING("s", "scalding"),
/**
* Identifies Spark applications
*/
SPARK("sp", "spark"),
/**
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@
/**
* hadoop versions
*/
public enum HadoopVersion {
ONE, TWO
public enum HistoryFileType {
ONE, TWO, SPARK
};
20 changes: 10 additions & 10 deletions hraven-core/src/main/java/com/twitter/hraven/JobDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class JobDetails implements Comparable<JobDetails> {
private String priority;
private String status;
private String version;
private HadoopVersion hadoopVersion;
private HistoryFileType historyFileType;
private String queue;
private long submitTime;
private long launchTime;
Expand Down Expand Up @@ -182,13 +182,13 @@ public void setVersion(String version) {
this.version = version;
}

public HadoopVersion getHadoopVersion() {
return hadoopVersion;
public HistoryFileType getHistoryFileType() {
return historyFileType;
}

public void setHadoopVersion(String hadoopVersion) {
public void setHistoryFileType(String historyFileType) {
// the enum.valueOf could throw a NPE or IllegalArgumentException
this.hadoopVersion = HadoopVersion.valueOf(hadoopVersion);
this.historyFileType = HistoryFileType.valueOf(historyFileType);
}

public long getSubmitTime() {
Expand Down Expand Up @@ -528,16 +528,16 @@ public static double getValueAsDouble(byte[] key, Map<byte[], byte[]> infoValues
* @param infoValues
* @return value as a enum or default of hadoop ONE
*/
private HadoopVersion getHadoopVersionFromResult(final JobHistoryKeys key,
private HistoryFileType getHistoryFileTypeFromResult(final JobHistoryKeys key,
final NavigableMap<byte[], byte[]> infoValues) {
byte[] value = infoValues.get(JobHistoryKeys.KEYS_TO_BYTES.get(key));
if (value != null) {
String hv = Bytes.toString(value);
// could throw an NPE or IllegalArgumentException
return HadoopVersion.valueOf(hv);
return HistoryFileType.valueOf(hv);
} else {
// default is hadoop 1
return HadoopVersion.ONE;
return HistoryFileType.ONE;
}
}

Expand Down Expand Up @@ -594,7 +594,7 @@ public void populate(Result result) {
this.jobName = getValueAsString(JobHistoryKeys.JOBNAME, infoValues);
this.priority = getValueAsString(JobHistoryKeys.JOB_PRIORITY, infoValues);
this.status = getValueAsString(JobHistoryKeys.JOB_STATUS, infoValues);
this.hadoopVersion = getHadoopVersionFromResult(JobHistoryKeys.hadoopversion, infoValues);
this.historyFileType = getHistoryFileTypeFromResult(JobHistoryKeys.hadoopversion, infoValues);
this.version = getValueAsString(Constants.VERSION_COLUMN_BYTES, infoValues);
this.cost = getValueAsDouble(Constants.JOBCOST_BYTES, infoValues);

Expand Down Expand Up @@ -623,7 +623,7 @@ public void populate(Result result) {

// populate stats from counters for this job based on
// hadoop version
if (this.hadoopVersion == HadoopVersion.TWO) {
if (this.historyFileType == HistoryFileType.TWO) {
// map file bytes read
this.mapFileBytesRead = getCounterValueAsLong(this.mapCounters,
Constants.FILESYSTEM_COUNTER_HADOOP2, Constants.FILES_BYTES_READ);
Expand Down
48 changes: 46 additions & 2 deletions hraven-core/src/main/java/com/twitter/hraven/JobId.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.twitter.hraven;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.CompareToBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.codehaus.jackson.annotate.JsonCreator;
Expand All @@ -27,6 +28,14 @@
*/
public class JobId implements Comparable<JobId> {
protected static final String JOB_ID_SEP = "_";
public static final String JOB_PREFIX = "job";
/**
* The prefix of the job id which could be
* "" for a mapreduce job:
* - to maintain backward compatibility with existing data
* "spark" for a spark job
*/
protected String jobPrefix;
/**
* The jobtracker start time from the job ID, obtained from parsing the
* center component of the job ID.
Expand All @@ -43,27 +52,58 @@ public JobId(@JsonProperty("jobIdString") String jobId) {
if (jobId != null) {
String[] elts = jobId.trim().split(JOB_ID_SEP);
try {
this.jobPrefix = elts[0];
if(JOB_PREFIX.equalsIgnoreCase(this.jobPrefix)) {
// set it to blank since existing data
// is being stored without the "job" prefix
this.jobPrefix = "";
}
this.jobEpoch = Long.parseLong(elts[1]);
this.jobSequence = Long.parseLong(elts[2]);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid job ID '"+jobId+
"', must be in the format 'job_[0-9]+_[0-9]+'");
"', must be in the format '(job|spark)_[0-9]+_[0-9]+'");
}
} else {
this.jobPrefix = "";
this.jobEpoch = 0L;
this.jobSequence = 0L;
}
}

public JobId(long epoch, long seq) {
this.jobEpoch = epoch;
this.jobSequence = seq;
// set default prefix of job to be blank
this.jobPrefix = "";
}

public JobId(String jobPrefix, long epoch, long seq) {
if(JOB_PREFIX.equalsIgnoreCase(jobPrefix)) {
// set it to blank since existing data
// is being stored without the "job" prefix
this.jobPrefix = "";
}
this.jobPrefix = jobPrefix;
this.jobEpoch = epoch;
this.jobSequence = seq;
}

public JobId(JobId idToCopy) {
if (idToCopy != null) {
this.jobPrefix = idToCopy.getJobPrefix();
this.jobEpoch = idToCopy.getJobEpoch();
this.jobSequence = idToCopy.getJobSequence();
}
}

/**
* Returns the prefix part of the job id
*/
public String getJobPrefix() {
return this.jobPrefix;
}

/**
* Returns the epoch value from the job ID. The epoch value is generated by simply
* parsing the date formatted jobtracker start time as a long value.
Expand All @@ -86,7 +126,11 @@ public long getJobSequence() {
}

public String getJobIdString() {
return String.format("job_%d_%04d", this.jobEpoch, this.jobSequence);
String prefix = this.jobPrefix;
if(StringUtils.isBlank(prefix)) {
prefix = JOB_PREFIX;
}
return String.format("%s_%d_%04d", prefix, this.jobEpoch, this.jobSequence);
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.twitter.hraven.datasource;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;

import com.twitter.hraven.JobId;
Expand All @@ -24,8 +25,17 @@
public class JobIdConverter implements ByteConverter<JobId> {
@Override
public byte[] toBytes(JobId jobId) {
return Bytes.add(Bytes.toBytes(jobId.getJobEpoch()),
Bytes.toBytes(jobId.getJobSequence()));
String prefix = jobId.getJobPrefix();
if ((StringUtils.isNotBlank(prefix) && (JobId.JOB_PREFIX.equalsIgnoreCase(prefix)))
|| (StringUtils.isBlank(prefix))) {
// do not include "job" prefix in conversion
return Bytes.add(Bytes.toBytes(jobId.getJobEpoch()),
Bytes.toBytes(jobId.getJobSequence()));
} else {
return Bytes.add(Bytes.toBytes(jobId.getJobPrefix()),
Bytes.toBytes(jobId.getJobEpoch()),
Bytes.toBytes(jobId.getJobSequence()));
}
}

@Override
Expand All @@ -34,9 +44,17 @@ public JobId fromBytes(byte[] bytes) {
return null;
}

// expect a packed bytes encoding of [8 bytes epoch][8 bytes seq]
long epoch = Bytes.toLong(bytes, 0);
long seq = Bytes.toLong(bytes, 8);
return new JobId(epoch, seq);
if( bytes.length <= 16) {
// expect a packed bytes encoding of [8 bytes epoch][8 bytes seq]
long epoch = Bytes.toLong(bytes, 0);
long seq = Bytes.toLong(bytes, 8);
return new JobId(epoch, seq);
} else {
// expect a packed bytes encoding of [prefix][8 bytes epoch][8 bytes seq]
String prefix = Bytes.toString(bytes, 0, bytes.length-16);
long epoch = Bytes.toLong(bytes, bytes.length-16);
long seq = Bytes.toLong(bytes, bytes.length-8);
return new JobId(prefix, epoch, seq);
}
}
}
Loading

0 comments on commit 259e451

Please sign in to comment.