/
Reporting.java
369 lines (346 loc) · 15.9 KB
/
Reporting.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
/*
* #%L
* Netarchivesuite - harvester
* %%
* Copyright (C) 2005 - 2018 The Royal Danish Library,
* the National Library of France and the Austrian National Library.
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 2.1 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Lesser Public License for more details.
*
* You should have received a copy of the GNU General Lesser Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/lgpl-2.1.html>.
* #L%
*/
package dk.netarkivet.viewerproxy.webinterface;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dk.netarkivet.common.CommonSettings;
import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory;
import dk.netarkivet.common.distribute.arcrepository.BatchStatus;
import dk.netarkivet.common.exceptions.ArgumentNotValid;
import dk.netarkivet.common.exceptions.IOFailure;
import dk.netarkivet.common.utils.FileUtils;
import dk.netarkivet.common.utils.Settings;
import dk.netarkivet.common.utils.SettingsFactory;
import dk.netarkivet.common.utils.batch.ArchiveBatchFilter;
import dk.netarkivet.common.utils.batch.FileBatchJob;
import dk.netarkivet.common.utils.batch.FileListJob;
import dk.netarkivet.common.utils.cdx.ArchiveExtractCDXJob;
import dk.netarkivet.common.utils.cdx.CDXRecord;
import dk.netarkivet.common.utils.hadoop.HadoopJob;
import dk.netarkivet.common.utils.hadoop.HadoopJobStrategy;
import dk.netarkivet.common.utils.hadoop.HadoopJobUtils;
import dk.netarkivet.common.utils.service.FileResolver;
import dk.netarkivet.viewerproxy.webinterface.hadoop.CrawlLogExtractionStrategy;
import dk.netarkivet.viewerproxy.webinterface.hadoop.MetadataCDXExtractionStrategy;
/**
* Methods for generating the batch results needed by the QA pages.
*/
@SuppressWarnings({"serial"})
public class Reporting {
/**
* Utility class, do not initialise.
*/
private Reporting() {
}
/** Logger for this class. */
private static final Logger log = LoggerFactory.getLogger(Reporting.class);
/** The suffix for the data arc/warc files produced by Heritrix.
* TODO This should be configurable
*/
static final String archivefile_suffix = ".*\\.(w)?arc(\\.gz)?";
/** The suffix for the data arc/warc metadata file created by NetarchiveSuite.
* should probably replaced by: Settings.get(CommonSettings.METADATAFILE_REGEX_SUFFIX);
*/
static final String metadatafile_suffix = "-metadata-[0-9]+\\.(w)?arc(\\.gz)?";
public static List<String> getFilesForJob(long jobid, String harvestprefix) {
if (!Settings.getBoolean(CommonSettings.USE_BITMAG_HADOOP_BACKEND)) {
return getFilesForJobBatch(jobid, harvestprefix);
} else {
return getFilesForJobFileResolver(jobid, harvestprefix);
}
}
public static List<String> getFilesForJobFileResolver(long jobid, String harvestprefix) {
FileResolver fileResolver = SettingsFactory.getInstance(CommonSettings.FILE_RESOLVER_CLASS);
String metadataFilePatternForJobId = getMetadataFilePatternForJobId(jobid);
log.debug("Looking for metadata files matching {}.", metadataFilePatternForJobId);
List<Path> metadataPaths = fileResolver.getPaths(Pattern.compile(metadataFilePatternForJobId));
log.debug("Initial found metadata files: {}", metadataPaths);
String archiveFilePatternForJobId = harvestprefix + archivefile_suffix;
log.debug("Looking for archive files matching {}.", archiveFilePatternForJobId);
List<Path> archivePaths = fileResolver.getPaths(Pattern.compile(archiveFilePatternForJobId));
log.debug("Initial found archive files {}.", archivePaths);
metadataPaths.addAll(archivePaths);
List<String> filteredFiles = metadataPaths.stream()
.filter(path -> fileResolver.getPath(path.getFileName().toString())!=null)
.map(path -> path.getFileName().toString()).collect(Collectors.toList());
log.debug("After filtering by collection we have the following files: {}", filteredFiles);
return filteredFiles;
}
/**
* Submit a batch job to list all files for a job, and report result in a sorted list.
*
* @param jobid The job to get files for.
* @param harvestprefix The harvestprefix for the files produced by heritrix
* @return A sorted list of files.
* @throws ArgumentNotValid If jobid is 0 or negative.
* @throws IOFailure On trouble generating the file list
*/
public static List<String> getFilesForJobBatch(long jobid, String harvestprefix) {
ArgumentNotValid.checkPositive(jobid, "jobid");
FileBatchJob fileListJob = new FileListJob();
List<String> acceptedPatterns = new ArrayList<String>();
acceptedPatterns.add(getMetadataFilePatternForJobId(jobid));
acceptedPatterns.add(harvestprefix + archivefile_suffix);
fileListJob.processOnlyFilesMatching(acceptedPatterns);
File f;
try {
f = File.createTempFile(jobid + "-files", ".txt", FileUtils.getTempDir());
} catch (IOException e) {
throw new IOFailure("Could not create temporary file", e);
}
BatchStatus status = ArcRepositoryClientFactory.getViewerInstance().batch(fileListJob,
Settings.get(CommonSettings.USE_REPLICA_ID));
status.getResultFile().copyTo(f);
List<String> lines = new ArrayList<String>(FileUtils.readListFromFile(f));
FileUtils.remove(f);
Set<String> linesAsSet = new HashSet<String>();
linesAsSet.addAll(lines);
lines = new ArrayList<String>();
lines.addAll(linesAsSet);
Collections.sort(lines);
return lines;
}
/**
* Depending on settings, submits either a Hadoop job or batch job to generate cdx for all metadata files for a job,
* and returns the results in a list.
*
* @param jobid The job to get cdx for.
* @throws ArgumentNotValid If jobid is 0 or negative.
* @return A list of cdx records.
*/
public static List<CDXRecord> getMetadataCDXRecordsForJob(long jobid) {
ArgumentNotValid.checkPositive(jobid, "jobid");
if (Settings.getBoolean(CommonSettings.USE_BITMAG_HADOOP_BACKEND)) {
return getRecordsUsingHadoop(jobid);
} else {
return getRecordsUsingBatch(jobid);
}
}
/**
* Submits a Hadoop job to generate cdx for all metadata files for a jobID and returns the resulting list of records.
*
* @param jobid The job to get CDX for.
* @return A list of CDX records.
*/
private static List<CDXRecord> getRecordsUsingHadoop(long jobid) {
Configuration hadoopConf = HadoopJobUtils.getConf();
String metadataFileSearchPattern = getMetadataFilePatternForJobId(jobid);
try (FileSystem fileSystem = FileSystem.newInstance(hadoopConf)) {
HadoopJobStrategy jobStrategy = new MetadataCDXExtractionStrategy(jobid, fileSystem);
HadoopJob job = new HadoopJob(jobid, jobStrategy);
job.processOnlyFilesMatching(metadataFileSearchPattern);
job.prepareJobInputOutput(fileSystem);
job.run();
List<String> cdxLines;
try {
cdxLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir());
} catch (IOException e) {
log.error("Failed getting CDX lines output for Hadoop job with ID: {}", jobid);
throw new IOFailure("Failed getting " + job.getJobType() + " job results");
}
return HadoopJobUtils.getCDXRecordListFromCDXLines(cdxLines);
} catch (IOException e) {
log.error("Error instantiating Hadoop filesystem for job {}.", jobid, e);
throw new IOFailure("Failed instantiating Hadoop filesystem.");
}
}
/**
* Submit a job to generate cdx for all metadata files for a job, and report result in a list.
*
* @param jobid The job to get cdx for.
* @return A list of cdx records.
* @throws IOFailure On trouble generating the cdx
*/
private static List<CDXRecord> getRecordsUsingBatch(long jobid) {
FileBatchJob cdxJob = new ArchiveExtractCDXJob(false) {
@Override
public ArchiveBatchFilter getFilter() {
return ArchiveBatchFilter.EXCLUDE_NON_WARCINFO_RECORDS;
}
};
String metadataFileSearchPattern = getMetadataFilePatternForJobId(jobid);
cdxJob.processOnlyFilesMatching(metadataFileSearchPattern);
File f;
try {
f = File.createTempFile(jobid + "-reports", ".cdx", FileUtils.getTempDir());
} catch (IOException e) {
throw new IOFailure("Could not create temporary file", e);
}
BatchStatus status = ArcRepositoryClientFactory.getViewerInstance().batch(cdxJob,
Settings.get(CommonSettings.USE_REPLICA_ID));
status.getResultFile().copyTo(f);
List<CDXRecord> records;
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(f));
records = new ArrayList<CDXRecord>();
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
System.out.println(line);
String[] parts = line.split("\\s+");
CDXRecord record = new CDXRecord(parts);
records.add(record);
}
} catch (IOException e) {
throw new IOFailure("Unable to read results from file '" + f + "'", e);
} finally {
IOUtils.closeQuietly(reader);
FileUtils.remove(f);
}
return records;
}
/**
* Submit a batch job to extract the part of a crawl log that is associated with the given domain and job.
*
* @param domain The domain to get crawl.log-lines for.
* @param jobid The jobid to get the crawl.log-lines for.
* @return A file containing the crawl.log lines. This file is temporary, and should be deleted after use.
* @throws ArgumentNotValid On negative jobids, or if domain is null or the empty string.
*/
public static File getCrawlLogForDomainInJob(String domain, long jobid) {
ArgumentNotValid.checkPositive(jobid, "jobid");
ArgumentNotValid.checkNotNullOrEmpty(domain, "String domain");
FileBatchJob urlsForDomainBatchJob = new HarvestedUrlsForDomainBatchJob(domain);
urlsForDomainBatchJob.processOnlyFilesMatching(getMetadataFilePatternForJobId(jobid));
return getResultFile(urlsForDomainBatchJob);
}
/**
* Helper method to create temp file for storage of result
*
* @param uuidSuffix Suffix of temp file.
* @return a new temp File.
*/
private static File createTempResultFile(String uuidSuffix) {
File tempFile;
try {
tempFile = File.createTempFile("temp", uuidSuffix + ".txt", FileUtils.getTempDir());
tempFile.deleteOnExit();
} catch (IOException e) {
throw new IOFailure("Unable to create temporary file", e);
}
return tempFile;
}
/**
* Helper method to get sorted File of crawllog lines.
*
* @param crawlLogLines The crawllog lines output from a job.
* @return A File containing the sorted lines.
*/
private static File getResultFile(List<String> crawlLogLines) {
final String uuid = UUID.randomUUID().toString();
File tempFile = createTempResultFile(uuid);
File sortedTempFile = createTempResultFile(uuid + "-sorted");
FileUtils.writeCollectionToFile(tempFile, crawlLogLines);
FileUtils.sortCrawlLogOnTimestamp(tempFile, sortedTempFile);
FileUtils.remove(tempFile);
return sortedTempFile;
}
/**
* Helper method to get result from a batchjob.
*
* @param batchJob a certain FileBatchJob
* @return a file with the result.
*/
private static File getResultFile(FileBatchJob batchJob) {
final String uuid = UUID.randomUUID().toString();
File tempFile = createTempResultFile(uuid);
File sortedTempFile = createTempResultFile(uuid);
BatchStatus status = ArcRepositoryClientFactory.getViewerInstance().batch(batchJob,
Settings.get(CommonSettings.USE_REPLICA_ID));
status.getResultFile().copyTo(tempFile);
FileUtils.sortCrawlLogOnTimestamp(tempFile, sortedTempFile);
FileUtils.remove(tempFile);
return sortedTempFile;
}
/**
* Return any crawllog lines for a given jobid matching the given regular expression.
*
* @param jobid The jobid
* @param regexp A regular expression
* @return a File with the matching lines.
*/
public static File getCrawlLoglinesMatchingRegexp(long jobid, String regexp) {
ArgumentNotValid.checkPositive(jobid, "jobid");
ArgumentNotValid.checkNotNullOrEmpty(regexp, "String regexp");
if (Settings.getBoolean(CommonSettings.USE_BITMAG_HADOOP_BACKEND)) {
return getCrawlLogLinesUsingHadoop(jobid, regexp);
} else {
FileBatchJob crawlLogBatchJob = new CrawlLogLinesMatchingRegexp(regexp);
crawlLogBatchJob.processOnlyFilesMatching(getMetadataFilePatternForJobId(jobid));
return getResultFile(crawlLogBatchJob);
}
}
/**
* Using Hadoop, gets crawllog lines for a given jobID matching a given regular expression.
*
* @param jobID The ID for the job.
* @param regex The regular expression specifying files to process.
* @return a File with the matching lines.
*/
private static File getCrawlLogLinesUsingHadoop(long jobID, String regex) {
String metadataFileSearchPattern = getMetadataFilePatternForJobId(jobID);
Configuration hadoopConf = HadoopJobUtils.getConf();
hadoopConf.setPattern("regex", Pattern.compile(regex));
try (FileSystem fileSystem = FileSystem.newInstance(hadoopConf)) {
HadoopJobStrategy jobStrategy = new CrawlLogExtractionStrategy(jobID, fileSystem);
HadoopJob job = new HadoopJob(jobID, jobStrategy);
job.processOnlyFilesMatching(metadataFileSearchPattern);
job.prepareJobInputOutput(fileSystem);
job.run();
List<String> crawlLogLines;
try {
crawlLogLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir());
} catch (IOException e) {
log.error("Failed getting crawl log lines output for job with ID: {}", jobID);
throw new IOFailure("Failed getting " + job.getJobType() + " job results");
}
return getResultFile(crawlLogLines);
} catch (IOException e) {
log.error("Error instantiating Hadoop filesystem for job {}.", jobID, e);
throw new IOFailure("Failed instantiating Hadoop filesystem.");
}
}
/**
* Construct the correct metadatafilepattern for a given jobID.
* @param jobid a given harvest jobID
* @return metadatafilePattern for the given jobid
*/
private static String getMetadataFilePatternForJobId(long jobid) {
return "(.*-)?" + jobid + "(-.*)?" + metadatafile_suffix;
}
}