/
MetadataIndexingApplication.java
117 lines (99 loc) · 5.29 KB
/
MetadataIndexingApplication.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dk.netarkivet.common.CommonSettings;
import dk.netarkivet.common.utils.LoggingOutputStream;
import dk.netarkivet.common.utils.Settings;
import dk.netarkivet.common.utils.hadoop.GetMetadataMapper;
import dk.netarkivet.common.utils.hadoop.HadoopFileUtils;
import dk.netarkivet.common.utils.hadoop.HadoopJob;
import dk.netarkivet.common.utils.hadoop.HadoopJobStrategy;
import dk.netarkivet.common.utils.hadoop.HadoopJobTool;
import dk.netarkivet.common.utils.hadoop.MetadataExtractionStrategy;
public class MetadataIndexingApplication {
private static final Logger log = LoggerFactory.getLogger(MetadataIndexingApplication.class);
// /user/nat-csr/9385-metadata-1.warc.gz
public static void main(String[] args) throws IOException, InterruptedException {
String principal = Settings.get(CommonSettings.HADOOP_KERBEROS_PRINCIPAL);
String keytab = Settings.get(CommonSettings.HADOOP_KERBEROS_KEYTAB);
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
ugi.doAs( (PrivilegedExceptionAction<Integer>)() -> {
Configuration conf = new JobConf(new YarnConfiguration(new HdfsConfiguration()));
conf.set("mapreduce.job.am-access-disabled","true");
conf.set("yarn.timeline-service.enabled", "false");
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setPattern(GetMetadataMapper.URL_PATTERN, Pattern.compile("metadata://[^/]*/crawl/index/cdx.*"));
conf.setPattern(GetMetadataMapper.MIME_PATTERN, Pattern.compile(".*"));
final String jarPath = Settings.get(CommonSettings.HADOOP_MAPRED_UBER_JAR);
if (jarPath == null || !(new File(jarPath)).exists()) {
log.warn("Specified jar file {} does not exist.", jarPath);
throw new RuntimeException("Jar file " + jarPath + " does not exist.");
}
conf.set("mapreduce.job.jar", jarPath);
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
try (FileSystem fileSystem = FileSystem.newInstance(conf)) {
Long id = 0L;
HadoopJobStrategy jobStrategy = new MetadataExtractionStrategy(id, fileSystem);
HadoopJob job = new HadoopJob(id, jobStrategy);
UUID uuid = UUID.randomUUID();
Path jobInputFile = jobStrategy.createJobInputFile(uuid);
job.setJobInputFile(jobInputFile);
java.nio.file.Path localInputTempFile = HadoopFileUtils.makeLocalInputTempFile();
log.info("Local input path is " + localInputTempFile);
List<java.nio.file.Path> filePaths = new ArrayList<>();
for (String filepath : args) {
log.info("Adding file " + filepath + " to input.");
filePaths.add(localInputTempFile.getFileSystem().getPath(filepath));
}
writeFileInputFileLinesToInputFile(filePaths, localInputTempFile);
log.info("Putting local input file in hdfs at " + jobInputFile);
fileSystem.copyFromLocalFile(true, new Path(localInputTempFile.toAbsolutePath().toString()),
jobInputFile);
Path jobOutputDir = jobStrategy.createJobOutputDir(uuid);
job.setJobOutputDir(jobOutputDir);
ToolRunner.run(new HadoopJobTool(conf, new GetMetadataMapper()),
new String[] {jobInputFile.toString(), jobOutputDir.toString()});
}
return 0;
} );
}
public static void writeHdfsInputFileLinesToInputFile(List<java.nio.file.Path> files,
java.nio.file.Path inputFilePath) throws IOException {
if (files.size() == 0) {
return;
}
java.nio.file.Path lastElem = files.get(files.size() - 1);
for (java.nio.file.Path file : files) {
String inputLine = "hdfs://" + file.toString() + "\n";
Files.write(inputFilePath, inputLine.getBytes(), StandardOpenOption.APPEND);
}
}
public static void writeFileInputFileLinesToInputFile(List<java.nio.file.Path> files,
java.nio.file.Path inputFilePath) throws IOException {
if (files.size() == 0) {
return;
}
java.nio.file.Path lastElem = files.get(files.size() - 1);
for (java.nio.file.Path file : files) {
String inputLine = "file://" + file.toString() + "\n";
Files.write(inputFilePath, inputLine.getBytes(), StandardOpenOption.APPEND);
}
}
}