Skip to content

Commit

Permalink
NIFI-5147, NIFI-5566 Added CryptographicHashAttribute and Cryptograph…
Browse files Browse the repository at this point in the history
…icHashContent processors.

Deprecated HashContent processor.
Added documentation to HashAttribute processor.
Added shared HashService and HashAlgorithm enum.
Added unit tests.
Added #clearProperties() to TestRunner, StandardProcessorTestRunner, and MockProcessContext.
Updated processor manifest.
Updated Javadoc.
Added documentation about deprecated/renamed components.
Added logic handling for UTF-16 encoding where UTF-16BE is overridden so the BOM is not inserted prior to hashing.

This closes apache#2983.

Co-authored-by: Otto Fowler <ottobackwards@gmail.com>
Signed-off-by: Kevin Doran <kdoran@apache.org>
  • Loading branch information
alopresto and ottobackwards committed Sep 18, 2018
1 parent 07ad132 commit cd68774
Show file tree
Hide file tree
Showing 16 changed files with 2,087 additions and 38 deletions.
Expand Up @@ -209,6 +209,13 @@ public boolean removeProperty(final String property) {
return false;
}

public void clearProperties() {
Map<PropertyDescriptor, String> properties = getProperties();
for (Map.Entry<PropertyDescriptor, String> e : properties.entrySet()) {
removeProperty(e.getKey());
}
}

