Skip to content

Commit

Permalink
Added direct output streaming from hdfs
Browse files Browse the repository at this point in the history
  • Loading branch information
csrster committed Jan 25, 2022
1 parent 2ab46b4 commit 480e741
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 6 deletions.
Expand Up @@ -38,6 +38,7 @@ public HadoopJobTool(Configuration conf, Mapper<LongWritable, Text, NullWritable
*/
@Override
public int run(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
log.info("Entered run method of HadoopJobTool");
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Configuration conf = getConf();
Expand All @@ -61,6 +62,7 @@ public int run(String[] args) throws InterruptedException, IOException, ClassNot
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

log.info("Calling waitForCompletion");
boolean success = job.waitForCompletion(true);
if (!success){
log.error("Job {} failed, state is {}."
Expand Down
Expand Up @@ -5,13 +5,15 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Optional;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -69,6 +71,7 @@ public static void doKerberosLogin() throws KrbException, IOException {
String krb5_conf = Settings.get(CommonSettings.HADOOP_KERBEROS_CONF);
System.setProperty("java.security.krb5.conf", krb5_conf);
sun.security.krb5.Config.refresh();
log.info("Kerberos: {}, {}, {}", krb5_conf, keytab, principal);
UserGroupInformation.loginUserFromKeytab(principal, keytab);
}

Expand Down Expand Up @@ -202,6 +205,8 @@ public static void writeHadoopInputFileLinesToInputFile(List<java.nio.file.Path>
* @param outputFolder The output folder to find the job result files in.
* @return A list of lines collected from all the output files.
* @throws IOException If the output folder or its contents cannot be read.
* @deprecated Use of public static void collectOutputLines(FileSystem fileSystem, Path outputFolder, OutputStream outputStream)
* is always to be preferred.
*/
public static List<String> collectOutputLines(FileSystem fileSystem, Path outputFolder) throws IOException {
List<String> resultLines = new ArrayList<>();
Expand All @@ -223,6 +228,24 @@ public static List<String> collectOutputLines(FileSystem fileSystem, Path output
return resultLines;
}

/**
* Method to copy output directly to any OutputStream.
* @param fileSystem
* @param outputFolder
* @param outputStream
* @throws IOException
*/
public static void collectOutputLines(FileSystem fileSystem, Path outputFolder, OutputStream outputStream)
throws IOException {
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(outputFolder, false);
while (iterator.hasNext()) {
Path subPath = iterator.next().getPath();
if (subPath.getName().startsWith("part-m")) {
IOUtils.copy(fileSystem.open(subPath), outputStream);
}
}
}

/**
* TODO now here's some code that would look better with streams
* Converts a list of CDX line strings to a list of CDXRecords
Expand Down
Expand Up @@ -71,6 +71,7 @@
import dk.netarkivet.wayback.batch.WaybackCDXExtractionWARCBatchJob;
import dk.netarkivet.wayback.hadoop.CDXMapper;
import dk.netarkivet.wayback.hadoop.CDXStrategy;
import sun.security.krb5.KrbException;

/**
* This class represents a file in the arcrepository which may be indexed by the indexer.
Expand Down Expand Up @@ -247,6 +248,13 @@ private void hadoopIndex() {
}
}

public static void main(String[] args) throws KrbException, IOException {
HadoopJobUtils.doKerberosLogin();
ArchiveFile archiveFile = new ArchiveFile();
archiveFile.setFilename(args[0]);
archiveFile.hadoopIndex();
}

private void createJobInputFile(String filename, Path jobInputFile, FileSystem fileSystem) throws IOException {
//Create the input file locally
File localInputTempFile = File.createTempFile("cdxextract", ".txt",
Expand All @@ -269,17 +277,17 @@ private void createJobInputFile(String filename, Path jobInputFile, FileSystem f
}

// Write the input file to hdfs
/*log.info("Copying file with input paths {} to hdfs filesystem {}, {}.", localInputTempFile, fileSystem, jobInputFile);
log.info("Copying file with input paths {} to hdfs filesystem {}, {}.", localInputTempFile, fileSystem, jobInputFile);
Path src = new Path(localInputTempFile.getAbsolutePath());
log.info("Copying from {}", src);*/
try (FSDataOutputStream fsDataOutputStream = fileSystem.create(jobInputFile)) {
log.info("Copying from {}", src);
/* try (FSDataOutputStream fsDataOutputStream = fileSystem.create(jobInputFile)) {
log.info("Writing data to input file.");
fsDataOutputStream.writeUTF("file://" + filePath.toString());
}
/*fileSystem.copyFromLocalFile(
}*/
fileSystem.copyFromLocalFile(
src,
jobInputFile
);*/
);


}
Expand Down
Expand Up @@ -91,6 +91,7 @@ private WaybackIndexer() {
if (Settings.getBoolean(CommonSettings.USE_BITMAG_HADOOP_BACKEND)) {
BitmagUtils.initialize();
try {
log.info("Logging in to Kerberos");
HadoopJobUtils.doKerberosLogin();
} catch (KrbException | IOException e) {
log.error("Fatal error starting WaybackIndexer - could not connect to Hadoop. " + e.getMessage());
Expand Down

0 comments on commit 480e741

Please sign in to comment.