Skip to content

Commit

Permalink
[GOBBLIN-303] Remove empty avro files during compaction
Browse files Browse the repository at this point in the history
Closes apache#2158 from yukuai518/empty
  • Loading branch information
yukuai518 authored and zxliucmu committed Nov 16, 2017
1 parent 65398ff commit 60ca454
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -83,30 +86,36 @@ public void onCompactionJobComplete (FileSystemDataset dataset) throws IOExcepti
boolean appendDeltaOutput = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);

// Obtain record count from input file names
// We are not getting record count from map-reduce counter because in next run, the threshold (delta record)
// calculation is based on the input file names.
Job job = this.configurator.getConfiguredJob();

long newTotalRecords = 0;
long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir()));
long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir()));

List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, tmpPath, this.fs);

if (appendDeltaOutput) {
FsPermission permission = HadoopUtils.deserializeFsPermission(this.state,
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
FsPermission.getDefault());
WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath, permission);
// append files under mr output to destination
List<Path> paths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
for (Path path: paths) {
String fileName = path.getName();
log.info(String.format("Adding %s to %s", path.toString(), dstPath));
for (Path filePath: goodPaths) {
String fileName = filePath.getName();
log.info(String.format("Adding %s to %s", filePath.toString(), dstPath));
Path outPath = new Path (dstPath, fileName);

if (!this.fs.rename(path, outPath)) {
if (!this.fs.rename(filePath, outPath)) {
throw new IOException(
String.format("Unable to move %s to %s", path.toString(), outPath.toString()));
String.format("Unable to move %s to %s", filePath.toString(), outPath.toString()));
}
}

// Obtain record count from input file names.
// We don't get record count from map-reduce counter because in the next run, the threshold (delta record)
// calculation is based on the input file names. By pre-defining which input folders are involved in the
// MR execution, it is easy to track how many files are involved in MR so far, thus calculating the number of total records
// (all previous run + current run) is possible.
newTotalRecords = this.configurator.getFileNameRecordCount();
} else {
this.fs.delete(dstPath, true);
Expand All @@ -120,15 +129,18 @@ public void onCompactionJobComplete (FileSystemDataset dataset) throws IOExcepti
String.format("Unable to move %s to %s", tmpPath, dstPath));
}

// get record count from map reduce job counter
Job job = this.configurator.getConfiguredJob();
// Obtain record count from map reduce job counter
// We don't get record count from file name because tracking which files are actually involved in the MR execution can
// be hard. This is due to new minutely data is rolled up to hourly folder but from daily compaction perspective we are not
// able to tell which file are newly added (because we simply pass all hourly folders to MR job instead of individual files).
Counter counter = job.getCounters().findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
newTotalRecords = counter.getValue();
}

State compactState = helper.loadState(new Path (result.getDstAbsoluteDir()));
compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
helper.saveState(new Path (result.getDstAbsoluteDir()), compactState);

log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath, executeCount + 1);
Expand All @@ -138,12 +150,15 @@ public void onCompactionJobComplete (FileSystemDataset dataset) throws IOExcepti
Map<String, String> eventMetadataMap = ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords),
CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1),
CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap);
}
}
}



