This repository has been archived by the owner on Jan 15, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 77
Spark history file processing #124
Merged
Merged
Changes from 5 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
34f6e78
Preprocessing step for Spark history file processing
ee3e572
Updating comments (hadoop version to history file type), updating var…
a2dbc93
Updating formatting
ed6f6c3
Adding in the processing step and unit tests, changes to JobDescFactory
5176251
Update spark_1413515656084_3051855
06977f0
Updating exception logging and if check styles as per review comments
cc16080
adding in comments for explaining constant values
2680bc3
Merge branch 'preprocess_spark' of https://github.com/vrushalic/hrave…
268305a
Updating one field in rest response
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,8 +149,8 @@ public byte[] code() { | |
/** app Version for this flow */ | ||
private String version ; | ||
|
||
/** hadoop Version for this flow */ | ||
private HadoopVersion hadoopVersion ; | ||
/** hadoop history file type for this flow */ | ||
private HistoryFileType historyFileType ; | ||
|
||
/** hadoop pool/queue for this flow */ | ||
private String queue ; | ||
|
@@ -281,13 +281,13 @@ public void addJob(JobDetails job) { | |
// set the submit time of the flow to the submit time of the first job | ||
if (( this.submitTime == 0L ) || (job.getSubmitTime() < this.submitTime)) { | ||
this.submitTime = job.getSubmitTime(); | ||
// set the hadoop version once for this job | ||
this.hadoopVersion = job.getHadoopVersion(); | ||
// set the hadoop history file type once for this job | ||
this.historyFileType = job.getHistoryFileType(); | ||
// set the queue/pool once for this flow | ||
this.queue = job.getQueue(); | ||
if (this.hadoopVersion == null) { | ||
if (this.historyFileType == null) { | ||
// set it to default so that we don't run into NPE during json serialization | ||
this.hadoopVersion = HadoopVersion.ONE; | ||
this.historyFileType = HistoryFileType.ONE; | ||
} | ||
} | ||
|
||
|
@@ -430,12 +430,12 @@ public void setCost(double cost) { | |
this.cost = cost; | ||
} | ||
|
||
public HadoopVersion getHadoopVersion() { | ||
return hadoopVersion; | ||
public HistoryFileType getHistoryFileType() { | ||
return historyFileType; | ||
} | ||
|
||
public void setHadoopVersion(HadoopVersion hadoopVersion) { | ||
this.hadoopVersion = hadoopVersion; | ||
public void setHistoryFileType(HistoryFileType hadoopVersion) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hadoopVersion -> fileType maybe? :) |
||
this.historyFileType = hadoopVersion; | ||
} | ||
|
||
public long getReduceShuffleBytes() { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,8 @@ | |
*/ | ||
package com.twitter.hraven; | ||
|
||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
import org.apache.hadoop.conf.Configuration; | ||
|
||
/** | ||
|
@@ -27,6 +29,7 @@ public class JobDescFactory { | |
|
||
private static final MRJobDescFactory MR_JOB_DESC_FACTORY = new MRJobDescFactory(); | ||
private static final PigJobDescFactory PIG_JOB_DESC_FACTORY = new PigJobDescFactory(); | ||
private static final SparkJobDescFactory SPARK_JOB_DESC_FACTORY = new SparkJobDescFactory(); | ||
private static final ScaldingJobDescFactory SCALDING_JOB_DESC_FACTORY = | ||
new ScaldingJobDescFactory(); | ||
|
||
|
@@ -43,6 +46,8 @@ public static JobDescFactoryBase getFrameworkSpecificJobDescFactory(Configuratio | |
return PIG_JOB_DESC_FACTORY; | ||
case SCALDING: | ||
return SCALDING_JOB_DESC_FACTORY; | ||
case SPARK: | ||
return SPARK_JOB_DESC_FACTORY; | ||
default: | ||
return MR_JOB_DESC_FACTORY; | ||
} | ||
|
@@ -69,17 +74,23 @@ public static JobDesc createJobDesc(QualifiedJobId qualifiedJobId, | |
*/ | ||
public static Framework getFramework(Configuration jobConf) { | ||
// Check if this is a pig job | ||
boolean isPig = jobConf.get(Constants.PIG_CONF_KEY) != null; | ||
if (isPig) { | ||
if (jobConf.get(Constants.PIG_CONF_KEY) != null) { | ||
return Framework.PIG; | ||
} else { | ||
String flowId = jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY); | ||
if ((flowId == null) || (flowId.length() == 0)) { | ||
return Framework.NONE; | ||
} else { | ||
return Framework.SCALDING; | ||
} | ||
} | ||
|
||
if ((jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY) != null) | ||
&& (jobConf.get(Constants.CASCADING_FLOW_ID_CONF_KEY).length() != 0)) { | ||
return Framework.SCALDING; | ||
} | ||
|
||
if ((jobConf.get(Constants.FRAMEWORK_CONF_KEY) != null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This first null check is not necessary, as String.equalsIgnoreCase() checks for null and returns false if it is false. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
&& (Constants.FRAMEWORK_CONF_SPARK_VALUE | ||
.equalsIgnoreCase(jobConf.get(Constants.FRAMEWORK_CONF_KEY)))) { | ||
return Framework.SPARK; | ||
} | ||
|
||
return Framework.NONE; | ||
|
||
} | ||
|
||
/** | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, how is this number (21) derived?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Will add a comment in the code as well), since spark job keys have a prefix of "spark", it is 5 + regular job key length (which is 16 (epoch and seq), hence spark job key length is 21.