diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java index 9c2c522c8..39b79e0a5 100644 --- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java +++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.replication; +import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Option; @@ -67,7 +68,6 @@ public static void main(String[] args) throws Exception { @Override public int run(String[] args) throws Exception { CommandLine cmd = getCommand(args); - DistCpOptions options = getDistCpOptions(cmd); Configuration conf = this.getConf(); // inject wf configs @@ -81,6 +81,8 @@ public int run(String[] args) throws Exception { final boolean includePathSet = (includePathConf != null) && !IGNORE.equalsIgnoreCase(includePathConf); + DistCpOptions options = getDistCpOptions(cmd, includePathSet); + String availabilityFlagOpt = cmd.getOptionValue("availabilityFlag"); if (StringUtils.isEmpty(availabilityFlagOpt)) { availabilityFlagOpt = "NA"; @@ -95,7 +97,7 @@ public int run(String[] args) throws Exception { DistCp distCp = (includePathSet) ? new CustomReplicator(conf, options) : new DistCp(conf, options); - LOG.info("Started DistCp"); + LOG.info("Started DistCp with options :" + options); Job job = distCp.execute(); if (cmd.hasOption("counterLogDir") @@ -220,12 +222,32 @@ protected CommandLine getCommand(String[] args) throws ParseException { return new GnuParser().parse(options, args); } - protected DistCpOptions getDistCpOptions(CommandLine cmd) throws FalconException, IOException { + protected DistCpOptions getDistCpOptions(CommandLine cmd, boolean includePathSet) + throws FalconException, IOException { String[] paths = cmd.getOptionValue("sourcePaths").trim().split(","); List srcPaths = getPaths(paths); String targetPathString = cmd.getOptionValue("targetPath").trim(); Path targetPath = new Path(targetPathString); + if (includePathSet) { + assert srcPaths.size() == 1 : "Source paths more than 1 can't be handled"; + + Path sourcePath = srcPaths.get(0); + Path includePath = new Path(getConf().get("falcon.include.path")); + assert includePath.toString().substring(0, sourcePath.toString().length()). + equals(sourcePath.toString()) : "Source path is not a subset of include path"; + + String relativePath = includePath.toString().substring(sourcePath.toString().length()); + String fixedPath = getFixedPath(relativePath); + + fixedPath = StringUtils.stripStart(fixedPath, "/"); + if (StringUtils.isNotEmpty(fixedPath)) { + sourcePath = new Path(sourcePath, fixedPath); + srcPaths = Arrays.asList(new Path[]{sourcePath}); + targetPath = new Path(targetPath, fixedPath); + } + } + return DistCPOptionsUtil.getDistCpOptions(cmd, srcPaths, targetPath, false, getConf()); } @@ -237,31 +259,14 @@ private List getPaths(String[] paths) { return listPaths; } - private void executePostProcessing(Configuration conf, DistCpOptions options) throws IOException, FalconException { + private void executePostProcessing(Configuration conf, DistCpOptions options) + throws IOException, FalconException { Path targetPath = options.getTargetPath(); FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( targetPath.toUri(), getConf()); - List inPaths = options.getSourcePaths(); - assert inPaths.size() == 1 : "Source paths more than 1 can't be handled"; - - Path sourcePath = inPaths.get(0); - Path includePath = new Path(getConf().get("falcon.include.path")); - assert includePath.toString().substring(0, sourcePath.toString().length()). - equals(sourcePath.toString()) : "Source path is not a subset of include path"; - - String relativePath = includePath.toString().substring(sourcePath.toString().length()); - String fixedPath = getFixedPath(relativePath); - - fixedPath = StringUtils.stripStart(fixedPath, "/"); - Path finalOutputPath; - if (StringUtils.isNotEmpty(fixedPath)) { - finalOutputPath = new Path(targetPath, fixedPath); - } else { - finalOutputPath = targetPath; - } final String availabilityFlag = conf.get("falcon.feed.availability.flag"); - FileStatus[] files = fs.globStatus(finalOutputPath); + FileStatus[] files = fs.globStatus(targetPath); if (files != null) { for (FileStatus file : files) { fs.create(new Path(file.getPath(), availabilityFlag)).close(); @@ -269,8 +274,8 @@ private void executePostProcessing(Configuration conf, DistCpOptions options) th } } else { // As distcp is not copying empty directories we are creating availabilityFlag file here - fs.create(new Path(finalOutputPath, availabilityFlag)).close(); - LOG.info("No files present in path: {}", finalOutputPath); + fs.create(new Path(targetPath, availabilityFlag)).close(); + LOG.info("No files present in path: {}", targetPath); } } diff --git a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java index b9b383dac..421976727 100644 --- a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java +++ b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java @@ -17,17 +17,17 @@ */ package org.apache.falcon.replication; -import org.apache.falcon.cluster.util.EmbeddedCluster; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.cli.CommandLine; +import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.falcon.entity.Storage; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.DistCpOptions; import org.testng.Assert; import org.testng.annotations.Test; -import java.util.ArrayList; -import java.util.List; - /** * Test class for FeedReplicator. */ @@ -61,7 +61,7 @@ public void testArguments() throws Exception { FeedReplicator replicator = new FeedReplicator(); CommandLine cmd = replicator.getCommand(args); replicator.setConf(cluster.getConf()); - DistCpOptions options = replicator.getDistCpOptions(cmd); + DistCpOptions options = replicator.getDistCpOptions(cmd, false); List srcPaths = new ArrayList(); srcPaths.add(new Path(defaultPath)); @@ -116,7 +116,7 @@ public void testOptionalArguments() throws Exception { FeedReplicator replicator = new FeedReplicator(); CommandLine cmd = replicator.getCommand(optionalArgs); - DistCpOptions options = replicator.getDistCpOptions(cmd); + DistCpOptions options = replicator.getDistCpOptions(cmd, false); List srcPaths = new ArrayList(); srcPaths.add(new Path(defaultPath)); @@ -124,6 +124,32 @@ public void testOptionalArguments() throws Exception { validateOptionalArguments(options); } + @Test + public void testIncludePath() throws Exception { + // Set the include Path so that CustomReplicator is used and the source and targetPaths are modified. + String includePath = defaultPath + "/test-colo"; + // creates jailed cluster in which DistCpOtions command can be tested. + EmbeddedCluster cluster = EmbeddedCluster.newCluster("FeedReplicatorTest"); + + final String[] args = { + "true", + "-maxMaps", "3", + "-mapBandwidth", "4", + "-sourcePaths", defaultPath, + "-targetPath", defaultPath, + "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(), + }; + + FeedReplicator replicator = new FeedReplicator(); + CommandLine cmd = replicator.getCommand(args); + Configuration conf = cluster.getConf(); + conf.set("falcon.include.path", includePath); + replicator.setConf(conf); + DistCpOptions options = replicator.getDistCpOptions(cmd, true); + Assert.assertEquals(options.getTargetPath().toString(), includePath); + Assert.assertEquals(options.getSourcePaths().get(0).toString(), includePath); + } + private void validateMandatoryArguments(DistCpOptions options, List srcPaths, boolean shouldSyncFolder) { Assert.assertEquals(options.getMaxMaps(), 3); Assert.assertEquals(options.getMapBandwidth(), 4);