Skip to content

Commit

Permalink
NIFI-6679 - MoveHDFS removes original file when destination exists
Browse files Browse the repository at this point in the history
- Include testPutWhenAlreadyExisting test
- Resolve dependencies

This closes apache#3746.

Signed-off-by: Bryan Bende <bbende@apache.org>
  • Loading branch information
eduardofontes authored and szaboferee committed Oct 7, 2019
1 parent 19c6012 commit 2ed6e74
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,10 @@ public Object run() {
if (destinationExists) {
switch (processorConfig.getConflictResolution()) {
case REPLACE_RESOLUTION:
if (hdfs.delete(file, false)) {
// Remove destination file (newFile) to replace
if (hdfs.delete(newFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{file, flowFile});
new Object[]{newFile, flowFile});
}
break;
case IGNORE_RESOLUTION:
Expand Down Expand Up @@ -547,4 +548,4 @@ public boolean accept(Path path) {
};
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NiFiProperties;
Expand All @@ -38,6 +39,7 @@
import java.util.HashSet;
import java.util.List;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -233,6 +235,52 @@ public void testEmptyInputDirectory() throws IOException {
Assert.assertEquals(0, flowFiles.size());
}

@Test
public void testPutWhenAlreadyExistingShouldFailWhenFAIL_RESOLUTION() throws IOException {
testPutWhenAlreadyExisting(MoveHDFS.FAIL_RESOLUTION, MoveHDFS.REL_FAILURE, "randombytes-1");
}

@Test
public void testPutWhenAlreadyExistingShouldIgnoreWhenIGNORE_RESOLUTION() throws IOException {
testPutWhenAlreadyExisting(MoveHDFS.IGNORE_RESOLUTION, MoveHDFS.REL_SUCCESS, "randombytes-1");
}

@Test
public void testPutWhenAlreadyExistingShouldReplaceWhenREPLACE_RESOLUTION() throws IOException {
testPutWhenAlreadyExisting(MoveHDFS.REPLACE_RESOLUTION, MoveHDFS.REL_SUCCESS, "randombytes-2");
}

private void testPutWhenAlreadyExisting(String conflictResolution, Relationship expectedDestination, String expectedContent) throws IOException {
// GIVEN
Files.createDirectories(Paths.get(INPUT_DIRECTORY));
Files.createDirectories(Paths.get(OUTPUT_DIRECTORY));
Files.copy(Paths.get(TEST_DATA_DIRECTORY, "randombytes-2"), Paths.get(INPUT_DIRECTORY, "randombytes-1"));
Files.copy(Paths.get(TEST_DATA_DIRECTORY, "randombytes-1"), Paths.get(OUTPUT_DIRECTORY, "randombytes-1"));

MoveHDFS processor = new MoveHDFS();

TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.setProperty(MoveHDFS.CONFLICT_RESOLUTION, conflictResolution);

byte[] expected = Files.readAllBytes(Paths.get(TEST_DATA_DIRECTORY, expectedContent));

// WHEN
runner.enqueue(new byte[0]);
runner.run();

// THEN
runner.assertAllFlowFilesTransferred(expectedDestination);

List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(expectedDestination);
Assert.assertEquals(1, flowFiles.size());

byte[] actual = Files.readAllBytes(Paths.get(OUTPUT_DIRECTORY, "randombytes-1"));

assertArrayEquals(expected, actual);
}

private static class TestableMoveHDFS extends MoveHDFS {

private KerberosProperties testKerberosProperties;
Expand All @@ -245,7 +293,5 @@ public TestableMoveHDFS(KerberosProperties testKerberosProperties) {
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return testKerberosProperties;
}

}

}

0 comments on commit 2ed6e74

Please sign in to comment.