@Override
public void yield() {
yieldCalled = true;
Expand Down
Expand Up @@ -824,6 +824,11 @@ public boolean removeProperty(String property) {
return context.removeProperty(property);
}

@Override
public void clearProperties() {
context.clearProperties();
}

@Override
public List<ProvenanceEventRecord> getProvenanceEvents() {
return sharedState.getProvenanceEvents();
Expand Down
5 changes: 5 additions & 0 deletions nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
Expand Up @@ -864,6 +864,11 @@ public interface TestRunner {
*/
boolean removeProperty(String property);

/**
* Clears all set properties from the {@link ProcessContext}.
*/
void clearProperties();

/**
* Returns a {@link List} of all {@link ProvenanceEventRecord}s that were
* emitted by the Processor
Expand Down
@@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.processors.standard;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.crypto.HashAlgorithm;
import org.apache.nifi.security.util.crypto.HashService;

@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"attributes", "hash", "md5", "sha", "keccak", "blake2", "cryptography"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Calculates a hash value for each of the specified attributes using the given algorithm and writes it to an output attribute. Please refer to https://csrc.nist.gov/Projects/Hash-Functions/NIST-Policy-on-Hash-Functions for help to decide which algorithm to use. ")
@WritesAttribute(attribute = "<Specified Attribute Name per Dynamic Property>", description = "This Processor adds an attribute whose value is the result of "
+ "hashing the specified attribute. The name of this attribute is specified by the value of the dynamic property.")
@DynamicProperty(name = "A flowfile attribute key for attribute inspection", value = "Attribute Name",
description = "The property name defines the attribute to look for and hash in the incoming flowfile. "
+ "The property value defines the name to give the generated attribute. "
+ "Attribute names must be unique.")
public class CryptographicHashAttribute extends AbstractProcessor {
public enum PartialAttributePolicy {
ALLOW,
PROHIBIT
}

private static final AllowableValue ALLOW_PARTIAL_ATTRIBUTES_VALUE = new AllowableValue(PartialAttributePolicy.ALLOW.name(),
"Allow missing attributes",
"Do not route to failure if there are attributes configured for hashing that are not present in the flowfile");

private static final AllowableValue FAIL_PARTIAL_ATTRIBUTES_VALUE = new AllowableValue(PartialAttributePolicy.PROHIBIT.name(),
"Fail if missing attributes",
"Route to failure if there are attributes configured for hashing that are not present in the flowfile");

static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("character_set")
.displayName("Character Set")
.description("The Character Set used to decode the attribute being hashed -- this applies to the incoming data encoding, not the resulting hash encoding. ")
.required(true)
.allowableValues(HashService.buildCharacterSetAllowableValues())
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();

static final PropertyDescriptor FAIL_WHEN_EMPTY = new PropertyDescriptor.Builder()
.name("fail_when_empty")
.displayName("Fail when no attributes present")
.description("Route to failure when none of the attributes that are configured for hashing are found. " +
"If set to false, then flow files that do not contain any of the attributes that are configured for hashing will just pass through to success.")
.allowableValues("true", "false")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("true")
.build();

static final PropertyDescriptor HASH_ALGORITHM = new PropertyDescriptor.Builder()
.name("hash_algorithm")
.displayName("Hash Algorithm")
.description("The cryptographic hash algorithm to use. Note that not all of the algorithms available are recommended for use (some are provided for legacy use). " +
"There are many things to consider when picking an algorithm; it is recommended to use the most secure algorithm possible.")
.required(true)
.allowableValues(HashService.buildHashAlgorithmAllowableValues())
.defaultValue(HashAlgorithm.SHA256.getName())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

static final PropertyDescriptor PARTIAL_ATTR_ROUTE_POLICY = new PropertyDescriptor.Builder()
.name("missing_attr_policy")
.displayName("Missing attribute policy")
.description("Policy for how the processor handles attributes that are configured for hashing but are not found in the flowfile.")
.required(true)
.allowableValues(ALLOW_PARTIAL_ATTRIBUTES_VALUE, FAIL_PARTIAL_ATTRIBUTES_VALUE)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.defaultValue(ALLOW_PARTIAL_ATTRIBUTES_VALUE.getValue())
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Used for flowfiles that have a hash value added")
.build();

public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Used for flowfiles that are missing required attributes")
.build();
private final static Set<Relationship> relationships;

private final static List<PropertyDescriptor> properties;

private final AtomicReference<Map<String, String>> attributeToGenerateNameMapRef = new AtomicReference<>(Collections.emptyMap());

static {
final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_FAILURE);
_relationships.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(_relationships);

final List<PropertyDescriptor> _properties = new ArrayList<>();
_properties.add(CHARACTER_SET);
_properties.add(FAIL_WHEN_EMPTY);
_properties.add(HASH_ALGORITHM);
_properties.add(PARTIAL_ATTR_ROUTE_POLICY);
properties = Collections.unmodifiableList(_properties);
}

@Override
public Set<Relationship> getRelationships() {
return relationships;
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
}

@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (descriptor.isRequired()) {
return;
}

final Map<String, String> attributeToGeneratedNameMap = new HashMap<>(attributeToGenerateNameMapRef.get());
if (newValue == null) {
attributeToGeneratedNameMap.remove(descriptor.getName());
} else {
attributeToGeneratedNameMap.put(descriptor.getName(), newValue);
}

attributeToGenerateNameMapRef.set(Collections.unmodifiableMap(attributeToGeneratedNameMap));
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final Map<String, String> attributeToGeneratedNameMap = attributeToGenerateNameMapRef.get();
final ComponentLog logger = getLogger();

final SortedMap<String, String> relevantAttributes = getRelevantAttributes(flowFile, attributeToGeneratedNameMap);
if (relevantAttributes.isEmpty()) {
if (context.getProperty(FAIL_WHEN_EMPTY).asBoolean()) {
logger.info("Routing {} to 'failure' because of missing all attributes: {}", new Object[]{flowFile, getMissingKeysString(null, attributeToGeneratedNameMap.keySet())});
session.transfer(flowFile, REL_FAILURE);
return;
}
}
if (relevantAttributes.size() != attributeToGeneratedNameMap.size()) {
if (PartialAttributePolicy.valueOf(context.getProperty(PARTIAL_ATTR_ROUTE_POLICY).getValue()) == PartialAttributePolicy.PROHIBIT) {
logger.info("Routing {} to 'failure' because of missing attributes: {}", new Object[]{flowFile,
getMissingKeysString(relevantAttributes.keySet(), attributeToGeneratedNameMap.keySet())});
session.transfer(flowFile, REL_FAILURE);
return;
}
}

// Determine the algorithm to use
final String algorithmName = context.getProperty(HASH_ALGORITHM).getValue();
logger.debug("Using algorithm {}", new Object[]{algorithmName});
HashAlgorithm algorithm = HashAlgorithm.fromName(algorithmName);

// Generate a hash with the configured algorithm for each attribute value
// and create a new attribute with the configured name
for (final Map.Entry<String, String> entry : relevantAttributes.entrySet()) {
logger.debug("Generating {} hash of attribute '{}'", new Object[]{algorithmName, entry.getKey()});
String value = hashValue(algorithm, entry.getValue(), charset);
session.putAttribute(flowFile, attributeToGeneratedNameMap.get(entry.getKey()), value);
}
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
}

private static SortedMap<String, String> getRelevantAttributes(final FlowFile flowFile, final Map<String, String> attributeToGeneratedNameMap) {
final SortedMap<String, String> attributeMap = new TreeMap<>();
for (final Map.Entry<String, String> entry : attributeToGeneratedNameMap.entrySet()) {
final String attributeName = entry.getKey();
final String attributeValue = flowFile.getAttribute(attributeName);
if (attributeValue != null) {
attributeMap.put(attributeName, attributeValue);
}
}
return attributeMap;
}

private String hashValue(HashAlgorithm algorithm, String value, Charset charset) {
if (value == null) {
getLogger().warn("Tried to calculate {} hash of null value; returning empty string", new Object[]{algorithm.getName()});
return "";
}
return HashService.hashValue(algorithm, value, charset);
}

private static String getMissingKeysString(Set<String> foundKeys, Set<String> wantedKeys) {
final StringBuilder missingKeys = new StringBuilder();
for (final String wantedKey : wantedKeys) {
if (foundKeys == null || !foundKeys.contains(wantedKey)) {
missingKeys.append(wantedKey).append(" ");
}
}
return missingKeys.toString();
}
}

0 comments on commit cd68774

Please sign in to comment.