Skip to content

Commit

Permalink
NIFI-9689: When all FlowFiles in a FlowFile Queue are penalized, do n…
Browse files Browse the repository at this point in the history
…ot schedule the destination to run. Also expose this fact via the ConnectionStatusSnapshotDTO, as this allows the front-end to render this information to the user in order to avoid confusion when it appears that the Processor has data but does nothing

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes apache#5771
  • Loading branch information
markap14 authored and krisztina-zsihovszki committed Jun 27, 2022
1 parent f23020f commit 87f5ec6
Show file tree
Hide file tree
Showing 20 changed files with 235 additions and 58 deletions.
Expand Up @@ -44,6 +44,7 @@ public class ConnectionStatus implements Cloneable {
private long maxQueuedBytes;
private long totalQueuedDuration;
private long maxQueuedDuration;
private FlowFileAvailability flowFileAvailability;

public String getId() {
return id;
Expand Down Expand Up @@ -214,6 +215,14 @@ public void setMaxQueuedDuration(long maxQueuedDuration) {
this.maxQueuedDuration = maxQueuedDuration;
}

public FlowFileAvailability getFlowFileAvailability() {
return flowFileAvailability;
}

public void setFlowFileAvailability(final FlowFileAvailability availability) {
this.flowFileAvailability = availability;
}

@Override
public ConnectionStatus clone() {
final ConnectionStatus clonedObj = new ConnectionStatus();
Expand All @@ -230,6 +239,7 @@ public ConnectionStatus clone() {
clonedObj.sourceName = sourceName;
clonedObj.destinationId = destinationId;
clonedObj.destinationName = destinationName;
clonedObj.flowFileAvailability = flowFileAvailability;

if (predictions != null) {
clonedObj.setPredictions(predictions.clone());
Expand Down Expand Up @@ -265,6 +275,8 @@ public String toString() {
builder.append(backPressureDataSizeThreshold);
builder.append(", backPressureObjectThreshold=");
builder.append(backPressureObjectThreshold);
builder.append(", flowFileAvailability=");
builder.append(flowFileAvailability);
builder.append(", inputCount=");
builder.append(inputCount);
builder.append(", inputBytes=");
Expand Down
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.controller.status;

public enum FlowFileAvailability {
ACTIVE_QUEUE_EMPTY,

HEAD_OF_QUEUE_PENALIZED,

FLOWFILE_AVAILABLE;
}
Expand Up @@ -16,11 +16,12 @@
*/
package org.apache.nifi.controller.status;

import org.apache.nifi.registry.flow.VersionedFlowState;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.registry.flow.VersionedFlowState;

/**
*/
Expand Down Expand Up @@ -255,7 +256,6 @@ public void setBytesTransferred(long bytesTransferred) {

@Override
public ProcessGroupStatus clone() {

final ProcessGroupStatus clonedObj = new ProcessGroupStatus();

clonedObj.id = id;
Expand Down Expand Up @@ -447,6 +447,7 @@ public static void merge(final ProcessGroupStatus target, final ProcessGroupStat
merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());
merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
merged.setFlowFileAvailability(mergeFlowFileAvailability(merged.getFlowFileAvailability(), statusToMerge.getFlowFileAvailability()));
}
target.setConnectionStatus(mergedConnectionMap.values());

Expand Down Expand Up @@ -588,4 +589,26 @@ public static void merge(final ProcessGroupStatus target, final ProcessGroupStat

target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values());
}

public static FlowFileAvailability mergeFlowFileAvailability(final FlowFileAvailability availabilityA, final FlowFileAvailability availabilityB) {
if (availabilityA == availabilityB) {
return availabilityA;
}
if (availabilityA == null) {
return availabilityB;
}
if (availabilityB == null) {
return availabilityA;
}

if (availabilityA == FlowFileAvailability.FLOWFILE_AVAILABLE || availabilityB == FlowFileAvailability.FLOWFILE_AVAILABLE) {
return FlowFileAvailability.FLOWFILE_AVAILABLE;
}

if (availabilityA == FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED || availabilityB == FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED) {
return FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED;
}

return FlowFileAvailability.FLOWFILE_AVAILABLE;
}
}
Expand Up @@ -18,6 +18,7 @@

import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;

Expand Down Expand Up @@ -109,6 +110,11 @@ public interface FlowFileQueue {
*/
boolean isEmpty();

/**
* @return the FlowFile Availability for this queue
*/
FlowFileAvailability getFlowFileAvailability();

/**
* @return <code>true</code> if the queue is empty or contains only FlowFiles that already are being processed
* by others, <code>false</code> if the queue contains at least one FlowFile that is available for processing,
Expand Down
Expand Up @@ -48,6 +48,7 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
private String queuedCount;
private Integer percentUseCount;
private Integer percentUseBytes;
private String flowFileAvailability;

/* getters / setters */
/**
Expand Down Expand Up @@ -283,6 +284,15 @@ public void setPercentUseBytes(Integer percentUseBytes) {
this.percentUseBytes = percentUseBytes;
}

@ApiModelProperty("The availability of FlowFiles in this connection")
public String getFlowFileAvailability() {
return flowFileAvailability;
}

public void setFlowFileAvailability(final String availability) {
this.flowFileAvailability = availability;
}

@Override
public ConnectionStatusSnapshotDTO clone() {
final ConnectionStatusSnapshotDTO other = new ConnectionStatusSnapshotDTO();
Expand Down
Expand Up @@ -52,7 +52,7 @@ protected void mergeResponses(ConnectionStatusEntity clientEntity, Map<NodeIdent

final NodeIdentifier selectedNodeId = entityMap.entrySet().stream()
.filter(e -> e.getValue() == clientEntity)
.map(e -> e.getKey())
.map(Map.Entry::getKey)
.findFirst()
.orElse(null);

Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.nifi.cluster.manager;

import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.registry.flow.VersionedFlowState;
Expand Down Expand Up @@ -274,7 +276,7 @@ public static void merge(final ProcessGroupStatusSnapshotDTO target, final boole
}

private static <T> Collection<T> replaceNull(final Collection<T> collection) {
return (collection == null) ? Collections.<T>emptyList() : collection;
return (collection == null) ? Collections.emptyList() : collection;
}


Expand Down Expand Up @@ -490,6 +492,11 @@ public static void merge(final ConnectionStatusSnapshotDTO target, final boolean
target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());

final FlowFileAvailability targetFlowFileAvailability = target.getFlowFileAvailability() == null ? null : FlowFileAvailability.valueOf(target.getFlowFileAvailability());
final FlowFileAvailability toMergeFlowFileAvailability = toMerge.getFlowFileAvailability() == null ? null : FlowFileAvailability.valueOf(toMerge.getFlowFileAvailability());
final FlowFileAvailability mergedFlowFileAvailability = ProcessGroupStatus.mergeFlowFileAvailability(targetFlowFileAvailability, toMergeFlowFileAvailability);
target.setFlowFileAvailability(mergedFlowFileAvailability == null ? null : mergedFlowFileAvailability.name());

if (target.getPercentUseBytes() == null) {
target.setPercentUseBytes(toMerge.getPercentUseBytes());
} else if (toMerge.getPercentUseBytes() != null) {
Expand Down Expand Up @@ -543,14 +550,15 @@ public static void merge(final ConnectionStatusSnapshotDTO target, final boolean
}

private static long minNonNegative(long a, long b){
if(a < 0){
if (a < 0) {
return b;
}else if(b < 0){
} else if (b < 0) {
return a;
}else{
} else {
return Math.min(a, b);
}
}

public static void updatePrettyPrintedFields(final ConnectionStatusSnapshotDTO target) {
target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued()));
target.setQueuedCount(formatCount(target.getFlowFilesQueued()));
Expand Down
Expand Up @@ -247,6 +247,7 @@ ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStat
connStatus.setTotalQueuedDuration(conn.getFlowFileQueue().getTotalQueuedDuration(now));
long minLastQueueDate = conn.getFlowFileQueue().getMinLastQueueDate();
connStatus.setMaxQueuedDuration(minLastQueueDate == 0 ? 0 : now - minLastQueueDate);
connStatus.setFlowFileAvailability(conn.getFlowFileQueue().getFlowFileAvailability());

final FlowFileEvent connectionStatusReport = statusReport.getReportEntry(conn.getIdentifier());
if (connectionStatusReport != null) {
Expand Down
Expand Up @@ -16,18 +16,19 @@
*/
package org.apache.nifi.util;

import java.util.Collection;
import java.util.List;

import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.processor.Relationship;

import java.util.Collection;
import java.util.List;

public class Connectables {

public static boolean flowFilesQueued(final Connectable connectable) {
for (final Connection conn : connectable.getIncomingConnections()) {
if (!conn.getFlowFileQueue().isActiveQueueEmpty()) {
if (conn.getFlowFileQueue().getFlowFileAvailability() == FlowFileAvailability.FLOWFILE_AVAILABLE) {
return true;
}
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;
Expand Down Expand Up @@ -177,6 +178,11 @@ public boolean isEmpty() {
return queue.getFlowFileQueueSize().isEmpty();
}

@Override
public FlowFileAvailability getFlowFileAvailability() {
return queue.getFlowFileAvailability();
}

@Override
public boolean isActiveQueueEmpty() {
final FlowFileQueueSize queueSize = queue.getFlowFileQueueSize();
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
Expand Down Expand Up @@ -440,6 +441,31 @@ public boolean isActiveQueueEmpty() {
return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0;
}

public FlowFileAvailability getFlowFileAvailability() {
// If queue is empty, avoid obtaining a lock.
if (isActiveQueueEmpty()) {
return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
}

final FlowFileRecord top;
readLock.lock();
try {
top = activeQueue.peek();
} finally {
readLock.unlock("isFlowFileAvailable");
}

if (top == null) {
return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
}

if (top.isPenalized()) {
return FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED;
}

return FlowFileAvailability.FLOWFILE_AVAILABLE;
}

public void acknowledge(final FlowFileRecord flowFile) {
logger.trace("{} Acknowledging {}", this, flowFile);
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.IllegalClusterStateException;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
Expand Down Expand Up @@ -559,6 +560,11 @@ public boolean isEmpty() {
return size().getObjectCount() == 0;
}

@Override
public FlowFileAvailability getFlowFileAvailability() {
return localPartition.getFlowFileAvailability();
}

@Override
public boolean isActiveQueueEmpty() {
return localPartition.isActiveQueueEmpty();
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.nifi.controller.queue.clustered.partition;

import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.PollStrategy;
Expand All @@ -38,6 +39,11 @@ public interface LocalQueuePartition extends QueuePartition {
*/
boolean isActiveQueueEmpty();

/**
* @return the availability of FlowFiles in the queue
*/
FlowFileAvailability getFlowFileAvailability();

/**
* @return <code>true</code> if there is at least one FlowFile that has not yet been acknowledged, <code>false</code> if all FlowFiles have been acknowledged.
*/
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.DropFlowFileAction;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
Expand Down Expand Up @@ -102,6 +103,11 @@ public boolean isActiveQueueEmpty() {
return priorityQueue.isActiveQueueEmpty();
}

@Override
public FlowFileAvailability getFlowFileAvailability() {
return priorityQueue.getFlowFileAvailability();
}

@Override
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
return priorityQueue.poll(expiredRecords, getExpiration(), pollStrategy);
Expand Down

0 comments on commit 87f5ec6

Please sign in to comment.