Skip to content

Commit

Permalink
NIFI-12231 FetchSmb supports Move and Delete Completion Strategies
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes apache#8617.
  • Loading branch information
turcsanyip authored and shubhluck committed Jun 1, 2024
1 parent adfa0d0 commit 5feff5e
Show file tree
Hide file tree
Showing 10 changed files with 482 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
*/
public interface SmbClientService extends AutoCloseable {

Stream<SmbListableEntity> listRemoteFiles(String path);
Stream<SmbListableEntity> listFiles(String directoryPath);

void createDirectory(String path);
void ensureDirectory(String directoryPath);

void readFile(String fileName, OutputStream outputStream) throws IOException;
void readFile(String filePath, OutputStream outputStream) throws IOException;

void moveFile(String filePath, String directoryPath);

void deleteFile(String filePath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,36 @@
*/
package org.apache.nifi.processors.smb;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableSet;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.smb.util.CompletionStrategy;
import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbClientService;
import org.apache.nifi.services.smb.SmbException;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableSet;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;

@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"samba", "smb", "cifs", "files", "fetch"})
@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.")
Expand All @@ -57,8 +59,8 @@ public class FetchSmb extends AbstractProcessor {
public static final String ERROR_CODE_ATTRIBUTE = "error.code";
public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";

public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
.Builder().name("remote-file")
public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor.Builder()
.name("remote-file")
.displayName("Remote File")
.description("The full path of the file to be retrieved from the remote server. Expression language is supported.")
.required(true)
Expand All @@ -67,91 +69,139 @@ public class FetchSmb extends AbstractProcessor {
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();

public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder()
public static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder()
.name("Completion Strategy")
.description("Specifies what to do with the original file on the server once it has been processed. If the Completion Strategy fails, a warning will be "
+ "logged but the data will still be transferred.")
.allowableValues(CompletionStrategy.class)
.defaultValue(CompletionStrategy.NONE)
.required(true)
.build();

public static final PropertyDescriptor DESTINATION_DIRECTORY = new PropertyDescriptor.Builder()
.name("Destination Directory")
.description("The directory on the remote server to move the original file to once it has been processed.")
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE)
.build();

public static final PropertyDescriptor CREATE_DESTINATION_DIRECTORY = new PropertyDescriptor.Builder()
.name("Create Destination Directory")
.description("Specifies whether or not the remote directory should be created if it does not exist.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE)
.build();

public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
.name("smb-client-provider-service")
.displayName("SMB Client Provider Service")
.description("Specifies the SMB client provider to use for creating SMB connections.")
.required(true)
.identifiesControllerService(SmbClientProviderService.class)
.build();

public static final Relationship REL_SUCCESS =
new Relationship.Builder()
.name("success")
.description("A flowfile will be routed here for each successfully fetched file.")
.description("A FlowFile will be routed here for each successfully fetched file.")
.build();

public static final Relationship REL_FAILURE =
new Relationship.Builder().name("failure")
.description(
"A flowfile will be routed here when failed to fetch its content.")
new Relationship.Builder()
.name("failure")
.description("A FlowFile will be routed here when failed to fetch its content.")
.build();

public static final Set<Relationship> RELATIONSHIPS = unmodifiableSet(new HashSet<>(asList(
REL_SUCCESS,
REL_FAILURE
)));
public static final String UNCATEGORIZED_ERROR = "-2";

private static final List<PropertyDescriptor> PROPERTIES = asList(
SMB_CLIENT_PROVIDER_SERVICE,
REMOTE_FILE
REMOTE_FILE,
COMPLETION_STRATEGY,
DESTINATION_DIRECTORY,
CREATE_DESTINATION_DIRECTORY
);

public static final String UNCATEGORIZED_ERROR = "-2";

@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

final SmbClientProviderService clientProviderService =
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);

try (SmbClientService client = clientProviderService.getClient()) {
fetchAndTransfer(session, context, client, flowFile);
} catch (Exception e) {
getLogger().error("Couldn't connect to SMB.", e);
flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
session.transfer(flowFile, REL_FAILURE);
}
final Map<String, String> attributes = flowFile.getAttributes();
final String filePath = context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue();

}
final SmbClientProviderService clientProviderService = context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
try (SmbClientService client = clientProviderService.getClient()) {
flowFile = session.write(flowFile, outputStream -> client.readFile(filePath, outputStream));

private void fetchAndTransfer(ProcessSession session, ProcessContext context, SmbClientService client,
FlowFile flowFile) {
final Map<String, String> attributes = flowFile.getAttributes();
final String filename = context.getProperty(REMOTE_FILE)
.evaluateAttributeExpressions(attributes).getValue();
try {
flowFile = session.write(flowFile, outputStream -> client.readFile(filename, outputStream));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
getLogger().error("Couldn't fetch file {}.", filename, e);
getLogger().error("Could not fetch file {}.", filePath, e);
flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e));
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, getErrorMessage(e));
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
session.transfer(flowFile, REL_FAILURE);
return;
}

session.commitAsync(() -> performCompletionStrategy(context, attributes));
}

private String getErrorCode(Exception exception) {
private String getErrorCode(final Exception exception) {
return Optional.ofNullable(exception instanceof SmbException ? (SmbException) exception : null)
.map(SmbException::getErrorCode)
.map(String::valueOf)
.orElse(UNCATEGORIZED_ERROR);
}

private String getErrorMessage(Exception exception) {
return Optional.ofNullable(exception.getMessage())
.orElse(exception.getClass().getSimpleName());
private void performCompletionStrategy(final ProcessContext context, final Map<String, String> attributes) {
final CompletionStrategy completionStrategy = context.getProperty(COMPLETION_STRATEGY).asAllowableValue(CompletionStrategy.class);

if (completionStrategy == CompletionStrategy.NONE) {
return;
}

final String filePath = context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue();

final SmbClientProviderService clientProviderService = context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);

try (SmbClientService client = clientProviderService.getClient()) {
if (completionStrategy == CompletionStrategy.MOVE) {
final String destinationDirectory = context.getProperty(DESTINATION_DIRECTORY).evaluateAttributeExpressions(attributes).getValue();
final boolean createDestinationDirectory = context.getProperty(CREATE_DESTINATION_DIRECTORY).asBoolean();

if (createDestinationDirectory) {
client.ensureDirectory(destinationDirectory);
}

client.moveFile(filePath, destinationDirectory);
} else if (completionStrategy == CompletionStrategy.DELETE) {
client.deleteFile(filePath);
}
} catch (Exception e) {
getLogger().warn("Could not perform completion strategy {} for file {}", completionStrategy, filePath, e);
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ private Stream<SmbListableEntity> performListing(ProcessContext context) throws
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
final String directory = getDirectory(context);
final SmbClientService clientService = clientProviderService.getClient();
return clientService.listRemoteFiles(directory).onClose(() -> {
return clientService.listFiles(directory).onClose(() -> {
try {
clientService.close();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.smb.util;

import org.apache.nifi.components.DescribedValue;

public enum CompletionStrategy implements DescribedValue {

NONE("None", "Leaves the file as-is."),
MOVE("Move File", "Moves the file to the specified directory on the remote system. This option cannot be used when DFS is enabled on 'SMB Client Provider Service'."),
DELETE("Delete File", "Deletes the file from the remote system.");

private final String displayName;
private final String description;

CompletionStrategy(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}

@Override
public String getValue() {
return name();
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}
}
Loading

0 comments on commit 5feff5e

Please sign in to comment.