Skip to content

Commit

Permalink
Fixed some issues with holding large hadoop result sets in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
csrster committed Jan 25, 2022
1 parent d354945 commit c5d6c5d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 33 deletions.
Expand Up @@ -237,13 +237,16 @@ public static List<String> collectOutputLines(FileSystem fileSystem, Path output
*/
public static void collectOutputLines(FileSystem fileSystem, Path outputFolder, OutputStream outputStream)
throws IOException {
log.info("Starting collection of hadoop output from {}", outputFolder);
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(outputFolder, false);
while (iterator.hasNext()) {
Path subPath = iterator.next().getPath();
if (subPath.getName().startsWith("part-m")) {
log.info("Collection output from {}", subPath);
IOUtils.copy(fileSystem.open(subPath), outputStream);
}
}
log.info("Finished collection of hadoop output from {}", outputFolder);
}

/**
Expand Down
Expand Up @@ -23,7 +23,9 @@
package dk.netarkivet.harvester.indexserver;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Hashtable;
Expand Down Expand Up @@ -169,24 +171,24 @@ private Long cacheDataHadoop(Long id) {
job.prepareJobInputOutput(fileSystem);
job.run();
// If no error is thrown, job was success

List<String> metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir());
if (metadataLines.size() > 0) {
File cacheFileName = getCacheFile(id);
if (tryToMigrateDuplicationRecords) {
migrateDuplicatesHadoop(id, fileSystem, specifiedPattern, metadataLines, cacheFileName);
} else {
copyResults(id, metadataLines, cacheFileName);
}
log.debug("Cached data for job '{}' for '{}'", id, prefix);
return id;
File cacheFileName = getCacheFile(id);
if (tryToMigrateDuplicationRecords) {
log.warn("Attempting to migrate duplication records via hadoop. This operation is not well tested.");
List<String> metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir());
migrateDuplicatesHadoop(id, fileSystem, specifiedPattern, metadataLines, cacheFileName);
} else {
log.info("No data found for job '{}' for '{}' in local bitarchive. ", id, prefix);
log.info("Collecting hadoop output to {}", cacheFileName.getAbsolutePath());
try (OutputStream os = new FileOutputStream(cacheFileName)) {
HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir(), os);
}
log.info("Collected {} bytes hadoop output to {}", cacheFileName.length(), cacheFileName.getAbsolutePath());
}
log.debug("Cached data for job '{}' for '{}'", id, prefix);
return id;
} catch (IOException e) {
log.error("Error instantiating Hadoop filesystem for job {}.", id, e);
return null;
}
return null;
}

/**
Expand Down
Expand Up @@ -24,8 +24,10 @@

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Path;
Expand Down Expand Up @@ -218,14 +220,11 @@ private static List<CDXRecord> getRecordsUsingHadoop(long jobid) {
job.processOnlyFilesMatching(metadataFileSearchPattern);
job.prepareJobInputOutput(fileSystem);
job.run();

try {
List<String> cdxLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir());
org.apache.commons.io.FileUtils.writeLines(cacheFile, cdxLines);
} catch (IOException e) {
log.error("Failed getting CDX lines output for Hadoop job with ID: {}", jobid);
throw new IOFailure("Failed getting " + job.getJobType() + " job results");
log.info("Collecting hadoop output from {} to {}", job.getJobOutputDir(), cacheFile.getAbsolutePath());
try (OutputStream os = new FileOutputStream(cacheFile)) {
HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir(), os);
}
log.info("Collected {} bytes output to {}", cacheFile.length(), cacheFile.getAbsolutePath());
return getCachedCDXRecords(jobid);
} catch (IOException e) {
log.error("Error instantiating Hadoop filesystem for job {}.", jobid, e);
Expand Down Expand Up @@ -437,7 +436,13 @@ private static File getCrawlLogFromCacheOrHdfs(long jobID) {
if (cacheFile.length()==0 || !cacheFile.exists()) { //The || part of this is strictly unnecessary
File outputFile = getCrawlLogUsingHadoop(jobID);
try {
log.info("Copying {} to {}", outputFile.getAbsolutePath(), cacheFile.getAbsolutePath());
org.apache.commons.io.FileUtils.copyFile(outputFile, cacheFile);
if (outputFile.delete()) {
log.info("Deleted {}", outputFile.getAbsolutePath());
} else {
log.warn("Could not delete {}", outputFile.getAbsolutePath());
}
} catch (IOException e) {
throw new RuntimeException((e));
}
Expand All @@ -455,14 +460,22 @@ private static File getCrawlLogUsingHadoop(long jobID) {
job.processOnlyFilesMatching(metadataFileSearchPattern);
job.prepareJobInputOutput(fileSystem);
job.run();
List<String> crawlLogLines;
try {
crawlLogLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir());
} catch (IOException e) {
log.error("Failed getting crawl log lines output for job with ID: {}", jobID);
throw new IOFailure("Failed getting " + job.getJobType() + " job results");
File tempOutputFile1 = File.createTempFile("unsorted_crawl", "log");
File tempOutputFile2 = File.createTempFile("unsorted_crawl", "log");
log.info("Collecting output from {} to {}", job.getJobOutputDir(), tempOutputFile1.getAbsolutePath());
try (OutputStream os = new FileOutputStream(tempOutputFile1)) {
HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir(), os);
}
log.info("Collected {} bytes to {}", tempOutputFile1.length(), tempOutputFile1.getAbsolutePath());
log.info("Sorting {} to {}", tempOutputFile1.getAbsolutePath(), tempOutputFile2.getAbsolutePath());
FileUtils.sortCrawlLogOnTimestamp(tempOutputFile1, tempOutputFile2);
log.info("Collected {} bytes to {}", tempOutputFile2.length(), tempOutputFile2.getAbsolutePath());
if (tempOutputFile1.delete()) {
log.info("Deleted {}", tempOutputFile1.getAbsolutePath());
} else {
log.warn("Could not delete {}", tempOutputFile1.getAbsolutePath());
}
return createSortedResultFile(crawlLogLines);
return tempOutputFile2;
} catch (IOException e) {
log.error("Error instantiating Hadoop filesystem for job {}.", jobID, e);
throw new IOFailure("Failed instantiating Hadoop filesystem.");
Expand Down
Expand Up @@ -25,8 +25,10 @@
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;
Expand Down Expand Up @@ -301,16 +303,15 @@ private void createJobInputFile(String filename, Path jobInputFile, FileSystem f
*/
private void collectHadoopResults(FileSystem fs, Path jobOutputDir) {
File outputFile = makeNewFileInWaybackTempDir();
log.info("Collecting index for '{}' from parts in '{}' to '{}'", this.getFilename(), jobOutputDir, outputFile.getAbsolutePath());
try {
List<String> cdxLines = HadoopJobUtils.collectOutputLines(fs, jobOutputDir);
FileUtils.writeCollectionToFile(outputFile, cdxLines);
log.info("Finished collecting index for '{}' to '{}'", this.getFilename(), outputFile.getAbsolutePath());
log.info("Collecting results for {} from {} to {}", this.getFilename(), jobOutputDir, outputFile.getAbsolutePath());
try (OutputStream os = new FileOutputStream(outputFile)) {
HadoopJobUtils.collectOutputLines(fs, jobOutputDir, os);
} catch (IOException e) {
log.warn("Could not collect index results from '{}'", jobOutputDir.toString(), e);
}
log.info("Collected {} bytes of index for {} from {} to {}", outputFile.length(), this.getFilename(), jobOutputDir, outputFile.getAbsolutePath());
File finalFile = moveFileToWaybackOutputDir(outputFile);

log.info("Moved index for {} to {}", this.getFilename(), finalFile.getAbsolutePath());
// Update the file status in the object store
originalIndexFileName = outputFile.getName();
isIndexed = true;
Expand Down

0 comments on commit c5d6c5d

Please sign in to comment.