/
RawMetadataCache.java
495 lines (466 loc) · 24.7 KB
/
RawMetadataCache.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
/*
* #%L
* Netarchivesuite - harvester
* %%
* Copyright (C) 2005 - 2018 The Royal Danish Library,
* the National Library of France and the Austrian National Library.
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 2.1 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Lesser Public License for more details.
*
* You should have received a copy of the GNU General Lesser Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/lgpl-2.1.html>.
* #L%
*/
package dk.netarkivet.harvester.indexserver;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dk.netarkivet.common.CommonSettings;
import dk.netarkivet.common.distribute.arcrepository.ArcRepositoryClientFactory;
import dk.netarkivet.common.distribute.arcrepository.BatchStatus;
import dk.netarkivet.common.distribute.arcrepository.Replica;
import dk.netarkivet.common.distribute.arcrepository.ReplicaType;
import dk.netarkivet.common.distribute.arcrepository.ViewerArcRepositoryClient;
import dk.netarkivet.common.exceptions.ArgumentNotValid;
import dk.netarkivet.common.exceptions.IOFailure;
import dk.netarkivet.common.utils.FileResolver;
import dk.netarkivet.common.utils.FileUtils;
import dk.netarkivet.common.utils.hadoop.HadoopFileUtils;
import dk.netarkivet.common.utils.hadoop.HadoopJobUtils;
import dk.netarkivet.common.utils.Settings;
import dk.netarkivet.common.utils.SimpleFileResolver;
import dk.netarkivet.common.utils.archive.ArchiveBatchJob;
import dk.netarkivet.common.utils.archive.GetMetadataArchiveBatchJob;
import dk.netarkivet.common.utils.hadoop.GetMetadataArchiveMapper;
import dk.netarkivet.common.utils.hadoop.HadoopJob;
import dk.netarkivet.harvester.HarvesterSettings;
import dk.netarkivet.harvester.harvesting.metadata.MetadataFile;
/**
* This is an implementation of the RawDataCache specialized for data out of metadata files. It uses regular expressions
* for matching URL and mime-type of ARC entries for the kind of metadata we want.
*/
public class RawMetadataCache extends FileBasedCache<Long> implements RawDataCache {
/** The logger for this class. */
private static final Logger log = LoggerFactory.getLogger(RawMetadataCache.class);
/** A regular expression object that matches everything. */
public static final Pattern MATCH_ALL_PATTERN = Pattern.compile(".*");
/** The prefix (cache name) that this cache uses. */
private final String prefix;
/**
* The arc repository interface. This does not need to be closed, it is a singleton.
*/
private ViewerArcRepositoryClient arcrep = ArcRepositoryClientFactory.getViewerInstance();
/** The actual pattern to be used for matching the url in the metadata record */
private Pattern urlPattern;
/** The actual pattern to be used for matching the mimetype in the metadata record */
private Pattern mimePattern;
/** Try to migrate jobs with a duplicationmigration record. */
private boolean tryToMigrateDuplicationRecords;
/**
* Create a new RawMetadataCache. For a given job ID, this will fetch and cache selected content from metadata files
* (<ID>-metadata-[0-9]+.arc). Any entry in a metadata file that matches both patterns will be returned. The
* returned data does not directly indicate which file they were from, though parts intrinsic to the particular
* format might.
*
* @param prefix A prefix that will be used to distinguish this cache's files from other caches'. It will be used
* for creating a directory, so it must not contain characters not legal in directory names.
* @param urlMatcher A pattern for matching URLs of the desired entries. If null, a .* pattern will be used.
* @param mimeMatcher A pattern for matching mime-types of the desired entries. If null, a .* pattern will be used.
*/
public RawMetadataCache(String prefix, Pattern urlMatcher, Pattern mimeMatcher) {
super(prefix);
this.prefix = prefix;
Pattern urlMatcher1;
if (urlMatcher != null) {
urlMatcher1 = urlMatcher;
} else {
urlMatcher1 = MATCH_ALL_PATTERN;
}
urlPattern = urlMatcher1;
Pattern mimeMatcher1;
if (mimeMatcher != null) {
mimeMatcher1 = mimeMatcher;
} else {
mimeMatcher1 = MATCH_ALL_PATTERN;
}
mimePattern = mimeMatcher1;
// Should we try to migrate duplicaterecords, yes or no.
tryToMigrateDuplicationRecords = Settings.getBoolean(HarvesterSettings.INDEXSERVER_INDEXING_TRY_TO_MIGRATE_DUPLICATION_RECORDS);
log.info("Metadata cache for '{}' is fetching metadata with urls matching '{}' and mimetype matching '{}'. Migration of duplicate records is "
+ (tryToMigrateDuplicationRecords? "enabled":"disabled"),
prefix, urlMatcher1.toString(), mimeMatcher1);
}
/**
* Get the file potentially containing (cached) data for a single job.
*
* @param id The job to find data for.
* @return The file where cache data for the job can be stored.
* @see FileBasedCache#getCacheFile(Object)
*/
@Override
public File getCacheFile(Long id) {
ArgumentNotValid.checkNotNull(id, "job ID");
ArgumentNotValid.checkNotNegative(id, "job ID");
return new File(getCacheDir(), prefix + "-" + id + "-cache");
}
/**
* Actually cache data for the given ID.
*
* @param id A job ID to cache data for.
* @return A File containing the data. This file will be the same as getCacheFile(ID);
* @see FileBasedCache#cacheData(Object)
*/
protected Long cacheData(Long id) {
if (Settings.getBoolean(CommonSettings.USING_HADOOP)) {
return cacheDataHadoop(id);
} else {
return cacheDataBatch(id);
}
}
/**
* Cache data for the given ID using Hadoop.
*
* @param id A job ID to cache data for.
* @return A File containing the data. This file will be the same as getCacheFile(ID);
* @see FileBasedCache#cacheData(Object)
*/
private Long cacheDataHadoop(Long id) {
final String metadataFilePatternSuffix = Settings.get(CommonSettings.METADATAFILE_REGEX_SUFFIX);
final String specifiedPattern = "(.*-)?" + id + "(-.*)?" + metadataFilePatternSuffix;
PathMatcher pathMatcher = FileSystems.getDefault().getPathMatcher("regex:" + specifiedPattern);
System.setProperty("HADOOP_USER_NAME", Settings.get(CommonSettings.HADOOP_USER_NAME));
Configuration conf = HadoopJobUtils.getConfFromSettings();
conf.set("url.pattern", urlPattern.toString());
conf.set("mime.pattern", mimePattern.toString());
UUID uuid = UUID.randomUUID();
try (FileSystem fileSystem = FileSystem.get(conf)) {
Path hadoopInputNameFile = HadoopFileUtils.initExtractionJobInput(fileSystem, uuid);
log.info("Hadoop input file will be {}", hadoopInputNameFile);
Path jobOutputDir = HadoopFileUtils.initExtractionJobOutput(fileSystem, uuid);
log.info("Output directory for job is {}", jobOutputDir);
if (hadoopInputNameFile == null || jobOutputDir == null) {
log.error("Failed initializing input/output for job {} with uuid {}", id, uuid);
return null;
}
java.nio.file.Path localInputTempFile = Files.createTempFile(null, null);
String pillarParentDir = Settings.get(CommonSettings.HADOOP_MAPRED_INPUT_FILES_PARENT_DIR);
FileResolver fileResolver = new SimpleFileResolver(Paths.get(pillarParentDir));
List<java.nio.file.Path> filePaths = fileResolver.getPaths(pathMatcher);
HadoopJobUtils.writeHadoopInputFileLinesToInputFile(filePaths, localInputTempFile);
log.info("Copying file with input paths {} to hdfs {}.", localInputTempFile, hadoopInputNameFile);
fileSystem.copyFromLocalFile(false, new Path(localInputTempFile.toAbsolutePath().toString()),
hadoopInputNameFile);
log.info("Starting metadata extraction job on {} file(s) matching pattern '{}'",
filePaths.size(), specifiedPattern);
int exitCode = 0;
try {
exitCode = ToolRunner.run(new HadoopJob(conf, new GetMetadataArchiveMapper()),
new String[] {hadoopInputNameFile.toString(), jobOutputDir.toString()});
if (exitCode == 0) {
List<String> metadataLines = HadoopJobUtils.collectOutputLines(fileSystem, jobOutputDir);
if (metadataLines.size() > 0) {
File cacheFileName = getCacheFile(id);
if (tryToMigrateDuplicationRecords) {
migrateDuplicatesHadoop(id, fileSystem, specifiedPattern, metadataLines, cacheFileName);
} else {
// TODO try-catch?
Files.write(Paths.get(cacheFileName.getAbsolutePath()), metadataLines);
}
log.debug("Cached data for job '{}' for '{}'", id, prefix);
return id;
} else {
log.info("No data found for job '{}' for '{}' in local bitarchive. ", id, prefix);
}
} else {
log.warn("Hadoop job failed with exit code '{}'", exitCode);
}
} catch (Exception e) {
log.error("Hadoop indexing job failed to run normally.", e);
return null;
}
} catch (IOException e) {
log.error("Error on hadoop filesystem.", e);
}
return null;
}
private void migrateDuplicatesHadoop(Long id, FileSystem fileSystem, String specifiedPattern, List<String> originalJobResults, File cacheFileName) {
PathMatcher pathMatcher = FileSystems.getDefault().getPathMatcher("regex:" + specifiedPattern);
Configuration conf = fileSystem.getConf();
conf.set("url.pattern", ".*duplicationmigration.*");
conf.set("mime.pattern", "text/plain");
log.debug("Looking for a duplicationmigration record for id {}", id);
if (urlPattern.pattern().equals(MetadataFile.CRAWL_LOG_PATTERN)) {
UUID uuid = UUID.randomUUID();
Path hadoopInputNameFile = HadoopFileUtils.initExtractionJobInput(fileSystem, uuid);
log.info("Hadoop input file will be {}", hadoopInputNameFile);
Path jobOutputDir = HadoopFileUtils.initExtractionJobOutput(fileSystem, uuid);
log.info("Output directory for job is {}", jobOutputDir);
if (hadoopInputNameFile == null || jobOutputDir == null) {
log.error("Failed initializing input/output for job {} with uuid {}", id, uuid);
return;
}
try {
java.nio.file.Path localInputTempFile = Files.createTempFile(null, null);
String pillarParentDir = Settings.get(CommonSettings.HADOOP_MAPRED_INPUT_FILES_PARENT_DIR);
FileResolver fileResolver = new SimpleFileResolver(Paths.get(pillarParentDir));
List<java.nio.file.Path> filePaths = fileResolver.getPaths(pathMatcher);
HadoopJobUtils.writeHadoopInputFileLinesToInputFile(filePaths, localInputTempFile);
log.info("Copying file with input paths {} to hdfs {}.", localInputTempFile, hadoopInputNameFile);
fileSystem.copyFromLocalFile(false, new Path(localInputTempFile.toAbsolutePath().toString()),
hadoopInputNameFile);
log.info("Starting metadata extraction job on {} file(s) matching pattern '{}'",
filePaths.size(), specifiedPattern);
List<String> metadataLines = new ArrayList<>();
int exitCode = 0;
try {
exitCode = ToolRunner.run(new HadoopJob(conf, new GetMetadataArchiveMapper()),
new String[] {hadoopInputNameFile.toString(), jobOutputDir.toString()});
if (exitCode == 0) {
metadataLines.addAll(HadoopJobUtils.collectOutputLines(fileSystem, jobOutputDir));
} else {
log.warn("Hadoop job failed with exit code '{}'", exitCode);
return;
}
} catch (Exception e) {
log.error("Hadoop indexing job failed to run normally.", e);
}
try {
handleMigrationHadoop(id, metadataLines, originalJobResults, cacheFileName);
} catch (IOFailure e) {
log.error("Failed migrating duplicates for job {}.", id, e);
}
} catch (IOException e) {
log.error("Failed writing to/creating file.", e);
}
} else {
try {
Files.write(Paths.get(cacheFileName.getAbsolutePath()), originalJobResults);
} catch (IOException e) {
log.error("Failed to write job results for job {} to {}", id, cacheFileName.getAbsolutePath());
}
}
}
private void handleMigrationHadoop(Long id, List<String> metadataLines, List<String> originalJobResults,
File cacheFileName) {
File migration = null;
try {
migration = File.createTempFile("migration", "txt");
} catch (IOException e) {
throw new IOFailure("Could not create temporary output file.");
}
if (metadataLines.size() > 0) {
try {
Files.write(Paths.get(migration.getAbsolutePath()), metadataLines);
} catch (IOException e) {
throw new IOFailure("Failed writing to file " + migration.getAbsolutePath());
}
}
boolean doMigration = migration.exists() && migration.length() > 0;
if (doMigration) {
Hashtable<Pair<String, Long>, Long> lookup = createLookupTableFromMigrationLines(id, migration);
File crawllog = null;
try {
crawllog = File.createTempFile("dedup", "txt");
} catch (IOException e) {
throw new IOFailure("Could not create temporary output file.");
}
try {
Files.write(Paths.get(crawllog.getAbsolutePath()), originalJobResults);
} catch (IOException e) {
throw new IOFailure("Failed writing to file " + crawllog.getAbsolutePath());
}
migrateFilenameOffsetPairs(id, cacheFileName, crawllog, lookup);
} else {
try {
Files.write(Paths.get(cacheFileName.getAbsolutePath()), originalJobResults);
} catch (IOException e) {
throw new IOFailure("Failed writing to file " + cacheFileName.getAbsolutePath());
}
}
}
/**
* Actually cache data for the given ID.
*
* @param id A job ID to cache data for.
* @return A File containing the data. This file will be the same as getCacheFile(ID);
* @see FileBasedCache#cacheData(Object)
*/
private Long cacheDataBatch(Long id) {
final String replicaUsed = Settings.get(CommonSettings.USE_REPLICA_ID);
final String metadataFilePatternSuffix = Settings.get(CommonSettings.METADATAFILE_REGEX_SUFFIX);
// Same pattern here as defined in class dk.netarkivet.viewerproxy.webinterface.Reporting
final String specifiedPattern = "(.*-)?" + id + "(-.*)?" + metadataFilePatternSuffix;
final ArchiveBatchJob job = new GetMetadataArchiveBatchJob(urlPattern, mimePattern);
log.debug("Extract using a batchjob of type '{}' cachedata from files matching '{}' on replica '{}'. Url pattern is '{}' and mimepattern is '{}'", job
.getClass().getName(), specifiedPattern, replicaUsed, urlPattern, mimePattern);
job.processOnlyFilesMatching(specifiedPattern);
BatchStatus b = arcrep.batch(job, replicaUsed);
// This check ensures that we got data from at least one file.
// Mind you, the data may be empty, but at least one file was
// successfully processed.
if (b.hasResultFile() && b.getNoOfFilesProcessed() > b.getFilesFailed().size()) {
File cacheFileName = getCacheFile(id);
if (tryToMigrateDuplicationRecords) {
migrateDuplicatesBatch(id, replicaUsed, specifiedPattern, b, cacheFileName);
} else {
b.copyResults(cacheFileName);
}
log.debug("Cached data for job '{}' for '{}'", id, prefix);
return id;
} else {
// Look for data in other bitarchive replicas, if this option is enabled
if (!Settings.getBoolean(HarvesterSettings.INDEXSERVER_INDEXING_LOOKFORDATAINOTHERBITARCHIVEREPLICAS)) {
log.info("No data found for job '{}' for '{}' in local bitarchive '{}'. ", id, prefix, replicaUsed);
} else {
log.info("No data found for job '{}' for '{}' in local bitarchive '{}'. Trying other replicas.", id,
prefix, replicaUsed);
for (Replica rep : Replica.getKnown()) {
// Only use different bitarchive replicas than replicaUsed
if (rep.getType().equals(ReplicaType.BITARCHIVE) && !rep.getId().equals(replicaUsed)) {
log.debug("Trying to retrieve index data for job '{}' from '{}'.", id, rep.getId());
b = arcrep.batch(job, rep.getId());
// Perform same check as for the batchresults from
// the default replica.
if (b.hasResultFile() && (b.getNoOfFilesProcessed() > b.getFilesFailed().size())) {
File cacheFileName = getCacheFile(id);
if (tryToMigrateDuplicationRecords) {
migrateDuplicatesBatch(id, rep.getId(), specifiedPattern, b, cacheFileName);
} else {
b.copyResults(cacheFileName);
}
log.debug("Cached data for job '{}' for '{}'", id, prefix);
return id;
} else {
log.trace("No data found for job '{}' for '{}' in bitarchive '{}'. ", id, prefix, rep);
}
}
}
log.info("No data found for job '{}' for '{}' in all bitarchive replicas", id, prefix);
}
return null;
}
}
/**
* If this cache represents a crawllog cache then this method will attempt to migrate any duplicate annotations in
* the crawl log using data in the duplicationmigration metadata record. This migrates filename/offset
* pairs from uncompressed to compressed (w)arc files. This method has the side effect of copying the index
* cache (whether migrated or not) into the cache file whose name is generated from the id.
* @param id the id of the cache
* @param replicaUsed which replica to look the file up in
* @param specifiedPattern the pattern specifying the files to be found
* @param originalBatchJob the original batch job which returned the unmigrated data.
*/
private void migrateDuplicatesBatch(Long id, String replicaUsed, String specifiedPattern, BatchStatus originalBatchJob, File cacheFileName) {
log.debug("Looking for a duplicationmigration record for id {}", id);
if (urlPattern.pattern().equals(MetadataFile.CRAWL_LOG_PATTERN)) {
GetMetadataArchiveBatchJob job2 = new GetMetadataArchiveBatchJob(Pattern.compile(".*duplicationmigration.*"), Pattern.compile("text/plain"));
job2.processOnlyFilesMatching(specifiedPattern);
BatchStatus b2 = arcrep.batch(job2, replicaUsed);
File migration = null;
try {
migration = File.createTempFile("migration", "txt");
} catch (IOException e) {
throw new IOFailure("Could not create temporary output file.");
}
if (b2.hasResultFile()) {
b2.copyResults(migration);
}
boolean doMigration = migration.exists() && migration.length() > 0;
if (doMigration) {
Hashtable<Pair<String, Long>, Long> lookup = createLookupTableFromMigrationLines(id, migration);
File crawllog = null;
try {
crawllog = File.createTempFile("dedup", "txt");
} catch (IOException e) {
throw new IOFailure("Could not create temporary output file.");
}
originalBatchJob.copyResults(crawllog);
migrateFilenameOffsetPairs(id, cacheFileName, crawllog, lookup);
} else {
originalBatchJob.copyResults(cacheFileName);
}
} else {
originalBatchJob.copyResults(cacheFileName);
}
}
private void migrateFilenameOffsetPairs(Long id, File cacheFileName, File crawllog, Hashtable<Pair<String, Long>, Long> lookup) {
Pattern duplicatePattern = Pattern.compile(".*duplicate:\"([^,]+),([0-9]+).*");
try {
int matches = 0;
int errors = 0;
for (String line : org.apache.commons.io.FileUtils.readLines(crawllog)) {
Matcher m = duplicatePattern.matcher(line);
if (m.matches()) {
matches++;
Long newOffset = lookup.get(new Pair<String, Long>(m.group(1), Long.parseLong(m.group(2))));
if (newOffset == null) {
log.warn("Could not migrate duplicate in " + line);
FileUtils.appendToFile(cacheFileName, line);
errors++;
} else {
String newLine = line.substring(0, m.start(2)) + newOffset + line.substring(m.end(2));
newLine = newLine.replace(m.group(1), m.group(1) + ".gz");
FileUtils.appendToFile(cacheFileName, newLine);
}
} else {
FileUtils.appendToFile(cacheFileName, line);
}
}
log.info("Found and migrated {} duplicate lines for job {} with {} errors", matches, id, errors);
} catch (IOException e) {
throw new IOFailure("Could not read " + crawllog.getAbsolutePath());
} finally {
crawllog.delete();
}
}
private Hashtable<Pair<String, Long>, Long> createLookupTableFromMigrationLines(Long id, File migration) {
Hashtable<Pair<String, Long>, Long> lookup = new Hashtable<>();
log.info("Found a nonempty duplicationmigration record. Now we do the migration for job {}", id);
try {
final List<String> migrationLines = org.apache.commons.io.FileUtils.readLines(migration);
log.info("{} migration records found for job {}", migrationLines.size(), id);
for (String line : migrationLines) {
// duplicationmigration lines look like this: "FILENAME 496812 393343 1282069269000"
// But only the first 3 entries are used.
String[] splitLine = StringUtils.split(line);
if (splitLine.length >= 3) {
lookup.put(new Pair<String, Long>(splitLine[0], Long.parseLong(splitLine[1])),
Long.parseLong(splitLine[2]));
} else {
log.warn("Line '" + line + "' has a wrong format. Ignoring line");
}
}
} catch (IOException e) {
throw new IOFailure("Could not read " + migration.getAbsolutePath());
} finally {
migration.delete();
}
return lookup;
}
}