Skip to content

Commit

Permalink
NIFI-12889 - Retry Kerberos login on auth failure in HDFS processors
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Apr 9, 2024
1 parent 10ce008 commit 9fb018f
Show file tree
Hide file tree
Showing 17 changed files with 854 additions and 605 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.hadoop;

import com.google.common.base.Throwables;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -42,11 +43,13 @@
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.ietf.jgss.GSSException;

import javax.net.SocketFactory;
import java.io.File;
Expand All @@ -62,7 +65,10 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Stream;

/**
* This is a base class that is helpful when building processors interacting with HDFS.
Expand Down Expand Up @@ -171,7 +177,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen

// variables shared by all threads of this processor
// Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();

// Holder of cached Configuration information so validation does not reload the same config over and over
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
Expand Down Expand Up @@ -532,12 +538,7 @@ protected FileSystem getFileSystem(final Configuration config) throws IOExceptio

protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(config);
}
});
return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(config));
} catch (InterruptedException e) {
throw new IOException("Unable to create file system: " + e.getMessage());
}
Expand Down Expand Up @@ -703,4 +704,35 @@ private Path getNormalizedPath(final String rawPath, final Optional<String> prop

return new Path(path.replaceAll("/+", "/"));
}

/**
* Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
* and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
* @param t The throwable to inspect for the cause.
* @return Throwable Cause
*/
protected <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
return causalChain
.filter(expectedCauseType::isInstance)
.map(expectedCauseType::cast)
.filter(causePredicate)
.findFirst();
}

protected boolean handleAuthErrors(Throwable t, ProcessSession session, ProcessContext context, BiConsumer<ProcessSession, ProcessContext> sessionHandler) {
Optional<GSSException> causeOptional = findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {

getLogger().error("An error occurred while connecting to HDFS. Rolling back session and, and resetting HDFS resources", causeOptional.get());
try {
hdfsResources.set(resetHDFSResources(getConfigLocations(context), context));
} catch (IOException ioe) {
getLogger().error("An error occurred resetting HDFS resources, you may need to restart the processor.");
}
sessionHandler.accept(session, context);
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;

import java.io.IOException;
import java.security.PrivilegedAction;
Expand Down Expand Up @@ -177,16 +178,20 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
} catch (IOException ioe) {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive.
getLogger().warn("Failed to delete file or directory", ioe);

Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage());

session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship());
failedPath++;
if (handleAuthErrors(ioe, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
return null;
} else {
// One possible scenario is that the IOException is permissions based, however it would be impractical to check every possible
// external HDFS authorization tool (Ranger, Sentry, etc). Local ACLs could be checked but the operation would be expensive.
getLogger().warn("Failed to delete file or directory", ioe);

Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
// The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage());

session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship());
failedPath++;
}
}
}
}
Expand All @@ -198,8 +203,12 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
session.remove(flowFile);
}
} catch (IOException e) {
getLogger().error("Error processing delete for flowfile {} due to {}", flowFile, e.getMessage(), e);
session.transfer(flowFile, getFailureRelationship());
if (handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
return null;
} else {
getLogger().error("Error processing delete for flowfile {} due to {}", flowFile, e.getMessage(), e);
session.transfer(flowFile, getFailureRelationship());
}
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -141,60 +142,59 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final StopWatch stopWatch = new StopWatch(true);
final FlowFile finalFlowFile = flowFile;

ugi.doAs(new PrivilegedAction<Object>() {
@Override
public Object run() {
InputStream stream = null;
CompressionCodec codec = null;
Configuration conf = getConfiguration();
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
final CompressionType compressionType = getCompressionType(context);
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;

if(inferCompressionCodec) {
codec = compressionCodecFactory.getCodec(path);
} else if (compressionType != CompressionType.NONE) {
codec = getCompressionCodec(context, getConfiguration());
}
ugi.doAs((PrivilegedAction<Object>) () -> {
InputStream stream = null;
CompressionCodec codec = null;
Configuration conf = getConfiguration();
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
final CompressionType compressionType = getCompressionType(context);
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;

if (inferCompressionCodec) {
codec = compressionCodecFactory.getCodec(path);
} else if (compressionType != CompressionType.NONE) {
codec = getCompressionCodec(context, getConfiguration());
}

FlowFile flowFile = finalFlowFile;
final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
try {
final String outputFilename;
final String originalFilename = path.getName();
stream = hdfs.open(path, 16384);

// Check if compression codec is defined (inferred or otherwise)
if (codec != null) {
stream = codec.createInputStream(stream);
outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
} else {
outputFilename = originalFilename;
}

flowFile = session.importFrom(stream, finalFlowFile);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);

stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
flowFile = session.putAttribute(flowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, getSuccessRelationship());
} catch (final FileNotFoundException | AccessControlException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e});
flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, getFailureRelationship());
} catch (final IOException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, getCommsFailureRelationship());
} finally {
IOUtils.closeQuietly(stream);
FlowFile outgoingFlowFile = finalFlowFile;
final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
try {
final String outputFilename;
final String originalFilename = path.getName();
stream = hdfs.open(path, 16384);

// Check if compression codec is defined (inferred or otherwise)
if (codec != null) {
stream = codec.createInputStream(stream);
outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
} else {
outputFilename = originalFilename;
}

return null;
outgoingFlowFile = session.importFrom(stream, finalFlowFile);
outgoingFlowFile = session.putAttribute(outgoingFlowFile, CoreAttributes.FILENAME.key(), outputFilename);

stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[]{qualifiedPath, outgoingFlowFile, stopWatch.getDuration()});
outgoingFlowFile = session.putAttribute(outgoingFlowFile, HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
session.getProvenanceReporter().fetch(outgoingFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(outgoingFlowFile, getSuccessRelationship());
} catch (final FileNotFoundException | AccessControlException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{qualifiedPath, outgoingFlowFile, e});
outgoingFlowFile = session.putAttribute(outgoingFlowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
outgoingFlowFile = session.penalize(outgoingFlowFile);
session.transfer(outgoingFlowFile, getFailureRelationship());
} catch (final IOException e) {
if (!handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", qualifiedPath, outgoingFlowFile, e);
outgoingFlowFile = session.penalize(outgoingFlowFile);
session.transfer(outgoingFlowFile, getCommsFailureRelationship());
}
} finally {
IOUtils.closeQuietly(stream);
}

return null;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;

import java.io.IOException;
Expand Down Expand Up @@ -294,12 +295,12 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}
if (logEmptyListing.getAndDecrement() > 0) {
getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
new Object[]{millis, listedFiles.size(), newItems});
millis, listedFiles.size(), newItems);
}
}
} catch (IOException e) {
context.yield();
getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e});
handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler());
getLogger().warn("Error while retrieving list of files due to {}", e.getMessage(), e);
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -388,20 +389,20 @@ protected void processBatchOfFiles(final List<Path> files, final ProcessContext
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);

if (!keepSourceFiles && !getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.delete(file, false))) {
getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...",
new Object[]{file});
getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", file);
session.remove(flowFile);
continue;
}

session.getProvenanceReporter().receive(flowFile, file.toString());
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, file, millis, dataRate});
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", flowFile, file, millis, dataRate);
} catch (final Throwable t) {
getLogger().error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t});
session.rollback();
context.yield();
if (!handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
getLogger().error("Error retrieving file {} from HDFS due to {}", file, t);
session.rollback();
context.yield();
}
} finally {
IOUtils.closeQuietly(stream);
stream = null;
Expand Down

0 comments on commit 9fb018f

Please sign in to comment.