From 10ce00862d1479dee1444d0da9cc48bbbd83729f Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Mon, 18 Mar 2024 11:04:32 +0100 Subject: [PATCH] NIFI-12837 Added DFS support in SMB processors Signed-off-by: Pierre Villard This closes #8527. --- .../nifi/processors/smb/GetSmbFile.java | 2 + .../nifi/processors/smb/PutSmbFile.java | 2 + .../apache/nifi/processors/smb/SmbDfsIT.java | 229 ++++++++++++++++++ .../smb/SmbjClientProviderService.java | 83 +++---- ...ClientIT.java => SmbjClientServiceIT.java} | 4 +- ...ntTest.java => SmbjClientServiceTest.java} | 8 +- .../org/apache/nifi/smb/common/SmbClient.java | 92 +++++++ .../apache/nifi/smb/common/SmbProperties.java | 9 + .../org/apache/nifi/smb/common/SmbUtils.java | 7 +- 9 files changed, 376 insertions(+), 60 deletions(-) create mode 100644 nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SmbDfsIT.java rename nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/{NiFiSmbjClientIT.java => SmbjClientServiceIT.java} (99%) rename nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/{NiFiSmbjClientTest.java => SmbjClientServiceTest.java} (92%) create mode 100644 nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbClient.java diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java index ed843f9bff84..f1649a4faaf2 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java @@ -80,6 +80,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS; import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT; import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT; import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION; @@ -257,6 +258,7 @@ protected void init(final ProcessorInitializationContext context) { descriptors.add(IGNORE_HIDDEN_FILES); descriptors.add(SMB_DIALECT); descriptors.add(USE_ENCRYPTION); + descriptors.add(ENABLE_DFS); descriptors.add(TIMEOUT); this.descriptors = Collections.unmodifiableList(descriptors); diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java index 468828f81988..af2eab2ff1bc 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java @@ -64,6 +64,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS; import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT; import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT; import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION; @@ -194,6 +195,7 @@ protected void init(final ProcessorInitializationContext context) { descriptors.add(RENAME_SUFFIX); descriptors.add(SMB_DIALECT); descriptors.add(USE_ENCRYPTION); + descriptors.add(ENABLE_DFS); descriptors.add(TIMEOUT); this.descriptors = Collections.unmodifiableList(descriptors); diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SmbDfsIT.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SmbDfsIT.java new file mode 100644 index 000000000000..6fe51b3eb6bd --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SmbDfsIT.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.services.smb.SmbjClientProviderService; +import org.apache.nifi.smb.common.SmbProperties; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.processor.util.list.AbstractListProcessor.NO_TRACKING; +import static org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE; +import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME; +import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD; +import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT; +import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE; +import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME; +import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS; +import static org.apache.nifi.util.TestRunners.newTestRunner; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class SmbDfsIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(SmbDfsIT.class); + + private static final int DEFAULT_SMB_PORT = 445; + + // DFS works only on the default SMB port (445). Not sure if it is a generic DFS vs Samba DFS constraint, or an issue in the smbj client library. + private final GenericContainer sambaContainer = new FixedHostPortGenericContainer<>("dperson/samba") + .withFixedExposedPort(DEFAULT_SMB_PORT, DEFAULT_SMB_PORT) + .waitingFor(Wait.forListeningPort()) + .withLogConsumer(new Slf4jLogConsumer(LOGGER)) + .withCommand("-u", "myuser;mypass", + "-s", "share;/share-dir;;no;no;myuser;;;", + "-s", "dfs-share;/dfs-share-dir;;no;no;myuser;;;", + "-p", + "-g", "host msdfs = yes", + "-G", "dfs-share;msdfs root = yes"); + + @BeforeEach + void beforeEach() throws Exception { + sambaContainer.start(); + + sambaContainer.execInContainer("ln", "-s", "msdfs:" + sambaContainer.getHost() + "\\share", "/dfs-share-dir/dfs-link"); + Thread.sleep(100); + } + + @AfterEach + void afterEach() { + sambaContainer.stop(); + } + + @Test + void testFetchSmb() throws Exception { + writeFile("fetch_file", "fetch_content"); + + TestRunner testRunner = newTestRunner(FetchSmb.class); + testRunner.setProperty(FetchSmb.REMOTE_FILE, "dfs-link/fetch_file"); + SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner); + + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertTransferCount(FetchSmb.REL_SUCCESS, 1); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchSmb.REL_SUCCESS).get(0); + assertEquals("fetch_content", flowFile.getContent()); + + testRunner.disableControllerService(smbjClientProviderService); + } + + @Test + void testFetchFileFailsWhenDfsIsDisabled() throws Exception { + writeFile("fetch_file", "fetch_content"); + + TestRunner testRunner = newTestRunner(FetchSmb.class); + testRunner.setProperty(FetchSmb.REMOTE_FILE, "dfs-link/fetch_file"); + SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, false); + + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertTransferCount(FetchSmb.REL_FAILURE, 1); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchSmb.REL_FAILURE).get(0); + assertEquals(0, flowFile.getSize()); + + testRunner.disableControllerService(smbjClientProviderService); + } + + @Test + void testListSmbWithDfsLink() throws Exception { + testListSmb("dfs-link"); + } + + @Test + @Disabled("Listing folders recursively from the DFS root or a parent directory of the DFS link does not work on Samba due to https://github.com/hierynomus/smbj/issues/717#") + void testListSmbWithDfsRoot() throws Exception { + testListSmb(null); + } + + private void testListSmb(String directory) throws Exception { + writeFile("list_file", "list_content"); + + TestRunner testRunner = newTestRunner(ListSmb.class); + if (directory != null) { + testRunner.setProperty(ListSmb.DIRECTORY, directory); + } + testRunner.setProperty(ListSmb.LISTING_STRATEGY, NO_TRACKING); + testRunner.setProperty(ListSmb.MINIMUM_AGE, "0 ms"); + SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner); + + testRunner.run(); + + testRunner.assertTransferCount(ListSmb.REL_SUCCESS, 1); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(ListSmb.REL_SUCCESS).get(0); + assertEquals(0, flowFile.getSize()); + assertEquals("dfs-link", flowFile.getAttribute(CoreAttributes.PATH.key())); + assertEquals("list_file", flowFile.getAttribute(CoreAttributes.FILENAME.key())); + + testRunner.disableControllerService(smbjClientProviderService); + } + + @Test + void testPutSmbFile() { + TestRunner testRunner = newTestRunner(PutSmbFile.class); + testRunner.setProperty(PutSmbFile.HOSTNAME, sambaContainer.getHost()); + testRunner.setProperty(PutSmbFile.SHARE, "dfs-share"); + testRunner.setProperty(PutSmbFile.DIRECTORY, "dfs-link"); + testRunner.setProperty(PutSmbFile.USERNAME, "myuser"); + testRunner.setProperty(PutSmbFile.PASSWORD, "mypass"); + testRunner.setProperty(SmbProperties.ENABLE_DFS, "true"); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "put_file"); + + testRunner.enqueue("put_content", attributes); + testRunner.run(); + + testRunner.assertTransferCount(PutSmbFile.REL_SUCCESS, 1); + + String fileContent = readFile("put_file"); + assertEquals("put_content", fileContent); + } + + @Test + void testGetSmbFile() { + writeFile("get_file", "get_content"); + + TestRunner testRunner = newTestRunner(GetSmbFile.class); + testRunner.setProperty(GetSmbFile.HOSTNAME, sambaContainer.getHost()); + testRunner.setProperty(GetSmbFile.SHARE, "dfs-share"); + testRunner.setProperty(GetSmbFile.DIRECTORY, "dfs-link"); + testRunner.setProperty(GetSmbFile.USERNAME, "myuser"); + testRunner.setProperty(GetSmbFile.PASSWORD, "mypass"); + testRunner.setProperty(SmbProperties.ENABLE_DFS, "true"); + + testRunner.run(); + + testRunner.assertTransferCount(GetSmbFile.REL_SUCCESS, 1); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetSmbFile.REL_SUCCESS).get(0); + assertEquals("get_content", flowFile.getContent()); + assertEquals("dfs-link", flowFile.getAttribute(CoreAttributes.PATH.key())); + assertEquals("get_file", flowFile.getAttribute(CoreAttributes.FILENAME.key())); + } + + private SmbjClientProviderService configureSmbClient(TestRunner testRunner) throws InitializationException { + return configureSmbClient(testRunner, true); + } + + private SmbjClientProviderService configureSmbClient(TestRunner testRunner, boolean enableDfs) throws InitializationException { + SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService(); + + testRunner.addControllerService("client-provider", smbjClientProviderService); + + testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider"); + + testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost()); + testRunner.setProperty(smbjClientProviderService, PORT, Integer.toString(DEFAULT_SMB_PORT)); + testRunner.setProperty(smbjClientProviderService, USERNAME, "myuser"); + testRunner.setProperty(smbjClientProviderService, PASSWORD, "mypass"); + testRunner.setProperty(smbjClientProviderService, SHARE, "dfs-share"); + testRunner.setProperty(smbjClientProviderService, ENABLE_DFS, Boolean.toString(enableDfs)); + + testRunner.enableControllerService(smbjClientProviderService); + + return smbjClientProviderService; + } + + private void writeFile(String filename, String content) { + String containerPath = "/share-dir/" + filename; + sambaContainer.copyFileToContainer(Transferable.of(content), containerPath); + } + + private String readFile(String filename) { + String containerPath = "/share-dir/" + filename; + return sambaContainer.copyFileFromContainer(containerPath, is -> IOUtils.toString(is, StandardCharsets.UTF_8)); + } + +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java index 080be061993d..ebffb816ec80 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java @@ -39,6 +39,7 @@ import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; +import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS; import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT; import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT; import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION; @@ -112,6 +113,7 @@ public class SmbjClientProviderService extends AbstractControllerService impleme DOMAIN, SMB_DIALECT, USE_ENCRYPTION, + ENABLE_DFS, TIMEOUT )); @@ -122,24 +124,37 @@ public class SmbjClientProviderService extends AbstractControllerService impleme private String shareName; @Override - public SmbClientService getClient() throws IOException { - Connection connection = null; + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } - try { - connection = smbClient.connect(hostname, port); - return connectToShare(connection); - } catch (IOException e) { - getLogger().debug("Closing stale connection and trying to create a new one for share " + getServiceLocation()); + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.hostname = context.getProperty(HOSTNAME).getValue(); + this.port = context.getProperty(PORT).asInteger(); + this.shareName = context.getProperty(SHARE).getValue(); + this.smbClient = buildSmbClient(context); + createAuthenticationContext(context); + } - closeConnection(connection); - unregisterHost(); + @OnDisabled + public void onDisabled() { + smbClient.close(); + smbClient = null; + hostname = null; + port = 0; + shareName = null; + } - connection = smbClient.connect(hostname, port); - return connectToShare(connection); - } + @Override + public URI getServiceLocation() { + return URI.create(String.format("smb://%s:%d/%s", hostname, port, shareName)); } - private SmbjClientService connectToShare(final Connection connection) throws IOException { + @Override + public SmbClientService getClient() throws IOException { + final Connection connection = smbClient.connect(hostname, port); + final Session session; final Share share; @@ -164,20 +179,6 @@ private SmbjClientService connectToShare(final Connection connection) throws IOE return new SmbjClientService(session, (DiskShare) share, getServiceLocation()); } - private void unregisterHost() { - smbClient.getServerList().unregister(hostname); - } - - private void closeConnection(final Connection connection) { - try { - if (connection != null) { - connection.close(true); - } - } catch (Exception e) { - getLogger().error("Could not close connection to {}", getServiceLocation(), e); - } - } - private void closeSession(final Session session) { try { if (session != null) { @@ -188,34 +189,6 @@ private void closeSession(final Session session) { } } - @Override - public URI getServiceLocation() { - return URI.create(String.format("smb://%s:%d/%s", hostname, port, shareName)); - } - - @OnEnabled - public void onEnabled(final ConfigurationContext context) { - this.hostname = context.getProperty(HOSTNAME).getValue(); - this.port = context.getProperty(PORT).asInteger(); - this.shareName = context.getProperty(SHARE).getValue(); - this.smbClient = buildSmbClient(context); - createAuthenticationContext(context); - } - - @OnDisabled - public void onDisabled() { - smbClient.close(); - smbClient = null; - hostname = null; - port = 0; - shareName = null; - } - - @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTIES; - } - private void createAuthenticationContext(final ConfigurationContext context) { if (context.getProperty(USERNAME).isSet()) { final String userName = context.getProperty(USERNAME).getValue(); diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java similarity index 99% rename from nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java rename to nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java index a030abc36383..59278e54c5f5 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java @@ -54,7 +54,7 @@ import org.testcontainers.images.builder.Transferable; import org.testcontainers.utility.DockerImageName; -public class NiFiSmbjClientIT { +public class SmbjClientServiceIT { private final static Logger sambaContainerLogger = LoggerFactory.getLogger("sambaContainer"); private final static Logger toxyProxyLogger = LoggerFactory.getLogger("toxiProxy"); @@ -62,7 +62,7 @@ public class NiFiSmbjClientIT { private final Network network = Network.newNetwork(); private final GenericContainer sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba")) - .withExposedPorts(139, 445) + .withExposedPorts(445) .waitingFor(Wait.forListeningPort()) .withLogConsumer(new Slf4jLogConsumer(sambaContainerLogger)) .withNetwork(network) diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java similarity index 92% rename from nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java rename to nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java index f00b505beacf..dba330fbc8de 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java @@ -28,7 +28,11 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -class NiFiSmbjClientTest { +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class SmbjClientServiceTest { @Mock Session session; @@ -60,4 +64,4 @@ public void shouldCreateDirectoriesRecursively() throws Exception { } -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbClient.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbClient.java new file mode 100644 index 000000000000..92c173c10e89 --- /dev/null +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbClient.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.smb.common; + +import com.hierynomus.mssmb2.messages.SMB2Echo; +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.connection.Connection; +import com.hierynomus.smbj.event.ConnectionClosed; +import com.hierynomus.smbj.event.SMBEventBus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Extends {@link com.hierynomus.smbj.SMBClient} with connection health check. + *
+ * Workaround to https://github.com/hierynomus/smbj/issues/796. + *

