Skip to content

Commit

Permalink
Possible fix for NARK-2259 oom (?) in hadoop mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
csrster committed Dec 13, 2022
1 parent 9bd6e21 commit ea28151
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 28 deletions.
@@ -1,14 +1,9 @@
package dk.netarkivet.viewerproxy.webinterface.hadoop;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.io.*;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.*;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -59,37 +54,32 @@ protected void map(LongWritable linenumber, Text archiveFilePath, Context contex
}
Path path = new Path(archiveFilePath.toString());
Configuration conf = context.getConfiguration();
List<String> crawlLogLines;
Pattern crawlLogRegex = conf.getPattern("regex", Pattern.compile(".*"));

log.info("Extracting crawl log lines matching regex: {}", crawlLogRegex);
log.info("Extracting directly to hadoop context.");
final FileSystem fileSystem = path.getFileSystem(conf);
if (!(fileSystem instanceof LocalFileSystem)) {
final String status = "Crawl log extraction only implemented for LocalFileSystem. Cannot extract from " + path;
context.setStatus(status);
System.err.println(status);
crawlLogLines = new ArrayList<>();
} else {
LocalFileSystem localFileSystem = ((LocalFileSystem) fileSystem);
if (cacheHdfs) {
try {
crawlLogLines = extractCrawlLogLinesWithHdfs(localFileSystem.pathToFile(path), crawlLogRegex, context);
extractCrawlLogLinesWithHdfs(localFileSystem.pathToFile(path), crawlLogRegex, context);
} catch (IOException e) {
log.warn("Extracting crawl log via hdfs failed for {} so trying with local file.", path, e);
crawlLogLines = extractCrawlLogLines(localFileSystem.pathToFile(path), crawlLogRegex);
extractCrawlLogLines(localFileSystem.pathToFile(path), crawlLogRegex, context );
}
} else {
crawlLogLines = extractCrawlLogLines(localFileSystem.pathToFile(path), crawlLogRegex);
extractCrawlLogLines(localFileSystem.pathToFile(path), crawlLogRegex, context);
}
}
for (String crawlLog : crawlLogLines) {
context.write(NullWritable.get(), new Text(crawlLog));
}
}

private List<String> extractCrawlLogLinesWithHdfs(File file, Pattern regex, Context context) throws IOException {
private void extractCrawlLogLinesWithHdfs(File file, Pattern regex, Context context) throws IOException, InterruptedException {
log.info("Executing experimental copy to hdfs.");
ArrayList<String> output = new ArrayList<>();
Path dst = HadoopFileUtils.cacheFile(file, context.getConfiguration(), context);
FileSystem hdfsFileSystem = FileSystem.get(context.getConfiguration());
try (FSDataInputStream inputStream = hdfsFileSystem.open(dst)) {
Expand All @@ -110,32 +100,45 @@ private List<String> extractCrawlLogLinesWithHdfs(File file, Pattern regex, Cont
String line;
while ((line = bufferedReader.readLine()) != null) {
if (regex.equals(".*") || regex.matcher(line).matches()) {
output.add(line);
context.write(NullWritable.get(), new Text(line));
}
}
return output; //Just return here as there is only one crawl log
}
}
}
}
return output;
}

/**
* Extract the crawl logs from a file matching the provided regex
* @param file File to look for crawl logs in.
* @param regex The regex to match lines with.
* @param context
* @return A list of crawl log lines extracted from the file.
*/
private List<String> extractCrawlLogLines(File file, Pattern regex) {
private void extractCrawlLogLines(File file, Pattern regex, Context context) throws IOException, InterruptedException {
FileBatchJob batchJob = new CrawlLogLinesMatchingRegexp(regex.pattern());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
batchJob.processFile(file, baos);
byte[] array = new byte[7]; // length is bounded by 7
new Random().nextBytes(array);
String generatedString = new String(array, Charset.forName("UTF-8"));
java.nio.file.Path tempOutput = Files.createTempFile(generatedString, "crawllog");
log.info("Extracting crawl log from {} to {}.", file.getAbsolutePath(), tempOutput);
OutputStream outputStream = Files.newOutputStream(tempOutput);
batchJob.processFile(file, outputStream);
try {
baos.flush();
outputStream.flush();
} catch (IOException e) {
log.warn("Error when trying to flush batch job output stream", e);
}
return Arrays.asList(baos.toString().split("\\n"));
log.info("Buffering output from {} of size {} bytes.", tempOutput, Files.size(tempOutput));
Files.lines(tempOutput).forEach(line -> {
try {
context.write(NullWritable.get(), new Text(line));
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
);
Files.delete(tempOutput);
}
}
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -975,7 +975,7 @@
<executions>
<execution>
<id>attach-sources</id>
<phase>none</phase><!--Disable the release-profile broken source plugin. See http://blog.peterlynch.ca/2010/05/maven-how-to-prevent-generate-sources.html-->
<!--Disable the release-profile broken source plugin. See http:/<phase>none</phase>/blog.peterlynch.ca/2010/05/maven-how-to-prevent-generate-sources.html-->
</execution>
<execution>
<id>attach-sources-no-fork</id>
Expand Down

0 comments on commit ea28151

Please sign in to comment.