From dcee3b48afde25b3ab1ac42fa65adc64f672d91e Mon Sep 17 00:00:00 2001 From: bohlski Date: Wed, 2 Dec 2020 14:07:18 +0100 Subject: [PATCH] Added settings for new job and finished last refactoring parts --- .../dk/netarkivet/common/CommonSettings.java | 18 +- .../common/utils/hadoop/HadoopJob.java | 131 ++++++++------- .../utils/hadoop/HadoopJobStrategy.java | 43 +++++ .../common/utils/hadoop/JobType.java | 19 --- .../hadoop/MetadataExtractionStrategy.java | 77 +++++++++ .../indexserver/RawMetadataCache.java | 154 ++++-------------- .../viewerproxy/webinterface/Reporting.java | 97 ++++------- .../hadoop/CrawlLogExtractionStrategy.java | 72 ++++++++ .../hadoop/MetadataCDXExtractionStrategy.java | 70 ++++++++ 9 files changed, 412 insertions(+), 269 deletions(-) create mode 100644 common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/HadoopJobStrategy.java delete mode 100644 common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/JobType.java create mode 100644 common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/MetadataExtractionStrategy.java create mode 100644 harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/hadoop/CrawlLogExtractionStrategy.java create mode 100644 harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/hadoop/MetadataCDXExtractionStrategy.java diff --git a/common/common-core/src/main/java/dk/netarkivet/common/CommonSettings.java b/common/common-core/src/main/java/dk/netarkivet/common/CommonSettings.java index 08e7274d96..fd7d3c0e30 100644 --- a/common/common-core/src/main/java/dk/netarkivet/common/CommonSettings.java +++ b/common/common-core/src/main/java/dk/netarkivet/common/CommonSettings.java @@ -596,16 +596,26 @@ public class CommonSettings { public static String HADOOP_MAPRED_METADATA_EXTRACTIONJOB_OUTPUT_DIR = "settings.common.hadoop.mapred.metadataExtractionJob.outputDir"; /** - * The directory/path for Hadoop to use as input path in metadata cdx-indexing map-/reduce jobs + * The directory/path for Hadoop to use as input path in metadata CDX-indexing map-/reduce jobs * (Hadoop equivalent of ArchiveExtractCDXJob). */ - public static String HADOOP_MAPRED_METADATA_CDXJOB_INPUT_DIR = "settings.common.hadoop.mapred.metadataCDXJob.inputDir"; + public static String HADOOP_MAPRED_METADATA_CDX_EXTRACTIONJOB_INPUT_DIR = "settings.common.hadoop.mapred.metadataCDXExtractionJob.inputDir"; /** - * The directory/path for Hadoop to use as output path in metadata cdx-indexing map-/reduce jobs + * The directory/path for Hadoop to use as output path in metadata CDX-indexing map-/reduce jobs * (Hadoop equivalent of ArchiveExtractCDXJob). */ - public static String HADOOP_MAPRED_METADATA_CDXJOB_OUTPUT_DIR = "settings.common.hadoop.mapred.metadataCDXJob.outputDir"; + public static String HADOOP_MAPRED_METADATA_CDX_EXTRACTIONJOB_OUTPUT_DIR = "settings.common.hadoop.mapred.metadataCDXExtractionJob.outputDir"; + + /** + * + */ + public static String HADOOP_MAPRED_CRAWLLOG_EXTRACTIONJOB_INPUT_DIR = "settings.common.hadoop.mapred.crawlLogExtractionJob.inputDir"; + + /** + * + */ + public static String HADOOP_MAPRED_CRAWLLOG_EXTRACTIONJOB_OUTPUT_DIR = "settings.common.hadoop.mapred.crawlLogExtractionJob.outputDir"; /** * Path on the client machine where the uber-jar file containing the map-/reduce job and dependencies diff --git a/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/HadoopJob.java b/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/HadoopJob.java index 11053bdf05..f02c7dd998 100644 --- a/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/HadoopJob.java +++ b/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/HadoopJob.java @@ -3,50 +3,65 @@ import java.io.IOException; import java.nio.file.Paths; import java.util.List; +import java.util.UUID; import java.util.regex.Pattern; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import dk.netarkivet.common.CommonSettings; +import dk.netarkivet.common.exceptions.IOFailure; import dk.netarkivet.common.utils.FileResolver; import dk.netarkivet.common.utils.Settings; import dk.netarkivet.common.utils.SettingsFactory; import dk.netarkivet.common.utils.SimpleFileResolver; +/** + * Wrapper for a Hadoop job to prepare/handle a job. + */ public class HadoopJob { private static final Logger log = LoggerFactory.getLogger(HadoopJob.class); - private final Configuration hadoopConf; - private final Path jobInputFile; - private final Path jobOutputDir; - private final JobType jobType; + private final HadoopJobStrategy jobStrategy; + private final String jobType; + private final long jobID; + private Path jobInputFile; + private Path jobOutputDir; private String filenamePattern = ".*"; - private boolean setupFailed = false; + private int fileCount = 0; - public HadoopJob(Configuration hadoopConf, Path jobInputPath, - Path jobOutputDir, JobType jobType) { - this.hadoopConf = hadoopConf; - this.jobInputFile = jobInputPath; - this.jobOutputDir = jobOutputDir; - this.jobType = jobType; + /** + * Constructor. + * + * @param jobID The id of the current job. + * @param jobStrategy Strategy specifying + */ + public HadoopJob(long jobID, HadoopJobStrategy jobStrategy) { + this.jobID = jobID; + this.jobStrategy = jobStrategy; + jobType = jobStrategy.getJobType(); } /** - * Prepare the job input by getting the relevant files to process from the fileresolver, writing their paths to a - * temp file, and copying this file to the input path. - * @param jobID The ID of the job that is being run. + * Prepare the job output and input by getting the relevant files to process from the fileresolver, + * writing their paths to a temp file, and copying this file to the input path. + * By default uses an all-matching pattern for the file resolver, so use {@link #processOnlyFilesMatching(String)} + * first to get files matching a specific pattern. + * * @param fileSystem The Hadoop FileSystem used. */ - public void prepareJobInput(long jobID, FileSystem fileSystem) { + public void prepareJobInputOutput(FileSystem fileSystem) { + UUID uuid = UUID.randomUUID(); + jobInputFile = jobStrategy.createJobInputFile(uuid); + jobOutputDir = jobStrategy.createJobOutputDir(uuid); + if (jobInputFile == null || jobOutputDir == null) { + log.error("Failed initializing input/output for {} job '{}' with uuid '{}'", + jobType, jobID, uuid); + throw new IOFailure("Failed preparing job: failed initializing job input/output directory"); + } + java.nio.file.Path localInputTempFile = HadoopFileUtils.makeLocalInputTempFile(); FileResolver fileResolver = SettingsFactory.getInstance(CommonSettings.FILE_RESOLVER_CLASS); if (fileResolver instanceof SimpleFileResolver) { @@ -54,62 +69,64 @@ public void prepareJobInput(long jobID, FileSystem fileSystem) { ((SimpleFileResolver) fileResolver).setDirectory(Paths.get(pillarParentDir)); } List filePaths = fileResolver.getPaths(Pattern.compile(filenamePattern)); + fileCount = filePaths.size(); try { HadoopJobUtils.writeHadoopInputFileLinesToInputFile(filePaths, localInputTempFile); } catch (IOException e) { - log.error("Failed writing filepaths to '{}' for {} job '{}'", localInputTempFile, jobType, jobID); - setupFailed = true; - return; + log.error("Failed writing filepaths to '{}' for {} job '{}'", + localInputTempFile, jobType, jobID); + throw new IOFailure("Failed preparing job: failed to write job input to input file"); } - log.info("Copying file with input paths '{}' to hdfs path '{}' for {} job '{}'.", + log.info("Copying file with input paths '{}' to job input path '{}' for {} job '{}'.", localInputTempFile, jobInputFile, jobType, jobID); try { fileSystem.copyFromLocalFile(true, new Path(localInputTempFile.toAbsolutePath().toString()), jobInputFile); } catch (IOException e) { - log.error("Failed copying local input '{}' to '{}' for job '{}'", localInputTempFile, jobInputFile, jobID); - setupFailed = true; + log.error("Failed copying local input '{}' to job input path '{}' for job '{}'", + localInputTempFile, jobInputFile, jobID); + throw new IOFailure("Failed preparing job: failed copying input to job input path"); } } - public int run(long jobID, Mapper mapper) { - int exitCode; - try { - log.info("Starting {} job for jobID {}", jobType, jobID); - exitCode = ToolRunner.run(new HadoopJobTool(hadoopConf, mapper), - new String[] {jobInputFile.toString(), jobOutputDir.toString()}); - } catch (Exception e) { - log.warn("{} job with ID {} failed to run normally.", jobType, jobID, e); - exitCode = 1; + /** + * Runs a Hadoop job according to the used strategy, configuration, and the settings in + * {@link dk.netarkivet.common.utils.hadoop.HadoopJobTool}. + */ + public void run() { + log.info("Starting {} job for jobID {} on {} file(s) matching pattern '{}'", + jobType, jobID, fileCount, filenamePattern); + int exitCode = jobStrategy.runJob(jobInputFile, jobOutputDir); + if (exitCode == 0) { + log.info("{} job with jobID {} was a success!", jobType, jobID); + } else { + log.warn("{} job with ID {} failed with exit code '{}'", jobType, jobID, exitCode); + throw new IOFailure("Hadoop job failed with exit code {}"); } - return exitCode; - } - - public Configuration getHadoopConf() { - return hadoopConf; - } - - public JobType getJobType() { - return jobType; } - public Path getJobInputFile() { - return jobInputFile; + /** + * Changes the pattern used when getting the files for the job's input. + * + * @param filenamePattern Pattern to use when matching filenames. + */ + public void processOnlyFilesMatching(String filenamePattern) { + this.filenamePattern = filenamePattern; } + /** + * Get the output directory for the job. + * @return Path representing output directory. + */ public Path getJobOutputDir() { return jobOutputDir; } - public void processOnlyFilesMatching(String fileSearchPattern) { - this.filenamePattern = fileSearchPattern; - } - - public String getFilenamePattern() { - return filenamePattern; - } - - public boolean hasJobsetupFailed() { - return setupFailed; + /** + * Get what type of job is being run. + * @return The job type set by the job strategy used. + */ + public String getJobType() { + return jobType; } } diff --git a/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/HadoopJobStrategy.java b/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/HadoopJobStrategy.java new file mode 100644 index 0000000000..9fc4ce629b --- /dev/null +++ b/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/HadoopJobStrategy.java @@ -0,0 +1,43 @@ +package dk.netarkivet.common.utils.hadoop; + +import java.util.UUID; + +import org.apache.hadoop.fs.Path; + +/** + * Interface for a HadoopJob's strategy of how to perform the job. + */ +public interface HadoopJobStrategy { + + /** + * Runs a Hadoop job (HadoopJobTool) according to the specification of the used strategy. + * + * @param jobInputFile The Path specifying the job's input file. + * @param jobOutputDir The Path specifying the job's output directory. + * @return + */ + int runJob(Path jobInputFile, Path jobOutputDir); + + /** + * Create the job input file with name from a uuid. + * + * @param uuid The UUID to create a unique name from. + * @return Path specifying where the input file is located. + */ + Path createJobInputFile(UUID uuid); + + /** + * Create the job output directory with name from a uuid. + * + * @param uuid The UUID to create a unique name from. + * @return Path specifying where the output directory is located. + */ + Path createJobOutputDir(UUID uuid); + + /** + * Return a string specifying which kind of job is being run. + * + * @return String specifying the job's type. + */ + String getJobType(); +} diff --git a/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/JobType.java b/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/JobType.java deleted file mode 100644 index 12643c9bff..0000000000 --- a/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/JobType.java +++ /dev/null @@ -1,19 +0,0 @@ -package dk.netarkivet.common.utils.hadoop; - -/** - * Enum specifying which type of Hadoop job is being run - mostly for logging purposes. - */ -public enum JobType { - STANDARD_CDX, - METADATA_CDX, - CRAWL_LOG_EXTRACTION; - - public String toString() { - switch(this) { - case STANDARD_CDX: return "standard CDX"; - case METADATA_CDX: return "metadata CDX"; - case CRAWL_LOG_EXTRACTION: return "crawl log extraction"; - default: return ""; // TODO what do? - } - } -} \ No newline at end of file diff --git a/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/MetadataExtractionStrategy.java b/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/MetadataExtractionStrategy.java new file mode 100644 index 0000000000..b2697a0022 --- /dev/null +++ b/common/common-core/src/main/java/dk/netarkivet/common/utils/hadoop/MetadataExtractionStrategy.java @@ -0,0 +1,77 @@ +package dk.netarkivet.common.utils.hadoop; + +import java.util.UUID; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import dk.netarkivet.common.CommonSettings; +import dk.netarkivet.common.utils.Settings; + +/** + * Strategy to give a HadoopJob when wanting to extract selected content from metadata files matching specific + * URL- and MIME-patterns. The mapper expects the used Configuration to have these patterns set before use. + * Otherwise, it will use all-matching patterns. + * + * This type of job is the Hadoop counterpart to running + * {@link dk.netarkivet.common.utils.archive.GetMetadataArchiveBatchJob}. + */ +public class MetadataExtractionStrategy implements HadoopJobStrategy { + private final Logger log = LoggerFactory.getLogger(MetadataExtractionStrategy.class); + private final long jobID; + private final FileSystem fileSystem; + private final Configuration hadoopConf; + private final Pattern urlPattern; + private final Pattern mimePattern; + + /** + * Constructor. + * + * @param jobID The ID for the job. + * @param fileSystem The Hadoop FileSystem used. + */ + public MetadataExtractionStrategy(long jobID, FileSystem fileSystem) { + this.jobID = jobID; + this.fileSystem = fileSystem; + hadoopConf = fileSystem.getConf(); + urlPattern = hadoopConf.getPattern(GetMetadataMapper.URL_PATTERN, Pattern.compile(".*")); + mimePattern = hadoopConf.getPattern(GetMetadataMapper.MIME_PATTERN, Pattern.compile(".*")); + } + + @Override public int runJob(Path jobInputFile, Path jobOutputDir) { + int exitCode; + try { + log.info("URL/MIME patterns used for metadata extraction job {} are '{}' and '{}'", + jobID, urlPattern, mimePattern); + exitCode = ToolRunner.run(new HadoopJobTool(hadoopConf, new GetMetadataMapper()), + new String[] {jobInputFile.toString(), jobOutputDir.toString()}); + } catch (Exception e) { + log.warn("Metadata extraction job with ID {} failed to run normally.", jobID, e); + exitCode = 1; + } + return exitCode; + } + + @Override public Path createJobInputFile(UUID uuid) { + Path jobInputFile = HadoopFileUtils.createUniquePathInDir( + fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_EXTRACTIONJOB_INPUT_DIR), uuid); + log.info("Input file for metadata extraction job '{}' will be '{}'", jobID, jobInputFile); + return jobInputFile; + } + + @Override public Path createJobOutputDir(UUID uuid) { + Path jobOutputDir = HadoopFileUtils.createUniquePathInDir( + fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_EXTRACTIONJOB_OUTPUT_DIR), uuid); + log.info("Output directory for metadata extraction job '{}' is '{}'", jobID, jobOutputDir); + return jobOutputDir; + } + + @Override public String getJobType() { + return "METADATA EXTRACTION"; + } +} diff --git a/harvester/harvester-core/src/main/java/dk/netarkivet/harvester/indexserver/RawMetadataCache.java b/harvester/harvester-core/src/main/java/dk/netarkivet/harvester/indexserver/RawMetadataCache.java index 28185dec1d..a8c4884567 100644 --- a/harvester/harvester-core/src/main/java/dk/netarkivet/harvester/indexserver/RawMetadataCache.java +++ b/harvester/harvester-core/src/main/java/dk/netarkivet/harvester/indexserver/RawMetadataCache.java @@ -28,7 +28,6 @@ import java.nio.file.Paths; import java.util.Hashtable; import java.util.List; -import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,8 +35,6 @@ import org.apache.commons.math3.util.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,17 +46,15 @@ import dk.netarkivet.common.distribute.arcrepository.ViewerArcRepositoryClient; import dk.netarkivet.common.exceptions.ArgumentNotValid; import dk.netarkivet.common.exceptions.IOFailure; -import dk.netarkivet.common.utils.FileResolver; import dk.netarkivet.common.utils.FileUtils; -import dk.netarkivet.common.utils.SettingsFactory; -import dk.netarkivet.common.utils.hadoop.HadoopFileUtils; -import dk.netarkivet.common.utils.hadoop.HadoopJobUtils; import dk.netarkivet.common.utils.Settings; -import dk.netarkivet.common.utils.SimpleFileResolver; import dk.netarkivet.common.utils.archive.ArchiveBatchJob; import dk.netarkivet.common.utils.archive.GetMetadataArchiveBatchJob; import dk.netarkivet.common.utils.hadoop.GetMetadataMapper; -import dk.netarkivet.common.utils.hadoop.HadoopJobTool; +import dk.netarkivet.common.utils.hadoop.HadoopJob; +import dk.netarkivet.common.utils.hadoop.HadoopJobStrategy; +import dk.netarkivet.common.utils.hadoop.HadoopJobUtils; +import dk.netarkivet.common.utils.hadoop.MetadataExtractionStrategy; import dk.netarkivet.harvester.HarvesterSettings; import dk.netarkivet.harvester.harvesting.metadata.MetadataFile; @@ -167,74 +162,29 @@ private Long cacheDataHadoop(Long id) { Configuration conf = HadoopJobUtils.getConfFromSettings(); conf.setPattern(GetMetadataMapper.URL_PATTERN, urlPattern); conf.setPattern(GetMetadataMapper.MIME_PATTERN, mimePattern); - UUID uuid = UUID.randomUUID(); try (FileSystem fileSystem = FileSystem.newInstance(conf)) { - Path jobInputNameFile = HadoopFileUtils.createUniquePathInDir( - fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_EXTRACTIONJOB_INPUT_DIR), uuid); - log.info("Input file for metadata extraction job '{}' will be '{}'", id, jobInputNameFile); - Path jobOutputDir = HadoopFileUtils.createUniquePathInDir( - fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_EXTRACTIONJOB_OUTPUT_DIR), uuid); - log.info("Output directory for metadata extraction job '{}' is '{}'", id, jobOutputDir); - if (jobInputNameFile == null || jobOutputDir == null) { - log.error("Failed initializing input/output for metadata extraction job '{}' with uuid '{}'", id, uuid); - return null; - } - - java.nio.file.Path localInputTempFile = HadoopFileUtils.makeLocalInputTempFile(); - String pillarParentDir = Settings.get(CommonSettings.HADOOP_MAPRED_INPUT_FILES_PARENT_DIR); - FileResolver fileResolver = SettingsFactory.getInstance(CommonSettings.FILE_RESOLVER_CLASS); - if (fileResolver instanceof SimpleFileResolver) { - ((SimpleFileResolver) fileResolver).setDirectory(Paths.get(pillarParentDir)); - } - List filePaths = fileResolver.getPaths(Pattern.compile(specifiedPattern)); - try { - HadoopJobUtils.writeHadoopInputFileLinesToInputFile(filePaths, localInputTempFile); - } catch (IOException e) { - log.error("Failed writing filepaths to '{}' for metadata extraction job '{}'", localInputTempFile, id); - return null; - } - log.info("Copying file with input paths '{}' to hdfs '{}' for metadata extraction job '{}'.", - localInputTempFile, jobInputNameFile, id); - try { - fileSystem.copyFromLocalFile(true, new Path(localInputTempFile.toAbsolutePath().toString()), - jobInputNameFile); - } catch (IOException e) { - log.error("Failed copying local input '{}' to '{}' for metadata extraction job '{}'", - localInputTempFile.toAbsolutePath(), jobInputNameFile, id); - return null; - } - - int exitCode = 0; - try { - log.info("Starting metadata extraction job on {} file(s) matching pattern '{}'. URL/MIME patterns used for" - + " job are '{}' and '{}'", filePaths.size(), specifiedPattern, urlPattern, mimePattern); - log.info("Starting hadoop job '{}' with input {} and output {}.", id, jobInputNameFile, jobOutputDir); - exitCode = ToolRunner.run(new HadoopJobTool(conf, new GetMetadataMapper()), - new String[] {jobInputNameFile.toString(), jobOutputDir.toString()}); - - if (exitCode == 0) { - List metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, jobOutputDir); - if (metadataLines.size() > 0) { - File cacheFileName = getCacheFile(id); - if (tryToMigrateDuplicationRecords) { - migrateDuplicatesHadoop(id, fileSystem, specifiedPattern, metadataLines, cacheFileName); - } else { - copyResults(id, metadataLines, cacheFileName); - } - log.debug("Cached data for job '{}' for '{}'", id, prefix); - return id; - } else { - log.info("No data found for job '{}' for '{}' in local bitarchive. ", id, prefix); - } + HadoopJobStrategy jobStrategy = new MetadataExtractionStrategy(id, fileSystem); + HadoopJob job = new HadoopJob(id, jobStrategy); + job.processOnlyFilesMatching(specifiedPattern); + job.prepareJobInputOutput(fileSystem); + job.run(); + // If no error is thrown, job was success + + List metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir()); + if (metadataLines.size() > 0) { + File cacheFileName = getCacheFile(id); + if (tryToMigrateDuplicationRecords) { + migrateDuplicatesHadoop(id, fileSystem, specifiedPattern, metadataLines, cacheFileName); } else { - log.warn("Metadata extraction job '{}' failed with exit code '{}'", id, exitCode); + copyResults(id, metadataLines, cacheFileName); } - } catch (Exception e) { - log.error("Metadata extraction job '{}' failed to run normally.", id, e); - return null; + log.debug("Cached data for job '{}' for '{}'", id, prefix); + return id; + } else { + log.info("No data found for job '{}' for '{}' in local bitarchive. ", id, prefix); } } catch (IOException e) { - log.error("Error on hadoop filesystem for job '{}'.", id, e); + log.error("Error instantiating Hadoop filesystem for job {}.", id, e); } return null; } @@ -256,61 +206,17 @@ private void migrateDuplicatesHadoop(Long id, FileSystem fileSystem, String spec Configuration conf = fileSystem.getConf(); conf.setPattern(GetMetadataMapper.URL_PATTERN, Pattern.compile(".*duplicationmigration.*")); conf.setPattern(GetMetadataMapper.MIME_PATTERN, Pattern.compile("text/plain")); - UUID uuid = UUID.randomUUID(); - Path hadoopInputNameFile = HadoopFileUtils.createUniquePathInDir( - fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_EXTRACTIONJOB_INPUT_DIR), uuid); - log.info("Hadoop input file will be '{}'", hadoopInputNameFile); - - Path jobOutputDir = HadoopFileUtils.createUniquePathInDir( - fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_EXTRACTIONJOB_OUTPUT_DIR), uuid); - log.info("Output directory for job is '{}'", jobOutputDir); - - if (hadoopInputNameFile == null || jobOutputDir == null) { - log.error("Failed initializing input/output for job '{}' with uuid '{}'", id, uuid); - return; - } - java.nio.file.Path localInputTempFile = HadoopFileUtils.makeLocalInputTempFile(); + HadoopJobStrategy jobStrategy = new MetadataExtractionStrategy(id, fileSystem); + HadoopJob job = new HadoopJob(id, jobStrategy); + job.processOnlyFilesMatching(specifiedPattern); + job.prepareJobInputOutput(fileSystem); + job.run(); - FileResolver fileResolver = SettingsFactory.getInstance(CommonSettings.FILE_RESOLVER_CLASS); - if (fileResolver instanceof SimpleFileResolver) { - String pillarParentDir = Settings.get(CommonSettings.HADOOP_MAPRED_INPUT_FILES_PARENT_DIR); - ((SimpleFileResolver) fileResolver).setDirectory(Paths.get(pillarParentDir)); - } - - - List filePaths = fileResolver.getPaths(Pattern.compile(specifiedPattern)); try { - HadoopJobUtils.writeHadoopInputFileLinesToInputFile(filePaths, localInputTempFile); + List metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir()); + handleMigrationHadoop(id, metadataLines, originalJobResults, cacheFileName); } catch (IOException e) { - log.error("Failed writing filepaths to '{}' for job '{}'", localInputTempFile, id); - return; - } - log.info("Copying file with input paths '{}' to hdfs '{}'.", localInputTempFile, hadoopInputNameFile); - try { - fileSystem.copyFromLocalFile(false, new Path(localInputTempFile.toAbsolutePath().toString()), - hadoopInputNameFile); - } catch (IOException e) { - log.error("Failed copying local input '{}' to '{}' for job '{}'", - localInputTempFile.toAbsolutePath(), hadoopInputNameFile, id); - return; - } - - log.info("Starting metadata extraction job on {} file(s) matching pattern '{}'. URL/MIME patterns used for" - + " job are '{}' and '{}'", filePaths.size(), specifiedPattern, urlPattern, mimePattern); - int exitCode = 0; - try { - log.info("Starting hadoop job with input {} and output {}.", hadoopInputNameFile, jobOutputDir); - exitCode = ToolRunner.run(new HadoopJobTool(conf, new GetMetadataMapper()), - new String[] {hadoopInputNameFile.toString(), jobOutputDir.toString()}); - - if (exitCode == 0) { - List metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, jobOutputDir); - handleMigrationHadoop(id, metadataLines, originalJobResults, cacheFileName); - } else { - log.warn("Hadoop job failed with exit code '{}'", exitCode); - } - } catch (Exception e) { - log.error("Hadoop indexing job failed to run normally.", e); + log.error("Failed getting duplicationmigration lines output from Hadoop job with ID: {}", id); } } else { copyResults(id, originalJobResults, cacheFileName); diff --git a/harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/Reporting.java b/harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/Reporting.java index 4935de3cdd..61be3f3188 100644 --- a/harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/Reporting.java +++ b/harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/Reporting.java @@ -36,7 +36,6 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,12 +51,11 @@ import dk.netarkivet.common.utils.batch.FileListJob; import dk.netarkivet.common.utils.cdx.ArchiveExtractCDXJob; import dk.netarkivet.common.utils.cdx.CDXRecord; -import dk.netarkivet.common.utils.hadoop.HadoopFileUtils; -import dk.netarkivet.common.utils.hadoop.HadoopJobUtils; import dk.netarkivet.common.utils.hadoop.HadoopJob; -import dk.netarkivet.common.utils.hadoop.JobType; -import dk.netarkivet.viewerproxy.webinterface.hadoop.CrawlLogExtractionMapper; -import dk.netarkivet.viewerproxy.webinterface.hadoop.MetadataCDXMapper; +import dk.netarkivet.common.utils.hadoop.HadoopJobStrategy; +import dk.netarkivet.common.utils.hadoop.HadoopJobUtils; +import dk.netarkivet.viewerproxy.webinterface.hadoop.CrawlLogExtractionStrategy; +import dk.netarkivet.viewerproxy.webinterface.hadoop.MetadataCDXExtractionStrategy; /** * Methods for generating the batch results needed by the QA pages. @@ -139,45 +137,31 @@ public static List getMetadataCDXRecordsForJob(long jobid) { /** * Submits a Hadoop job to generate cdx for all metadata files for a jobID and returns the resulting list of records. * - * @param jobid The job to get cdx for. - * @return A list of cdx records. + * @param jobid The job to get CDX for. + * @return A list of CDX records. */ private static List getRecordsUsingHadoop(long jobid) { - JobType jobType = JobType.METADATA_CDX; Configuration hadoopConf = HadoopJobUtils.getConfFromSettings(); String metadataFileSearchPattern = getMetadataFilePatternForJobId(jobid); - UUID uuid = UUID.randomUUID(); try (FileSystem fileSystem = FileSystem.newInstance(hadoopConf)) { - Path jobInputNameFile = HadoopFileUtils.createUniquePathInDir( - fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_CDXJOB_INPUT_DIR), uuid); - log.info("Input file for {} job '{}' will be '{}'", jobType, jobid, jobInputNameFile); - Path jobOutputDir = HadoopFileUtils.createUniquePathInDir( - fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_CDXJOB_OUTPUT_DIR), uuid); - log.info("Output directory for {} job '{}' is '{}'", jobType, jobid, jobOutputDir); - if (jobInputNameFile == null || jobOutputDir == null) { - log.error("Failed initializing input/output for {} job '{}' with uuid '{}'", jobType, jobid, uuid); - throw new IOFailure("Failed initializing input/output directory"); - } - - HadoopJob hadoopJob = new HadoopJob(hadoopConf, jobInputNameFile, jobOutputDir, jobType); - hadoopJob.processOnlyFilesMatching(metadataFileSearchPattern); - hadoopJob.prepareJobInput(jobid, fileSystem); - if (hadoopJob.hasJobsetupFailed()) { - throw new IOFailure("Failed writing job input"); - } + HadoopJobStrategy jobStrategy = new MetadataCDXExtractionStrategy(jobid, fileSystem); + HadoopJob job = new HadoopJob(jobid, jobStrategy); + job.processOnlyFilesMatching(metadataFileSearchPattern); + job.prepareJobInputOutput(fileSystem); + job.run(); - int exitCode = hadoopJob.run(jobid, new MetadataCDXMapper()); - if (exitCode == 0) { - log.info("{} job with jobID {} was a success!", jobType, jobid); - List cdxLines = HadoopJobUtils.collectOutputLines(fileSystem, jobOutputDir); - return HadoopJobUtils.getCDXRecordListFromCDXLines(cdxLines); - } else { - log.warn("{} job with ID {} failed with exit code '{}'", jobType, jobid, exitCode); + List cdxLines; + try { + cdxLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir()); + } catch (IOException e) { + log.error("Failed getting CDX lines output for Hadoop job with ID: {}", jobid); + throw new IOFailure("Failed getting " + job.getJobType() + " job results"); } + return HadoopJobUtils.getCDXRecordListFromCDXLines(cdxLines); } catch (IOException e) { log.error("Error instantiating Hadoop filesystem for job {}.", jobid, e); + throw new IOFailure("Failed instantiating Hadoop filesystem."); } - return null; } /** @@ -313,7 +297,7 @@ public static File getCrawlLoglinesMatchingRegexp(long jobid, String regexp) { } /** - * Using Hadoop, gets crawllog lines for a given jobID matching a given regular expression + * Using Hadoop, gets crawllog lines for a given jobID matching a given regular expression. * * @param jobID The ID for the job. * @param regex The regular expression specifying files to process. @@ -321,43 +305,26 @@ public static File getCrawlLoglinesMatchingRegexp(long jobid, String regexp) { */ private static File getCrawlLogLinesUsingHadoop(long jobID, String regex) { String metadataFileSearchPattern = getMetadataFilePatternForJobId(jobID); - JobType jobType = JobType.CRAWL_LOG_EXTRACTION; Configuration hadoopConf = HadoopJobUtils.getConfFromSettings(); hadoopConf.set("regex", regex); - UUID uuid = UUID.randomUUID(); try (FileSystem fileSystem = FileSystem.newInstance(hadoopConf)) { - Path jobInputNameFile = HadoopFileUtils.createUniquePathInDir( - fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_CDXJOB_INPUT_DIR), uuid); - log.info("Input file for {} job '{}' will be '{}'", jobType, jobID, jobInputNameFile); - Path jobOutputDir = HadoopFileUtils.createUniquePathInDir( - fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_CDXJOB_OUTPUT_DIR), uuid); - log.info("Output directory for {} job '{}' is '{}'", jobType, jobID, jobOutputDir); - if (jobInputNameFile == null || jobOutputDir == null) { - log.error("Failed initializing input/output for {} job '{}' with uuid '{}'", jobType, jobID, uuid); - throw new IOFailure("Failed initializing input/output directory"); - } - - HadoopJob job = new HadoopJob(hadoopConf, jobInputNameFile, jobOutputDir, jobType); + HadoopJobStrategy jobStrategy = new CrawlLogExtractionStrategy(jobID, fileSystem); + HadoopJob job = new HadoopJob(jobID, jobStrategy); job.processOnlyFilesMatching(metadataFileSearchPattern); - job.prepareJobInput(jobID, fileSystem); - - if (job.hasJobsetupFailed()) { - throw new IOFailure("Failed writing job input"); + job.prepareJobInputOutput(fileSystem); + job.run(); + List crawlLogLines; + try { + crawlLogLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir()); + } catch (IOException e) { + log.error("Failed getting crawl log lines output for job with ID: {}", jobID); + throw new IOFailure("Failed getting " + job.getJobType() + " job results"); } - - int exitCode = job.run(jobID, new CrawlLogExtractionMapper()); - if (exitCode == 0) { - log.info("{} job with jobID {} was a success!", jobType, jobID); - List crawlLogLines = HadoopJobUtils.collectOutputLines(fileSystem, jobOutputDir); - return getResultFile(crawlLogLines); - } else { - log.warn("{} job with ID {} failed with exit code '{}'", jobType, jobID, exitCode); - } - + return getResultFile(crawlLogLines); } catch (IOException e) { log.error("Error instantiating Hadoop filesystem for job {}.", jobID, e); + throw new IOFailure("Failed instantiating Hadoop filesystem."); } - return null; } /** diff --git a/harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/hadoop/CrawlLogExtractionStrategy.java b/harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/hadoop/CrawlLogExtractionStrategy.java new file mode 100644 index 0000000000..d8932e1abd --- /dev/null +++ b/harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/hadoop/CrawlLogExtractionStrategy.java @@ -0,0 +1,72 @@ +package dk.netarkivet.viewerproxy.webinterface.hadoop; + +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import dk.netarkivet.common.CommonSettings; +import dk.netarkivet.common.utils.Settings; +import dk.netarkivet.common.utils.hadoop.HadoopFileUtils; +import dk.netarkivet.common.utils.hadoop.HadoopJobStrategy; +import dk.netarkivet.common.utils.hadoop.HadoopJobTool; + +/** + * Strategy to give a HadoopJob when wanting to extract crawl log lines matching some regex from metadata files. + * The mapper expects the used Configuration to have this regex set. Otherwise, an all-matching pattern will be used. + * + * This type of job is the Hadoop counterpart to running + * {@link dk.netarkivet.viewerproxy.webinterface.CrawlLogLinesMatchingRegexp}. + */ +public class CrawlLogExtractionStrategy implements HadoopJobStrategy { + private final Logger log = LoggerFactory.getLogger(CrawlLogExtractionStrategy.class); + private final long jobID; + private final FileSystem fileSystem; + private final Configuration hadoopConf; + + /** + * Constructor. + * + * @param jobID The ID for the job. + * @param fileSystem The Hadoop FileSystem used. + */ + public CrawlLogExtractionStrategy(long jobID, FileSystem fileSystem) { + this.jobID = jobID; + this.fileSystem = fileSystem; + hadoopConf = fileSystem.getConf(); + } + + @Override public int runJob(Path jobInputFile, Path jobOutputDir) { + int exitCode; + try { + exitCode = ToolRunner.run(new HadoopJobTool(hadoopConf, new CrawlLogExtractionMapper()), + new String[] {jobInputFile.toString(), jobOutputDir.toString()}); + } catch (Exception e) { + log.warn("Crawl log extraction job with ID {} failed to run normally.", jobID, e); + exitCode = 1; + } + return exitCode; + } + + @Override public Path createJobInputFile(UUID uuid) { + Path jobInputFile = HadoopFileUtils.createUniquePathInDir( + fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_CRAWLLOG_EXTRACTIONJOB_INPUT_DIR), uuid); + log.info("Input file for crawl log extraction job '{}' will be '{}'", jobID, jobInputFile); + return jobInputFile; + } + + @Override public Path createJobOutputDir(UUID uuid) { + Path jobOutputDir = HadoopFileUtils.createUniquePathInDir( + fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_CRAWLLOG_EXTRACTIONJOB_OUTPUT_DIR), uuid); + log.info("Output directory for crawl log extraction job '{}' is '{}'", jobID, jobOutputDir); + return jobOutputDir; + } + + @Override public String getJobType() { + return "CRAWL LOG EXTRACTION"; + } +} diff --git a/harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/hadoop/MetadataCDXExtractionStrategy.java b/harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/hadoop/MetadataCDXExtractionStrategy.java new file mode 100644 index 0000000000..dbe93fceb4 --- /dev/null +++ b/harvester/harvester-core/src/main/java/dk/netarkivet/viewerproxy/webinterface/hadoop/MetadataCDXExtractionStrategy.java @@ -0,0 +1,70 @@ +package dk.netarkivet.viewerproxy.webinterface.hadoop; + +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import dk.netarkivet.common.CommonSettings; +import dk.netarkivet.common.utils.Settings; +import dk.netarkivet.common.utils.hadoop.HadoopFileUtils; +import dk.netarkivet.common.utils.hadoop.HadoopJobStrategy; +import dk.netarkivet.common.utils.hadoop.HadoopJobTool; + +/** + * Strategy to extract CDX lines from metadata files. + * + * This type of job is the Hadoop counterpart to running {@link dk.netarkivet.common.utils.cdx.ArchiveExtractCDXJob}. + */ +public class MetadataCDXExtractionStrategy implements HadoopJobStrategy { + private final Logger log = LoggerFactory.getLogger(MetadataCDXExtractionStrategy.class); + private final long jobID; + private final FileSystem fileSystem; + private final Configuration hadoopConf; + + /** + * Constructor. + * + * @param jobID The ID for the job. + * @param fileSystem The Hadoop FileSystem used. + */ + public MetadataCDXExtractionStrategy(long jobID, FileSystem fileSystem) { + this.jobID = jobID; + this.fileSystem = fileSystem; + hadoopConf = fileSystem.getConf(); + } + + @Override public int runJob(Path jobInputFile, Path jobOutputDir) { + int exitCode; + try { + exitCode = ToolRunner.run(new HadoopJobTool(hadoopConf, new MetadataCDXMapper()), + new String[] {jobInputFile.toString(), jobOutputDir.toString()}); + } catch (Exception e) { + log.warn("Metadata CDX extraction job with ID {} failed to run normally.", jobID, e); + exitCode = 1; + } + return exitCode; + } + + @Override public Path createJobInputFile(UUID uuid) { + Path jobInputFile = HadoopFileUtils.createUniquePathInDir( + fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_CDX_EXTRACTIONJOB_INPUT_DIR), uuid); + log.info("Input file for metadata CDX extraction job '{}' will be '{}'", jobID, jobInputFile); + return jobInputFile; + } + + @Override public Path createJobOutputDir(UUID uuid) { + Path jobOutputDir = HadoopFileUtils.createUniquePathInDir( + fileSystem, Settings.get(CommonSettings.HADOOP_MAPRED_METADATA_CDX_EXTRACTIONJOB_OUTPUT_DIR), uuid); + log.info("Output directory for metadata CDX extraction job '{}' is '{}'", jobID, jobOutputDir); + return jobOutputDir; + } + + @Override public String getJobType() { + return "METADATA CDX EXTRACTION"; + } +}