+ * Health check method: + *
    + *
  • get connection from the parent class
  • + *
  • if it is a newly created connection, then return it
  • + *
  • if it is an old connection, send ECHO message to the server + *
      + *
    • if ECHO succeeds, return the connection
    • + *
    • if ECHO fails, unregister the connection, get connection again (which creates a new one) and return it
    • + *
    + *
  • + *
+ */ +class SmbClient extends SMBClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(SmbClient.class); + + private SMBEventBus bus; + + private SmbClient(final SmbConfig config, final SMBEventBus bus) { + super(config, bus); + } + + static SmbClient create(final SmbConfig config) { + final SMBEventBus bus = new SMBEventBus(); + + final SmbClient client = new SmbClient(config, bus); + + client.bus = bus; + + return client; + } + + public Connection connect(final String hostname) throws IOException { + return connect(hostname, DEFAULT_PORT); + } + + public synchronized Connection connect(final String hostname, final int port) throws IOException { + final Connection connection = super.connect(hostname, port); + + try { + // SMB2 ECHO message can only be sent if this is not a new connection (and health check is only needed in this case) + if (!connection.release()) { + connection.send(new SMB2Echo(connection.getNegotiatedProtocol().getDialect())).get(10, TimeUnit.SECONDS); + } + + // set lease counter back + connection.lease(); + + return connection; + } catch (Exception e) { + LOGGER.info("Stale connection found, unregistering it and creating a new one"); + bus.publish(new ConnectionClosed(hostname, port)); + } + + return super.connect(hostname, port); + } +} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java index 5b474fcc940a..b79d68790f9d 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java @@ -44,6 +44,15 @@ public class SmbProperties { .defaultValue("false") .build(); + public static final PropertyDescriptor ENABLE_DFS = new PropertyDescriptor.Builder() + .name("enable-dfs") + .displayName("Enable DFS") + .description("Enables accessing Distributed File System (DFS) and following DFS links during SMB operations.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() .displayName("Timeout") .name("timeout") diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java index 0895abfae06b..b705c5c38a5a 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java @@ -21,6 +21,7 @@ import org.apache.nifi.context.PropertyContext; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS; import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT; import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT; import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION; @@ -32,7 +33,7 @@ private SmbUtils() { } public static SMBClient buildSmbClient(final PropertyContext context) { - return new SMBClient(buildSmbConfig(context)); + return SmbClient.create(buildSmbConfig(context)); } static SmbConfig buildSmbConfig(final PropertyContext context) { @@ -50,6 +51,10 @@ static SmbConfig buildSmbConfig(final PropertyContext context) { configBuilder.withEncryptData(context.getProperty(USE_ENCRYPTION).asBoolean()); } + if (context.getProperty(ENABLE_DFS).isSet()) { + configBuilder.withDfsEnabled(context.getProperty(ENABLE_DFS).asBoolean()); + } + if (context.getProperty(TIMEOUT).isSet()) { configBuilder.withTimeout(context.getProperty(TIMEOUT).asTimePeriod(MILLISECONDS), MILLISECONDS); }