Skip to content

Commit

Permalink
Added settings for new job and finished last refactoring parts
Browse files Browse the repository at this point in the history
  • Loading branch information
Bohlski committed Dec 2, 2020
1 parent 987c230 commit dcee3b4
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 269 deletions.
Expand Up @@ -596,16 +596,26 @@ public class CommonSettings {
public static String HADOOP_MAPRED_METADATA_EXTRACTIONJOB_OUTPUT_DIR = "settings.common.hadoop.mapred.metadataExtractionJob.outputDir";

/**
* The directory/path for Hadoop to use as input path in metadata cdx-indexing map-/reduce jobs
* The directory/path for Hadoop to use as input path in metadata CDX-indexing map-/reduce jobs
* (Hadoop equivalent of ArchiveExtractCDXJob).
*/
public static String HADOOP_MAPRED_METADATA_CDXJOB_INPUT_DIR = "settings.common.hadoop.mapred.metadataCDXJob.inputDir";
public static String HADOOP_MAPRED_METADATA_CDX_EXTRACTIONJOB_INPUT_DIR = "settings.common.hadoop.mapred.metadataCDXExtractionJob.inputDir";

/**
* The directory/path for Hadoop to use as output path in metadata cdx-indexing map-/reduce jobs
* The directory/path for Hadoop to use as output path in metadata CDX-indexing map-/reduce jobs
* (Hadoop equivalent of ArchiveExtractCDXJob).
*/
public static String HADOOP_MAPRED_METADATA_CDXJOB_OUTPUT_DIR = "settings.common.hadoop.mapred.metadataCDXJob.outputDir";
public static String HADOOP_MAPRED_METADATA_CDX_EXTRACTIONJOB_OUTPUT_DIR = "settings.common.hadoop.mapred.metadataCDXExtractionJob.outputDir";

/**
*
*/
public static String HADOOP_MAPRED_CRAWLLOG_EXTRACTIONJOB_INPUT_DIR = "settings.common.hadoop.mapred.crawlLogExtractionJob.inputDir";

/**
*
*/
public static String HADOOP_MAPRED_CRAWLLOG_EXTRACTIONJOB_OUTPUT_DIR = "settings.common.hadoop.mapred.crawlLogExtractionJob.outputDir";

/**
* Path on the client machine where the uber-jar file containing the map-/reduce job and dependencies
Expand Down
Expand Up @@ -3,113 +3,130 @@
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.UUID;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import dk.netarkivet.common.CommonSettings;
import dk.netarkivet.common.exceptions.IOFailure;
import dk.netarkivet.common.utils.FileResolver;
import dk.netarkivet.common.utils.Settings;
import dk.netarkivet.common.utils.SettingsFactory;
import dk.netarkivet.common.utils.SimpleFileResolver;

/**
* Wrapper for a Hadoop job to prepare/handle a job.
*/
public class HadoopJob {
private static final Logger log = LoggerFactory.getLogger(HadoopJob.class);

private final Configuration hadoopConf;
private final Path jobInputFile;
private final Path jobOutputDir;
private final JobType jobType;
private final HadoopJobStrategy jobStrategy;
private final String jobType;
private final long jobID;
private Path jobInputFile;
private Path jobOutputDir;
private String filenamePattern = ".*";
private boolean setupFailed = false;
private int fileCount = 0;

public HadoopJob(Configuration hadoopConf, Path jobInputPath,
Path jobOutputDir, JobType jobType) {
this.hadoopConf = hadoopConf;
this.jobInputFile = jobInputPath;
this.jobOutputDir = jobOutputDir;
this.jobType = jobType;
/**
* Constructor.
*
* @param jobID The id of the current job.
* @param jobStrategy Strategy specifying
*/
public HadoopJob(long jobID, HadoopJobStrategy jobStrategy) {
this.jobID = jobID;
this.jobStrategy = jobStrategy;
jobType = jobStrategy.getJobType();
}

/**
* Prepare the job input by getting the relevant files to process from the fileresolver, writing their paths to a
* temp file, and copying this file to the input path.
* @param jobID The ID of the job that is being run.
* Prepare the job output and input by getting the relevant files to process from the fileresolver,
* writing their paths to a temp file, and copying this file to the input path.
* By default uses an all-matching pattern for the file resolver, so use {@link #processOnlyFilesMatching(String)}
* first to get files matching a specific pattern.
*
* @param fileSystem The Hadoop FileSystem used.
*/
public void prepareJobInput(long jobID, FileSystem fileSystem) {
public void prepareJobInputOutput(FileSystem fileSystem) {
UUID uuid = UUID.randomUUID();
jobInputFile = jobStrategy.createJobInputFile(uuid);
jobOutputDir = jobStrategy.createJobOutputDir(uuid);
if (jobInputFile == null || jobOutputDir == null) {
log.error("Failed initializing input/output for {} job '{}' with uuid '{}'",
jobType, jobID, uuid);
throw new IOFailure("Failed preparing job: failed initializing job input/output directory");
}

java.nio.file.Path localInputTempFile = HadoopFileUtils.makeLocalInputTempFile();
FileResolver fileResolver = SettingsFactory.getInstance(CommonSettings.FILE_RESOLVER_CLASS);
if (fileResolver instanceof SimpleFileResolver) {
String pillarParentDir = Settings.get(CommonSettings.HADOOP_MAPRED_INPUT_FILES_PARENT_DIR);
((SimpleFileResolver) fileResolver).setDirectory(Paths.get(pillarParentDir));
}
List<java.nio.file.Path> filePaths = fileResolver.getPaths(Pattern.compile(filenamePattern));
fileCount = filePaths.size();
try {
HadoopJobUtils.writeHadoopInputFileLinesToInputFile(filePaths, localInputTempFile);
} catch (IOException e) {
log.error("Failed writing filepaths to '{}' for {} job '{}'", localInputTempFile, jobType, jobID);
setupFailed = true;
return;
log.error("Failed writing filepaths to '{}' for {} job '{}'",
localInputTempFile, jobType, jobID);
throw new IOFailure("Failed preparing job: failed to write job input to input file");
}
log.info("Copying file with input paths '{}' to hdfs path '{}' for {} job '{}'.",
log.info("Copying file with input paths '{}' to job input path '{}' for {} job '{}'.",
localInputTempFile, jobInputFile, jobType, jobID);
try {
fileSystem.copyFromLocalFile(true, new Path(localInputTempFile.toAbsolutePath().toString()),
jobInputFile);
} catch (IOException e) {
log.error("Failed copying local input '{}' to '{}' for job '{}'", localInputTempFile, jobInputFile, jobID);
setupFailed = true;
log.error("Failed copying local input '{}' to job input path '{}' for job '{}'",
localInputTempFile, jobInputFile, jobID);
throw new IOFailure("Failed preparing job: failed copying input to job input path");
}
}

public int run(long jobID, Mapper<LongWritable, Text, NullWritable, Text> mapper) {
int exitCode;
try {
log.info("Starting {} job for jobID {}", jobType, jobID);
exitCode = ToolRunner.run(new HadoopJobTool(hadoopConf, mapper),
new String[] {jobInputFile.toString(), jobOutputDir.toString()});
} catch (Exception e) {
log.warn("{} job with ID {} failed to run normally.", jobType, jobID, e);
exitCode = 1;
/**
* Runs a Hadoop job according to the used strategy, configuration, and the settings in
* {@link dk.netarkivet.common.utils.hadoop.HadoopJobTool}.
*/
public void run() {
log.info("Starting {} job for jobID {} on {} file(s) matching pattern '{}'",
jobType, jobID, fileCount, filenamePattern);
int exitCode = jobStrategy.runJob(jobInputFile, jobOutputDir);
if (exitCode == 0) {
log.info("{} job with jobID {} was a success!", jobType, jobID);
} else {
log.warn("{} job with ID {} failed with exit code '{}'", jobType, jobID, exitCode);
throw new IOFailure("Hadoop job failed with exit code {}");
}
return exitCode;
}

public Configuration getHadoopConf() {
return hadoopConf;
}

public JobType getJobType() {
return jobType;
}

public Path getJobInputFile() {
return jobInputFile;
/**
* Changes the pattern used when getting the files for the job's input.
*
* @param filenamePattern Pattern to use when matching filenames.
*/
public void processOnlyFilesMatching(String filenamePattern) {
this.filenamePattern = filenamePattern;
}

/**
* Get the output directory for the job.
* @return Path representing output directory.
*/
public Path getJobOutputDir() {
return jobOutputDir;
}

public void processOnlyFilesMatching(String fileSearchPattern) {
this.filenamePattern = fileSearchPattern;
}

public String getFilenamePattern() {
return filenamePattern;
}

public boolean hasJobsetupFailed() {
return setupFailed;
/**
* Get what type of job is being run.
* @return The job type set by the job strategy used.
*/
public String getJobType() {
return jobType;
}
}
@@ -0,0 +1,43 @@
package dk.netarkivet.common.utils.hadoop;

import java.util.UUID;

import org.apache.hadoop.fs.Path;

/**
* Interface for a HadoopJob's strategy of how to perform the job.
*/
public interface HadoopJobStrategy {

/**
* Runs a Hadoop job (HadoopJobTool) according to the specification of the used strategy.
*
* @param jobInputFile The Path specifying the job's input file.
* @param jobOutputDir The Path specifying the job's output directory.
* @return
*/
int runJob(Path jobInputFile, Path jobOutputDir);

/**
* Create the job input file with name from a uuid.
*
* @param uuid The UUID to create a unique name from.
* @return Path specifying where the input file is located.
*/
Path createJobInputFile(UUID uuid);

/**
* Create the job output directory with name from a uuid.
*
* @param uuid The UUID to create a unique name from.
* @return Path specifying where the output directory is located.
*/
Path createJobOutputDir(UUID uuid);

/**
* Return a string specifying which kind of job is being run.
*
* @return String specifying the job's type.
*/
String getJobType();
}

This file was deleted.

@@ -0,0 +1,77 @@
package dk.netarkivet.common.utils.hadoop;

import java.util.UUID;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import dk.netarkivet.common.CommonSettings;
import dk.netarkivet.common.utils.Settings;

/**
* Strategy to give a HadoopJob when wanting to extract selected content from metadata files matching specific
* URL- and MIME-patterns. The mapper expects the used Configuration to have these patterns set before use.
* Otherwise, it will use all-matching patterns.
*
* This type of job is the Hadoop counterpart to running
* {@link dk.netarkivet.common.utils.archive.GetMetadataArchiveBatchJob}.
*/
public class MetadataExtractionStrategy implements HadoopJobStrategy {
private final Logger log = LoggerFactory.getLogger(MetadataExtractionStrategy.class);
private final long jobID;
private final FileSystem fileSystem;
private final Configuration hadoopConf;
private final Pattern urlPattern;
private final Pattern mimePattern;

/**
* Constructor.
*
* @param jobID The ID for the job.
* @param fileSystem The Hadoop FileSystem used.
*/
public MetadataExtractionStrategy(long jobID, FileSystem fileSystem) {
this.jobID = jobID;
this.fileSystem = fileSystem;
hadoopConf = fileSystem.getConf();
urlPattern = hadoopConf.getPattern(GetMetadataMapper.URL_PATTERN, Pattern.compile(".*"));
mimePattern = hadoopConf.getPattern(GetMetadataMapper.MIME_PATTERN, Pattern.compile(".*"));
}

@Override public int runJob(Path jobInputFile, Path jobOutputDir) {
int exitCode;
try {
log.info("URL/MIME patterns used for metadata extraction job {} are '{}' and '{}'",
jobID, urlPattern, mimePattern);
exitCode = ToolRunner.run(new HadoopJobTool(hadoopConf, new GetMetadataMapper()),
new String[] {jobInputFile.toString(), jobOutputDir.toString()});
} catch (Exception e) {
log.warn("Metadata extraction job with ID {} failed to run normally.", jobID, e);
exitCode = 1;
}
return exitCode;
}

@Override public Path createJobInputFile(UUID uuid) {
Path jobInputFile = HadoopFileUtils.createUniquePathInDir(
fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_EXTRACTIONJOB_INPUT_DIR), uuid);
log.info("Input file for metadata extraction job '{}' will be '{}'", jobID, jobInputFile);
return jobInputFile;
}

@Override public Path createJobOutputDir(UUID uuid) {
Path jobOutputDir = HadoopFileUtils.createUniquePathInDir(
fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_EXTRACTIONJOB_OUTPUT_DIR), uuid);
log.info("Output directory for metadata extraction job '{}' is '{}'", jobID, jobOutputDir);
return jobOutputDir;
}

@Override public String getJobType() {
return "METADATA EXTRACTION";
}
}

0 comments on commit dcee3b4

Please sign in to comment.