Skip to content

Commit

Permalink
NIFI-13121: Handle runtime exceptions in FetchHDFS
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed May 1, 2024
1 parent f70967e commit 104782a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
Expand Down Expand Up @@ -175,7 +176,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
outgoingFlowFile = session.putAttribute(outgoingFlowFile, CoreAttributes.FILENAME.key(), outputFilename);

stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[]{qualifiedPath, outgoingFlowFile, stopWatch.getDuration()});
getLogger().info("Successfully received content from {} for {} in {}", qualifiedPath, outgoingFlowFile, stopWatch.getDuration());
outgoingFlowFile = session.putAttribute(outgoingFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().fetch(outgoingFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(outgoingFlowFile, getSuccessRelationship());
Expand All @@ -190,6 +191,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
outgoingFlowFile = session.penalize(outgoingFlowFile);
session.transfer(outgoingFlowFile, getCommsFailureRelationship());
}
} catch (FlowFileAccessException ffae) {
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", outgoingFlowFile, ffae);
outgoingFlowFile = session.penalize(outgoingFlowFile);
session.transfer(outgoingFlowFile, getCommsFailureRelationship());
} finally {
IOUtils.closeQuietly(stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,25 @@ public void testGSSException() throws IOException {
fileSystem.setFailOnOpen(false);
}

@Test
public void testRuntimeException() {
MockFileSystem fileSystem = new MockFileSystem();
fileSystem.setRuntimeFailOnOpen(true);
FetchHDFS proc = new TestableFetchHDFS(kerberosProperties, fileSystem);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz");
runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE");
runner.enqueue("trigger flow file");
runner.run();

runner.assertTransferCount(FetchHDFS.REL_SUCCESS, 0);
runner.assertTransferCount(FetchHDFS.REL_FAILURE, 0);
runner.assertTransferCount(FetchHDFS.REL_COMMS_FAILURE, 1);
// assert that the file was penalized
runner.assertPenalizeCount(1);
fileSystem.setRuntimeFailOnOpen(false);
}

private static class TestableFetchHDFS extends FetchHDFS {
private final KerberosProperties testKerberosProps;
private final FileSystem fileSystem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -26,6 +27,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Progressable;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.ietf.jgss.GSSException;

import java.io.ByteArrayOutputStream;
Expand All @@ -49,6 +51,7 @@ public class MockFileSystem extends FileSystem {
private final Map<Path, FSDataOutputStream> pathToOutputStream = new HashMap<>();

private boolean failOnOpen;
private boolean runtimeFailOnOpen;
private boolean failOnClose;
private boolean failOnCreate;
private boolean failOnFileStatus;
Expand All @@ -74,6 +77,10 @@ public void setFailOnOpen(final boolean failOnOpen) {
this.failOnOpen = failOnOpen;
}

public void setRuntimeFailOnOpen(final boolean runtimeFailOnOpen) {
this.runtimeFailOnOpen = runtimeFailOnOpen;
}

public void setAcl(final Path path, final List<AclEntry> aclSpec) {
pathToAcl.put(path, aclSpec);
}
Expand All @@ -93,7 +100,10 @@ public FSDataInputStream open(final Path f, final int bufferSize) throws IOExcep
if (failOnOpen) {
throw new IOException(new GSSException(13));
}
return null;
if (runtimeFailOnOpen) {
throw new FlowFileAccessException("runtime");
}
return createInputStream(f);
}

@Override
Expand Down Expand Up @@ -190,6 +200,19 @@ public boolean exists(Path f) throws IOException {
return pathToStatus.containsKey(f);
}

private FSDataInputStream createInputStream(final Path f) throws IOException {
if(failOnClose) {
return new FSDataInputStream(new StubFSInputStream()) {
@Override
public void close() throws IOException {
super.close();
throw new IOException("Fail on close");
}
};
} else {
return new FSDataInputStream(new StubFSInputStream());
}
}
private FSDataOutputStream createOutputStream() {
if(failOnClose) {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) {
Expand Down Expand Up @@ -294,4 +317,27 @@ public short getDefaultReplication() {
private static FsPermission perms(short p) {
return new FsPermission(p);
}

private class StubFSInputStream extends FSInputStream {

@Override
public void seek(long l) throws IOException {

}

@Override
public long getPos() throws IOException {
return 0;
}

@Override
public boolean seekToNewSource(long l) throws IOException {
return true;
}

@Override
public int read() throws IOException {
return -1;
}
}
}

0 comments on commit 104782a

Please sign in to comment.