Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Hadoop2.0 #18

Merged
merged 14 commits into from
Aug 27, 2013
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion hraven-core/src/main/java/com/twitter/hraven/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,10 @@ public class Constants {
* job_201306192120_0003_1371677828795_hadoop_word+count
* and conf file name is
* job_201306192120_0003_1371677828795_hadoop_conf.xml
* in hadoop 2.0, job history file names are named as
* job_1374258111572_0003-1374260622449-userName1-TeraGen-1374260635219-2-0-SUCCEEDED-default.jhist
*/
public static final String JOB_FILENAME_PATTERN_REGEX = ".*(job_[0-9]*_[0-9]*)_([0-9]*[aA-zZ]_)*(.*)$";
public static final String JOB_FILENAME_PATTERN_REGEX = ".*(job_[0-9]*_[0-9]*)(-|_)([0-9]*[aA-zZ]*)*(.*)$";

// JobHistory file name parsing related
public static final String JOB_CONF_FILE_END = "(.*)(conf.xml)$";
Expand Down
166 changes: 105 additions & 61 deletions hraven-core/src/main/java/com/twitter/hraven/JobHistoryKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.common.collect.Maps;

/**
* Contains the extract of the keys enum from
* {@link org.apache.hadoop.mapreduce.JobHistoryCopy} class
Expand All @@ -24,19 +27,108 @@
*/

public enum JobHistoryKeys {
JOBTRACKERID, START_TIME, FINISH_TIME,
JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES,
FAILED_MAPS, FAILED_REDUCES,
FINISHED_MAPS, FINISHED_REDUCES,
JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
ERROR, TASK_ATTEMPT_ID, TASK_STATUS,
COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS,
SPLITS, JOB_PRIORITY, HTTP_PORT,
TRACKER_NAME, STATE_STRING, VERSION,
MAP_COUNTERS, REDUCE_COUNTERS,
VIEW_JOB, MODIFY_JOB, JOB_QUEUE;
JOBTRACKERID(String.class, null),
START_TIME(Long.class, "startTime"),
FINISH_TIME(Long.class, "finishTime"),
JOBID(String.class, "jobid"),
JOBNAME(String.class, "jobName"),
USER(String.class, "userName"),
JOBCONF(String.class, "jobConfPath"),
SUBMIT_TIME(Long.class, "submitTime"),
LAUNCH_TIME(Long.class, "launchTime"),
TOTAL_MAPS(Long.class, "totalMaps"),
TOTAL_REDUCES(Long.class, "totalReduces"),
FAILED_MAPS(Long.class, "failedMaps"),
FAILED_REDUCES(Long.class, "failedReduces"),
FINISHED_MAPS(Long.class, "finishedMaps"),
FINISHED_REDUCES(Long.class, "finishedReduces"),
JOB_STATUS(String.class, "jobStatus"),
TASKID(String.class, "taskid"),
HOSTNAME(String.class, "hostname"),
TASK_TYPE(String.class, "taskType"),
ERROR(String.class, "error"),
TASK_ATTEMPT_ID(String.class, "attemptId"),
TASK_STATUS(String.class, "taskStatus"),
COPY_PHASE(String.class, null),
SORT_PHASE(String.class, null),
REDUCE_PHASE(String.class, null),
SHUFFLE_FINISHED(Long.class, "shuffleFinishTime"),
SORT_FINISHED(Long.class, "sortFinishTime"),
COUNTERS(String.class, null),
SPLITS(String.class, "splitLocations"),
JOB_PRIORITY(String.class, null),
HTTP_PORT(Integer.class, "httpPort"),
TRACKER_NAME(String.class, "trackerName"),
STATE_STRING(String.class, "state"),
VERSION(String.class, null),
MAP_COUNTERS(String.class, null),
REDUCE_COUNTERS(String.class, null),
VIEW_JOB(String.class, null),
MODIFY_JOB(String.class, null),
JOB_QUEUE(String.class, "jobQueueName"),
// hadoop 2.0 related keys {@link JobHistoryParser}
applicationAttemptId(String.class, null),
containerId(String.class, null),
successfulAttemptId(String.class, null),
failedDueToAttempt(String.class, null),
workflowId(String.class, null),
workflowName(String.class, null),
workflowNodeName(String.class, null),
workflowAdjacencies(String.class, null),
locality(String.class, null),
avataar(String.class, null),
nodeManagerHost(String.class, null),
nodeManagerPort(Integer.class, null),
nodeManagerHttpPort(Integer.class, null),
acls(String.class, null),
uberized(String.class, null),
shufflePort(Integer.class, null),
mapFinishTime(Long.class, null),
port(Integer.class, null),
rackname(String.class, null),
clockSplits(String.class, null),
cpuUsages(String.class, null),
physMemKbytes(String.class, null),
vMemKbytes(String.class, null),
status(String.class, null),
TOTAL_COUNTERS(String.class, null),
TASK_COUNTERS(String.class, null),
TASK_ATTEMPT_COUNTERS(String.class, null);

private final Class<?> className;
private final String mapping;

private JobHistoryKeys(Class<?> className, String mapping) {
this.className = className;
this.mapping = mapping;
}

public Class<?> getClassName() {
return className;
}

/**
* Data types represented by each of the defined job history field names
*/
public static Map<JobHistoryKeys, Class<?>> KEY_TYPES = Maps.newHashMap();
static {
for (JobHistoryKeys t : JobHistoryKeys.values()) {
KEY_TYPES.put(t, t.getClassName());
}
}

/**
* Mapping the keys in 2.0 job history files to 1.0 key names
*/
public static Map<String, String> HADOOP2_TO_HADOOP1_MAPPING = Maps.newHashMap();
static {
for (JobHistoryKeys t : JobHistoryKeys.values()) {
if (StringUtils.isNotEmpty(t.mapping)) {
HADOOP2_TO_HADOOP1_MAPPING.put(t.mapping,t.toString());
}
}
}


/**
* Job history key names as bytes
Expand All @@ -48,52 +140,4 @@ public enum JobHistoryKeys {
KEYS_TO_BYTES.put(k, Bytes.toBytes(k.toString().toLowerCase()));
}
}

/**
* Data types represented by each of the defined job history field names
*/
@SuppressWarnings("rawtypes")
public static Map<JobHistoryKeys, Class> KEY_TYPES = new HashMap<JobHistoryKeys, Class>();
static {
KEY_TYPES.put(JOBTRACKERID, String.class);
KEY_TYPES.put(START_TIME, Long.class);
KEY_TYPES.put(FINISH_TIME, Long.class);
KEY_TYPES.put(JOBID, String.class);
KEY_TYPES.put(JOBNAME, String.class);
KEY_TYPES.put(USER, String.class);
KEY_TYPES.put(JOBCONF, String.class);
KEY_TYPES.put(SUBMIT_TIME, Long.class);
KEY_TYPES.put(LAUNCH_TIME, Long.class);
KEY_TYPES.put(TOTAL_MAPS, Long.class);
KEY_TYPES.put(TOTAL_REDUCES, Long.class);
KEY_TYPES.put(FAILED_MAPS, Long.class);
KEY_TYPES.put(FAILED_REDUCES, Long.class);
KEY_TYPES.put(FINISHED_MAPS, Long.class);
KEY_TYPES.put(FINISHED_REDUCES, Long.class);
KEY_TYPES.put(JOB_STATUS, String.class);
KEY_TYPES.put(TASKID, String.class);
KEY_TYPES.put(HOSTNAME, String.class);
KEY_TYPES.put(TASK_TYPE, String.class);
KEY_TYPES.put(ERROR, String.class);
KEY_TYPES.put(TASK_ATTEMPT_ID, String.class);
KEY_TYPES.put(TASK_STATUS, String.class);
KEY_TYPES.put(COPY_PHASE, String.class);
KEY_TYPES.put(SORT_PHASE, String.class);
KEY_TYPES.put(REDUCE_PHASE, String.class);
KEY_TYPES.put(SHUFFLE_FINISHED, Long.class);
KEY_TYPES.put(SORT_FINISHED, Long.class);
KEY_TYPES.put(COUNTERS, String.class);
KEY_TYPES.put(SPLITS, String.class);
KEY_TYPES.put(JOB_PRIORITY, String.class);
KEY_TYPES.put(HTTP_PORT, Integer.class);
KEY_TYPES.put(TRACKER_NAME, String.class);
KEY_TYPES.put(STATE_STRING, String.class);
KEY_TYPES.put(VERSION, String.class);
KEY_TYPES.put(MAP_COUNTERS, String.class);
KEY_TYPES.put(REDUCE_COUNTERS, String.class);
KEY_TYPES.put(VIEW_JOB, String.class);
KEY_TYPES.put(MODIFY_JOB, String.class);
KEY_TYPES.put(JOB_QUEUE, String.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public Scan getHistoryRawTableScan(String cluster, String minJobId,
}
scan.setStartRow(startRow);

LOG.info("Starting raw table scan at " + Bytes.toStringBinary(startRow));
LOG.info("Starting raw table scan at " + Bytes.toStringBinary(startRow) + " " + idConv.fromBytes(startRow));

FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);

Expand All @@ -210,7 +210,7 @@ public Scan getHistoryRawTableScan(String cluster, String minJobId,
InclusiveStopFilter inclusiveStopFilter = new InclusiveStopFilter(lastRow);
filters.addFilter(inclusiveStopFilter);
LOG.info("Stopping raw table scan (stop filter) at "
+ Bytes.toStringBinary(lastRow));
+ Bytes.toStringBinary(lastRow) + " " + idConv.fromBytes(lastRow));

// Add one to the jobSequence of the maximum JobId.
JobId maximumJobId = new JobId(maxJobId);
Expand Down Expand Up @@ -323,6 +323,24 @@ public String getRawJobHistory(QualifiedJobId jobId) throws IOException {
return historyData;
}

/**
* Returns the raw job history file as a byte array stored for the given cluster and job ID.
* @param jobId the cluster and job ID to look up
* @return the stored job history file contents or {@code null} if no corresponding record was found
* @throws IOException
*/
public byte[] getRawJobHistoryBytes(QualifiedJobId jobId) throws IOException {
byte[] historyData = null;
byte[] rowKey = idConv.toBytes(jobId);
Get get = new Get(rowKey);
get.addColumn(Constants.RAW_FAM_BYTES, Constants.JOBHISTORY_COL_BYTES);
Result result = rawTable.get(get);
if (result != null && !result.isEmpty()) {
historyData = result.getValue(Constants.RAW_FAM_BYTES, Constants.JOBHISTORY_COL_BYTES);
}
return historyData;
}

/**
* @param result
* from the {@link Scan} from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,29 @@
public class TestJobHistoryKeys {

private enum test_keys {
JOBTRACKERID, START_TIME, FINISH_TIME,
JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES,
FAILED_MAPS, FAILED_REDUCES,
FINISHED_MAPS, FINISHED_REDUCES,
JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
ERROR, TASK_ATTEMPT_ID, TASK_STATUS,
COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
JOBTRACKERID, START_TIME, FINISH_TIME,
JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES,
FAILED_MAPS, FAILED_REDUCES,
FINISHED_MAPS, FINISHED_REDUCES,
JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
ERROR, TASK_ATTEMPT_ID, TASK_STATUS,
COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS,
SPLITS, JOB_PRIORITY, HTTP_PORT,
TRACKER_NAME, STATE_STRING, VERSION,
MAP_COUNTERS, REDUCE_COUNTERS,
VIEW_JOB, MODIFY_JOB, JOB_QUEUE;
SPLITS, JOB_PRIORITY, HTTP_PORT,
TRACKER_NAME, STATE_STRING, VERSION,
MAP_COUNTERS, REDUCE_COUNTERS,
VIEW_JOB, MODIFY_JOB, JOB_QUEUE,
// hadoop 2.0 related keys {@link JobHistoryParser}
applicationAttemptId, containerId, nodeManagerHost,
successfulAttemptId, failedDueToAttempt,
workflowId, workflowName, workflowNodeName,
workflowAdjacencies, locality, avataar,
nodeManagerPort, nodeManagerHttpPort,
acls, uberized, shufflePort, mapFinishTime,
port, rackname, clockSplits, cpuUsages,
physMemKbytes, vMemKbytes, status, TOTAL_COUNTERS,
TASK_COUNTERS, TASK_ATTEMPT_COUNTERS;
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions hraven-etl/src/main/java/com/twitter/hraven/etl/JobFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ private void parseFilename() {
Matcher confMatcher = CONF_PATTERN.matcher(filename);
if (confMatcher.matches()) {
isJobConfFile = true;
LOG.debug("Job Conf file " + filename);
LOG.debug("Job Conf file " + filename + " with job id: " + jobid);
} else {
isJobHistoryFile = true;
LOG.debug("Job History file " + filename);
LOG.debug("Job History file " + filename + " with job id: " + jobid);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -45,6 +47,7 @@ public class JobFileModifiedRangePathFilter extends JobFilePathFilter {
* The configuration of this processing job (not the files we are processing).
*/
private final Configuration myConf;
private static Log LOG = LogFactory.getLog(JobFileModifiedRangePathFilter.class);

/**
* Constructs a filter that accepts only JobFiles with lastModification time
Expand Down Expand Up @@ -100,14 +103,14 @@ public boolean accept(Path path) {
FileSystem fs = path.getFileSystem(myConf);
FileStatus fileStatus = fs.getFileStatus(path);
long fileModificationTimeMillis = fileStatus.getModificationTime();

return accept(fileModificationTimeMillis);
} catch (IOException e) {
throw new ImportException("Cannot determine file modification time of "
+ path.getName(), e);
}
} else {
// Reject anything that does not match a job conf filename.
LOG.info(" Not a valid job conf / job history file "+ path.getName());
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface JobHistoryFileParser {
*
* @throws ProcessingException
*/
public void parse(InputStream historyFile, JobKey jobKey);
public void parse(byte[] historyFile, JobKey jobKey);

/**
* Return the generated list of job puts assembled when history file is
Expand Down
Loading