Skip to content

Commit

Permalink
Added rethrows for better error handling from hadoop.
Browse files Browse the repository at this point in the history
Corrected path for using cached data.

Added settings for hadoop queues

Code cleanup and javadoc

Tidies up the UX for domain matching

Experiment with url-parsing for domain matching on source tag

Experiment with url-parsing for domain matching

Experiment with re2j pattern-matching for crawl logs.

Added a comment on a needed improvement in the QA.

Found additonal places where hadoop code was trying to read from netarkivet settings.

Optimised local regex'ing of crawl log

Ensure hdfs caching for all interactive jobs.

Fixed configuration logic + minor optimisation

Mapping metadata now uses caching

First attempt at caching for MetadataCDXMapper

Make sure cache dir is created

Removed hard-codes and enabled caching utility

Optimised wrt to crawl log extraction

Fixed npe

Fixed cache path to be writable.

Experimental hdfs caching processor for crawl logs only
  • Loading branch information
csrster committed Jun 18, 2021
1 parent 057052a commit 42fe34d
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 39 deletions.
Expand Up @@ -639,6 +639,53 @@ public class CommonSettings {
*/
public static String HADOOP_MAPRED_UBER_JAR = "settings.common.hadoop.mapred.hadoopUberJar";

/**
* The amount of memory to assign to mapper-tasks in hadoop jobs.
*/
public static String HADOOP_MAP_MEMORY_MB = "settings.common.hadoop.mapred.mapMemoryMb";

/**
* The number of cores to assign to mapper tasks in haddop.
*/
public static String HADOOP_MAP_MEMORY_CORES = "settings.common.hadoop.mapred.mapMemoryCores";

/**
* Whether to enable caching of "local" (ie non-hdfs) warcfiles to hdfs before processing. This should
* speed up future jobs working on the same files.
*/
public static String HADOOP_ENABLE_HDFS_CACHE = "settings.common.hadoop.mapred.hdfsCacheEnabled";

/**
* Hdfs directory in which to cache warcfiles. ("hdfs://"prefix is not needed.)
*/
public static String HADOOP_HDFS_CACHE_DIR = "settings.common.hadoop.mapred.hdfsCacheDir";

/**
* The number of days for which to retain files in the hdfs cache if enabled. If set to zero, files
* will be copied to hdfs before processing but not retained for future jobs.
*/
public static String HADOOP_CACHE_DAYS = "settings.common.hadoop.mapred.hdfsCacheDays";

/**
* Enable/disable ubertasking - a hadoop optimisation for efficient running of small jobs.
*/
public static String HADOOP_MAPRED_ENABLE_UBERTASK = "settings.common.hadoop.mapred.enableUbertask";

/**
* Here we enable separate queues for interactive jobs (started by clicking in the NAS GUI) and batch jobs.
* If set to valid queuenames for the hadoop cluster, this ensures that there is always some minimum capacity
* for interactive jobs
*/
public static String HADOOP_MAPRED_QUEUENAME_INTERACTIVE = "settings.common.hadoop.mapred.queue.interactive";

/**
* Here we enable separate queues for interactive jobs (started by clicking in the NAS GUI) and batch jobs.
* If set to valid queuenames for the hadoop cluster, this ensures that there is always some minimum capacity
* for interactive jobs
*/
public static String HADOOP_MAPRED_QUEUENAME_BATCH = "settings.common.hadoop.mapred.queue.batch";


/**
* Username to start map-/reduce jobs under on the Hadoop cluster.
* In production we now use kerberos so this is
Expand Down
Expand Up @@ -64,16 +64,16 @@ protected void setup(Context context) throws IOException, InterruptedException {
* @param context Context used for writing output.
*/
@Override
protected void map(LongWritable lineNumber, Text filePath, Context context) {
protected void map(LongWritable lineNumber, Text filePath, Context context) throws IOException {
log.info("Mapper processing line number {}", lineNumber.toString());
// reject empty or null file paths.
if (filePath == null || filePath.toString().trim().isEmpty()) {
return;
}

Path path = new Path(filePath.toString());
path = HadoopFileUtils.replaceWithCachedPathIfEnabled(context, path);
log.info("Mapper processing {}.", path);
try (FileSystem fs = FileSystem.newInstance(new URI(filePath.toString()), context.getConfiguration());){
try (FileSystem fs = FileSystem.newInstance(new URI(path.toString()), context.getConfiguration());){
try (InputStream in = new BufferedInputStream(fs.open(path))) {
try (ArchiveReader archiveReader = ArchiveReaderFactory.get(filePath.toString(), in, true)) {
for (ArchiveRecord archiveRecord : archiveReader) {
Expand All @@ -93,15 +93,19 @@ protected void map(LongWritable lineNumber, Text filePath, Context context) {
}
}
} catch (IOException e) {
log.warn("Failed creating archiveReader from archive file located at '{}'", filePath.toString());
log.warn("Failed creating archiveReader from archive file located at '{}'", filePath.toString(), e);
throw e;
}
} catch (IOException e) {
log.error("Could not read input file at '{}'.", path.toString());
log.error("Could not read input file at '{}'.", path.toString(), e);
throw e;
}
} catch (IOException e) {
log.error("Could not get FileSystem from configuration", e);
throw e;
} catch (URISyntaxException e) {
log.error("Not a URI:", e);
throw new IOException(e);
}
}

Expand All @@ -112,7 +116,8 @@ protected void map(LongWritable lineNumber, Text filePath, Context context) {
* @param path Path for the input file the job is run on.
* @param context The mapping context.
*/
private void writeRecordMetadataLinesToContext(ArchiveRecordBase record, Path path, Context context) {
private void writeRecordMetadataLinesToContext(ArchiveRecordBase record, Path path, Context context)
throws IOException {
int lineCount = 0;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(record.getInputStream()))) {
for (String metadataLine = reader.readLine(); metadataLine != null; metadataLine = reader.readLine()) {
Expand All @@ -121,7 +126,8 @@ private void writeRecordMetadataLinesToContext(ArchiveRecordBase record, Path pa
}
log.info("Mapper written {} lines to output.", lineCount);
} catch (Exception e) {
log.warn("Failed writing metadata line #{} for input file '{}'.", lineCount, path.toString());
log.warn("Failed writing metadata line #{} for input file '{}'.", lineCount, path.toString(), e);
throw new IOException(e);
}
}
}
@@ -1,11 +1,19 @@
package dk.netarkivet.common.utils.hadoop;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -16,6 +24,55 @@
public class HadoopFileUtils {
private static final Logger log = LoggerFactory.getLogger(HadoopFileUtils.class);

/**
* Given a file on a local file system, return a cached version of the same file on
* a hdfs file system.
* @param file
* @return a hdfs path to the file
* @throws IOException if caching not enabled or fails otherwise
*/
public static Path cacheFile(File file, Configuration conf) throws IOException {
if (!conf.getBoolean(CommonSettings.HADOOP_ENABLE_HDFS_CACHE, false)) {
throw new InvalidRequestException("Hdfs caching not enabled.");
}
FileSystem hdfsFileSystem = FileSystem.get(conf);
Path cachePath = new Path(conf.get(CommonSettings.HADOOP_HDFS_CACHE_DIR));
log.info("Creating the cache directory at {} if necessary.", cachePath);
hdfsFileSystem.mkdirs(cachePath);
cleanCache(conf);
Path dst = new Path(cachePath, file.getName());
log.info("Caching {} to {}.", file.getAbsolutePath(), dst);
if (!hdfsFileSystem.exists(dst)) {
FileUtil.copy(file, hdfsFileSystem, dst, false, conf);
} else {
log.info("Cached copy found - copying not necessary.");
}
return dst;
}

public static void cleanCache(Configuration configuration) throws IOException {
log.info("Cleaning hdfs cache");
long currentTime = System.currentTimeMillis();
int days = configuration.getInt(CommonSettings.HADOOP_CACHE_DAYS, 0);
long maxAgeMillis = days *24L*3600L*1000L;
Path cachePath = new Path(configuration.get(CommonSettings.HADOOP_HDFS_CACHE_DIR));;
log.info("Scanning {} for files older than {} days.", cachePath, days);
FileSystem fileSystem = FileSystem.get(configuration);
fileSystem.mkdirs(cachePath);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem
.listFiles(cachePath, false);
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus locatedFileStatus = locatedFileStatusRemoteIterator.next();
long modTime = locatedFileStatus.getModificationTime();
if (days == 0 || (currentTime - modTime) > maxAgeMillis ) {
log.info("Deleting {}.", locatedFileStatus.getPath());
fileSystem.delete(locatedFileStatus.getPath(), false);
} else {
log.info("Not deleting {}.", locatedFileStatus.getPath());
}
}
}

/**
* Creates and returns a unique path under a given directory.
* @param fileSystem The used filesystem
Expand Down Expand Up @@ -60,4 +117,17 @@ public static java.nio.file.Path makeLocalInputTempFile() {
}
return localInputTempFile;
}

public static Path replaceWithCachedPathIfEnabled(Mapper.Context context, Path path)
throws IOException {
boolean cachingEnabled = context.getConfiguration().getBoolean(CommonSettings.HADOOP_ENABLE_HDFS_CACHE, false);
boolean isLocal = path.getFileSystem(context.getConfiguration()) instanceof LocalFileSystem;
if (isLocal && cachingEnabled) {
log.info("Replacing {} with hdfs cached version.", path);
File localFile = ((LocalFileSystem) path.getFileSystem(context.getConfiguration())).pathToFile(path);
path = cacheFile(localFile, context.getConfiguration());
log.info("New input path is {}.", path);
}
return path;
}
}
Expand Up @@ -126,7 +126,9 @@ public static Configuration enableMapOnlyUberTask(Configuration configuration, I
configuration.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB)
+ Optional.ofNullable(appMasterMemory).orElse(MRJobConfig.DEFAULT_MR_AM_VMEM_MB));

configuration.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
if (Settings.getBoolean(CommonSettings.HADOOP_MAPRED_ENABLE_UBERTASK)) {
configuration.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
}

setReducerMemory(configuration, 0);
setReduceCoresPerTask(configuration, 0);
Expand Down Expand Up @@ -243,4 +245,19 @@ public static List<CDXRecord> getCDXRecordListFromCDXLines(List<String> cdxLines
}
return recordsForJob;
}

public static void configureCaching(Configuration configuration) {
configuration.setBoolean(CommonSettings.HADOOP_ENABLE_HDFS_CACHE, Settings.getBoolean(CommonSettings.HADOOP_ENABLE_HDFS_CACHE));
configuration.set(CommonSettings.HADOOP_HDFS_CACHE_DIR, Settings.get(CommonSettings.HADOOP_HDFS_CACHE_DIR));
configuration.setInt(CommonSettings.HADOOP_CACHE_DAYS, Settings.getInt(CommonSettings.HADOOP_CACHE_DAYS));
}

public static void setBatchQueue(Configuration conf) {
conf.set("mapreduce.job.queuename", Settings.get(CommonSettings.HADOOP_MAPRED_QUEUENAME_BATCH));
}

public static void setInteractiveQueue(Configuration conf) {
conf.set("mapreduce.job.queuename", Settings.get(CommonSettings.HADOOP_MAPRED_QUEUENAME_INTERACTIVE));
}

}
Expand Up @@ -39,11 +39,15 @@ public MetadataExtractionStrategy(long jobID, FileSystem fileSystem) {
this.jobID = jobID;
this.fileSystem = fileSystem;
hadoopConf = fileSystem.getConf();
HadoopJobUtils.setMapMemory(hadoopConf, 4096);
HadoopJobUtils.setMapCoresPerTask(hadoopConf, 2);
HadoopJobUtils.enableMapOnlyUberTask(hadoopConf, 4096, 2);
int totalMemory = Settings.getInt(CommonSettings.HADOOP_MAP_MEMORY_MB);
int totalCores = Settings.getInt(CommonSettings.HADOOP_MAP_MEMORY_CORES);
HadoopJobUtils.setMapMemory(hadoopConf, totalMemory);
HadoopJobUtils.setMapCoresPerTask(hadoopConf, totalCores);
HadoopJobUtils.enableMapOnlyUberTask(hadoopConf, totalMemory, totalCores);
HadoopJobUtils.configureCaching(hadoopConf);
urlPattern = hadoopConf.getPattern(GetMetadataMapper.URL_PATTERN, Pattern.compile(".*"));
mimePattern = hadoopConf.getPattern(GetMetadataMapper.MIME_PATTERN, Pattern.compile(".*"));
HadoopJobUtils.setBatchQueue(hadoopConf);
}

@Override
Expand Down
Expand Up @@ -234,6 +234,16 @@ National Library.
<framework>yarn</framework>
<inputDir>nas_input</inputDir>
<outputDir>nas_output</outputDir>
<mapMemoryMb>4096</mapMemoryMb>
<mapMemoryCores>2</mapMemoryCores>
<hdfsCacheEnabled>true</hdfsCacheEnabled>
<hdfsCacheDir>ChangeMeToSomethingYouCanWriteTo</hdfsCacheDir>
<hdfsCacheDays>7</hdfsCacheDays>
<enableUbertask>true</enableUbertask>
<queue> <!--These values will always be cluster- and user-dependent -->
<interactive></interactive>
<batch></batch>
</queue>
</mapred>
</hadoop>
<useHadoopAsMassProcessor>true</useHadoopAsMassProcessor>
Expand Down
Expand Up @@ -26,6 +26,8 @@
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -291,7 +293,7 @@ public static File getCrawlLogForDomainInJob(String domain, long jobid) {
ArgumentNotValid.checkNotNullOrEmpty(domain, "String domain");
FileBatchJob urlsForDomainBatchJob = new HarvestedUrlsForDomainBatchJob(domain);
urlsForDomainBatchJob.processOnlyFilesMatching(getMetadataFilePatternForJobId(jobid));
return getResultFile(urlsForDomainBatchJob);
return createSortedResultFile(urlsForDomainBatchJob);
}

/**
Expand All @@ -317,7 +319,7 @@ private static File createTempResultFile(String uuidSuffix) {
* @param crawlLogLines The crawllog lines output from a job.
* @return A File containing the sorted lines.
*/
private static File getResultFile(List<String> crawlLogLines) {
private static File createSortedResultFile(List<String> crawlLogLines) {
final String uuid = UUID.randomUUID().toString();
File tempFile = createTempResultFile(uuid);
File sortedTempFile = createTempResultFile(uuid + "-sorted");
Expand All @@ -333,7 +335,7 @@ private static File getResultFile(List<String> crawlLogLines) {
* @param batchJob a certain FileBatchJob
* @return a file with the result.
*/
private static File getResultFile(FileBatchJob batchJob) {
private static File createSortedResultFile(FileBatchJob batchJob) {
final String uuid = UUID.randomUUID().toString();
File tempFile = createTempResultFile(uuid);
File sortedTempFile = createTempResultFile(uuid);
Expand All @@ -360,7 +362,7 @@ public static File getCrawlLoglinesMatchingRegexp(long jobid, String regexp) {
} else {
FileBatchJob crawlLogBatchJob = new CrawlLogLinesMatchingRegexp(regexp);
crawlLogBatchJob.processOnlyFilesMatching(getMetadataFilePatternForJobId(jobid));
return getResultFile(crawlLogBatchJob);
return createSortedResultFile(crawlLogBatchJob);
}
}

Expand All @@ -383,6 +385,51 @@ private static File getCrawlLogCache(long jobid) {
* @return a File with the matching lines.
*/
private static File getCrawlLogLinesUsingHadoop(long jobID, String regex) {
File cacheFile = getCrawlLogFromCacheOrHdfs(jobID);
List<String> matches = getMatchingStringsFromFile(cacheFile, regex);
return createSortedResultFile(matches);
}

public static File getCrawlLogLinesMatchingDomain(long jobID, String domain) {
File cacheFile = getCrawlLogFromCacheOrHdfs(jobID);
List<String> matches = getMatchingDomainStringsFromFile(cacheFile, domain);
return createSortedResultFile(matches);
}

private static List<String> getMatchingDomainStringsFromFile(File cacheFile, String domain) {
try {
return org.apache.commons.io.FileUtils.readLines(cacheFile).stream()
.filter(line -> lineMatchesDomain(line, domain)).collect(Collectors.toList());
} catch (IOException e) {
e.printStackTrace();
return new ArrayList<>();
}
}

private static boolean lineMatchesDomain(String crawlLine, String domain) {
try {
String urlS = crawlLine.split("\\s+")[10];
URL url = new URL(urlS);
return url.getHost().equals(domain) || url.getHost().endsWith("."+domain);
} catch (Exception e) {
return false;
}
}

private static List<String> getMatchingStringsFromFile(File cacheFile,
String regex) {
List<String> matches = null;
Pattern regexp = Pattern.compile(regex);
try {
matches = org.apache.commons.io.FileUtils.readLines(cacheFile).stream().filter(s -> regexp.matcher(s).matches() ).collect(
Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
return matches;
}

private static File getCrawlLogFromCacheOrHdfs(long jobID) {
File cacheFile = getCrawlLogCache(jobID);
if (cacheFile.exists() && cacheFile.length() == 0) {
log.info("Overwriting empty cache file {}.", cacheFile.getAbsolutePath());
Expand All @@ -395,14 +442,7 @@ private static File getCrawlLogLinesUsingHadoop(long jobID, String regex) {
throw new RuntimeException((e));
}
}
List<String> matches = null;
try {
matches = org.apache.commons.io.FileUtils.readLines(cacheFile).stream().filter(s -> s.matches(regex)).collect(
Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
return getResultFile(matches);
return cacheFile;
}

private static File getCrawlLogUsingHadoop(long jobID) {
Expand All @@ -422,7 +462,7 @@ private static File getCrawlLogUsingHadoop(long jobID) {
log.error("Failed getting crawl log lines output for job with ID: {}", jobID);
throw new IOFailure("Failed getting " + job.getJobType() + " job results");
}
return getResultFile(crawlLogLines);
return createSortedResultFile(crawlLogLines);
} catch (IOException e) {
log.error("Error instantiating Hadoop filesystem for job {}.", jobID, e);
throw new IOFailure("Failed instantiating Hadoop filesystem.");
Expand Down

0 comments on commit 42fe34d

Please sign in to comment.