public void addEventSubmitter(EventSubmitter eventSubmitter) {
this.eventSubmitter = eventSubmitter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class CompactionSlaEventHelper {
public static final String NEED_RECOMPACT = "needRecompact";
public static final String PREV_RECORD_COUNT_TOTAL = "prevRecordCountTotal";
public static final String EXEC_COUNT_TOTAL = "executionCountTotal";
public static final String MR_JOB_ID = "mrJobId";
public static final String RECORD_COUNT_TOTAL = "recordCountTotal";
public static final String HIVE_REGISTRATION_PATHS = "hiveRegistrationPaths";
public static final String RENAME_DIR_PATHS = "renameDirPaths";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import com.google.common.base.Enums;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;

import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.mapreduce.avro.*;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
Expand All @@ -38,20 +41,27 @@
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.commons.math3.primes.Primes;
import org.apache.gobblin.writer.WriterOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A configurator that focused on creating avro compaction map-reduce job
Expand Down Expand Up @@ -331,5 +341,61 @@ protected Collection<Path> getGranularInputPaths (Path path) throws IOException

return uncompacted;
}

private static List<TaskCompletionEvent> getAllTaskCompletionEvent(Job completedJob) {
List<TaskCompletionEvent> completionEvents = new LinkedList<>();

while (true) {
try {
TaskCompletionEvent[] bunchOfEvents;
bunchOfEvents = completedJob.getTaskCompletionEvents(completionEvents.size());
if (bunchOfEvents == null || bunchOfEvents.length == 0) {
break;
}
completionEvents.addAll(Arrays.asList(bunchOfEvents));
} catch (IOException e) {
break;
}
}

return completionEvents;
}

private static List<TaskCompletionEvent> getUnsuccessfulTaskCompletionEvent(Job completedJob) {
return getAllTaskCompletionEvent(completedJob).stream().filter(te->te.getStatus() != TaskCompletionEvent.Status.SUCCEEDED).collect(
Collectors.toList());
}

private static boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) {
return failedEvents.stream()
.anyMatch(event -> path.toString().contains(Path.SEPARATOR + event.getTaskAttemptId().toString() + Path.SEPARATOR));
}

/**
* Remove all bad paths caused by speculative execution
* The problem happens when speculative task attempt initialized but then killed in the middle of processing.
* Some partial file was generated at {tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/part-m-xxxx.avro,
* without being committed to its final destination at {tmp_output}/part-m-xxxx.avro.
*
* @param job Completed MR job
* @param fs File system that can handle file system
* @return all successful paths
*/
public static List<Path> removeFailedPaths(Job job, Path tmpPath, FileSystem fs) throws IOException {
List<TaskCompletionEvent> failedEvents = CompactionAvroJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);

List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, Lists.newArrayList("avro"));
List<Path> goodPaths = new ArrayList<>();
for (Path filePath: allFilePaths) {
if (CompactionAvroJobConfigurator.isFailedPath(filePath, failedEvents)) {
fs.delete(filePath, false);
log.error("{} is a bad path so it was deleted", filePath);
} else {
goodPaths.add(filePath);
}
}

return goodPaths;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.math3.primes.Primes;
Expand All @@ -40,6 +42,7 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -321,9 +324,12 @@ public void run() {
this.configureJob(job);
this.submitAndWait(job);
if (shouldPublishData(compactionTimestamp)) {
// remove all invalid empty files due to speculative task execution
List<Path> goodPaths = CompactionAvroJobConfigurator.removeFailedPaths(job, this.dataset.outputTmpPath(), this.tmpFs);

if (!this.recompactAllData && this.recompactFromDestPaths) {
// append new files without deleting output directory
addFilesInTmpPathToOutputPath();
addGoodFilesToOutputPath(goodPaths);
// clean up late data from outputLateDirectory, which has been set to inputPath
deleteFilesByPaths(this.dataset.inputPaths());
} else {
Expand Down Expand Up @@ -352,7 +358,6 @@ public void run() {
}
}


/**
* For regular compactions, compaction timestamp is the time the compaction job starts.
*
Expand Down Expand Up @@ -603,9 +608,8 @@ private void moveTmpPathToOutputPath() throws IOException {
HadoopUtils.movePath (MRCompactorJobRunner.this.tmpFs, this.dataset.outputTmpPath(), FileSystem.get(this.dataset.outputPath().getParent().toUri(), this.fs.getConf()), this.dataset.outputPath(), false, this.fs.getConf()) ;
}

private void addFilesInTmpPathToOutputPath () throws IOException {
List<Path> paths = this.getApplicableFilePaths(this.dataset.outputTmpPath(), this.tmpFs);
for (Path path: paths) {
private void addGoodFilesToOutputPath (List<Path> goodPaths) throws IOException {
for (Path path: goodPaths) {
String fileName = path.getName();
LOG.info(String.format("Adding %s to %s", path.toString(), this.dataset.outputPath()));
Path outPath = MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(fileName,
Expand Down

0 comments on commit 60ca454

Please sign in to comment.