Skip to content

Commit

Permalink
NIFI-12889: Retry Kerberos login on auth failure in HDFS processors
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Mar 12, 2024
1 parent 7db1664 commit 154eeec
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen

// variables shared by all threads of this processor
// Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();

// Holder of cached Configuration information so validation does not reload the same config over and over
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
Expand All @@ -63,7 +62,6 @@
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.security.PrivilegedAction;
Expand Down Expand Up @@ -352,18 +350,18 @@ public Object run() {
case REPLACE_RESOLUTION:
if (hdfs.delete(copyFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{copyFile, putFlowFile});
copyFile, putFlowFile);
}
break;
case IGNORE_RESOLUTION:
session.transfer(putFlowFile, getSuccessRelationship());
getLogger().info("transferring {} to success because file with same name already exists",
new Object[]{putFlowFile});
putFlowFile);
return null;
case FAIL_RESOLUTION:
session.transfer(session.penalize(putFlowFile), getFailureRelationship());
getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
new Object[]{putFlowFile});
putFlowFile);
return null;
default:
break;
Expand All @@ -372,54 +370,56 @@ public Object run() {

// Write FlowFile to temp file on HDFS
final StopWatch stopWatch = new StopWatch(true);
session.read(putFlowFile, new InputStreamCallback() {

@Override
public void process(InputStream in) throws IOException {
OutputStream fos = null;
Path createdFile = null;
try {
if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
fos = hdfs.append(copyFile, bufferSize);
} else {
final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);

if (shouldIgnoreLocality(context, session)) {
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
}
session.read(putFlowFile, in -> {
OutputStream fos = null;
Path createdFile = null;
try {
if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
fos = hdfs.append(copyFile, bufferSize);
} else {
final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);

fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
null, null);
if (shouldIgnoreLocality(context, session)) {
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
}

if (codec != null) {
fos = codec.createOutputStream(fos);
fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
null, null);
}

if (codec != null) {
fos = codec.createOutputStream(fos);
}
createdFile = actualCopyFile;
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
fos.flush();
} catch (IOException e) {
// Catch GSSExceptions and reset the resources
Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().warn("Error authenticating when performing file operation, resetting HDFS resources: {} ", e.getCause().getMessage());
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
}
} finally {
try {
if (fos != null) {
fos.close();
}
createdFile = actualCopyFile;
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
fos.flush();
} finally {
try {
if (fos != null) {
fos.close();
}
} catch (Throwable t) {
// when talking to remote HDFS clusters, we don't notice problems until fos.close()
if (createdFile != null) {
try {
hdfs.delete(createdFile, false);
} catch (Throwable ignore) {
}
} catch (Throwable t) {
// when talking to remote HDFS clusters, we don't notice problems until fos.close()
if (createdFile != null) {
try {
hdfs.delete(createdFile, false);
} catch (Throwable ignore) {
}
throw t;
}
fos = null;
throw t;
}
fos = null;
}

});
stopWatch.stop();
final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
Expand Down Expand Up @@ -466,8 +466,13 @@ public void process(InputStream in) throws IOException {
Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().warn("An error occurred while connecting to HDFS. "
+ "Rolling back session, and penalizing flow file {}",
new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
+ "Rolling back session, resetting HDFS resources, and penalizing flow file {}",
putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get());
try {
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
} catch (IOException ioe) {
getLogger().warn("An error occurred resetting HDFS resources, you may need to restart the processor.");
}
session.rollback(true);
} else {
getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import static org.mockito.Mockito.when;

public class GetHDFSSequenceFileTest {
private HdfsResources hdfsResources;
private HdfsResources hdfsResourcesLocal;
private GetHDFSSequenceFile getHDFSSequenceFile;
private Configuration configuration;
private FileSystem fileSystem;
Expand All @@ -55,7 +55,7 @@ public void setup() throws IOException {
configuration = mock(Configuration.class);
fileSystem = mock(FileSystem.class);
userGroupInformation = mock(UserGroupInformation.class);
hdfsResources = new HdfsResources(configuration, fileSystem, userGroupInformation, null);
hdfsResourcesLocal = new HdfsResources(configuration, fileSystem, userGroupInformation, null);
getHDFSSequenceFile = new TestableGetHDFSSequenceFile();
getHDFSSequenceFile.kerberosProperties = mock(KerberosProperties.class);
reloginTried = false;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void getFlowFilesWithUgiAndNewTicketShouldCallDoAsAndNotRelogin() throws

@Test
public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
hdfsResources = new HdfsResources(configuration, fileSystem, null, null);
hdfsResourcesLocal = new HdfsResources(configuration, fileSystem, null, null);
init();
SequenceFileReader reader = mock(SequenceFileReader.class);
Path file = mock(Path.class);
Expand All @@ -97,7 +97,7 @@ public void testGetFlowFilesNoUgiShouldntCallDoAs() throws Exception {
public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile {
@Override
HdfsResources resetHDFSResources(final List<String> resourceLocations, ProcessContext context) throws IOException {
return hdfsResources;
return hdfsResourcesLocal;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Progressable;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
Expand Down Expand Up @@ -635,6 +636,33 @@ public void testPutFileWithCloseException() throws IOException {
mockFileSystem.delete(p, true);
}

@Test
public void testPutFileWithCreateException() throws IOException {
mockFileSystem = new MockFileSystem(false, true);
String dirName = "target/testPutFileCreateException";
File file = new File(dirName);
file.mkdirs();
Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory());

TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, mockFileSystem));
runner.setProperty(PutHDFS.DIRECTORY, dirName);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");

try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
runner.enqueue(fis, attributes);
runner.run();
}

List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
assertFalse(failedFlowFiles.isEmpty());
assertTrue(failedFlowFiles.get(0).isPenalized());

mockFileSystem.delete(p, true);
}

private class TestablePutHDFS extends PutHDFS {

private KerberosProperties testKerberosProperties;
Expand Down Expand Up @@ -666,13 +694,21 @@ private static class MockFileSystem extends FileSystem {
private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
private final boolean failOnClose;
private final boolean failOnCreate;

public MockFileSystem() {
failOnClose = false;
failOnCreate = false;
}

public MockFileSystem(boolean failOnClose) {
this.failOnClose = failOnClose;
this.failOnCreate = false;
}

public MockFileSystem(boolean failOnClose, boolean failOnCreate) {
this.failOnClose = failOnClose;
this.failOnCreate = failOnCreate;
}

public void setAcl(final Path path, final List<AclEntry> aclSpec) {
Expand All @@ -696,7 +732,11 @@ public FSDataInputStream open(final Path f, final int bufferSize) {

@Override
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
final long blockSize, final Progressable progress) {
final long blockSize, final Progressable progress) throws IOException {
if (failOnCreate) {
// Simulate an AuthenticationException wrapped in an IOException
throw new IOException(new AuthenticationException("test auth error"));
}
pathToStatus.put(f, newFile(f, permission));
if(failOnClose) {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) {
Expand Down

0 comments on commit 154eeec

Please sign in to comment.