Skip to content

Commit

Permalink
Merge pull request apache#33 from InMobi/729
Browse files Browse the repository at this point in the history
FALCON-729 When feed from multiple colos are replicated, the colo fo…
  • Loading branch information
pallavi-rao committed Oct 25, 2016
2 parents ad5ad64 + 14649a5 commit 4f85c89
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.falcon.replication;

import java.util.Arrays;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
Expand Down Expand Up @@ -67,7 +68,6 @@ public static void main(String[] args) throws Exception {
@Override
public int run(String[] args) throws Exception {
CommandLine cmd = getCommand(args);
DistCpOptions options = getDistCpOptions(cmd);

Configuration conf = this.getConf();
// inject wf configs
Expand All @@ -81,6 +81,8 @@ public int run(String[] args) throws Exception {
final boolean includePathSet = (includePathConf != null)
&& !IGNORE.equalsIgnoreCase(includePathConf);

DistCpOptions options = getDistCpOptions(cmd, includePathSet);

String availabilityFlagOpt = cmd.getOptionValue("availabilityFlag");
if (StringUtils.isEmpty(availabilityFlagOpt)) {
availabilityFlagOpt = "NA";
Expand All @@ -95,7 +97,7 @@ public int run(String[] args) throws Exception {
DistCp distCp = (includePathSet)
? new CustomReplicator(conf, options)
: new DistCp(conf, options);
LOG.info("Started DistCp");
LOG.info("Started DistCp with options :" + options);
Job job = distCp.execute();

if (cmd.hasOption("counterLogDir")
Expand Down Expand Up @@ -220,12 +222,32 @@ protected CommandLine getCommand(String[] args) throws ParseException {
return new GnuParser().parse(options, args);
}

protected DistCpOptions getDistCpOptions(CommandLine cmd) throws FalconException, IOException {
protected DistCpOptions getDistCpOptions(CommandLine cmd, boolean includePathSet)
throws FalconException, IOException {
String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
List<Path> srcPaths = getPaths(paths);
String targetPathString = cmd.getOptionValue("targetPath").trim();
Path targetPath = new Path(targetPathString);

if (includePathSet) {
assert srcPaths.size() == 1 : "Source paths more than 1 can't be handled";

Path sourcePath = srcPaths.get(0);
Path includePath = new Path(getConf().get("falcon.include.path"));
assert includePath.toString().substring(0, sourcePath.toString().length()).
equals(sourcePath.toString()) : "Source path is not a subset of include path";

String relativePath = includePath.toString().substring(sourcePath.toString().length());
String fixedPath = getFixedPath(relativePath);

fixedPath = StringUtils.stripStart(fixedPath, "/");
if (StringUtils.isNotEmpty(fixedPath)) {
sourcePath = new Path(sourcePath, fixedPath);
srcPaths = Arrays.asList(new Path[]{sourcePath});
targetPath = new Path(targetPath, fixedPath);
}
}

return DistCPOptionsUtil.getDistCpOptions(cmd, srcPaths, targetPath, false, getConf());
}

Expand All @@ -237,40 +259,23 @@ private List<Path> getPaths(String[] paths) {
return listPaths;
}

private void executePostProcessing(Configuration conf, DistCpOptions options) throws IOException, FalconException {
private void executePostProcessing(Configuration conf, DistCpOptions options)
throws IOException, FalconException {
Path targetPath = options.getTargetPath();
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
targetPath.toUri(), getConf());
List<Path> inPaths = options.getSourcePaths();
assert inPaths.size() == 1 : "Source paths more than 1 can't be handled";

Path sourcePath = inPaths.get(0);
Path includePath = new Path(getConf().get("falcon.include.path"));
assert includePath.toString().substring(0, sourcePath.toString().length()).
equals(sourcePath.toString()) : "Source path is not a subset of include path";

String relativePath = includePath.toString().substring(sourcePath.toString().length());
String fixedPath = getFixedPath(relativePath);

fixedPath = StringUtils.stripStart(fixedPath, "/");
Path finalOutputPath;
if (StringUtils.isNotEmpty(fixedPath)) {
finalOutputPath = new Path(targetPath, fixedPath);
} else {
finalOutputPath = targetPath;
}

final String availabilityFlag = conf.get("falcon.feed.availability.flag");
FileStatus[] files = fs.globStatus(finalOutputPath);
FileStatus[] files = fs.globStatus(targetPath);
if (files != null) {
for (FileStatus file : files) {
fs.create(new Path(file.getPath(), availabilityFlag)).close();
LOG.info("Created {}", new Path(file.getPath(), availabilityFlag));
}
} else {
// As distcp is not copying empty directories we are creating availabilityFlag file here
fs.create(new Path(finalOutputPath, availabilityFlag)).close();
LOG.info("No files present in path: {}", finalOutputPath);
fs.create(new Path(targetPath, availabilityFlag)).close();
LOG.info("No files present in path: {}", targetPath);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
*/
package org.apache.falcon.replication;

import org.apache.falcon.cluster.util.EmbeddedCluster;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.Storage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.DistCpOptions;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;

/**
* Test class for FeedReplicator.
*/
Expand Down Expand Up @@ -61,7 +61,7 @@ public void testArguments() throws Exception {
FeedReplicator replicator = new FeedReplicator();
CommandLine cmd = replicator.getCommand(args);
replicator.setConf(cluster.getConf());
DistCpOptions options = replicator.getDistCpOptions(cmd);
DistCpOptions options = replicator.getDistCpOptions(cmd, false);

List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path(defaultPath));
Expand Down Expand Up @@ -116,14 +116,40 @@ public void testOptionalArguments() throws Exception {

FeedReplicator replicator = new FeedReplicator();
CommandLine cmd = replicator.getCommand(optionalArgs);
DistCpOptions options = replicator.getDistCpOptions(cmd);
DistCpOptions options = replicator.getDistCpOptions(cmd, false);

List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path(defaultPath));
validateMandatoryArguments(options, srcPaths, false);
validateOptionalArguments(options);
}

@Test
public void testIncludePath() throws Exception {
// Set the include Path so that CustomReplicator is used and the source and targetPaths are modified.
String includePath = defaultPath + "/test-colo";
// creates jailed cluster in which DistCpOtions command can be tested.
EmbeddedCluster cluster = EmbeddedCluster.newCluster("FeedReplicatorTest");

final String[] args = {
"true",
"-maxMaps", "3",
"-mapBandwidth", "4",
"-sourcePaths", defaultPath,
"-targetPath", defaultPath,
"-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
};

FeedReplicator replicator = new FeedReplicator();
CommandLine cmd = replicator.getCommand(args);
Configuration conf = cluster.getConf();
conf.set("falcon.include.path", includePath);
replicator.setConf(conf);
DistCpOptions options = replicator.getDistCpOptions(cmd, true);
Assert.assertEquals(options.getTargetPath().toString(), includePath);
Assert.assertEquals(options.getSourcePaths().get(0).toString(), includePath);
}

private void validateMandatoryArguments(DistCpOptions options, List<Path> srcPaths, boolean shouldSyncFolder) {
Assert.assertEquals(options.getMaxMaps(), 3);
Assert.assertEquals(options.getMapBandwidth(), 4);
Expand Down

0 comments on commit 4f85c89

Please sign in to comment.