Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Commit

Permalink
Merge 9367423 into ff97b69
Browse files Browse the repository at this point in the history
  • Loading branch information
Vrushali Channapattan committed Dec 24, 2014
2 parents ff97b69 + 9367423 commit 2f3f4cd
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ public boolean isJobAlreadyProcessed(byte[] row) throws ProcessingException {
} catch (IOException e) {
// safer to return false on account of the exception caught
// while trying to get the value from the raw table
LOG.info("returning false, error in get", e);
return false;
}
if (result != null && !result.isEmpty()) {
Expand Down Expand Up @@ -588,4 +589,8 @@ public byte[] getJobHistoryRawFromResult(Result value) throws MissingColumnInRes
byte[] jobHistoryRaw = keyValue.getValue();
return jobHistoryRaw;
}

public void doJobProcessedSuccessPut(byte[] row, boolean status) throws IOException {
rawTable.put(getJobProcessedSuccessPut(row, status));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ public int run(String[] args) throws Exception {

boolean success = false;
if (reprocess) {
success = reProcessRecords(hbaseConf, cluster, batchSize, threadCount);
success = reProcessRecords(hbaseConf, cluster, batchSize, threadCount,
processFileSubstring);
} else {
success = processRecords(hbaseConf, cluster, batchSize, threadCount,
processFileSubstring);
Expand Down Expand Up @@ -335,7 +336,8 @@ boolean processRecords(Configuration conf, String cluster, int batchSize,
}

List<JobRunner> jobRunners = getJobRunners(conf, cluster, false, batchSize,
minMaxJobFileTracker.getMinJobId(), minMaxJobFileTracker.getMaxJobId());
minMaxJobFileTracker.getMinJobId(), minMaxJobFileTracker.getMaxJobId(),
processFileSubstring);

boolean success = runJobs(threadCount, jobRunners);
if (success) {
Expand Down Expand Up @@ -367,11 +369,11 @@ boolean processRecords(Configuration conf, String cluster, int batchSize,
* @throws RowKeyParseException
*/
boolean reProcessRecords(Configuration conf, String cluster, int batchSize,
int threadCount) throws IOException, InterruptedException,
int threadCount, String processFileSubstring) throws IOException, InterruptedException,
ClassNotFoundException, ExecutionException, RowKeyParseException {

List<JobRunner> jobRunners = getJobRunners(conf, cluster, true, batchSize,
null, null);
null, null, processFileSubstring);

boolean success = runJobs(threadCount, jobRunners);
return success;
Expand Down Expand Up @@ -537,7 +539,8 @@ private void updateProcessRecords(Configuration conf,
* @throws RowKeyParseException
*/
private List<JobRunner> getJobRunners(Configuration conf, String cluster,
boolean reprocess, int batchSize, String minJobId, String maxJobId)
boolean reprocess, int batchSize, String minJobId, String maxJobId,
String processFileSubstring)
throws IOException, InterruptedException, ClassNotFoundException,
RowKeyParseException {
List<JobRunner> jobRunners = new LinkedList<JobRunner>();
Expand All @@ -553,7 +556,7 @@ private List<JobRunner> getJobRunners(Configuration conf, String cluster,
cluster, minJobId, maxJobId, reprocess, batchSize);

for (Scan scan : scanList) {
Job job = getProcessingJob(conf, scan, scanList.size());
Job job = getProcessingJob(conf, scan, scanList.size(), processFileSubstring);

JobRunner jobRunner = new JobRunner(job, null);
jobRunners.add(jobRunner);
Expand Down Expand Up @@ -588,7 +591,8 @@ private List<JobRunner> getJobRunners(Configuration conf, String cluster,
* @throws InterruptedException
* @throws ClassNotFoundException
*/
private Job getProcessingJob(Configuration conf, Scan scan, int totalJobCount)
private Job getProcessingJob(Configuration conf, Scan scan, int totalJobCount,
String processFileSubstring)
throws IOException {

Configuration confClone = new Configuration(conf);
Expand All @@ -598,7 +602,7 @@ private Job getProcessingJob(Configuration conf, Scan scan, int totalJobCount)
confClone.setBoolean("mapred.map.tasks.speculative.execution", false);

// Set up job
Job job = new Job(confClone, getJobName(totalJobCount));
Job job = new Job(confClone, getJobName(totalJobCount, processFileSubstring));

// This is a map-only class, skip reduce step
job.setNumReduceTasks(0);
Expand All @@ -618,8 +622,8 @@ private Job getProcessingJob(Configuration conf, Scan scan, int totalJobCount)
* name how far along this job is.
* @return the name to use for each consecutive Hadoop job to launch.
*/
private synchronized String getJobName(int totalJobCount) {
String jobName = NAME + " [" + startTimestamp + " "
private synchronized String getJobName(int totalJobCount, String processFileSubstring) {
String jobName = NAME + " [" + processFileSubstring +" " + startTimestamp + " "
+ jobCounter.incrementAndGet() + "/" + totalJobCount + "]";
return jobName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ public static enum Hadoop2RecordType {
JobStatusChanged("JOB_STATUS_CHANGED"),
JobQueueChange("JOB_QUEUE_CHANGED"),
JobSubmitted("JOB_SUBMITTED"),
JobUnsuccessfulCompletion("JOB_KILLED","JOB_FAILED"),
JobUnsuccessfulCompletion("JOB_KILLED",
"JOB_FAILED",
"JOB_ERROR"),
MapAttemptFinished("MAP_ATTEMPT_FINISHED"),
ReduceAttemptFinished("REDUCE_ATTEMPT_FINISHED"),
TaskAttemptFinished("CLEANUP_ATTEMPT_FINISHED"),
TaskAttemptFinished("CLEANUP_ATTEMPT_FINISHED",
"SETUP_ATTEMPT_FINISHED"),
TaskAttemptStarted("CLEANUP_ATTEMPT_STARTED",
"SETUP_ATTEMPT_STARTED",
"REDUCE_ATTEMPT_STARTED",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ protected void setup(
* the last character if 0 indicates that this is the first task attempt
*/
isThisAttemptTheFirst = attemptid.endsWith("0");
LOG.info("isThisAttemptTheFirst=" + isThisAttemptTheFirst);
}
}

Expand Down Expand Up @@ -327,14 +328,7 @@ protected void map(

// Indicate that we processed the RAW successfully so that we can skip it
// on the next scan (or not).
Put successPut = rawService.getJobProcessedSuccessPut(value.getRow(),
success);
// TODO: In the unlikely event of multiple mappers running against one RAW
// row, with one succeeding and one failing, there could be a race where the
// raw does not properly indicate the true status (which is questionable in
// any case with multiple simultaneous runs with different outcome).
context.write(RAW_TABLE, successPut);

rawService.doJobProcessedSuccessPut(value.getRow(), success);
}

/**
Expand Down

0 comments on commit 2f3f4cd

Please sign in to comment.