Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Spark history file processing #124

Merged
merged 9 commits into from
Dec 5, 2014

Conversation

vrushalic
Copy link
Collaborator

Enables the preprocessing, rawloader and processing steps to process Spark history/conf files.

Ensures the "spark" prefix is applied to spark jobs, but backwards compatibility is maintained for hadoop jobs. hRaven till now presumed the prefix to be "job" for all job ids. All the JobIds, JobKeys, Row keys did not contain this prefix , but now we need to add existence of such a prefix to include spark files.

Adds SparkJobDescFactory, JobHistoryFileParserSpark classes

If the processing step is going to be run on a hadoop1 cluster, then jackson-1.8.8 libraries need to occur in the classpath before the jackson-1.5.2 libraries . The jackson 1.5.2 libraries come with hadoop.
Hence we need set -Dmapreduce.task.classpath.user.precedence=true parameter at the processing step and add $(ls /usr/local/hbase/lib/jackson-core-asl-.jar),$(ls /usr/local/hbase/lib/jackson-mapper-asl-.jar) to the libjars at the processing step for hadoop1 clusters.

@coveralls
Copy link

Coverage Status

Coverage increased (+7.76%) when pulling 34f6e78 on vrushalic:preprocess_spark into 0dd1bef on twitter:twitter_only.

Vrushali Channapattan added 2 commits November 11, 2014 12:01
@coveralls
Copy link

Coverage Status

Coverage increased (+8.0%) when pulling a2dbc93 on vrushalic:preprocess_spark into 0dd1bef on twitter:twitter_only.

@coveralls
Copy link

Coverage Status

Coverage increased (+8.0%) when pulling a2dbc93 on vrushalic:preprocess_spark into 0dd1bef on twitter:twitter_only.

return Bytes.add(Bytes.toBytes(jobId.getJobEpoch()),
Bytes.toBytes(jobId.getJobSequence()));
String prefix = jobId.getJobPrefix();
if ((StringUtils.isNotBlank(prefix) && (JobId.JOB_PREFIX.equalsIgnoreCase(prefix)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that JobId.JOB_PREFIX.equalsIgnoreCase(prefix) is not needed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that JobId never has prefix = JOB_PREFIX, can't we remove the latter check?

@vrushalic vrushalic changed the title Preprocessing step for Spark history file processing Spark history file processing Nov 18, 2014
@coveralls
Copy link

Coverage Status

Coverage increased (+8.22%) when pulling 5176251 on vrushalic:preprocess_spark into 0dd1bef on twitter:twitter_only.

@coveralls
Copy link

Coverage Status

Coverage increased (+8.22%) when pulling 5176251 on vrushalic:preprocess_spark into 0dd1bef on twitter:twitter_only.

@@ -426,4 +456,6 @@

/** name of the properties file used for cluster to cluster identifier mapping */
public static final String HRAVEN_CLUSTER_PROPERTIES_FILENAME = "hRavenClusters.properties";

public static final int SPARK_JOB_KEY_LENGTH = 21;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, how is this number (21) derived?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Will add a comment in the code as well), since spark job keys have a prefix of "spark", it is 5 + regular job key length (which is 16 (epoch and seq), hence spark job key length is 21.

byte[] qualifier;
byte[] valueBytes;

Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.getFields();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it can be expressed more concisely as

for (Map.Entry<String,JsonNode> field: rootNode.getFields()) {
// job history keys are in upper case
...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so too. But rootNode is a JsonNode object and that has .getFields which returns an iterator. The other two methods it has are .getKey or .getValue. It has .get which needs an index number to passed in. It does not have a get function which returns a Map.Entry.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought (mistakenly) getFields returned an Iterable, in which case the enhance for syntax applies. I should have looked more carefully. NVM.

@coveralls
Copy link

Coverage Status

Coverage increased (+8.22%) when pulling 2680bc3 on vrushalic:preprocess_spark into 0dd1bef on twitter:twitter_only.

@sjlee
Copy link
Collaborator

sjlee commented Nov 21, 2014

LGTM. Thanks for your patience @vrushalic! We can merge this after @jrottinghuis gives his +1.

@coveralls
Copy link

Coverage Status

Coverage increased (+8.22%) when pulling 268305a on vrushalic:preprocess_spark into 0dd1bef on twitter:twitter_only.

jrottinghuis added a commit that referenced this pull request Dec 5, 2014
@jrottinghuis jrottinghuis merged commit 4675812 into twitter:twitter_only Dec 5, 2014
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants