Skip to content
Permalink
Browse files

Fixed handling of multiple replicas, plus logging and special cases.

  • Loading branch information...
csrster committed Jan 26, 2017
1 parent d42dabd commit 5a45eb18ff0e7bfd00d22af8c6afd5ee087c1a87
@@ -25,6 +25,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Hashtable;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@@ -134,67 +135,11 @@ protected Long cacheData(Long id) {
final String specifiedPattern = ".*" + id + ".*" + metadataFilePatternSuffix;
job.processOnlyFilesMatching(specifiedPattern);
BatchStatus b = arcrep.batch(job, replicaUsed);

//TODO for crawl log data, we here call a 2nd batchjob which looks for the ifile data, and apply that
//to modify the data in b.


// This check ensures that we got data from at least one file.
// Mind you, the data may be empty, but at least one file was
// successfully processed.
Pattern duplicatePattern = Pattern.compile(".*duplicate:\"([^,]+),([0-9]+),.*");
if (b.hasResultFile() && b.getNoOfFilesProcessed() > b.getFilesFailed().size()) {
File cacheFileName = getCacheFile(id);
if (urlPattern.pattern().equals(MetadataFile.CRAWL_LOG_PATTERN)) {
File crawllog = null;
try {
crawllog = File.createTempFile("dedup","txt");
} catch (IOException e) {
throw new IOFailure("Could not create temprary output file.");
}
b.copyResults(crawllog);
GetMetadataArchiveBatchJob job2 = new GetMetadataArchiveBatchJob(Pattern.compile(".*duplicationmigration.*"), Pattern.compile("text/plain"));
job2.processOnlyFilesMatching(specifiedPattern);
BatchStatus b2 = arcrep.batch(job2, replicaUsed);
File migration = null;
try {
migration = File.createTempFile("migration", "txt");
} catch (IOException e) {
throw new IOFailure("Could not create temprary output file.");
}
b2.copyResults(migration);
Hashtable<Pair<String,Long>, Long> lookup = new Hashtable<>();
try {
for (String line: org.apache.commons.io.FileUtils.readLines(migration) ){
String[] splitLine = StringUtils.split(line);
lookup.put(new Pair<String,Long>(splitLine[0], Long.parseLong(splitLine[1])), Long.parseLong(splitLine[2]));
}
} catch (IOException e) {
throw new IOFailure("Could not read " + migration.getAbsolutePath());
} finally {
migration.delete();
}
try {
for (String line: org.apache.commons.io.FileUtils.readLines(crawllog)) {
Matcher m = duplicatePattern.matcher(line);
if (m.matches()) {
Long newOffset = lookup.get(new Pair<String, Long>(m.group(1), Long.parseLong(m.group(2))));
String newLine = line.substring(0, m.start(2)) + newOffset + line.substring(m.end(2));
newLine = newLine.replace(m.group(1), m.group(1)+".gz");
FileUtils.appendToFile(cacheFileName, newLine);
} else {
FileUtils.appendToFile(cacheFileName, line);
}
}
} catch (IOException e) {
throw new IOFailure("Could not read " + crawllog.getAbsolutePath());
} finally {
crawllog.delete();
}
} else {
b.copyResults(cacheFileName);
}
log.debug("Cached data for job '{}' for '{}'", id, prefix);
migrateDuplicates(id, replicaUsed, specifiedPattern, b);
return id;
} else {
// Look for data in other bitarchive replicas, if this option is enabled
@@ -209,14 +154,10 @@ protected Long cacheData(Long id) {
if (rep.getType().equals(ReplicaType.BITARCHIVE) && !rep.getId().equals(replicaUsed)) {
log.debug("Trying to retrieve index data for job '{}' from '{}'.", id, rep.getId());
b = arcrep.batch(job, rep.getId());

// Perform same check as for the batchresults from
// the default replica.
if (b.hasResultFile() && (b.getNoOfFilesProcessed() > b.getFilesFailed().size())) {
File cacheFileName = getCacheFile(id);
b.copyResults(cacheFileName);
log.info("Cached data for job '{}' for '{}' from '{}' instead of '{}'", id, prefix, rep,
replicaUsed);
migrateDuplicates(id, rep.getId(), specifiedPattern, b);
return id;
} else {
log.trace("No data found for job '{}' for '{}' in bitarchive '{}'. ", id, prefix, rep);
@@ -229,4 +170,87 @@ protected Long cacheData(Long id) {
}
}

/**
* If this cache represents a crawllog cache then this method will attempt to migrate any duplicate annotations in
* the crawl log using data in the duplicationmigration metadata record. This migrates filename/offset
* pairs from uncompressed to compressed (w)arc files. This method has the side effect of copying the index
* cache (whether migrated or not) into the cache file whose name is generated from the id.
* @param id the id of the cache
* @param replicaUsed which replica to look the file up in
* @param specifiedPattern the pattern specifying the files to be found
* @param originalBatchJob the original batch job which returned the unmigrated data.
*/
private void migrateDuplicates(Long id, String replicaUsed, String specifiedPattern, BatchStatus originalBatchJob) {
File cacheFileName = getCacheFile(id);
Pattern duplicatePattern = Pattern.compile(".*duplicate:\"([^,]+),([0-9]+),.*");
if (urlPattern.pattern().equals(MetadataFile.CRAWL_LOG_PATTERN)) {
GetMetadataArchiveBatchJob job2 = new GetMetadataArchiveBatchJob(Pattern.compile(".*duplicationmigration.*"), Pattern.compile("text/plain"));
job2.processOnlyFilesMatching(specifiedPattern);
BatchStatus b2 = arcrep.batch(job2, replicaUsed);
File migration = null;
try {
migration = File.createTempFile("migration", "txt");
} catch (IOException e) {
throw new IOFailure("Could not create temporary output file.");
}
if (b2.hasResultFile()) {
b2.copyResults(migration);
}
boolean doMigration = migration.exists() && migration.length() > 0;
Hashtable<Pair<String, Long>, Long> lookup = new Hashtable<>();
if (doMigration) {
log.info("Doing migration for {}", id);
try {
final List<String> migrationLines = org.apache.commons.io.FileUtils.readLines(migration);
log.info("{} migration records found.", migrationLines.size());
for (String line : migrationLines) {
String[] splitLine = StringUtils.split(line);
lookup.put(new Pair<String, Long>(splitLine[0], Long.parseLong(splitLine[1])),
Long.parseLong(splitLine[2]));
}
} catch (IOException e) {
throw new IOFailure("Could not read " + migration.getAbsolutePath());
} finally {
migration.delete();
}
}
if (doMigration) {
File crawllog = null;
try {
crawllog = File.createTempFile("dedup", "txt");
} catch (IOException e) {
throw new IOFailure("Could not create temporary output file.");
}
originalBatchJob.copyResults(crawllog);
try {
for (String line : org.apache.commons.io.FileUtils.readLines(crawllog)) {
Matcher m = duplicatePattern.matcher(line);
if (m.matches()) {
Long newOffset = lookup.get(new Pair<String, Long>(m.group(1), Long.parseLong(m.group(2))));
if (newOffset == null) {
log.warn("Could not migrate duplicate in " + line);
FileUtils.appendToFile(cacheFileName, line);
} else {
String newLine = line.substring(0, m.start(2)) + newOffset + line.substring(m.end(2));
newLine = newLine.replace(m.group(1), m.group(1) + ".gz");
FileUtils.appendToFile(cacheFileName, newLine);
}
} else {
FileUtils.appendToFile(cacheFileName, line);
}
}
} catch (IOException e) {
throw new IOFailure("Could not read " + crawllog.getAbsolutePath());
} finally {
crawllog.delete();
}
} else {
originalBatchJob.copyResults(cacheFileName);
}
} else {
originalBatchJob.copyResults(cacheFileName);
}
log.debug("Cached data for job '{}' for '{}'", id, prefix);
}

}

0 comments on commit 5a45eb1

Please sign in to comment.
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.