diff --git a/hraven-assembly/pom.xml b/hraven-assembly/pom.xml
index 5aa0291..3466847 100644
--- a/hraven-assembly/pom.xml
+++ b/hraven-assembly/pom.xml
@@ -17,12 +17,12 @@
com.twitter.hraven
hraven
- 0.9.17-SNAPSHOT
+ 0.9.17.t01-SNAPSHOT
../
com.twitter.hraven
hraven-assembly
- 0.9.17-SNAPSHOT
+ 0.9.17.t01-SNAPSHOT
hRaven-assembly
hRaven - Assembly artifacts
pom
diff --git a/hraven-core/pom.xml b/hraven-core/pom.xml
index e57e992..8ddeb73 100644
--- a/hraven-core/pom.xml
+++ b/hraven-core/pom.xml
@@ -21,12 +21,12 @@
com.twitter.hraven
hraven
- 0.9.17-SNAPSHOT
+ 0.9.17.t01-SNAPSHOT
../
com.twitter.hraven
hraven-core
- 0.9.17-SNAPSHOT
+ 0.9.17.t01-SNAPSHOT
hRaven - core
Core components of hRaven, including model classes and data access layer
diff --git a/hraven-core/src/main/java/com/twitter/hraven/Constants.java b/hraven-core/src/main/java/com/twitter/hraven/Constants.java
index 9e8653a..d028c91 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/Constants.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/Constants.java
@@ -242,6 +242,17 @@ public class Constants {
public static final byte[] SUBMIT_TIME_PREFIX_HADOOP2_BYTES = Bytes
.toBytes(SUBMIT_TIME_PREFIX_HADOOP2);
+ /**
+ * The string representing the submit time in a spark job history file.
+ */
+ public static final String SPARK_SUBMIT_TIME = "\"submit_time\":";
+
+ /**
+ * Raw bytes representation of {@link #SPARK_SUBMIT_TIME};
+ */
+ public static final byte[] SPARK_SUBMIT_TIME_BYTES = Bytes
+ .toBytes(SPARK_SUBMIT_TIME);
+
/**
* length of string that contains a unix timestamp in milliseconds
* this length will be correct till Sat, 20 Nov 2286 which is
@@ -267,6 +278,16 @@ public class Constants {
public static final int MAX_LONG_LENGTH = Long.toString(Long.MAX_VALUE)
.length();
+ /**
+ * length of run id in bytes in the job key
+ */
+ public static final int RUN_ID_LENGTH_JOBKEY = 8;
+
+ /**
+ * length of sequence number in bytes in the job key
+ */
+ public static final int SEQUENCE_NUM_LENGTH_JOBKEY = 8;
+
public static final String USER_CONF_KEY = "user.name";
public static final String USER_CONF_KEY_HADOOP2 = "mapreduce.job.user.name";
@@ -322,6 +343,9 @@ public class Constants {
public static final String MR_RUN_CONF_KEY = "mapred.app.submitted.timestamp";
+ public static final String FRAMEWORK_CONF_KEY = "mapreduce.framework.name";
+ public static final String FRAMEWORK_CONF_SPARK_VALUE = "spark";
+ public static final String SPARK_VERSION_CONF_KEY = "spark.signature";
/**
* hadoop2 memory mb for container size
* for map, reduce and AM containers
@@ -358,7 +382,9 @@ public class Constants {
}
/**
- * Regex to parse a job file name. First back-ref is JT name, second one if
+ * Regex to parse a job file name.
+ * It could be a job history file from a mapreduce job or a spark job
+ * First back-ref is JT name, second one if
* job-id
*
* For example,
@@ -372,8 +398,12 @@ public class Constants {
* job_201306192120_0003_1371677828795_hadoop_conf.xml
* in hadoop 2.0, job history file names are named as
* job_1374258111572_0003-1374260622449-userName1-TeraGen-1374260635219-2-0-SUCCEEDED-default.jhist
+ * in spark, the file name looks like:
+ * spark_1413515656084_3051855
+ * and the spark conf file name:
+ * spark_1413515656084_3051855_conf.xml
*/
- public static final String JOB_FILENAME_PATTERN_REGEX = ".*(job_[0-9]*_[0-9]*)(-|_)([0-9]*[aA-zZ]*)*(.*)$";
+ public static final String JOB_FILENAME_PATTERN_REGEX = ".*?((job|spark)_[0-9]*_[0-9]*)(-|_)*([0-9]*[aA-zZ]*)*(.*)$";
// JobHistory file name parsing related
public static final String JOB_CONF_FILE_END = "(.*)(conf.xml)$";
@@ -426,4 +456,12 @@ public class Constants {
/** name of the properties file used for cluster to cluster identifier mapping */
public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties";
+
+ /** spark job keys have a prefix of "spark"
+ * hence spark job key length is calculated as
+ * 5
+ * +
+ * regular job key length which is 16 (epoch and sequence number)
+ */
+ public static final int SPARK_JOB_KEY_LENGTH = 21;
}
diff --git a/hraven-core/src/main/java/com/twitter/hraven/Flow.java b/hraven-core/src/main/java/com/twitter/hraven/Flow.java
index bed215c..f4f519d 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/Flow.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/Flow.java
@@ -149,8 +149,8 @@ public byte[] code() {
/** app Version for this flow */
private String version ;
- /** hadoop Version for this flow */
- private HadoopVersion hadoopVersion ;
+ /** hadoop history file type for this flow */
+ private HistoryFileType historyFileType ;
/** hadoop pool/queue for this flow */
private String queue ;
@@ -281,13 +281,13 @@ public void addJob(JobDetails job) {
// set the submit time of the flow to the submit time of the first job
if (( this.submitTime == 0L ) || (job.getSubmitTime() < this.submitTime)) {
this.submitTime = job.getSubmitTime();
- // set the hadoop version once for this job
- this.hadoopVersion = job.getHadoopVersion();
+ // set the hadoop history file type once for this job
+ this.historyFileType = job.getHistoryFileType();
// set the queue/pool once for this flow
this.queue = job.getQueue();
- if (this.hadoopVersion == null) {
+ if (this.historyFileType == null) {
// set it to default so that we don't run into NPE during json serialization
- this.hadoopVersion = HadoopVersion.ONE;
+ this.historyFileType = HistoryFileType.ONE;
}
}
@@ -430,12 +430,12 @@ public void setCost(double cost) {
this.cost = cost;
}
- public HadoopVersion getHadoopVersion() {
- return hadoopVersion;
+ public HistoryFileType getHistoryFileType() {
+ return historyFileType;
}
- public void setHadoopVersion(HadoopVersion hadoopVersion) {
- this.hadoopVersion = hadoopVersion;
+ public void setHistoryFileType(HistoryFileType hadoopFileType) {
+ this.historyFileType = hadoopFileType;
}
public long getReduceShuffleBytes() {
diff --git a/hraven-core/src/main/java/com/twitter/hraven/Framework.java b/hraven-core/src/main/java/com/twitter/hraven/Framework.java
index 29f00bb..1a288a5 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/Framework.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/Framework.java
@@ -28,6 +28,10 @@ public enum Framework {
*
*/
SCALDING("s", "scalding"),
+ /**
+ * Identifies Spark applications
+ */
+ SPARK("sp", "spark"),
/**
*
*/
diff --git a/hraven-core/src/main/java/com/twitter/hraven/HadoopVersion.java b/hraven-core/src/main/java/com/twitter/hraven/HistoryFileType.java
similarity index 77%
rename from hraven-core/src/main/java/com/twitter/hraven/HadoopVersion.java
rename to hraven-core/src/main/java/com/twitter/hraven/HistoryFileType.java
index 19b8cb6..9c85e3b 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/HadoopVersion.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/HistoryFileType.java
@@ -17,8 +17,11 @@
package com.twitter.hraven;
/**
- * hadoop versions
+ * hadoop file type being processed in hRaven
+ * ONE: hadoop v1 file
+ * TWO: hadoop v2 file
+ * SPARK: spark history file
*/
-public enum HadoopVersion {
- ONE, TWO
+public enum HistoryFileType {
+ ONE, TWO, SPARK
};
diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobDescFactory.java b/hraven-core/src/main/java/com/twitter/hraven/JobDescFactory.java
index 76ab46f..de33b85 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/JobDescFactory.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/JobDescFactory.java
@@ -27,6 +27,7 @@ public class JobDescFactory {
private static final MRJobDescFactory MR_JOB_DESC_FACTORY = new MRJobDescFactory();
private static final PigJobDescFactory PIG_JOB_DESC_FACTORY = new PigJobDescFactory();
+ private static final SparkJobDescFactory SPARK_JOB_DESC_FACTORY = new SparkJobDescFactory();
private static final ScaldingJobDescFactory SCALDING_JOB_DESC_FACTORY =
new ScaldingJobDescFactory();
@@ -43,6 +44,8 @@ public static JobDescFactoryBase getFrameworkSpecificJobDescFactory(Configuratio
return PIG_JOB_DESC_FACTORY;
case SCALDING:
return SCALDING_JOB_DESC_FACTORY;
+ case SPARK:
+ return SPARK_JOB_DESC_FACTORY;
default:
return MR_JOB_DESC_FACTORY;
}
@@ -69,17 +72,22 @@ public static JobDesc createJobDesc(QualifiedJobId qualifiedJobId,
*/
public static Framework getFramework(Configuration jobConf) {
// Check if this is a pig job
- boolean isPig = jobConf.get(Constants.PIG_CONF_KEY) != null;
- if (isPig) {
+ if (jobConf.get(Constants.PIG_CONF_KEY) != null) {
return Framework.PIG;
- } else {
- String flowId = jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY);
- if ((flowId == null) || (flowId.length() == 0)) {
- return Framework.NONE;
- } else {
- return Framework.SCALDING;
- }
}
+
+ if ((jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY) != null)
+ && (jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY).length() != 0)) {
+ return Framework.SCALDING;
+ }
+
+ if (Constants.FRAMEWORK_CONF_SPARK_VALUE
+ .equalsIgnoreCase(jobConf.get(Constants.FRAMEWORK_CONF_KEY))) {
+ return Framework.SPARK;
+ }
+
+ return Framework.NONE;
+
}
/**
diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobDetails.java b/hraven-core/src/main/java/com/twitter/hraven/JobDetails.java
index ed51a60..5ebf67e 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/JobDetails.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/JobDetails.java
@@ -59,7 +59,7 @@ public class JobDetails implements Comparable {
private String priority;
private String status;
private String version;
- private HadoopVersion hadoopVersion;
+ private HistoryFileType historyFileType;
private String queue;
private long submitTime;
private long launchTime;
@@ -182,13 +182,13 @@ public void setVersion(String version) {
this.version = version;
}
- public HadoopVersion getHadoopVersion() {
- return hadoopVersion;
+ public HistoryFileType getHistoryFileType() {
+ return historyFileType;
}
- public void setHadoopVersion(String hadoopVersion) {
+ public void setHistoryFileType(String historyFileType) {
// the enum.valueOf could throw a NPE or IllegalArgumentException
- this.hadoopVersion = HadoopVersion.valueOf(hadoopVersion);
+ this.historyFileType = HistoryFileType.valueOf(historyFileType);
}
public long getSubmitTime() {
@@ -523,21 +523,22 @@ public static double getValueAsDouble(byte[] key, Map infoValues
}
/**
- * return an enum value from the NavigableMap for hadoop version
+ * return an enum value from the NavigableMap
+ * for history file type
* @param key
* @param infoValues
* @return value as a enum or default of hadoop ONE
*/
- private HadoopVersion getHadoopVersionFromResult(final JobHistoryKeys key,
+ private HistoryFileType getHistoryFileTypeFromResult(final JobHistoryKeys key,
final NavigableMap infoValues) {
byte[] value = infoValues.get(JobHistoryKeys.KEYS_TO_BYTES.get(key));
if (value != null) {
String hv = Bytes.toString(value);
// could throw an NPE or IllegalArgumentException
- return HadoopVersion.valueOf(hv);
+ return HistoryFileType.valueOf(hv);
} else {
// default is hadoop 1
- return HadoopVersion.ONE;
+ return HistoryFileType.ONE;
}
}
@@ -594,7 +595,7 @@ public void populate(Result result) {
this.jobName = getValueAsString(JobHistoryKeys.JOBNAME, infoValues);
this.priority = getValueAsString(JobHistoryKeys.JOB_PRIORITY, infoValues);
this.status = getValueAsString(JobHistoryKeys.JOB_STATUS, infoValues);
- this.hadoopVersion = getHadoopVersionFromResult(JobHistoryKeys.hadoopversion, infoValues);
+ this.historyFileType = getHistoryFileTypeFromResult(JobHistoryKeys.hadoopversion, infoValues);
this.version = getValueAsString(Constants.VERSION_COLUMN_BYTES, infoValues);
this.cost = getValueAsDouble(Constants.JOBCOST_BYTES, infoValues);
@@ -622,8 +623,8 @@ public void populate(Result result) {
infoValues);
// populate stats from counters for this job based on
- // hadoop version
- if (this.hadoopVersion == HadoopVersion.TWO) {
+ // history file type
+ if (this.historyFileType == HistoryFileType.TWO) {
// map file bytes read
this.mapFileBytesRead = getCounterValueAsLong(this.mapCounters,
Constants.FILESYSTEM_COUNTER_HADOOP2, Constants.FILES_BYTES_READ);
diff --git a/hraven-core/src/main/java/com/twitter/hraven/JobId.java b/hraven-core/src/main/java/com/twitter/hraven/JobId.java
index 887a0a5..863a921 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/JobId.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/JobId.java
@@ -15,6 +15,7 @@
*/
package com.twitter.hraven;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.CompareToBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.codehaus.jackson.annotate.JsonCreator;
@@ -27,6 +28,14 @@
*/
public class JobId implements Comparable {
protected static final String JOB_ID_SEP = "_";
+ public static final String JOB_PREFIX = "job";
+ /**
+ * The prefix of the job id which could be
+ * "" for a mapreduce job:
+ * - to maintain backward compatibility with existing data
+ * "spark" for a spark job
+ */
+ protected String jobPrefix;
/**
* The jobtracker start time from the job ID, obtained from parsing the
* center component of the job ID.
@@ -43,27 +52,59 @@ public JobId(@JsonProperty("jobIdString") String jobId) {
if (jobId != null) {
String[] elts = jobId.trim().split(JOB_ID_SEP);
try {
+ this.jobPrefix = elts[0];
+ if (JOB_PREFIX.equalsIgnoreCase(this.jobPrefix)) {
+ // set it to blank since existing data
+ // is being stored without the "job" prefix
+ this.jobPrefix = "";
+ }
this.jobEpoch = Long.parseLong(elts[1]);
this.jobSequence = Long.parseLong(elts[2]);
} catch (Exception e) {
- throw new IllegalArgumentException("Invalid job ID '"+jobId+
- "', must be in the format 'job_[0-9]+_[0-9]+'");
+ throw new IllegalArgumentException("Invalid job ID '" + jobId
+ + "', must be in the format '(job|spark)_[0-9]+_[0-9]+'");
}
+ } else {
+ this.jobPrefix = "";
+ this.jobEpoch = 0L;
+ this.jobSequence = 0L;
}
}
public JobId(long epoch, long seq) {
this.jobEpoch = epoch;
this.jobSequence = seq;
+ // set default prefix of job to be blank
+ this.jobPrefix = "";
+ }
+
+ public JobId(String jobPrefix, long epoch, long seq) {
+ if(JOB_PREFIX.equalsIgnoreCase(jobPrefix)) {
+ // set it to blank since existing data
+ // is being stored without the "job" prefix
+ this.jobPrefix = "";
+ } else {
+ this.jobPrefix = jobPrefix;
+ }
+ this.jobEpoch = epoch;
+ this.jobSequence = seq;
}
public JobId(JobId idToCopy) {
if (idToCopy != null) {
+ this.jobPrefix = idToCopy.getJobPrefix();
this.jobEpoch = idToCopy.getJobEpoch();
this.jobSequence = idToCopy.getJobSequence();
}
}
+ /**
+ * Returns the prefix part of the job id
+ */
+ public String getJobPrefix() {
+ return this.jobPrefix;
+ }
+
/**
* Returns the epoch value from the job ID. The epoch value is generated by simply
* parsing the date formatted jobtracker start time as a long value.
@@ -86,7 +127,11 @@ public long getJobSequence() {
}
public String getJobIdString() {
- return String.format("job_%d_%04d", this.jobEpoch, this.jobSequence);
+ String prefix = this.jobPrefix;
+ if(StringUtils.isBlank(prefix)) {
+ prefix = JOB_PREFIX;
+ }
+ return String.format("%s_%d_%04d", prefix, this.jobEpoch, this.jobSequence);
}
public String toString() {
@@ -95,17 +140,18 @@ public String toString() {
/**
* Compares two JobId objects on the basis of their
+ * jobPrefix (either blank or "spark")
* jobEpoch (jobtracker start time from the job ID)
* and
* jobSequence( jobtracker assigned sequence number for the job,)
*
* @param other
- * @return 0 if this jobEpoch and jobSequence are equal to
- * other jobEpoch and jobSequence,
- * 1 if this jobEpoch and jobSequence are greater than
- * other jobEpoch and jobSequence,
- * -1 if this jobEpoch and jobSequence less than
- * other jobEpoch and jobSequence
+ * @return 0 if this jobPrefix, jobEpoch and jobSequence are equal to
+ * other jobPrefix, jobEpoch and jobSequence,
+ * 1 if this jobPrefix, jobEpoch and jobSequence are greater than
+ * other jobPrefix, jobEpoch and jobSequence,
+ * -1 if this jobPrefix, jobEpoch and jobSequence less than
+ * other jobPrefix, jobEpoch and jobSequence
*
*/
@Override
@@ -114,8 +160,8 @@ public int compareTo(JobId o) {
// nulls sort last
return -1;
}
-
return new CompareToBuilder()
+ .append(this.jobPrefix, o.getJobPrefix())
.append(this.jobEpoch, o.getJobEpoch())
.append(this.jobSequence, o.getJobSequence())
.toComparison();
@@ -132,6 +178,7 @@ public boolean equals(Object other) {
@Override
public int hashCode(){
return new HashCodeBuilder()
+ .append(this.jobPrefix)
.append(this.jobEpoch)
.append(this.jobSequence)
.toHashCode();
diff --git a/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java b/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java
index 36f762f..9079dc6 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/QualifiedJobId.java
@@ -57,4 +57,8 @@ public String getCluster() {
return cluster;
}
+ @Override
+ public String toString() {
+ return this.cluster + Constants.SEP + this.getJobIdString();
+ }
}
diff --git a/hraven-core/src/main/java/com/twitter/hraven/SparkJobDescFactory.java b/hraven-core/src/main/java/com/twitter/hraven/SparkJobDescFactory.java
new file mode 100644
index 0000000..26418a4
--- /dev/null
+++ b/hraven-core/src/main/java/com/twitter/hraven/SparkJobDescFactory.java
@@ -0,0 +1,55 @@
+/*
+Copyright 2014 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package com.twitter.hraven;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used to {@link JobKey} instances that can deal with {@link Configuration}
+ * file (contents) for {@link Framework#SPARK}
+ */
+public class SparkJobDescFactory extends JobDescFactoryBase {
+
+ /**
+ * creates a JobDesc obj based on the framework
+ */
+ @Override
+ JobDesc create(QualifiedJobId qualifiedJobId, long submitTimeMillis,
+ Configuration jobConf) {
+
+ // for spark jobs, app id is the same as jobname
+ String appId = jobConf.get(Constants.JOB_NAME_HADOOP2_CONF_KEY,
+ Constants.UNKNOWN);
+
+ if (Constants.UNKNOWN.equals(appId)) {
+ // Fall back to job id, should not happen
+ appId = qualifiedJobId.getJobIdString();
+ }
+
+ String version = jobConf.get(Constants.SPARK_VERSION_CONF_KEY,
+ Constants.UNKNOWN);
+
+ return create(qualifiedJobId, jobConf, appId, version,
+ Framework.SPARK, submitTimeMillis);
+ }
+
+ @Override
+ String getAppIdFromJobName(String jobName) {
+ // for spark jobs, app id is the same as jobname
+ return jobName;
+ }
+}
+
diff --git a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java
index 4c6fef7..a992f50 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobHistoryRawService.java
@@ -209,15 +209,15 @@ public Scan getHistoryRawTableScan(String cluster, String minJobId,
InclusiveStopFilter inclusiveStopFilter = new InclusiveStopFilter(lastRow);
filters.addFilter(inclusiveStopFilter);
LOG.info("Stopping raw table scan (stop filter) at "
- + Bytes.toStringBinary(lastRow) + " " + idConv.fromBytes(lastRow));
+ + Bytes.toStringBinary(lastRow) + " " + idConv.fromBytes(lastRow).toString());
// Add one to the jobSequence of the maximum JobId.
JobId maximumJobId = new JobId(maxJobId);
- JobId oneBiggerThanMaxJobId = new JobId(maximumJobId.getJobEpoch(),
+ JobId oneBiggerThanMaxJobId = new JobId(maximumJobId.getJobPrefix(),
+ maximumJobId.getJobEpoch(),
maximumJobId.getJobSequence() + 1);
stopRow = idConv.toBytes(new QualifiedJobId(cluster,
oneBiggerThanMaxJobId));
-
} else {
char oneBiggerSep = (char) (Constants.SEP_CHAR + 1);
stopRow = Bytes.toBytes(cluster + oneBiggerSep);
diff --git a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobIdConverter.java b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobIdConverter.java
index 1b3436d..fe65f50 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobIdConverter.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobIdConverter.java
@@ -15,8 +15,10 @@
*/
package com.twitter.hraven.datasource;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
+import com.twitter.hraven.Constants;
import com.twitter.hraven.JobId;
/**
@@ -24,19 +26,38 @@
public class JobIdConverter implements ByteConverter {
@Override
public byte[] toBytes(JobId jobId) {
- return Bytes.add(Bytes.toBytes(jobId.getJobEpoch()),
- Bytes.toBytes(jobId.getJobSequence()));
+ String prefix = jobId.getJobPrefix();
+ if (StringUtils.isBlank(prefix)) {
+ // do not include "job" prefix in conversion
+ return Bytes.add(Bytes.toBytes(jobId.getJobEpoch()),
+ Bytes.toBytes(jobId.getJobSequence()));
+ } else {
+ return Bytes.add(Bytes.toBytes(jobId.getJobPrefix()),
+ Bytes.toBytes(jobId.getJobEpoch()),
+ Bytes.toBytes(jobId.getJobSequence()));
+ }
}
@Override
public JobId fromBytes(byte[] bytes) {
- if (bytes == null || bytes.length < 16) {
+ int packedBytesEpochSeqSize = Constants.RUN_ID_LENGTH_JOBKEY
+ + Constants.SEQUENCE_NUM_LENGTH_JOBKEY;
+
+ if (bytes == null || bytes.length < packedBytesEpochSeqSize) {
return null;
}
- // expect a packed bytes encoding of [8 bytes epoch][8 bytes seq]
- long epoch = Bytes.toLong(bytes, 0);
- long seq = Bytes.toLong(bytes, 8);
- return new JobId(epoch, seq);
+ if (bytes.length <= packedBytesEpochSeqSize) {
+ // expect a packed bytes encoding of [8 bytes epoch][8 bytes seq]
+ long epoch = Bytes.toLong(bytes, 0);
+ long seq = Bytes.toLong(bytes, Constants.SEQUENCE_NUM_LENGTH_JOBKEY);
+ return new JobId(epoch, seq);
+ } else {
+ // expect a packed bytes encoding of [prefix][8 bytes epoch][8 bytes seq]
+ String prefix = Bytes.toString(bytes, 0, (bytes.length - packedBytesEpochSeqSize));
+ long epoch = Bytes.toLong(bytes, prefix.length());
+ long seq = Bytes.toLong(bytes, (bytes.length - Constants.SEQUENCE_NUM_LENGTH_JOBKEY));
+ return new JobId(prefix, epoch, seq);
+ }
}
}
diff --git a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobKeyConverter.java b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobKeyConverter.java
index 241325c..41ff5ff 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/datasource/JobKeyConverter.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/datasource/JobKeyConverter.java
@@ -15,6 +15,7 @@
*/
package com.twitter.hraven.datasource;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import com.twitter.hraven.Constants;
@@ -105,19 +106,23 @@ static byte[][] splitJobKey(byte[] rawKey) {
byte[] remainder = splits[3];
byte[][] extraComponents = new byte[3][];
- int offset = 0;
- // run ID
- extraComponents[0] = ByteUtil.safeCopy(remainder, offset, 8);
- // followed by sep + job epoch + job seq
- offset += 8+Constants.SEP_BYTES.length;
- extraComponents[1] = ByteUtil.safeCopy(remainder, offset, 16);
- offset += 16+Constants.SEP_BYTES.length;
- // followed by any remainder
- extraComponents[2] = ByteUtil.safeCopy(remainder, offset, remainder.length - offset);
+ // now extract components (runId!jobId!additional)
+ // run id occurs at the start and is of 8 bytes in length
+ extraComponents[0] = extractRunId(remainder, Constants.RUN_ID_LENGTH_JOBKEY);
+ // job id occurs after run id and seperator
+ int offset = Constants.RUN_ID_LENGTH_JOBKEY + Constants.SEP_BYTES.length;
+ extraComponents[1] = extractJobId(offset, remainder);
+
+ // now extract any additional stuff after the job id
+ if (extraComponents[1] != null) {
+ offset += extraComponents[1].length;
+ }
+ offset += Constants.SEP_BYTES.length;
+ extraComponents[2] = extractRemainder(offset, remainder);
int extraSize = 0;
// figure out the full size of all splits
- for (int i=0; i < extraComponents.length; i++) {
+ for (int i = 0; i < extraComponents.length; i++) {
if (extraComponents[i] != null) {
extraSize++;
} else {
@@ -139,4 +144,47 @@ static byte[][] splitJobKey(byte[] rawKey) {
}
return splits;
}
+
+ private static byte[] extractRemainder(int offset, byte[] remainder) {
+ return ByteUtil.safeCopy(remainder, offset,
+ remainder.length - offset);
+ }
+
+ private static int getLengthJobIdPackedBytes(int offset, byte[] remainder) {
+
+ int lengthRest = remainder.length - offset ;
+ byte[] jobIdOtherStuff = null;
+ if (lengthRest > offset) {
+ jobIdOtherStuff = ByteUtil.safeCopy(remainder, offset,
+ remainder.length-offset);
+ if (StringUtils.startsWith(Bytes.toString(jobIdOtherStuff), Constants.FRAMEWORK_CONF_SPARK_VALUE)) {
+ return Constants.SPARK_JOB_KEY_LENGTH;
+ }
+ }
+ return (Constants.SEQUENCE_NUM_LENGTH_JOBKEY + Constants.RUN_ID_LENGTH_JOBKEY);
+ }
+
+ /**
+ * extracts the job id from the packged byte array array looks like encodedRunid!jobid!otherstuff
+ * @param remainder
+ * @return
+ */
+ private static byte[] extractJobId(int offset, byte[] remainder) {
+ // since remainder contains runid ! jobid ! possibly other stuff
+ // the offset for reading job is:
+ // 8 bytes for run id
+ // +
+ // bytes for separator field
+ int lengthJobId = getLengthJobIdPackedBytes(offset, remainder);
+
+ return ByteUtil.safeCopy(remainder, offset, lengthJobId);
+ }
+
+ /**
+ * extracts a long number representation of encoded run id it reads 8 bytes
+ * @param remainder
+ */
+ private static byte[] extractRunId(byte[] remainder, int lengthToRead) {
+ return ByteUtil.safeCopy(remainder, 0, lengthToRead);
+ }
}
diff --git a/hraven-core/src/main/java/com/twitter/hraven/rest/ObjectMapperProvider.java b/hraven-core/src/main/java/com/twitter/hraven/rest/ObjectMapperProvider.java
index 5a4b65f..aa0763e 100644
--- a/hraven-core/src/main/java/com/twitter/hraven/rest/ObjectMapperProvider.java
+++ b/hraven-core/src/main/java/com/twitter/hraven/rest/ObjectMapperProvider.java
@@ -259,13 +259,13 @@ public void serialize(Flow aFlow, JsonGenerator jsonGenerator,
jsonGenerator.writeNumber(aFlow.getRunId());
jsonGenerator.writeFieldName("version");
jsonGenerator.writeString(aFlow.getVersion());
- jsonGenerator.writeFieldName("hadoopVersion");
+ jsonGenerator.writeFieldName("historyFileType");
/**
* unlikely that the next line with .toString
* will throw NPE since Flow class always sets
* default hadoop version in Flow#addJob
*/
- jsonGenerator.writeString(aFlow.getHadoopVersion().toString());
+ jsonGenerator.writeString(aFlow.getHistoryFileType().toString());
jsonGenerator.writeFieldName(Constants.HRAVEN_QUEUE);
jsonGenerator.writeString(aFlow.getQueue());
jsonGenerator.writeFieldName("counters");
diff --git a/hraven-core/src/test/java/com/twitter/hraven/GenerateFlowTestData.java b/hraven-core/src/test/java/com/twitter/hraven/GenerateFlowTestData.java
index 2fae24f..c7f5f06 100644
--- a/hraven-core/src/test/java/com/twitter/hraven/GenerateFlowTestData.java
+++ b/hraven-core/src/test/java/com/twitter/hraven/GenerateFlowTestData.java
@@ -74,7 +74,7 @@ public void loadFlow(String cluster, String user, String app, long runId,
Bytes.toBytes("SUCCESS"));
p.add(Constants.INFO_FAM_BYTES, Constants.VERSION_COLUMN_BYTES, Bytes.toBytes(version));
p.add(Constants.INFO_FAM_BYTES, JobHistoryKeys.KEYS_TO_BYTES.get(JobHistoryKeys.hadoopversion),
- Bytes.toBytes(HadoopVersion.ONE.toString()));
+ Bytes.toBytes(HistoryFileType.ONE.toString()));
p.add(Constants.INFO_FAM_BYTES, JobHistoryKeys.KEYS_TO_BYTES.get(JobHistoryKeys.TOTAL_MAPS),
Bytes.toBytes(baseStats));
p.add(Constants.INFO_FAM_BYTES, JobHistoryKeys.KEYS_TO_BYTES.get(JobHistoryKeys.TOTAL_REDUCES),
diff --git a/hraven-core/src/test/java/com/twitter/hraven/TestFramework.java b/hraven-core/src/test/java/com/twitter/hraven/TestFramework.java
index 503a43d..239db0d 100644
--- a/hraven-core/src/test/java/com/twitter/hraven/TestFramework.java
+++ b/hraven-core/src/test/java/com/twitter/hraven/TestFramework.java
@@ -18,6 +18,7 @@
import static com.twitter.hraven.Framework.NONE;
import static com.twitter.hraven.Framework.PIG;
import static com.twitter.hraven.Framework.SCALDING;
+import static com.twitter.hraven.Framework.SPARK;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
import static junit.framework.Assert.assertTrue;
@@ -39,6 +40,7 @@ public void testGetCode() {
assertEquals(PIG, Framework.get(PIG.getCode()));
assertEquals(SCALDING, Framework.get(SCALDING.getCode()));
assertEquals(NONE, Framework.get(NONE.getCode()));
+ assertEquals(SPARK,Framework.get(SPARK.getCode()));
}
/**
@@ -49,10 +51,11 @@ public void getDescription() {
assertNotNull(PIG.getDescription());
assertNotNull(SCALDING.getDescription());
assertNotNull(NONE.getDescription());
-
+ assertNotNull(SPARK.getDescription());
assertTrue("Description is not expected to be empty", PIG.getDescription().length() > 0);
assertTrue("Description is not expected to be empty", SCALDING.getDescription().length() > 0);
assertTrue("Description is not expected to be empty", NONE.getDescription().length() > 0);
+ assertTrue("Description is not expected to be empty", SPARK.getDescription().length() > 0);
}
diff --git a/hraven-core/src/test/java/com/twitter/hraven/TestHadoopVersion.java b/hraven-core/src/test/java/com/twitter/hraven/TestHistoryFileType.java
similarity index 62%
rename from hraven-core/src/test/java/com/twitter/hraven/TestHadoopVersion.java
rename to hraven-core/src/test/java/com/twitter/hraven/TestHistoryFileType.java
index 4cf2ce4..9f62bbb 100644
--- a/hraven-core/src/test/java/com/twitter/hraven/TestHadoopVersion.java
+++ b/hraven-core/src/test/java/com/twitter/hraven/TestHistoryFileType.java
@@ -23,24 +23,24 @@
import org.junit.Test;
/**
- * test class for hadoop versions
+ * test class for hadoop history file type
*/
-public class TestHadoopVersion {
+public class TestHistoryFileType {
- private enum ExpVersions {
- ONE, TWO
+ private enum ExpHistoryFileType {
+ ONE, TWO, SPARK
}
@Test
- public void checkVersions() {
- assertEquals(ExpVersions.values().length, HadoopVersion.values().length);
- for (HadoopVersion hv : HadoopVersion.values()) {
- assertTrue(ExpVersions.valueOf(hv.toString()) != null);
+ public void checkHistoryFileType() {
+ assertEquals(ExpHistoryFileType.values().length, HistoryFileType.values().length);
+ for (HistoryFileType hv : HistoryFileType.values()) {
+ assertTrue(ExpHistoryFileType.valueOf(hv.toString()) != null);
}
}
@Test(expected=IllegalArgumentException.class)
- public void testNonExistentVersion() {
- assertNull(HadoopVersion.valueOf("DOES NOT EXIST"));
+ public void testNonExistentHistoryFileType() {
+ assertNull(HistoryFileType.valueOf("DOES NOT EXIST"));
}
};
diff --git a/hraven-core/src/test/java/com/twitter/hraven/TestJobDescFactory.java b/hraven-core/src/test/java/com/twitter/hraven/TestJobDescFactory.java
index 7313d21..3dd5a36 100644
--- a/hraven-core/src/test/java/com/twitter/hraven/TestJobDescFactory.java
+++ b/hraven-core/src/test/java/com/twitter/hraven/TestJobDescFactory.java
@@ -52,4 +52,26 @@ public void testCluster() {
result = JobDescFactory.getCluster(c);
assertNull(result);
}
+
+ @Test
+ public void testGetFramework() {
+ Configuration c = new Configuration();
+ Framework f = JobDescFactory.getFramework(c);
+ assertEquals(Framework.NONE, f);
+
+ // test spark
+ c.set(Constants.FRAMEWORK_CONF_KEY, Constants.FRAMEWORK_CONF_SPARK_VALUE);
+ f = JobDescFactory.getFramework(c);
+ assertEquals(Framework.SPARK, f);
+
+ // test scalding
+ c.set(Constants.CASCADING_FLOW_ID_CONF_KEY, "some value");
+ f = JobDescFactory.getFramework(c);
+ assertEquals(Framework.SCALDING, f);
+
+ // test pig
+ c.set(Constants.PIG_CONF_KEY," some random value");
+ f = JobDescFactory.getFramework(c);
+ assertEquals(Framework.PIG, f);
+ }
}
diff --git a/hraven-core/src/test/java/com/twitter/hraven/TestJobDetails.java b/hraven-core/src/test/java/com/twitter/hraven/TestJobDetails.java
index 0718a7a..51d20dd 100644
--- a/hraven-core/src/test/java/com/twitter/hraven/TestJobDetails.java
+++ b/hraven-core/src/test/java/com/twitter/hraven/TestJobDetails.java
@@ -181,7 +181,7 @@ private void addInTotalCounters(NavigableMap infoValues) {
infoValues.put(c4, value);
}
- private NavigableMap getInfoValues(HadoopVersion hv) {
+ private NavigableMap getInfoValues(HistoryFileType hv) {
NavigableMap infoValues = new TreeMap(Bytes.BYTES_COMPARATOR);
infoValues.put(JobHistoryKeys.KEYS_TO_BYTES.get(JobHistoryKeys.JOBID),
@@ -321,8 +321,8 @@ private void confirmHadoop1Counters(JobDetails jd) {
.getValue());
}
- private void confirmHadoopVersion(JobDetails jd, HadoopVersion hv) {
- assertEquals(hv, jd.getHadoopVersion());
+ private void confirmHistoryFileType(JobDetails jd, HistoryFileType hv) {
+ assertEquals(hv, jd.getHistoryFileType());
}
/**
@@ -331,7 +331,7 @@ private void confirmHadoopVersion(JobDetails jd, HadoopVersion hv) {
@Test
public void testPopulateForHadoop2() {
JobDetails jd = new JobDetails(null);
- NavigableMap infoValues = getInfoValues(HadoopVersion.TWO);
+ NavigableMap infoValues = getInfoValues(HistoryFileType.TWO);
Result result = Mockito.mock(Result.class);
when(result.getFamilyMap(Constants.INFO_FAM_BYTES)).thenReturn(infoValues);
@@ -339,7 +339,7 @@ public void testPopulateForHadoop2() {
jd.populate(result);
// confirm hadoop2 first
- confirmHadoopVersion(jd, HadoopVersion.TWO);
+ confirmHistoryFileType(jd, HistoryFileType.TWO);
// confirm job details
confirmSomeJobDeets(jd);
// confirm hadoop2 counters
@@ -449,7 +449,7 @@ public void testGetCounterValueAsLong() {
@Test
public void testPopulateForHadoop1() {
JobDetails jd = new JobDetails(null);
- NavigableMap infoValues = getInfoValues(HadoopVersion.ONE);
+ NavigableMap infoValues = getInfoValues(HistoryFileType.ONE);
Result result1 = mock(Result.class);
when(result1.getFamilyMap(Constants.INFO_FAM_BYTES)).thenReturn(infoValues);
@@ -457,7 +457,7 @@ public void testPopulateForHadoop1() {
jd.populate(result1);
// confirm hadoop 1
- confirmHadoopVersion(jd, HadoopVersion.ONE);
+ confirmHistoryFileType(jd, HistoryFileType.ONE);
// confirm hadoop1 counters
confirmHadoop1Counters(jd);
// confirm job details
diff --git a/hraven-core/src/test/java/com/twitter/hraven/TestJobId.java b/hraven-core/src/test/java/com/twitter/hraven/TestJobId.java
index 670e838..fb990bc 100644
--- a/hraven-core/src/test/java/com/twitter/hraven/TestJobId.java
+++ b/hraven-core/src/test/java/com/twitter/hraven/TestJobId.java
@@ -39,6 +39,9 @@ public void testSerialization() {
String job2 = "job_"+epoch+"_1111";
String job3 = "job_"+epoch+"_2222";
String job4 = "job_"+epoch+"_11111";
+ String job5 = "spark_"+epoch+"_11112";
+ String job6 = "spark_"+epoch+"_11113";
+ String job7 = "spark_1413515656084_305185";
JobId jobId1 = new JobId(job1);
assertEquals(epoch, jobId1.getJobEpoch());
@@ -66,6 +69,23 @@ public void testSerialization() {
// check Comparable implementation
assertTrue(jobId3.compareTo(jobId4) < 0);
+ JobId jobId5 = new JobId(job5);
+ assertEquals(epoch, jobId5.getJobEpoch());
+ assertEquals(11112L, jobId5.getJobSequence());
+ assertEquals(job5, jobId5.getJobIdString());
+
+ JobId jobId6 = new JobId(job6);
+ assertEquals(epoch, jobId6.getJobEpoch());
+ assertEquals(11113L, jobId6.getJobSequence());
+ assertEquals(job6, jobId6.getJobIdString());
+ // check Comparable implementation
+ assertTrue(jobId5.compareTo(jobId6) < 0);
+
+ JobId jobId7 = new JobId(job7);
+ assertEquals(1413515656084L, jobId7.getJobEpoch());
+ assertEquals(305185L, jobId7.getJobSequence());
+ assertEquals(job7, jobId7.getJobIdString());
+
JobIdConverter conv = new JobIdConverter();
JobId tmp = conv.fromBytes( conv.toBytes(jobId1) );
assertEquals(jobId1, tmp);
@@ -80,6 +100,15 @@ public void testSerialization() {
tmp = conv.fromBytes( conv.toBytes(jobId4) );
assertEquals(jobId4, tmp);
assertEquals(jobId4.hashCode(), tmp.hashCode());
+ tmp = conv.fromBytes( conv.toBytes(jobId5) );
+ assertEquals(jobId5, tmp);
+ assertEquals(jobId5.hashCode(), tmp.hashCode());
+ tmp = conv.fromBytes( conv.toBytes(jobId6) );
+ assertEquals(jobId6, tmp);
+ assertEquals(jobId6.hashCode(), tmp.hashCode());
+ tmp = conv.fromBytes( conv.toBytes(jobId7) );
+ assertEquals(jobId7, tmp);
+ assertEquals(jobId7.hashCode(), tmp.hashCode());
}
/**
@@ -93,12 +122,16 @@ public void testJobIdOrdering() {
String job3 = "job_20120101000000_2222";
String job4 = "job_20120101000000_11111";
String job5 = "job_20120201000000_0001";
+ String job6 = "spark_20120101000000_11111";
+ String job7 = "spark_20120201000000_0001";
JobId jobId1 = new JobId(job1);
JobId jobId2 = new JobId(job2);
JobId jobId3 = new JobId(job3);
JobId jobId4 = new JobId(job4);
JobId jobId5 = new JobId(job5);
+ JobId jobId6 = new JobId(job6);
+ JobId jobId7 = new JobId(job7);
JobIdConverter conv = new JobIdConverter();
byte[] job1Bytes = conv.toBytes(jobId1);
@@ -106,10 +139,13 @@ public void testJobIdOrdering() {
byte[] job3Bytes = conv.toBytes(jobId3);
byte[] job4Bytes = conv.toBytes(jobId4);
byte[] job5Bytes = conv.toBytes(jobId5);
+ byte[] job6Bytes = conv.toBytes(jobId6);
+ byte[] job7Bytes = conv.toBytes(jobId7);
assertTrue(Bytes.compareTo(job1Bytes, job2Bytes) < 0);
assertTrue(Bytes.compareTo(job2Bytes, job3Bytes) < 0);
assertTrue(Bytes.compareTo(job3Bytes, job4Bytes) < 0);
assertTrue(Bytes.compareTo(job4Bytes, job5Bytes) < 0);
+ assertTrue(Bytes.compareTo(job6Bytes, job7Bytes) < 0);
}
}
diff --git a/hraven-core/src/test/java/com/twitter/hraven/TestJobKey.java b/hraven-core/src/test/java/com/twitter/hraven/TestJobKey.java
index 5f18288..b28207c 100644
--- a/hraven-core/src/test/java/com/twitter/hraven/TestJobKey.java
+++ b/hraven-core/src/test/java/com/twitter/hraven/TestJobKey.java
@@ -66,11 +66,9 @@ public void testKeySerialization() {
assertEquals(key.getJobId(), key2.getJobId());
// key with no trailing job Id
- keyBytes = ByteUtil.join(Constants.SEP_BYTES,
- Bytes.toBytes("c1@local"),
- Bytes.toBytes("user1"),
- Bytes.toBytes("app1"),
- Bytes.toBytes(Long.MAX_VALUE-15L));
+ keyBytes =
+ ByteUtil.join(Constants.SEP_BYTES, Bytes.toBytes("c1@local"), Bytes.toBytes("user1"),
+ Bytes.toBytes("app1"), Bytes.toBytes(Long.MAX_VALUE - 15L));
key2 = conv.fromBytes(keyBytes);
assertEquals("c1@local", key2.getCluster());
assertEquals("user1", key2.getUserName());
@@ -79,6 +77,7 @@ public void testKeySerialization() {
assertEquals(0L, key2.getJobId().getJobEpoch());
assertEquals(0L, key2.getJobId().getJobSequence());
+
// key with empty appId
key = new JobKey("c1@local", "user1", "", 1234L, "job_201206201718_1941");
keyBytes = conv.toBytes(key);
@@ -86,6 +85,83 @@ public void testKeySerialization() {
assertKey(key, key2);
}
+ /**
+ * Confirm that we can properly serialize and deserialize
+ * a JobKey for spark jobs
+ * Important since "spark" prefix will be stored in keys
+ */
+ @Test
+ public void testKeySerializationSpark() {
+ JobKeyConverter conv = new JobKeyConverter();
+ JobKey key = new JobKey("clusterS1@identifierS1",
+ "userS1",
+ "appSpark1",
+ 13,
+ "spark_1413515656084_3051855");
+ byte[] keyBytes = conv.toBytes(key);
+ JobKey key2 = conv.fromBytes(keyBytes);
+ assertEquals(key.getCluster(), key2.getCluster());
+ assertEquals(key.getUserName(), key2.getUserName());
+ assertEquals(key.getAppId(), key2.getAppId());
+ assertEquals(key.getRunId(), key2.getRunId());
+ assertEquals(key.getJobId(), key2.getJobId());
+
+ // also verify that the runId gets inverted in the serialized byte
+ // representation
+ byte[][] keyParts = ByteUtil.split(keyBytes, Constants.SEP_BYTES);
+ assertEquals(5, keyParts.length);
+ long encodedRunId = Bytes.toLong(keyParts[3]);
+ assertEquals(key.getRunId(), Long.MAX_VALUE - encodedRunId);
+
+ // key with empty appId
+ key = new JobKey("c1@local", "user1", "", 1234L, "spark_1413515656084_3051855");
+ keyBytes = conv.toBytes(key);
+ key2 = conv.fromBytes(keyBytes);
+ assertKey(key, key2);
+
+ JobKey jobKey = new JobKey("cluster1",
+ "user1",
+ "com.example.spark_program.simple_example.Main$",
+ 13,
+ // this job id has the seperator in its byte representation
+ // hence check for this id
+ "spark_1413515656084_305185");
+ keyBytes = conv.toBytes(jobKey);
+ key2 = conv.fromBytes(keyBytes);
+ assertEquals(jobKey.getCluster(), key2.getCluster());
+ assertEquals(jobKey.getUserName(), key2.getUserName());
+ assertEquals(jobKey.getAppId(), key2.getAppId());
+ assertEquals(jobKey.getRunId(), key2.getRunId());
+ assertEquals(jobKey.getJobId(), key2.getJobId());
+
+ }
+
+ @Test
+ public void checkOldFormatKey() {
+ // key with old format job id bytes stored, i.e. without the "job_" prefix
+ long epoch = 1415332804102L;
+ long seq = 10223L;
+ byte[] jobidarr = ByteUtil.join(Constants.EMPTY_BYTES,
+ Bytes.toBytes(epoch),
+ Bytes.toBytes(seq));
+ byte[] keyBytes = ByteUtil.join(Constants.SEP_BYTES,
+ Bytes.toBytes("c1@local"),
+ Bytes.toBytes("user1"),
+ Bytes.toBytes("app1"),
+ Bytes.toBytes(Long.MAX_VALUE - 15L),
+ jobidarr);
+ JobKeyConverter conv = new JobKeyConverter();
+
+ JobKey key2 = conv.fromBytes(keyBytes);
+ assertEquals("c1@local", key2.getCluster());
+ assertEquals("user1", key2.getUserName());
+ assertEquals("app1", key2.getAppId());
+ assertEquals(15L, key2.getRunId());
+ assertEquals(epoch, key2.getJobId().getJobEpoch());
+ assertEquals(seq, key2.getJobId().getJobSequence());
+
+ }
+
public void assertKey(JobKey expected, JobKey actual) {
assertEquals(expected.getCluster(), actual.getCluster());
assertEquals(expected.getUserName(), actual.getUserName());
@@ -115,6 +191,23 @@ public void testPlainConstructor() {
assertEquals("appSpace", key.getAppId());
assertEquals(17, key.getRunId());
assertEquals("job_20120101235959_1111", key.getJobId().getJobIdString());
+
+ // test for spark job keys
+ key = new JobKey("clusterS2@identifierS2 ", "userS2 ", "appSpaceS ", 17,
+ " spark_20120101235959_1111 ");
+ keyBytes = conv.toBytes(key);
+ key2 = conv.fromBytes(keyBytes);
+ assertEquals(key.getUserName(), key2.getUserName());
+ assertEquals(key.getAppId(), key2.getAppId());
+ assertEquals(key.getRunId(), key2.getRunId());
+ assertEquals(key.getJobId(), key2.getJobId());
+
+ assertEquals("clusterS2@identifierS2", key.getCluster());
+ assertEquals("userS2", key.getUserName());
+ assertEquals("appSpaceS", key.getAppId());
+ assertEquals(17, key.getRunId());
+ assertEquals("spark_20120101235959_1111", key.getJobId().getJobIdString());
+
}
/**
@@ -140,6 +233,34 @@ public void testJobDescConstructor() {
assertEquals("job_20120101235959_1111", key.getJobId().getJobIdString());
}
+ /**
+ * Confirm that leading and trailing spaces get ripped off for
+ * spark job keys as well
+ */
+ @Test
+ public void testJobDescConstructorSparkJobKeys() {
+ JobKeyConverter conv = new JobKeyConverter();
+ JobDesc jobDesc = new JobDesc("cluster2@identifier3 ",
+ "user3Spark ",
+ "appSpaceSpark ",
+ "spaceVersion3 ", 19,
+ " spark_1413515656084_3051855 ",
+ Framework.SPARK);
+ JobKey key = new JobKey(jobDesc);
+ byte[] keyBytes = conv.toBytes(key);
+ JobKey key2 = conv.fromBytes(keyBytes);
+ assertEquals(key.getUserName(), key2.getUserName());
+ assertEquals(key.getAppId(), key2.getAppId());
+ assertEquals(key.getRunId(), key2.getRunId());
+ assertEquals(key.getJobId(), key2.getJobId());
+
+ assertEquals("user3Spark", key.getUserName());
+ assertEquals("cluster2@identifier3", key.getCluster());
+ assertEquals("appSpaceSpark", key.getAppId());
+ assertEquals(19, key.getRunId());
+ assertEquals("spark_1413515656084_3051855", key.getJobId().getJobIdString());
+ }
+
/**
* Checks for correct parsing of job key when run ID may contain the byte
* representation of the separator character.
@@ -157,18 +278,90 @@ public void testEncodedRunId() {
// assemble a job key with the bad run ID
JobIdConverter idConv = new JobIdConverter();
+ byte[] encodedKey = ByteUtil.join(Constants.SEP_BYTES,
+ Bytes.toBytes("c1S@local"),
+ Bytes.toBytes("userS31"),
+ Bytes.toBytes("spark_app1"),
+ encoded,
+ idConv.toBytes(new JobId("spark_1413515656084_51855")));
+
+ JobKey key = conv.fromBytes(encodedKey);
+ assertEquals("c1S@local", key.getCluster());
+ assertEquals("userS31", key.getUserName());
+ assertEquals("spark_app1", key.getAppId());
+ assertEquals("spark_1413515656084_51855", key.getJobId().getJobIdString());
+
+ // assemble a spark job key with the bad run ID
+ encodedKey = ByteUtil.join(Constants.SEP_BYTES,
+ Bytes.toBytes("c1S@local"),
+ Bytes.toBytes("userS31"),
+ Bytes.toBytes("spark_app1"),
+ encoded,
+ idConv.toBytes(new JobId("spark_1413515656084_51855")));
+
+ key = conv.fromBytes(encodedKey);
+ assertEquals("c1S@local", key.getCluster());
+ assertEquals("userS31", key.getUserName());
+ assertEquals("spark_app1", key.getAppId());
+ assertEquals("spark_1413515656084_51855", key.getJobId().getJobIdString());
+
+ }
+
+ @Test
+ public void testEncodedRunIdDifferentFormats() {
+ JobKeyConverter conv = new JobKeyConverter();
+ long now = System.currentTimeMillis();
+ byte[] encoded = Bytes.toBytes(Long.MAX_VALUE - now);
+ // replace last byte with separator and reconvert to long
+ Bytes.putBytes(encoded, encoded.length-Constants.SEP_BYTES.length,
+ Constants.SEP_BYTES, 0, Constants.SEP_BYTES.length);
+ long badId = Long.MAX_VALUE - Bytes.toLong(encoded);
+ LOG.info("Bad run ID is "+badId);
+
+ // create a packed byte array for
+ // jobid without the "job_" prefix
+ long epoch = 1415332804102L;
+ long seq = 10223L;
+ byte[] jobidarr = ByteUtil.join(Constants.EMPTY_BYTES,
+ Bytes.toBytes(epoch),
+ Bytes.toBytes(seq));
+
+ // assemble a job key with the bad run ID
byte[] encodedKey = ByteUtil.join(Constants.SEP_BYTES,
Bytes.toBytes("c1@local"),
Bytes.toBytes("user1"),
Bytes.toBytes("app1"),
encoded,
- idConv.toBytes(new JobId("job_20120101000000_0001")));
+ jobidarr);
JobKey key = conv.fromBytes(encodedKey);
assertEquals("c1@local", key.getCluster());
assertEquals("user1", key.getUserName());
assertEquals("app1", key.getAppId());
- assertEquals("job_20120101000000_0001", key.getJobId().getJobIdString());
+ assertEquals("job_1415332804102_10223", key.getJobId().getJobIdString());
+
+ // create a packed byte array for
+ // jobid with "spark" prefix
+ epoch = 1415332804102L;
+ seq = 10223L;
+ jobidarr = ByteUtil.join(Constants.EMPTY_BYTES,
+ Bytes.toBytes("spark"),
+ Bytes.toBytes(epoch),
+ Bytes.toBytes(seq));
+
+ // assemble a job key with the bad run ID
+ encodedKey = ByteUtil.join(Constants.SEP_BYTES,
+ Bytes.toBytes("sc1@local"),
+ Bytes.toBytes("spark_user1"),
+ Bytes.toBytes("sparkApp1"),
+ encoded,
+ jobidarr);
+
+ key = conv.fromBytes(encodedKey);
+ assertEquals("sc1@local", key.getCluster());
+ assertEquals("spark_user1", key.getUserName());
+ assertEquals("sparkApp1", key.getAppId());
+ assertEquals("spark_1415332804102_10223", key.getJobId().getJobIdString());
}
@Test
diff --git a/hraven-core/src/test/java/com/twitter/hraven/TestJsonSerde.java b/hraven-core/src/test/java/com/twitter/hraven/TestJsonSerde.java
index 6ebb41d..5b6f029 100644
--- a/hraven-core/src/test/java/com/twitter/hraven/TestJsonSerde.java
+++ b/hraven-core/src/test/java/com/twitter/hraven/TestJsonSerde.java
@@ -200,7 +200,7 @@ private void assertFlowEquals(Flow flow1, Flow flow2) {
assertEquals(flow1.getTotalMaps(), flow2.getTotalMaps());
assertEquals(flow1.getTotalReduces(), flow2.getTotalReduces());
assertEquals(flow1.getVersion(), flow2.getVersion());
- assertEquals(flow1.getHadoopVersion(), flow2.getHadoopVersion());
+ assertEquals(flow1.getHistoryFileType(), flow2.getHistoryFileType());
assertEquals(flow1.getUserName(), flow2.getUserName());
assertJobListEquals(flow1.getJobs(), flow2.getJobs());
}
@@ -222,7 +222,7 @@ private void assertJobListEquals( List job1, List job2)
assertEquals(job1.get(j).getMapSlotMillis(), job2.get(j).getMapSlotMillis());
assertEquals(job1.get(j).getReduceSlotMillis(), job2.get(j).getReduceSlotMillis());
assertEquals(job1.get(j).getMegabyteMillis(), job2.get(j).getMegabyteMillis());
- assertEquals(job1.get(j).getHadoopVersion(), job2.get(j).getHadoopVersion());
+ assertEquals(job1.get(j).getHistoryFileType(), job2.get(j).getHistoryFileType());
assertEquals(job1.get(j).getUser(), job2.get(j).getUser());
}
}
diff --git a/hraven-core/src/test/java/com/twitter/hraven/TestSparkJobDescFactory.java b/hraven-core/src/test/java/com/twitter/hraven/TestSparkJobDescFactory.java
new file mode 100644
index 0000000..b68bd47
--- /dev/null
+++ b/hraven-core/src/test/java/com/twitter/hraven/TestSparkJobDescFactory.java
@@ -0,0 +1,33 @@
+package com.twitter.hraven;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestSparkJobDescFactory {
+
+ @Test
+ public void testCreation() {
+ String username = "testuser";
+ String jobName1 = "sample spark job";
+
+ Configuration c = new Configuration();
+ c.set("mapreduce.job.name", jobName1);
+ c.set("mapreduce.job.user.name", username);
+
+ SparkJobDescFactory factory = new SparkJobDescFactory();
+ QualifiedJobId jobId = new QualifiedJobId("test@local", "spark_201206010000_0001");
+ long runId = 1415407334L;
+ JobDesc desc = factory.create(jobId, runId, c);
+ assertNotNull(desc);
+ assertEquals(jobId, desc.getQualifiedJobId());
+
+ assertEquals("spark_201206010000_0001", desc.getJobId());
+ assertEquals(username, desc.getUserName());
+ assertEquals(jobName1, desc.getAppId());
+ assertEquals("", desc.getVersion());
+ assertEquals(Framework.SPARK, desc.getFramework());
+ assertEquals(runId, desc.getRunId());
+ }
+}
diff --git a/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java b/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java
index 1c8905e..56bb4c8 100644
--- a/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java
+++ b/hraven-core/src/test/java/com/twitter/hraven/datasource/TestJobHistoryService.java
@@ -38,7 +38,7 @@
import com.twitter.hraven.Constants;
import com.twitter.hraven.Flow;
import com.twitter.hraven.GenerateFlowTestData;
-import com.twitter.hraven.HadoopVersion;
+import com.twitter.hraven.HistoryFileType;
import com.twitter.hraven.JobDetails;
import com.twitter.hraven.JobKey;
import com.twitter.hraven.datasource.JobHistoryByIdService;
@@ -217,7 +217,7 @@ public void testGetJobByJobID() throws Exception {
}
@SuppressWarnings("deprecation")
- private void checkSomeFlowStats(String version, HadoopVersion hv, int numJobs, long baseStats, List flowSeries) {
+ private void checkSomeFlowStats(String version, HistoryFileType hv, int numJobs, long baseStats, List flowSeries) {
assertNotNull(flowSeries);
for ( Flow f : flowSeries ){
assertEquals( numJobs, f.getJobCount());
@@ -232,7 +232,7 @@ private void checkSomeFlowStats(String version, HadoopVersion hv, int numJobs, l
assertEquals( numJobs * baseStats , f.getReduceShuffleBytes());
assertEquals( numJobs * baseStats , f.getReduceSlotMillis());
assertEquals( version , f.getVersion());
- assertEquals( hv, f.getHadoopVersion());
+ assertEquals( hv, f.getHistoryFileType());
assertEquals( numJobs * baseStats , f.getMegabyteMillis());
assertEquals( numJobs * 1000, f.getDuration());
assertEquals( f.getDuration() + GenerateFlowTestData.SUBMIT_LAUCH_DIFF, f.getWallClockTime());
@@ -258,10 +258,10 @@ public void testGetFlowTimeSeriesStats() throws Exception {
try {
// fetch back the entire flow stats
List flowSeries = service.getFlowTimeSeriesStats("c1@local", "buser", "AppOne", "", 0L, 0L, 1000, null);
- checkSomeFlowStats("a", HadoopVersion.ONE, numJobsAppOne, baseStats, flowSeries);
+ checkSomeFlowStats("a", HistoryFileType.ONE, numJobsAppOne, baseStats, flowSeries);
flowSeries = service.getFlowTimeSeriesStats("c1@local", "buser", "AppTwo", "", 0L, 0L, 1000, null);
- checkSomeFlowStats("b", HadoopVersion.ONE, numJobsAppTwo, baseStats, flowSeries);
+ checkSomeFlowStats("b", HistoryFileType.ONE, numJobsAppTwo, baseStats, flowSeries);
} finally {
service.close();
diff --git a/hraven-etl/pom.xml b/hraven-etl/pom.xml
index 9c29444..20e4795 100644
--- a/hraven-etl/pom.xml
+++ b/hraven-etl/pom.xml
@@ -21,12 +21,12 @@
com.twitter.hraven
hraven
- 0.9.17-SNAPSHOT
+ 0.9.17.t01-SNAPSHOT
../
com.twitter.hraven
hraven-etl
- 0.9.17-SNAPSHOT
+ 0.9.17.t01-SNAPSHOT
hRaven - etl
jar
ETL map reduce jobs and supporting components for data loading
@@ -162,13 +162,13 @@
com.twitter.hraven
hraven-core
- 0.9.17-SNAPSHOT
+ 0.9.17.t01-SNAPSHOT
com.twitter.hraven
hraven-core
- 0.9.17-SNAPSHOT
+ 0.9.17.t01-SNAPSHOT
test-jar
test
diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java
index b5b3c0c..021ede1 100644
--- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java
+++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFilePreprocessor.java
@@ -276,14 +276,16 @@ public int run(String[] args) throws Exception {
* Reference:
* {@link https://github.com/twitter/hraven/issues/59}
*/
- String maxFileSizeStr = commandLine.getOptionValue("s");
- LOG.info("maxFileSize=" + maxFileSizeStr);
long maxFileSize = DEFAULT_RAW_FILE_SIZE_LIMIT;
- try {
- maxFileSize = Long.parseLong(maxFileSizeStr);
- } catch (NumberFormatException nfe) {
- throw new ProcessingException("Caught NumberFormatException during conversion "
+ if (commandLine.hasOption("s")) {
+ String maxFileSizeStr = commandLine.getOptionValue("s");
+ LOG.info("maxFileSize=" + maxFileSizeStr);
+ try {
+ maxFileSize = Long.parseLong(maxFileSizeStr);
+ } catch (NumberFormatException nfe) {
+ throw new ProcessingException("Caught NumberFormatException during conversion "
+ " of maxFileSize to long", nfe);
+ }
}
ProcessRecordService processRecordService = new ProcessRecordService(
diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java
index 1c4ea97..a8add22 100644
--- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java
+++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobFileProcessor.java
@@ -40,7 +40,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
@@ -254,7 +253,7 @@ public int run(String[] args) throws Exception {
Path hdfsPath = new Path(costFilePath + Constants.COST_PROPERTIES_FILENAME);
// add to distributed cache
DistributedCache.addCacheFile(hdfsPath.toUri(), hbaseConf);
-
+
// Grab the machine type argument
String machineType = commandLine.getOptionValue("m");
// set it as part of conf so that the
diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java
index 83e3f42..a0e24a7 100644
--- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java
+++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserBase.java
@@ -23,7 +23,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import com.twitter.hraven.Constants;
-import com.twitter.hraven.HadoopVersion;
+import com.twitter.hraven.HistoryFileType;
import com.twitter.hraven.JobHistoryKeys;
import com.twitter.hraven.datasource.ProcessingException;
import com.twitter.hraven.util.ByteUtil;
@@ -32,7 +32,7 @@
* Abstract class for job history file parsing
*
* Implements the interface for history file parsing
- * Adds the implementation for a getHadoopVersionPut function
+ * Adds the implementation for a getHistoryFileTypePut function
* Other methods to be implemented for parsing by sub classes
*
*/
@@ -54,7 +54,7 @@ protected JobHistoryFileParserBase(Configuration conf) {
*
* @return Put
*/
- public Put getHadoopVersionPut(HadoopVersion historyFileVersion, byte[] jobKeyBytes) {
+ public Put getHistoryFileTypePut(HistoryFileType historyFileVersion, byte[] jobKeyBytes) {
Put pVersion = new Put(jobKeyBytes);
byte[] valueBytes = null;
valueBytes = Bytes.toBytes(historyFileVersion.toString());
@@ -139,8 +139,7 @@ public static Long getXmxValue(String javaChildOptsStr) {
retVal /= (1024 * 1024);
}
} catch (NumberFormatException nfe) {
- LOG.error("Unable to get the Xmx value from " + javaChildOptsStr);
- nfe.printStackTrace();
+ LOG.error("Unable to get the Xmx value from " + javaChildOptsStr, nfe);
throw new ProcessingException("Unable to get the Xmx value from " + javaChildOptsStr, nfe);
}
return retVal;
@@ -160,16 +159,19 @@ public static Long getXmxTotal(final long xmx75) {
* @return the job submit time in milliseconds since January 1, 1970 UTC;
* or 0 if no value can be found.
*/
- public static long getSubmitTimeMillisFromJobHistory(byte[] jobHistoryRaw) {
+ public static long getSubmitTimeMillisFromJobHistory(HistoryFileType historyFileType,
+ byte[] jobHistoryRaw) {
+
+ if (historyFileType == null) {
+ throw new IllegalArgumentException("History file type can't be null ");
+ }
long submitTimeMillis = 0;
if (null == jobHistoryRaw) {
return submitTimeMillis;
}
- HadoopVersion hv = JobHistoryFileParserFactory.getVersion(jobHistoryRaw);
-
- switch (hv) {
+ switch (historyFileType) {
case TWO:
// look for the job submitted event, since that has the job submit time
int startIndex = ByteUtil.indexOf(jobHistoryRaw, Constants.JOB_SUBMIT_EVENT_BYTES, 0);
@@ -185,7 +187,8 @@ public static long getSubmitTimeMillisFromJobHistory(byte[] jobHistoryRaw) {
try {
submitTimeMillis = Long.parseLong(submitTimeMillisString);
} catch (NumberFormatException nfe) {
- LOG.error(" caught NFE during conversion of submit time " + submitTimeMillisString + " " + nfe.getMessage());
+ LOG.error(" caught NFE during conversion of submit time "
+ + submitTimeMillisString, nfe);
submitTimeMillis = 0;
}
}
@@ -217,13 +220,32 @@ public static long getSubmitTimeMillisFromJobHistory(byte[] jobHistoryRaw) {
try {
submitTimeMillis = Long.parseLong(submitTimeMillisString);
} catch (NumberFormatException nfe) {
- LOG.error(" caught NFE during conversion of submit time " + submitTimeMillisString
- + " " + nfe.getMessage());
+ LOG.error(" caught NFE during conversion of submit time "
+ + submitTimeMillisString, nfe);
submitTimeMillis = 0;
}
}
}
break;
+
+ case SPARK:
+ // look for the spark submit time key
+ startIndex = ByteUtil.indexOf(jobHistoryRaw,
+ Constants.SPARK_SUBMIT_TIME_BYTES, 0);
+ if (startIndex != -1) {
+ // read the string that contains the unix timestamp
+ String submitTimeMillisString = Bytes.toString(jobHistoryRaw,
+ startIndex + Constants.SPARK_SUBMIT_TIME_BYTES.length,
+ Constants.EPOCH_TIMESTAMP_STRING_LENGTH);
+ try {
+ submitTimeMillis = Long.parseLong(submitTimeMillisString);
+ } catch (NumberFormatException nfe) {
+ LOG.error(" caught NFE during conversion of submit time "
+ + submitTimeMillisString + " ", nfe);
+ submitTimeMillis = 0;
+ }
+ }
+ break;
}
return submitTimeMillis;
diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserFactory.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserFactory.java
index adbb744..ec03660 100644
--- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserFactory.java
+++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserFactory.java
@@ -17,7 +17,10 @@
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import com.twitter.hraven.HadoopVersion;
+
+import com.twitter.hraven.Constants;
+import com.twitter.hraven.HistoryFileType;
+import com.twitter.hraven.QualifiedJobId;
/**
* Deal with {@link JobHistoryFileParser} implementations.
@@ -49,24 +52,37 @@ public class JobHistoryFileParserFactory {
*
* @throws IllegalArgumentException if neither match
*/
- public static HadoopVersion getVersion(byte[] historyFileContents) {
+ public static HistoryFileType getHistoryFileType(QualifiedJobId qualifiedJobId,
+ byte[] historyFileContents) {
+ if (historyFileContents == null) {
+ // throw an exception if it's null
+ throw new IllegalArgumentException("Null job history file");
+ }
+
+ // an easy check to see if file type is spark
+ if ((qualifiedJobId != null) && Constants.FRAMEWORK_CONF_SPARK_VALUE
+ .equalsIgnoreCase(qualifiedJobId.getJobPrefix())) {
+ return HistoryFileType.SPARK;
+ }
+
if(historyFileContents.length > HADOOP2_VERSION_LENGTH) {
// the first 10 bytes in a hadoop2.0 history file contain Avro-Json
String version2Part = new String(historyFileContents, 0, HADOOP2_VERSION_LENGTH);
if (StringUtils.equalsIgnoreCase(version2Part, HADOOP2_VERSION_STRING)) {
- return HadoopVersion.TWO;
+ return HistoryFileType.TWO;
} else {
if(historyFileContents.length > HADOOP1_VERSION_LENGTH) {
// the first 18 bytes in a hadoop1.0 history file contain Meta VERSION="1" .
- String version1Part = new String(historyFileContents, 0, HADOOP1_VERSION_LENGTH);
+ String version1Part = new String(historyFileContents, 0,HADOOP1_VERSION_LENGTH);
if (StringUtils.equalsIgnoreCase(version1Part, HADOOP1_VERSION_STRING)) {
- return HadoopVersion.ONE;
+ return HistoryFileType.ONE;
}
}
}
}
// throw an exception if we did not find any matching version
- throw new IllegalArgumentException(" Unknown format of job history file: " + historyFileContents);
+ throw new IllegalArgumentException(" Unknown format of job history file: "
+ + historyFileContents);
}
/**
@@ -80,22 +96,25 @@ public static HadoopVersion getVersion(byte[] historyFileContents) {
* Or return null if either input is null
*/
public static JobHistoryFileParser createJobHistoryFileParser(
- byte[] historyFileContents, Configuration jobConf) throws IllegalArgumentException {
+ byte[] historyFileContents, Configuration jobConf,
+ HistoryFileType historyFileType)
+ throws IllegalArgumentException {
if (historyFileContents == null) {
throw new IllegalArgumentException(
"Job history contents should not be null");
}
- HadoopVersion version = getVersion(historyFileContents);
-
- switch (version) {
+ switch (historyFileType) {
case ONE:
return new JobHistoryFileParserHadoop1(jobConf);
case TWO:
return new JobHistoryFileParserHadoop2(jobConf);
+ case SPARK:
+ return new JobHistoryFileParserSpark(jobConf);
+
default:
throw new IllegalArgumentException(
" Unknown format of job history file ");
@@ -103,16 +122,16 @@ public static JobHistoryFileParser createJobHistoryFileParser(
}
/**
- * @return HISTORY_FILE_VERSION1
+ * @return HISTORY FILE TYPE VERSION1
*/
- public static HadoopVersion getHistoryFileVersion1() {
- return HadoopVersion.ONE;
+ public static HistoryFileType getHistoryFileVersion1() {
+ return HistoryFileType.ONE;
}
/**
- * @return HISTORY_FILE_VERSION2
+ * @return HISTORY FILE TYPE VERSION2
*/
- public static HadoopVersion getHistoryFileVersion2() {
- return HadoopVersion.TWO;
+ public static HistoryFileType getHistoryFileVersion2() {
+ return HistoryFileType.TWO;
}
}
diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java
index 8955175..6436164 100644
--- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java
+++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop1.java
@@ -56,9 +56,9 @@ public void parse(byte[] historyFile, JobKey jobKey) throws ProcessingException
jobHistoryListener = new JobHistoryListener(jobKey);
JobHistoryCopy.parseHistoryFromIS(new ByteArrayInputStream(historyFile), jobHistoryListener);
// set the hadoop version for this record
- Put versionPut = getHadoopVersionPut(JobHistoryFileParserFactory.getHistoryFileVersion1(),
+ Put versionPut = getHistoryFileTypePut(JobHistoryFileParserFactory.getHistoryFileVersion1(),
jobHistoryListener.getJobKeyBytes());
- jobHistoryListener.includeHadoopVersionPut(versionPut);
+ jobHistoryListener.includeHistoryFileTypePut(versionPut);
} catch (IOException ioe) {
LOG.error(" Exception during parsing hadoop 1.0 file ", ioe);
throw new ProcessingException(
diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java
index 822f6c3..9799e0e 100644
--- a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java
+++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserHadoop2.java
@@ -298,7 +298,7 @@ public void parse(byte[] historyFileContents, JobKey jobKey)
this.jobPuts.add(jobStatusPut);
// set the hadoop version for this record
- Put versionPut = getHadoopVersionPut(JobHistoryFileParserFactory.getHistoryFileVersion2(), this.jobKeyBytes);
+ Put versionPut = getHistoryFileTypePut(JobHistoryFileParserFactory.getHistoryFileVersion2(), this.jobKeyBytes);
this.jobPuts.add(versionPut);
LOG.info("For " + this.jobKey + " #jobPuts " + jobPuts.size() + " #taskPuts: "
diff --git a/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserSpark.java b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserSpark.java
new file mode 100644
index 0000000..dc74d7c
--- /dev/null
+++ b/hraven-etl/src/main/java/com/twitter/hraven/etl/JobHistoryFileParserSpark.java
@@ -0,0 +1,158 @@
+/*
+Copyright 2014 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+ */
+package com.twitter.hraven.etl;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.twitter.hraven.Constants;
+import com.twitter.hraven.HistoryFileType;
+import com.twitter.hraven.JobHistoryKeys;
+import com.twitter.hraven.JobKey;
+import com.twitter.hraven.datasource.JobKeyConverter;
+import com.twitter.hraven.datasource.ProcessingException;
+import com.twitter.hraven.util.ByteUtil;
+
+public class JobHistoryFileParserSpark extends JobHistoryFileParserBase {
+
+ private List jobPuts = new LinkedList();
+ private List taskPuts = new LinkedList();
+ private JobKeyConverter jobKeyConv = new JobKeyConverter();
+ private long megabytemillis;
+ private static final Log LOG = LogFactory.getLog(JobHistoryFileParserSpark.class);
+
+ public JobHistoryFileParserSpark(Configuration jobConf) {
+ super(jobConf);
+ }
+
+ @Override
+ public void parse(byte[] historyFile, JobKey jobKey) {
+ byte[] jobKeyBytes = jobKeyConv.toBytes(jobKey);
+ Put sparkJobPuts = new Put(jobKeyBytes);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ JsonNode rootNode = objectMapper.readTree(new ByteArrayInputStream(historyFile));
+ String key;
+ byte[] qualifier;
+ byte[] valueBytes;
+
+ Iterator> fieldsIterator = rootNode.getFields();
+ while (fieldsIterator.hasNext()) {
+ Map.Entry field = fieldsIterator.next();
+ // job history keys are in upper case
+ key = field.getKey().toUpperCase();
+ try {
+ if (JobHistoryKeys.valueOf(key) != null) {
+ qualifier = Bytes.toBytes(key.toString().toLowerCase());
+ Class> clazz = JobHistoryKeys.KEY_TYPES.get(JobHistoryKeys.valueOf(key));
+ if (clazz == null) {
+ throw new IllegalArgumentException(" unknown key " + key
+ + " encountered while parsing " + key);
+ }
+ if (Integer.class.equals(clazz)) {
+ try {
+ valueBytes = Bytes.toBytes(field.getValue().getIntValue());
+ } catch (NumberFormatException nfe) {
+ // us a default value
+ valueBytes = Constants.ZERO_INT_BYTES;
+ }
+ } else if (Long.class.equals(clazz)) {
+ try {
+ valueBytes = Bytes.toBytes(field.getValue().getLongValue());
+ } catch (NumberFormatException nfe) {
+ // us a default value
+ valueBytes = Constants.ZERO_LONG_BYTES;
+ }
+ } else {
+ // keep the string representation by default
+ valueBytes = Bytes.toBytes(field.getValue().getTextValue());
+ }
+ } else {
+ // simply store the key value as is
+ qualifier = Bytes.toBytes(key.toLowerCase());
+ valueBytes = Bytes.toBytes(field.getValue().getTextValue());
+ }
+ } catch (IllegalArgumentException iae) {
+ // job history key does not exist, so
+ // it could be 'queue' or 'megabytemillis' or 'batch.desc' field
+ // store them as per hadoop1/hadoop2 compatible storage
+ // if none fo those,
+ // store the key value as is
+ if (StringUtils.equals(key.toLowerCase(), Constants.HRAVEN_QUEUE)) {
+ // queue is stored as part of job conf
+ qualifier =
+ ByteUtil.join(Constants.SEP_BYTES, Constants.JOB_CONF_COLUMN_PREFIX_BYTES,
+ Constants.HRAVEN_QUEUE_BYTES);
+ valueBytes = Bytes.toBytes(field.getValue().getTextValue());
+
+ } else if (StringUtils.equals(key.toLowerCase(), Constants.MEGABYTEMILLIS)) {
+ qualifier = Constants.MEGABYTEMILLIS_BYTES;
+ this.megabytemillis = field.getValue().getLongValue();
+ valueBytes = Bytes.toBytes(this.megabytemillis);
+ } else {
+ // simply store the key as is
+ qualifier = Bytes.toBytes(key.toLowerCase());
+ valueBytes = Bytes.toBytes(field.getValue().getTextValue());
+ }
+ }
+ sparkJobPuts.add(Constants.INFO_FAM_BYTES, qualifier, valueBytes);
+ }
+ } catch (JsonProcessingException e) {
+ throw new ProcessingException("Caught exception during spark history parsing ", e);
+ } catch (IOException e) {
+ throw new ProcessingException("Caught exception during spark history parsing ", e);
+ }
+ this.jobPuts.add(sparkJobPuts);
+ // set the history file type for this record
+ Put historyFileTypePut = getHistoryFileTypePut(HistoryFileType.SPARK, jobKeyBytes);
+ this.jobPuts.add(historyFileTypePut);
+
+ LOG.info("For " + this.jobKeyConv.fromBytes(jobKeyBytes) + " #jobPuts " + jobPuts.size()
+ + " #taskPuts: " + taskPuts.size());
+
+ }
+
+ @Override
+ public Long getMegaByteMillis() {
+ return this.megabytemillis;
+ }
+
+ @Override
+ public List getJobPuts() {
+ return this.jobPuts;
+ }
+
+ @Override
+ public List getTaskPuts() {
+ return this.taskPuts;
+ }
+
+}
diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java
index eff49d9..deec46e 100644
--- a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java
+++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobFileTableMapper.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import com.twitter.hraven.Constants;
+import com.twitter.hraven.HistoryFileType;
import com.twitter.hraven.JobDesc;
import com.twitter.hraven.JobDescFactory;
import com.twitter.hraven.JobKey;
@@ -137,8 +138,11 @@ protected void map(
context.progress();
byte[] jobhistoryraw = rawService.getJobHistoryRawFromResult(value);
- long submitTimeMillis = JobHistoryFileParserBase.getSubmitTimeMillisFromJobHistory(
- jobhistoryraw);
+ // find out if it's a hadoop1 or hadoop2 file or a spark history file
+ HistoryFileType historyFileType = JobHistoryFileParserFactory
+ .getHistoryFileType(qualifiedJobId, jobhistoryraw);
+ long submitTimeMillis = JobHistoryFileParserBase
+ .getSubmitTimeMillisFromJobHistory(historyFileType, jobhistoryraw);
context.progress();
// explicitly setting the byte array to null to free up memory
jobhistoryraw = null;
@@ -162,7 +166,8 @@ protected void map(
// TODO: remove sysout
String msg = "JobDesc (" + keyCount + "): " + jobDesc
- + " submitTimeMillis: " + submitTimeMillis;
+ + " submitTimeMillis: " + submitTimeMillis
+ + " historyFileType=" + historyFileType;
LOG.info(msg);
List puts = JobHistoryService.getHbasePuts(jobDesc, jobConf);
@@ -203,7 +208,8 @@ protected void map(
historyFileContents = keyValue.getValue();
}
JobHistoryFileParser historyFileParser = JobHistoryFileParserFactory
- .createJobHistoryFileParser(historyFileContents, jobConf);
+ .createJobHistoryFileParser(historyFileContents, jobConf,
+ historyFileType);
historyFileParser.parse(historyFileContents, jobKey);
context.progress();
diff --git a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java
index 320e05e..a0c5783 100644
--- a/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java
+++ b/hraven-etl/src/main/java/com/twitter/hraven/mapreduce/JobHistoryListener.java
@@ -124,7 +124,7 @@ private void handleJob(Map values) {
* @param pVersion
* @throws IllegalArgumentException if put is null
*/
- public void includeHadoopVersionPut(Put pVersion) {
+ public void includeHistoryFileTypePut(Put pVersion) {
// set the hadoop version for this record
if (pVersion != null) {
this.jobPuts.add(pVersion);
diff --git a/hraven-etl/src/test/java/com/twitter/hraven/TestJobFile.java b/hraven-etl/src/test/java/com/twitter/hraven/TestJobFile.java
index ba17bcf..2030c97 100644
--- a/hraven-etl/src/test/java/com/twitter/hraven/TestJobFile.java
+++ b/hraven-etl/src/test/java/com/twitter/hraven/TestJobFile.java
@@ -45,6 +45,8 @@ public class TestJobFile {
/** 2.0 job history file name */
final static String VALID_JOB_HISTORY_FILENAME5 =
"job_1374258111572_0003-1374260622449-userName1-TeraGen-1374260635219-2-0-SUCCEEDED-default.jhist";
+ final static String VALID_SPARK_HISTORY_FILENAME =
+ "spark_1413515656084_3051855.json";
final static String INVALID_JOB_FILENAME = "jabbedabbedoo.txt";
@@ -107,6 +109,15 @@ public void testJobConfFile() {
*/
assertEquals("job_1374258111572_0003", jobFile.getJobid());
+ jobFile = new JobFile(VALID_SPARK_HISTORY_FILENAME);
+ assertFalse("This should not be a valid job conf file", jobFile.isJobConfFile());
+ assertTrue("this should be a spark job history file", jobFile.isJobHistoryFile());
+ /*
+ * confirm that the job id was parsed correctly. Note that the history filename is a 2.0 job
+ * history file name
+ */
+ assertEquals("spark_1413515656084_3051855", jobFile.getJobid());
+
}
}
diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobFilePreprocessor.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobFilePreprocessor.java
new file mode 100644
index 0000000..03cd567
--- /dev/null
+++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobFilePreprocessor.java
@@ -0,0 +1,111 @@
+package com.twitter.hraven.etl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.twitter.hraven.datasource.HRavenTestUtil;
+
+public class TestJobFilePreprocessor {
+ private static HBaseTestingUtility UTIL;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ UTIL = new HBaseTestingUtility();
+ UTIL.startMiniCluster();
+ HRavenTestUtil.createSchema(UTIL);
+ }
+
+
+ @Test
+ public void testSparkProcessRecordCreation() throws IOException{
+
+ final String procDir = "spark_processing_dir";
+ final String inputDir = "spark_input_dir";
+ final String cluster = "cluster1";
+
+ FileSystem hdfs = FileSystem.get(UTIL.getConfiguration());
+ Path inputPath = new Path(inputDir);
+ boolean os = hdfs.mkdirs(inputPath);
+ assertTrue(os);
+ assertTrue(hdfs.exists(inputPath));
+
+ Path procPath = new Path(procDir);
+ os = hdfs.mkdirs(procPath);
+ assertTrue(os);
+ assertTrue(hdfs.exists(procPath));
+ hdfs.getFileStatus(procPath);
+
+ final String SPARK_JOB_HISTORY_FILE_NAME =
+ "src/test/resources/spark_1413515656084_3051855";
+ File jobHistoryfile = new File(SPARK_JOB_HISTORY_FILE_NAME);
+ Path srcPath = new Path(jobHistoryfile.toURI());
+ hdfs.copyFromLocalFile(srcPath, inputPath);
+ Path expPath = new Path(inputPath.toUri() + "/" + srcPath.getName());
+ assertTrue(hdfs.exists(expPath));
+
+ final String JOB_CONF_FILE_NAME =
+ "src/test/resources/spark_1413515656084_3051855_conf.xml";
+ File jobConfFile = new File(JOB_CONF_FILE_NAME);
+ srcPath = new Path(jobConfFile.toURI());
+ hdfs.copyFromLocalFile(srcPath, inputPath);
+ expPath = new Path(inputPath.toUri() + "/" + srcPath.getName());
+ assertTrue(hdfs.exists(expPath));
+
+ JobFilePreprocessor jfpp = new JobFilePreprocessor(UTIL.getConfiguration());
+ String args[] = new String[3];
+ args[0] = "-o" + procDir;
+ args[1] = "-i" + inputDir;
+ args[2] = "-c" + cluster;
+ try {
+ // list the files in processing dir and assert that it's empty
+ FileStatus[] fs = hdfs.listStatus(procPath);
+ assertEquals(0, fs.length);
+ jfpp.run(args);
+ // now check if the seq file exists
+ // can't check for exact filename since it includes the current timestamp
+ fs = hdfs.listStatus(procPath);
+ assertEquals(1, fs.length);
+ // ensure that hbase table contains the process record
+ ProcessRecordService processRecordService = new ProcessRecordService(
+ UTIL.getConfiguration());
+ ProcessRecord pr = processRecordService.getLastSuccessfulProcessRecord(cluster);
+ assertNotNull(pr);
+ assertEquals(pr.getMaxJobId(), pr.getMinJobId());
+ assertEquals("spark_1413515656084_3051855", pr.getMaxJobId());
+ ProcessRecordKey pk = pr.getKey();
+ assertNotNull(pk);
+ assertEquals(cluster, pk.getCluster());
+ assertEquals(2, pr.getProcessedJobFiles());
+ assertEquals(ProcessState.PREPROCESSED, pr.getProcessState());
+ assertEquals(fs[0].getPath().getParent().getName()
+ + "/" + fs[0].getPath().getName(), pr.getProcessFile());
+
+ // run raw loader as well
+ args = new String[2];
+ args[0] = "-p" + procDir;
+ args[1] = "-c" + cluster;
+ JobFileRawLoader jr = new JobFileRawLoader(UTIL.getConfiguration());
+ assertNotNull(jr);
+ ToolRunner.run(jr, args);
+ pr = processRecordService.getLastSuccessfulProcessRecord(cluster);
+ assertEquals(ProcessState.LOADED, pr.getProcessState());
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserBase.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserBase.java
index 3201409..79b5d41 100644
--- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserBase.java
+++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserBase.java
@@ -23,6 +23,7 @@
import com.google.common.io.Files;
import com.twitter.hraven.Constants;
+import com.twitter.hraven.HistoryFileType;
import com.twitter.hraven.datasource.ProcessingException;
public class TestJobHistoryFileParserBase {
@@ -128,7 +129,8 @@ public void testGetSubmitTimeMillisFromJobHistory2() throws IOException {
// hadoop2 file
File jobHistoryfile = new File(JOB_HISTORY_FILE_NAME);
byte[] contents = Files.toByteArray(jobHistoryfile);
- long actualts = JobHistoryFileParserBase.getSubmitTimeMillisFromJobHistory(contents);
+ long actualts = JobHistoryFileParserBase
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.TWO, contents);
long expts = 1329348443227L;
assertEquals(expts, actualts);
@@ -137,7 +139,8 @@ public void testGetSubmitTimeMillisFromJobHistory2() throws IOException {
"src/test/resources/job_1329348432999_0003-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist";
jobHistoryfile = new File(JOB_HISTORY_FILE_NAME);
contents = Files.toByteArray(jobHistoryfile);
- actualts = JobHistoryFileParserBase.getSubmitTimeMillisFromJobHistory(contents);
+ actualts = JobHistoryFileParserBase
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.TWO, contents);
expts = 1328218696000L;
assertEquals(expts, actualts);
@@ -146,7 +149,8 @@ public void testGetSubmitTimeMillisFromJobHistory2() throws IOException {
"src/test/resources/job_201311192236_3583_1386370578196_user1_Sleep+job";
jobHistoryfile = new File(JOB_HISTORY_FILE_NAME);
contents = Files.toByteArray(jobHistoryfile);
- actualts = JobHistoryFileParserBase.getSubmitTimeMillisFromJobHistory(contents);
+ actualts = JobHistoryFileParserBase
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.ONE, contents);
expts = 1386370578196L;
assertEquals(expts, actualts);
}
@@ -191,40 +195,80 @@ public void testGetSubmitTimeMillisFromJobHistory() {
byte[] jobHistoryBytes = Bytes.toBytes(JOB_HISTORY);
long submitTimeMillis = JobHistoryFileParserBase
- .getSubmitTimeMillisFromJobHistory(jobHistoryBytes);
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.ONE, jobHistoryBytes);
assertEquals(1339063492288L, submitTimeMillis);
jobHistoryBytes = Bytes.toBytes(JOB_HISTORY2);
submitTimeMillis = JobHistoryFileParserBase
- .getSubmitTimeMillisFromJobHistory(jobHistoryBytes);
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.ONE, jobHistoryBytes);
assertEquals(1339063492288L, submitTimeMillis);
jobHistoryBytes = Bytes.toBytes(BAD_JOB_HISTORY);
submitTimeMillis = JobHistoryFileParserBase
- .getSubmitTimeMillisFromJobHistory(jobHistoryBytes);
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.ONE, jobHistoryBytes);
assertEquals(0L, submitTimeMillis);
jobHistoryBytes = Bytes.toBytes(BAD_JOB_HISTORY2);
submitTimeMillis = JobHistoryFileParserBase
- .getSubmitTimeMillisFromJobHistory(jobHistoryBytes);
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.ONE, jobHistoryBytes);
assertEquals(0L, submitTimeMillis);
jobHistoryBytes = Bytes.toBytes(BAD_JOB_HISTORY3);
submitTimeMillis = JobHistoryFileParserBase
- .getSubmitTimeMillisFromJobHistory(jobHistoryBytes);
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.ONE, jobHistoryBytes);
assertEquals(0L, submitTimeMillis);
jobHistoryBytes = Bytes.toBytes(BAD_JOB_HISTORY4);
submitTimeMillis = JobHistoryFileParserBase
- .getSubmitTimeMillisFromJobHistory(jobHistoryBytes);
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.ONE, jobHistoryBytes);
assertEquals(0L, submitTimeMillis);
}
+ /**
+ * Confirm that we can properly find the submit timestamp for Spark.
+ */
+ @Test
+ public void testGetSubmitTimeMillisFromJobHistorySpark() {
+ /**
+ * Normal example
+ */
+ final String JOB_HISTORY =
+ "{\"appname\":" + "\"com.example.spark_history.simple_example.Main$\","
+ + "\"appid\":\"spark_1412702189634_248930\","
+ + "\"username\":\"userName1\", "
+ + "\"submit_time\":1415146248235,"
+ + "\"start_time\":1415146253739,"
+ + "\"finish_time\":1415146273227,"
+ + "\"queue\":\"defaultQueueName\","
+ + "\"megabytemillis\":1528224768,"
+ + "\"job_status\":\"SUCCEEDED\","
+ + "\"batch.desc\":\"\"}";
+
+ final String BAD_JOB_HISTORY = "{\"appname\":"
+ + "\"com.example.spark_history.simple_example.Main$\","
+ + "\"appid\":\"spark_1412702189634_248930\","
+ + "\"username\":\"userName1\", ";
+
+ byte[] jobHistoryBytes = Bytes.toBytes(JOB_HISTORY);
+ long submitTimeMillis = JobHistoryFileParserBase
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.SPARK,
+ jobHistoryBytes);
+ assertEquals(1415146248235L, submitTimeMillis);
+
+ jobHistoryBytes = Bytes.toBytes(BAD_JOB_HISTORY);
+ submitTimeMillis = JobHistoryFileParserBase
+ .getSubmitTimeMillisFromJobHistory(HistoryFileType.SPARK,
+ jobHistoryBytes);
+ assertEquals(0L, submitTimeMillis);
+
+ }
+
@Test(expected=IllegalArgumentException.class)
public void testIncorrectSubmitTime() {
// Now some cases where we should not be able to find any timestamp.
byte[] jobHistoryBytes = Bytes.toBytes("");
- JobHistoryFileParserBase.getSubmitTimeMillisFromJobHistory(jobHistoryBytes);
+ JobHistoryFileParserBase.getSubmitTimeMillisFromJobHistory(null,
+ jobHistoryBytes);
}
@Test
diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserFactory.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserFactory.java
index 6afc95c..c812ecc 100644
--- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserFactory.java
+++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserFactory.java
@@ -21,7 +21,7 @@
import static org.junit.Assert.assertTrue;
import org.junit.Test;
-import com.twitter.hraven.HadoopVersion;
+import com.twitter.hraven.HistoryFileType;
/**
* Test {@link JobHistoryFileParserFactory}
@@ -35,7 +35,7 @@ public void testCreateJobHistoryFileParserCorrectCreation() {
String jHist = "Meta VERSION=\"1\" .\n"
+ "Job JOBID=\"job_201301010000_12345\"";
JobHistoryFileParser historyFileParser = JobHistoryFileParserFactory
- .createJobHistoryFileParser(jHist.getBytes(), null);
+ .createJobHistoryFileParser(jHist.getBytes(), null, HistoryFileType.ONE);
assertNotNull(historyFileParser);
@@ -52,15 +52,17 @@ public void testCreateJobHistoryFileParserCorrectCreation() {
@Test
public void testGetVersion() {
String jHist1 = "Meta VERSION=\"1\" .\n" + "Job JOBID=\"job_201301010000_12345\"";
- HadoopVersion version1 = JobHistoryFileParserFactory.getVersion(jHist1.getBytes());
- // confirm that we get back hadoop 1.0 version
+ HistoryFileType version1 = JobHistoryFileParserFactory.getHistoryFileType(null,
+ jHist1.getBytes());
+ // confirm that we get back hadoop 1.0 version as history file type
assertEquals(JobHistoryFileParserFactory.getHistoryFileVersion1(), version1);
String jHist2 = "Avro-Json\n"
+ "{\"type\":\"record\",\"name\":\"Event\", "
+ "\"namespace\":\"org.apache.hadoop.mapreduce.jobhistory\",\"fields\":[]\"";
- HadoopVersion version2 = JobHistoryFileParserFactory.getVersion(jHist2.getBytes());
- // confirm that we get back hadoop 2.0 version
+ HistoryFileType version2 = JobHistoryFileParserFactory.getHistoryFileType(null,
+ jHist2.getBytes());
+ // confirm that we get back hadoop 2.0 version as history file type
assertEquals(JobHistoryFileParserFactory.getHistoryFileVersion2(), version2);
}
@@ -68,20 +70,20 @@ public void testGetVersion() {
* confirm that exception is thrown on incorrect input
*/
@Test(expected = IllegalArgumentException.class)
- public void testGetVersionIncorrect2() {
+ public void testGetHistoryFileTypeIncorrect2() {
String jHist2 =
"Avro-HELLO-Json\n" + "{\"type\":\"record\",\"name\":\"Event\", "
+ "\"namespace\":\"org.apache.hadoop.mapreduce.jobhistory\",\"fields\":[]\"";
- JobHistoryFileParserFactory.getVersion(jHist2.getBytes());
+ JobHistoryFileParserFactory.getHistoryFileType(null, jHist2.getBytes());
}
/**
* confirm that exception is thrown on incorrect input
*/
@Test(expected = IllegalArgumentException.class)
- public void testGetVersionIncorrect1() {
+ public void testGetHistoryFileTypeIncorrect1() {
String jHist1 = "Meta HELLO VERSION=\"1\" .\n" + "Job JOBID=\"job_201301010000_12345\"";
- JobHistoryFileParserFactory.getVersion(jHist1.getBytes());
+ JobHistoryFileParserFactory.getHistoryFileType(null, jHist1.getBytes());
}
/**
@@ -90,7 +92,7 @@ public void testGetVersionIncorrect1() {
@Test(expected = IllegalArgumentException.class)
public void testCreateJobHistoryFileParserNullCreation() {
JobHistoryFileParser historyFileParser = JobHistoryFileParserFactory
- .createJobHistoryFileParser(null, null);
+ .createJobHistoryFileParser(null, null, null);
assertNull(historyFileParser);
}
}
diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java
index 411b0c4..06cb6b6 100644
--- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java
+++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop1.java
@@ -30,6 +30,7 @@
import com.google.common.io.Files;
import com.twitter.hraven.Constants;
+import com.twitter.hraven.HistoryFileType;
import com.twitter.hraven.JobKey;
public class TestJobHistoryFileParserHadoop1 {
@@ -55,7 +56,8 @@ public void testMegaByteMillis() throws IOException {
jobConf.addResource(new Path(JOB_CONF_FILENAME));
JobHistoryFileParser historyFileParser =
- JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf);
+ JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf,
+ HistoryFileType.ONE);
assertNotNull(historyFileParser);
// confirm that we get back an object that can parse hadoop 1.0 files
diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java
index 486a186..891df27 100644
--- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java
+++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserHadoop2.java
@@ -21,7 +21,7 @@
import org.junit.Test;
import com.google.common.io.Files;
import com.twitter.hraven.Constants;
-import com.twitter.hraven.HadoopVersion;
+import com.twitter.hraven.HistoryFileType;
import com.twitter.hraven.JobHistoryKeys;
import com.twitter.hraven.JobKey;
import com.twitter.hraven.datasource.JobKeyConverter;
@@ -55,7 +55,8 @@ public void testCreateJobHistoryFileParserCorrectCreation() throws IOException {
jobConf.addResource(new Path(JOB_CONF_FILE_NAME));
JobHistoryFileParser historyFileParser =
- JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf);
+ JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf,
+ HistoryFileType.TWO);
assertNotNull(historyFileParser);
// confirm that we get back an object that can parse hadoop 2.0 files
@@ -87,7 +88,7 @@ public void testCreateJobHistoryFileParserCorrectCreation() throws IOException {
for (KeyValue kv : lkv) {
// ensure we have a hadoop2 version as the value
assertEquals(Bytes.toString(kv.getValue()),
- HadoopVersion.TWO.toString());
+ HistoryFileType.TWO.toString());
// ensure we don't see the same put twice
assertFalse(foundVersion2);
@@ -180,7 +181,8 @@ public void testCreateJobHistoryFileParserNullConf() throws IOException {
File jobHistoryfile = new File(JOB_HISTORY_FILE_NAME);
byte[] contents = Files.toByteArray(jobHistoryfile);
JobHistoryFileParser historyFileParser =
- JobHistoryFileParserFactory.createJobHistoryFileParser(contents, null);
+ JobHistoryFileParserFactory.createJobHistoryFileParser(contents, null,
+ HistoryFileType.TWO);
assertNotNull(historyFileParser);
// confirm that we get back an object that can parse hadoop 2.0 files
@@ -204,7 +206,8 @@ public void testMissingSlotMillis() throws IOException {
jobConf.addResource(new Path(JOB_CONF_FILE_NAME));
JobHistoryFileParser historyFileParser =
- JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf);
+ JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf,
+ HistoryFileType.TWO);
assertNotNull(historyFileParser);
// confirm that we get back an object that can parse hadoop 2.0 files
@@ -269,7 +272,8 @@ public void testVersion2_4() throws IOException {
jobConf.addResource(new Path(JOB_CONF_FILE_NAME));
JobHistoryFileParser historyFileParser =
- JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf);
+ JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf,
+ HistoryFileType.TWO);
assertNotNull(historyFileParser);
// confirm that we get back an object that can parse hadoop 2.x files
diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserSpark.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserSpark.java
new file mode 100644
index 0000000..0e336e2
--- /dev/null
+++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryFileParserSpark.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2014 Twitter, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may obtain a copy of the License
+ * at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in
+ * writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
+ * language governing permissions and limitations under the License.
+ */
+package com.twitter.hraven.etl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import com.google.common.io.Files;
+import com.twitter.hraven.Constants;
+import com.twitter.hraven.HistoryFileType;
+import com.twitter.hraven.JobHistoryKeys;
+import com.twitter.hraven.JobKey;
+import com.twitter.hraven.datasource.JobKeyConverter;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test {@link JobHistoryFileParserHadoop2}
+ */
+public class TestJobHistoryFileParserSpark {
+
+ @Test
+ public void testCreateJobHistoryFileParserCorrectCreation() throws IOException {
+
+ final String JOB_HISTORY_FILE_NAME =
+ "src/test/resources/spark_1413515656084_3051855";
+
+ File jobHistoryfile = new File(JOB_HISTORY_FILE_NAME);
+ byte[] contents = Files.toByteArray(jobHistoryfile);
+ // now load the conf file and check
+ final String JOB_CONF_FILE_NAME =
+ "src/test/resources/spark_1413515656084_3051855_conf.xml";
+ Configuration jobConf = new Configuration();
+ jobConf.addResource(new Path(JOB_CONF_FILE_NAME));
+
+ JobHistoryFileParser historyFileParser =
+ JobHistoryFileParserFactory.createJobHistoryFileParser(contents, jobConf,
+ HistoryFileType.SPARK);
+ assertNotNull(historyFileParser);
+
+ // confirm that we get back an object that can parse spark files
+ assertTrue(historyFileParser instanceof JobHistoryFileParserSpark);
+
+ JobKey jobKey = new JobKey("cluster1",
+ "user1",
+ "com.example.spark_program.simple_example.Main$",
+ 1413515656084L,
+ "spark_1413515656084_305185");
+ historyFileParser.parse(contents, jobKey);
+
+ List jobPuts = historyFileParser.getJobPuts();
+ assertNotNull(jobPuts);
+ assertEquals(2, jobPuts.size());
+
+ JobKeyConverter jobKeyConv = new JobKeyConverter();
+ System.out.println(" rowkey " +jobPuts.get(0).getRow().toString());
+ assertEquals(jobKey.toString(), jobKeyConv.fromBytes(jobPuts.get(0).getRow()).toString());
+
+ // check history file type
+ boolean foundVersion2 = false;
+ for (Put p : jobPuts) {
+ List kv2 = p.get(Constants.INFO_FAM_BYTES,
+ Bytes.toBytes(JobHistoryKeys.hadoopversion.toString()));
+ if (kv2.size() == 0) {
+ // we are interested in hadoop version put only
+ // hence continue
+ continue;
+ }
+ assertEquals(1, kv2.size());
+ Map> d = p.getFamilyMap();
+ for (List lkv : d.values()) {
+ for (KeyValue kv : lkv) {
+ // ensure we have a hadoop2 version as the value
+ assertEquals(Bytes.toString(kv.getValue()),
+ HistoryFileType.SPARK.toString());
+
+ // ensure we don't see the same put twice
+ assertFalse(foundVersion2);
+ // now set this to true
+ foundVersion2 = true;
+ }
+ }
+ }
+ // ensure that we got the hadoop2 version put
+ assertTrue(foundVersion2);
+
+ // check job status
+ boolean foundJobStatus = false;
+ for (Put p : jobPuts) {
+ List kv2 =
+ p.get(Constants.INFO_FAM_BYTES,
+ Bytes.toBytes(JobHistoryKeys.JOB_STATUS.toString().toLowerCase()));
+ if (kv2.size() == 0) {
+ // we are interested in JobStatus put only
+ // hence continue
+ continue;
+ }
+ assertEquals(1, kv2.size());
+
+ for (KeyValue kv : kv2) {
+ // ensure we have a job status value as the value
+ assertEquals(Bytes.toString(kv.getValue()),
+ JobHistoryFileParserHadoop2.JOB_STATUS_SUCCEEDED);
+
+ // ensure we don't see the same put twice
+ assertFalse(foundJobStatus);
+ // now set this to true
+ foundJobStatus = true;
+ }
+ }
+ // ensure that we got the JobStatus put
+ assertTrue(foundJobStatus);
+
+ List taskPuts = historyFileParser.getTaskPuts();
+ assertEquals(taskPuts.size(), 0);
+
+ // check post processing for megabytemillis
+ // first with empty job conf
+ Long mbMillis = historyFileParser.getMegaByteMillis();
+ assertNotNull(mbMillis);
+ Long expValue = 1528224768L;
+ assertEquals(expValue, mbMillis);
+ }
+}
diff --git a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java
index 90af968..6e74295 100644
--- a/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java
+++ b/hraven-etl/src/test/java/com/twitter/hraven/etl/TestJobHistoryListener.java
@@ -31,7 +31,7 @@
import com.twitter.hraven.Constants;
import com.twitter.hraven.JobHistoryKeys;
import com.twitter.hraven.JobKey;
-import com.twitter.hraven.HadoopVersion;
+import com.twitter.hraven.HistoryFileType;
import com.twitter.hraven.mapreduce.JobHistoryListener;
@@ -54,10 +54,10 @@ public void checkHadoopVersionSet() {
JobHistoryFileParserHadoop1 jh = new JobHistoryFileParserHadoop1(null);
- Put versionPut = jh.getHadoopVersionPut(
- HadoopVersion.ONE,
+ Put versionPut = jh.getHistoryFileTypePut(
+ HistoryFileType.ONE,
jobHistoryListener.getJobKeyBytes());
- jobHistoryListener.includeHadoopVersionPut(versionPut);
+ jobHistoryListener.includeHistoryFileTypePut(versionPut);
assertEquals(jobHistoryListener.getJobPuts().size(), 1);
// check hadoop version
@@ -76,7 +76,7 @@ public void checkHadoopVersionSet() {
for (KeyValue kv : lkv) {
// ensure we have a hadoop2 version as the value
assertEquals(Bytes.toString(kv.getValue()),
- HadoopVersion.ONE.toString() );
+ HistoryFileType.ONE.toString() );
// ensure we don't see the same put twice
assertFalse(foundVersion1);
// now set this to true
diff --git a/hraven-etl/src/test/resources/spark_1413515656084_3051855 b/hraven-etl/src/test/resources/spark_1413515656084_3051855
new file mode 100644
index 0000000..43c47e6
--- /dev/null
+++ b/hraven-etl/src/test/resources/spark_1413515656084_3051855
@@ -0,0 +1,4 @@
+{
+ "jobname":"com.example.spark_program.simple_example.Main$","jobid":"spark_1412702189634_248930","user":"user1","submit_time":1415146248235,"start_time":1415146253739,"finish_time":1415146273227,"queue":"default","megabytemillis":1528224768,"job_status":"SUCCEEDED","batch.desc":""}
+}
+
diff --git a/hraven-etl/src/test/resources/spark_1413515656084_3051855_conf.xml b/hraven-etl/src/test/resources/spark_1413515656084_3051855_conf.xml
new file mode 100644
index 0000000..2231f9e
--- /dev/null
+++ b/hraven-etl/src/test/resources/spark_1413515656084_3051855_conf.xml
@@ -0,0 +1,14 @@
+
+mapreduce.job.nameSleep job
+batch.descAny descriptive string that identifies this batch of jobs- hraven will group individual MR jobs by this name
+mapreduce.framework.namespark
+mapreduce.job.user.nameSomeUserName
+mapreduce.job.queuenamesomeQueueName
+yarn.resourcemanager.address0.0.0.0:8030
+yarn.app.mapreduce.am.command-opts-Xmx500m
+yarn.nodemanager.process-kill-wait.ms2000
+yarn.nodemanager.log-aggregation.compression-typegz
+mapreduce.map.memory.mb512
+mapreduce.reduce.memory.mb512
+yarn.nodemanager.resource.memory-mb8192
+
diff --git a/pom.xml b/pom.xml
index e15cd5e..464a486 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
4.0.0
com.twitter.hraven
hraven
- 0.9.17-SNAPSHOT
+ 0.9.17.t01-SNAPSHOT
hRaven Project
https://github.com/twitter/hraven