Skip to content

Commit

Permalink
Code-maturation for cdx-indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
csrster committed Aug 24, 2020
1 parent b227515 commit c87a69b
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,14 @@ public class CommonSettings {
*/
public static String HADOOP_MAPRED_OUTPUT_DIR = "settings.common.hadoop.mapred.outputDir";

/**
* Path on the client machine (wayback indexer machine) where the uber-jar file containing the
* indexing map-reduce job and dependencies is to be found
*/
public static String HADOOP_MAPRED_WAYBACK_UBER_JAR = "settings.common.hadoop.mapred.wayback_uber_jar";

public static String HADOOP_USER_NAME = "settings.common.hadoop.username";

/**
* Specifies if hadoop is used for mass processing jobs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.Date;
import java.util.Set;
import java.util.UUID;

import javax.persistence.Entity;
Expand All @@ -36,6 +37,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.hibernate.id.GUIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -198,7 +200,7 @@ public void index() throws IllegalState {
if (Settings.getBoolean(CommonSettings.USING_HADOOP) && WARCUtils.isWarc(filename)) {
// Start a hadoop indexing job.
// But this shouldn't be done on the files individually?? This is done on a list of filenames..
hadoopIndex();
hadoopIndex();
} else {
batchIndex();
}
Expand All @@ -208,64 +210,88 @@ public void index() throws IllegalState {
* Runs a map-only (no reduce) job to index this file.
*/
private void hadoopIndex() {
// For now only handles WARC files
String hadoopInputDir = Settings.get(CommonSettings.HADOOP_MAPRED_INPUT_DIR);
Path hadoopInputNameFile = new Path(hadoopInputDir,
filename.substring(0, filename.indexOf('.')) + "__map_input.txt");
System.setProperty("HADOOP_USER_NAME", Settings.get(CommonSettings.HADOOP_USER_NAME));
Configuration conf = HadoopUtils.getConfFromSettings();
conf.set("mapreduce.job.jar", "/nas/lib/wayback-indexer.jar");
java.nio.file.Path localInputTempFile = null;

try {
// Make temp input file containing path for this file
localInputTempFile = Files.createTempFile(null, null);
// TODO use file resolver here to figure out the file path
Files.write(localInputTempFile, ("file:///kbhpillar/collection-netarkivet/" + filename).getBytes());
} catch (IOException e) {
log.warn("Couldn't write input paths file");
return;
final String jarPath = Settings.get(CommonSettings.HADOOP_MAPRED_WAYBACK_UBER_JAR);
if (jarPath == null || !(new File(jarPath)).exists()) {
log.warn("Specified jar file {} does not exist.", jarPath);
}

FileSystem fs = null;
try {
fs = FileSystem.get(conf);
// Delete the old output folder if it exists TODO maybe check for existance
fs.delete(new Path(Settings.get(CommonSettings.HADOOP_MAPRED_OUTPUT_DIR)), true);
// Write the input file to hdfs
log.info("Copying file with input paths to hdfs");
try {
fs.copyFromLocalFile(false, new Path(localInputTempFile.toAbsolutePath().toString()),
hadoopInputNameFile);
} catch (IOException e) {
log.warn("Failed to upload '{}' to hdfs", localInputTempFile.toString(), e);
conf.set("mapreduce.job.jar", jarPath);
UUID uuid = UUID.randomUUID();
log.info("File {} indexed with job uuid for i/o {}.", this.filename, uuid);
try (FileSystem fileSystem = FileSystem.get(conf)) {
String hadoopInputDir = Settings.get(CommonSettings.HADOOP_MAPRED_INPUT_DIR);
if (hadoopInputDir == null) {
log.error("Parent output dir specified by {} must not be null.", CommonSettings.HADOOP_MAPRED_INPUT_DIR);
return;
}
initInputDir(fileSystem, hadoopInputDir);
Path hadoopInputNameFile = new Path(hadoopInputDir, uuid.toString());
log.info("Hadoop input file will be {}", hadoopInputNameFile);

String parentOutputDir = Settings.get(CommonSettings.HADOOP_MAPRED_OUTPUT_DIR);
if (parentOutputDir == null) {
log.error("Parent output dir specified by {} must not be null.", CommonSettings.HADOOP_MAPRED_OUTPUT_DIR);
return;
}
initOutputDir(fileSystem, parentOutputDir);
Path jobOutputDir = new Path(new Path(parentOutputDir), uuid.toString());
log.info("Output directory for job is {}", jobOutputDir);
java.nio.file.Path localInputTempFile = null;
localInputTempFile = Files.createTempFile(null, null);
// TODO use file resolver here to figure out the file path
final String s = "file:///kbhpillar/collection-netarkivet/" + filename;
log.info("Inserting {} in {}.", s, localInputTempFile);
Files.write(localInputTempFile, s.getBytes());
// Write the input file to hdfs
log.info("Copying file with input paths {} to hdfs {}.", localInputTempFile, hadoopInputNameFile);
fileSystem .copyFromLocalFile(false, new Path(localInputTempFile.toAbsolutePath().toString()),
hadoopInputNameFile);
log.info("Starting CDXJob on file '{}'", filename);
int exitCode = 0;
try {
// TODO Guess conditioning on which file it is should be handled here by designating different mapper classes
int exitCode = ToolRunner.run(new CDXJob(conf),
log.info("Starting hadoop job with input {} and output {}.", hadoopInputNameFile, jobOutputDir);
exitCode = ToolRunner.run(new CDXJob(conf),
new String[] {
hadoopInputNameFile.toString(), Settings.get(CommonSettings.HADOOP_MAPRED_OUTPUT_DIR)});

hadoopInputNameFile.toString(), jobOutputDir.toString()});
if (exitCode == 0) {
collectHadoopResults(fs);
collectHadoopResults(fileSystem, jobOutputDir);
} else {
log.warn("Hadoop job failed with exit code '{}'", exitCode);
}
} catch (Exception e) {
log.warn("Running hadoop job threw exception", e);
} catch (Exception exception) {
log.error("Hadoop indexing job failed to run normally.", exception);
}
} catch (IOException e) {
log.warn("Couldn't get FileSystem from configuration", e);
} finally {
try {
if (fs != null) {
fs.close();
}
} catch (IOException e) {
log.warn("Problem closing FileSystem: ", e);
log.error("Error on hadoop filesystem.", e);
}

}

private void initOutputDir(FileSystem fileSystem, String parentOutputDir) throws IOException {
Path parentOutputDirPath = new Path(parentOutputDir);
if (fileSystem.exists(parentOutputDirPath)) {
if (!fileSystem.isDirectory(parentOutputDirPath)) {
log.warn("{} exists and is not a directory.", parentOutputDirPath);
fileSystem.delete(parentOutputDirPath);
fileSystem.mkdirs(parentOutputDirPath);
}
} else {
log.info("Creating parent output dir {}.", parentOutputDirPath);
fileSystem.mkdirs(parentOutputDirPath);
}
}

private void initInputDir(FileSystem fileSystem, String hadoopInputDir) throws IOException {
log.info("Hadoop input files will be placed under {}.", hadoopInputDir);
Path hadoopInputDirPath = new Path(hadoopInputDir);
if (fileSystem.exists(hadoopInputDirPath) && !fileSystem.isDirectory(hadoopInputDirPath)) {
log.warn("{} already exists and is a file. Deleting and creating directory.", hadoopInputDirPath);
fileSystem.delete(hadoopInputDirPath);
fileSystem.mkdirs(hadoopInputDirPath);
}
else if (!fileSystem.exists(hadoopInputDirPath)) {
fileSystem.mkdirs(hadoopInputDirPath);
}
}

Expand Down Expand Up @@ -327,7 +353,7 @@ private void hadoopHDFSIndex() {
log.warn("Problem closing FileSystem: ", e);
}
} else {
collectHadoopResults(fs);
collectHadoopResults(fs, new Path(Settings.get(CommonSettings.HADOOP_MAPRED_OUTPUT_DIR)));
}
} catch (Exception e) {
log.warn("Running hadoop job threw exception", e);
Expand All @@ -351,12 +377,10 @@ private void hadoopHDFSIndex() {
* object has been indexed.
* @param fs The Hadoop FileSystem that is used
*/
private void collectHadoopResults(FileSystem fs) {
Path jobResultFilePath = new Path(
Settings.get(CommonSettings.HADOOP_MAPRED_OUTPUT_DIR) + "/part-m-00000"); //TODO: Make non-hardcoded - should eventually run through all files named 'part-m-XXXXX'

private void collectHadoopResults(FileSystem fs, Path jobOutputDir) {
Path jobResultFilePath = new Path(jobOutputDir, "/part-m-00000"); //TODO: Make non-hardcoded - should eventually run through all files named 'part-m-XXXXX'
File outputFile = makeNewFileInWaybackTempDir();
log.info("Collecting index for '{}' to '{}'", this.getFilename(), outputFile.getAbsolutePath());
log.info("Collecting index for '{}' from {} to '{}'", this.getFilename(), jobResultFilePath, outputFile.getAbsolutePath());
try {
if (fs.exists(jobResultFilePath)) {
fs.copyToLocalFile(jobResultFilePath, new Path(outputFile.getAbsolutePath()));
Expand Down

0 comments on commit c87a69b

Please sign in to comment.