Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Spark history file processing #124

Merged
merged 9 commits into from
Dec 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hraven-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
<parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven-assembly</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<name>hRaven-assembly</name>
<description>hRaven - Assembly artifacts</description>
<packaging>pom</packaging>
Expand Down
4 changes: 2 additions & 2 deletions hraven-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
<parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<groupId>com.twitter.hraven</groupId>
<artifactId>hraven-core</artifactId>
<version>0.9.17-SNAPSHOT</version>
<version>0.9.17.t01-SNAPSHOT</version>
<name>hRaven - core</name>
<description>Core components of hRaven, including model classes and data access layer</description>

Expand Down
42 changes: 40 additions & 2 deletions hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,17 @@ public class Constants {
public static final byte[] SUBMIT_TIME_PREFIX_HADOOP2_BYTES = Bytes
.toBytes(SUBMIT_TIME_PREFIX_HADOOP2);

/**
* The string representing the submit time in a spark job history file.
*/
public static final String SPARK_SUBMIT_TIME = "\"submit_time\":";

/**
* Raw bytes representation of {@link #SPARK_SUBMIT_TIME};
*/
public static final byte[] SPARK_SUBMIT_TIME_BYTES = Bytes
.toBytes(SPARK_SUBMIT_TIME);

/**
* length of string that contains a unix timestamp in milliseconds
* this length will be correct till Sat, 20 Nov 2286 which is
Expand All @@ -267,6 +278,16 @@ public class Constants {
public static final int MAX_LONG_LENGTH = Long.toString(Long.MAX_VALUE)
.length();

/**
* length of run id in bytes in the job key
*/
public static final int RUN_ID_LENGTH_JOBKEY = 8;

/**
* length of sequence number in bytes in the job key
*/
public static final int SEQUENCE_NUM_LENGTH_JOBKEY = 8;

public static final String USER_CONF_KEY = "user.name";
public static final String USER_CONF_KEY_HADOOP2 = "mapreduce.job.user.name";

Expand Down Expand Up @@ -322,6 +343,9 @@ public class Constants {

public static final String MR_RUN_CONF_KEY = "mapred.app.submitted.timestamp";

public static final String FRAMEWORK_CONF_KEY = "mapreduce.framework.name";
public static final String FRAMEWORK_CONF_SPARK_VALUE = "spark";
public static final String SPARK_VERSION_CONF_KEY = "spark.signature";
/**
* hadoop2 memory mb for container size
* for map, reduce and AM containers
Expand Down Expand Up @@ -358,7 +382,9 @@ public class Constants {
}

/**
* Regex to parse a job file name. First back-ref is JT name, second one if
* Regex to parse a job file name.
* It could be a job history file from a mapreduce job or a spark job
* First back-ref is JT name, second one if
* job-id
* <p>
* For example,
Expand All @@ -372,8 +398,12 @@ public class Constants {
* job_201306192120_0003_1371677828795_hadoop_conf.xml
* in hadoop 2.0, job history file names are named as
* job_1374258111572_0003-1374260622449-userName1-TeraGen-1374260635219-2-0-SUCCEEDED-default.jhist
* in spark, the file name looks like:
* spark_1413515656084_3051855
* and the spark conf file name:
* spark_1413515656084_3051855_conf.xml
*/
public static final String JOB_FILENAME_PATTERN_REGEX = ".*(job_[0-9]*_[0-9]*)(-|_)([0-9]*[aA-zZ]*)*(.*)$";
public static final String JOB_FILENAME_PATTERN_REGEX = ".*?((job|spark)_[0-9]*_[0-9]*)(-|_)*([0-9]*[aA-zZ]*)*(.*)$";

// JobHistory file name parsing related
public static final String JOB_CONF_FILE_END = "(.*)(conf.xml)$";
Expand Down Expand Up @@ -426,4 +456,12 @@ public class Constants {

/** name of the properties file used for cluster to cluster identifier mapping */
public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties";

/** spark job keys have a prefix of "spark"
* hence spark job key length is calculated as
* 5
* +
* regular job key length which is 16 (epoch and sequence number)
*/
public static final int SPARK_JOB_KEY_LENGTH = 21;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, how is this number (21) derived?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Will add a comment in the code as well), since spark job keys have a prefix of "spark", it is 5 + regular job key length (which is 16 (epoch and seq), hence spark job key length is 21.

}
20 changes: 10 additions & 10 deletions hraven-core/src/main/java/com/twitter/hraven/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public byte[] code() {
/** app Version for this flow */
private String version ;

/** hadoop Version for this flow */
private HadoopVersion hadoopVersion ;
/** hadoop history file type for this flow */
private HistoryFileType historyFileType ;

/** hadoop pool/queue for this flow */
private String queue ;
Expand Down Expand Up @@ -281,13 +281,13 @@ public void addJob(JobDetails job) {
// set the submit time of the flow to the submit time of the first job
if (( this.submitTime == 0L ) || (job.getSubmitTime() < this.submitTime)) {
this.submitTime = job.getSubmitTime();
// set the hadoop version once for this job
this.hadoopVersion = job.getHadoopVersion();
// set the hadoop history file type once for this job
this.historyFileType = job.getHistoryFileType();
// set the queue/pool once for this flow
this.queue = job.getQueue();
if (this.hadoopVersion == null) {
if (this.historyFileType == null) {
// set it to default so that we don't run into NPE during json serialization
this.hadoopVersion = HadoopVersion.ONE;
this.historyFileType = HistoryFileType.ONE;
}
}

Expand Down Expand Up @@ -430,12 +430,12 @@ public void setCost(double cost) {
this.cost = cost;
}

public HadoopVersion getHadoopVersion() {
return hadoopVersion;
public HistoryFileType getHistoryFileType() {
return historyFileType;
}

public void setHadoopVersion(HadoopVersion hadoopVersion) {
this.hadoopVersion = hadoopVersion;
public void setHistoryFileType(HistoryFileType hadoopFileType) {
this.historyFileType = hadoopFileType;
}

public long getReduceShuffleBytes() {
Expand Down
4 changes: 4 additions & 0 deletions hraven-core/src/main/java/com/twitter/hraven/Framework.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public enum Framework {
*
*/
SCALDING("s", "scalding"),
/**
* Identifies Spark applications
*/
SPARK("sp", "spark"),
/**
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package com.twitter.hraven;

/**
* hadoop versions
* hadoop file type being processed in hRaven
* ONE: hadoop v1 file
* TWO: hadoop v2 file
* SPARK: spark history file
*/
public enum HadoopVersion {
ONE, TWO
public enum HistoryFileType {
ONE, TWO, SPARK
};
26 changes: 17 additions & 9 deletions hraven-core/src/main/java/com/twitter/hraven/JobDescFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class JobDescFactory {

private static final MRJobDescFactory MR_JOB_DESC_FACTORY = new MRJobDescFactory();
private static final PigJobDescFactory PIG_JOB_DESC_FACTORY = new PigJobDescFactory();
private static final SparkJobDescFactory SPARK_JOB_DESC_FACTORY = new SparkJobDescFactory();
private static final ScaldingJobDescFactory SCALDING_JOB_DESC_FACTORY =
new ScaldingJobDescFactory();

Expand All @@ -43,6 +44,8 @@ public static JobDescFactoryBase getFrameworkSpecificJobDescFactory(Configuratio
return PIG_JOB_DESC_FACTORY;
case SCALDING:
return SCALDING_JOB_DESC_FACTORY;
case SPARK:
return SPARK_JOB_DESC_FACTORY;
default:
return MR_JOB_DESC_FACTORY;
}
Expand All @@ -69,17 +72,22 @@ public static JobDesc createJobDesc(QualifiedJobId qualifiedJobId,
*/
public static Framework getFramework(Configuration jobConf) {
// Check if this is a pig job
boolean isPig = jobConf.get(Constants.PIG_CONF_KEY) != null;
if (isPig) {
if (jobConf.get(Constants.PIG_CONF_KEY) != null) {
return Framework.PIG;
} else {
String flowId = jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY);
if ((flowId == null) || (flowId.length() == 0)) {
return Framework.NONE;
} else {
return Framework.SCALDING;
}
}

if ((jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY) != null)
&& (jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY).length() != 0)) {
return Framework.SCALDING;
}

if (Constants.FRAMEWORK_CONF_SPARK_VALUE
.equalsIgnoreCase(jobConf.get(Constants.FRAMEWORK_CONF_KEY))) {
return Framework.SPARK;
}

return Framework.NONE;

}

/**
Expand Down
25 changes: 13 additions & 12 deletions hraven-core/src/main/java/com/twitter/hraven/JobDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class JobDetails implements Comparable<JobDetails> {
private String priority;
private String status;
private String version;
private HadoopVersion hadoopVersion;
private HistoryFileType historyFileType;
private String queue;
private long submitTime;
private long launchTime;
Expand Down Expand Up @@ -182,13 +182,13 @@ public void setVersion(String version) {
this.version = version;
}

public HadoopVersion getHadoopVersion() {
return hadoopVersion;
public HistoryFileType getHistoryFileType() {
return historyFileType;
}

public void setHadoopVersion(String hadoopVersion) {
public void setHistoryFileType(String historyFileType) {
// the enum.valueOf could throw a NPE or IllegalArgumentException
this.hadoopVersion = HadoopVersion.valueOf(hadoopVersion);
this.historyFileType = HistoryFileType.valueOf(historyFileType);
}

public long getSubmitTime() {
Expand Down Expand Up @@ -523,21 +523,22 @@ public static double getValueAsDouble(byte[] key, Map<byte[], byte[]> infoValues
}

/**
* return an enum value from the NavigableMap for hadoop version
* return an enum value from the NavigableMap
* for history file type
* @param key
* @param infoValues
* @return value as a enum or default of hadoop ONE
*/
private HadoopVersion getHadoopVersionFromResult(final JobHistoryKeys key,
private HistoryFileType getHistoryFileTypeFromResult(final JobHistoryKeys key,
final NavigableMap<byte[], byte[]> infoValues) {
byte[] value = infoValues.get(JobHistoryKeys.KEYS_TO_BYTES.get(key));
if (value != null) {
String hv = Bytes.toString(value);
// could throw an NPE or IllegalArgumentException
return HadoopVersion.valueOf(hv);
return HistoryFileType.valueOf(hv);
} else {
// default is hadoop 1
return HadoopVersion.ONE;
return HistoryFileType.ONE;
}
}

Expand Down Expand Up @@ -594,7 +595,7 @@ public void populate(Result result) {
this.jobName = getValueAsString(JobHistoryKeys.JOBNAME, infoValues);
this.priority = getValueAsString(JobHistoryKeys.JOB_PRIORITY, infoValues);
this.status = getValueAsString(JobHistoryKeys.JOB_STATUS, infoValues);
this.hadoopVersion = getHadoopVersionFromResult(JobHistoryKeys.hadoopversion, infoValues);
this.historyFileType = getHistoryFileTypeFromResult(JobHistoryKeys.hadoopversion, infoValues);
this.version = getValueAsString(Constants.VERSION_COLUMN_BYTES, infoValues);
this.cost = getValueAsDouble(Constants.JOBCOST_BYTES, infoValues);

Expand Down Expand Up @@ -622,8 +623,8 @@ public void populate(Result result) {
infoValues);

// populate stats from counters for this job based on
// hadoop version
if (this.hadoopVersion == HadoopVersion.TWO) {
// history file type
if (this.historyFileType == HistoryFileType.TWO) {
// map file bytes read
this.mapFileBytesRead = getCounterValueAsLong(this.mapCounters,
Constants.FILESYSTEM_COUNTER_HADOOP2, Constants.FILES_BYTES_READ);
Expand Down
Loading