Skip to content

Commit

Permalink
Simple cache for metadata cdx records
Browse files Browse the repository at this point in the history
  • Loading branch information
csrster committed May 19, 2021
1 parent efb441d commit ad347f1
Showing 1 changed file with 50 additions and 20 deletions.
Expand Up @@ -170,35 +170,65 @@ public static List<CDXRecord> getMetadataCDXRecordsForJob(long jobid) {
}
}

private static File getCDXCache(long jobid) {
String cacheDir = "metadata_cache";
String cdxcache = "cdxcache";
File cacheFile = new File(new File(new File(cacheDir), cdxcache), "" + jobid);
cacheFile.mkdirs();
return cacheFile;
}

private static List<CDXRecord> getCachedCDX(long jobid) {
List<String> cdxLines;
File cacheFile = getCDXCache(jobid);
if (cacheFile.exists()) {
try {
cdxLines = org.apache.commons.io.FileUtils.readLines(cacheFile);
return HadoopJobUtils.getCDXRecordListFromCDXLines(cdxLines);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
return null;
}
}

/**
* Submits a Hadoop job to generate cdx for all metadata files for a jobID and returns the resulting list of records.
*
* @param jobid The job to get CDX for.
* @return A list of CDX records.
*/
private static List<CDXRecord> getRecordsUsingHadoop(long jobid) {
Configuration hadoopConf = HadoopJobUtils.getConf();
String metadataFileSearchPattern = getMetadataFilePatternForJobId(jobid);

try (FileSystem fileSystem = FileSystem.newInstance(hadoopConf)) {
HadoopJobStrategy jobStrategy = new MetadataCDXExtractionStrategy(jobid, fileSystem);
HadoopJob job = new HadoopJob(jobid, jobStrategy);
job.processOnlyFilesMatching(metadataFileSearchPattern);
job.prepareJobInputOutput(fileSystem);
job.run();
List<CDXRecord> cdxRecords = getCachedCDX(jobid);
if (cdxRecords != null) {
return cdxRecords;
} else {
File cacheFile = getCDXCache(jobid);
Configuration hadoopConf = HadoopJobUtils.getConf();
String metadataFileSearchPattern = getMetadataFilePatternForJobId(jobid);

List<String> cdxLines;
try {
cdxLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir());
} catch (IOException e) {
log.error("Failed getting CDX lines output for Hadoop job with ID: {}", jobid);
throw new IOFailure("Failed getting " + job.getJobType() + " job results");
}
return HadoopJobUtils.getCDXRecordListFromCDXLines(cdxLines);
} catch (IOException e) {
log.error("Error instantiating Hadoop filesystem for job {}.", jobid, e);
throw new IOFailure("Failed instantiating Hadoop filesystem.");
}
try (FileSystem fileSystem = FileSystem.newInstance(hadoopConf)) {
HadoopJobStrategy jobStrategy = new MetadataCDXExtractionStrategy(jobid, fileSystem);
HadoopJob job = new HadoopJob(jobid, jobStrategy);
job.processOnlyFilesMatching(metadataFileSearchPattern);
job.prepareJobInputOutput(fileSystem);
job.run();

try {
List<String> cdxLines = HadoopJobUtils.collectOutputLines(fileSystem, job.getJobOutputDir());
org.apache.commons.io.FileUtils.writeLines(cacheFile, cdxLines);
} catch (IOException e) {
log.error("Failed getting CDX lines output for Hadoop job with ID: {}", jobid);
throw new IOFailure("Failed getting " + job.getJobType() + " job results");
}
return getCachedCDX(jobid);
} catch (IOException e) {
log.error("Error instantiating Hadoop filesystem for job {}.", jobid, e);
throw new IOFailure("Failed instantiating Hadoop filesystem.");
}
}
}

/**
Expand Down

0 comments on commit ad347f1

Please sign in to comment.