diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/CachingSessionFactory.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/CachingSessionFactory.java index 9314a0177f1..3c1c7971c2e 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/CachingSessionFactory.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/CachingSessionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.integration.util.SimplePool; +import org.springframework.util.Assert; /** * A {@link SessionFactory} implementation that caches Sessions for reuse without @@ -62,6 +63,8 @@ public CachingSessionFactory(SessionFactory sessionFactory) { * Create a CachingSessionFactory with the specified session limit. By default, if * no sessions are available in the cache, and the size limit has been reached, * calling threads will block until a session is available. + *

+ * Do not cache a {@link DelegatingSessionFactory}, cache each delegate therein instead. * @see #setSessionWaitTimeout(long) * @see #setPoolSize(int) * @@ -69,6 +72,8 @@ public CachingSessionFactory(SessionFactory sessionFactory) { * @param sessionCacheSize The maximum cache size. */ public CachingSessionFactory(SessionFactory sessionFactory, int sessionCacheSize) { + Assert.isTrue(!(sessionFactory instanceof DelegatingSessionFactory), + "'sessionFactory' cannot be a 'DelegatingSessionFactory'; cache each delegate instead"); this.sessionFactory = sessionFactory; this.pool = new SimplePool>(sessionCacheSize, new SimplePool.PoolItemCallback>() { @Override diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/DefaultSessionFactoryLocator.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/DefaultSessionFactoryLocator.java new file mode 100644 index 00000000000..556800088f7 --- /dev/null +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/DefaultSessionFactoryLocator.java @@ -0,0 +1,78 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.file.remote.session; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The default implementation of {@link SessionFactoryLocator} using a simple map lookup + * and an optional default to fall back on. + * + * @author Gary Russell + * @since 4.2 + * + */ +public class DefaultSessionFactoryLocator implements SessionFactoryLocator { + + private final Map> factories = new ConcurrentHashMap>(); + + private final SessionFactory defaultFactory; + + /** + * @param factories A map of factories, keyed by lookup key. + */ + public DefaultSessionFactoryLocator(Map> factories) { + this(factories, null); + } + + /** + * @param factories A map of factories, keyed by lookup key. + * @param defaultFactory A default to be used if the lookup fails. + */ + public DefaultSessionFactoryLocator(Map> factories, SessionFactory defaultFactory) { + this.factories.putAll(factories); + this.defaultFactory = defaultFactory; + } + + /** + * Add a session factory. + * @param key the lookup key. + * @param factory the factory. + */ + public void addSessionFactory(String key, SessionFactory factory) { + this.factories.put(key, factory); + } + + /** + * Remove a session factory. + * @param key the lookup key. + * @return the factory, if it was present. + */ + public SessionFactory removeSessionFactory(Object key) { + return this.factories.remove(key); + } + + @Override + public SessionFactory getSessionFactory(Object key) { + if (key == null) { + return this.defaultFactory; + } + SessionFactory factory = this.factories.get(key); + return factory != null ? factory : this.defaultFactory; + } + +} diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/DelegatingSessionFactory.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/DelegatingSessionFactory.java new file mode 100644 index 00000000000..2d02c58cf79 --- /dev/null +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/DelegatingSessionFactory.java @@ -0,0 +1,113 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.file.remote.session; + +import java.util.Map; + +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * {@link SessionFactory} that delegates to a {@link SessionFactory} retrieved from a + * {@link SessionFactoryLocator}. + * + * @author Gary Russell + * @since 4.2 + * + */ +public class DelegatingSessionFactory implements SessionFactory { + + private final SessionFactoryLocator factoryLocator; + + private final ThreadLocal threadKey = new ThreadLocal(); + + /** + * Construct an instance with a {@link DefaultSessionFactoryLocator} using the + * supplied factories and default key. + * @param factories the factories. + * @param defaultFactory the default to use if the lookup fails. + */ + public DelegatingSessionFactory(Map> factories, SessionFactory defaultFactory) { + this(new DefaultSessionFactoryLocator(factories, defaultFactory)); + } + + /** + * Construct an instance using the supplied factory. + * @param factoryLocator the factory. + */ + public DelegatingSessionFactory(SessionFactoryLocator factoryLocator) { + Assert.notNull(factoryLocator, "'factoryFactory' cannot be null"); + this.factoryLocator = factoryLocator; + } + + /** + * Return this factory's locator. + * @return the locator. + */ + public SessionFactoryLocator getFactoryLocator() { + return factoryLocator; + } + + /** + * Set a key to be used for {@link #getSession()} on this thread. + * @param key the key. + */ + public void setThreadKey(Object key) { + this.threadKey.set(key); + } + + /** + * Clear the key for this thread. + */ + public void clearThreadKey() { + this.threadKey.remove(); + } + + /** + * Messaging-friendly version of {@link #setThreadKey(Object)} that can be invoked from + * a service activator. + * @param message the message. + * @param key the key. + * @return the message (unchanged). + */ + public Message setThreadKey(Message message, Object key) { + this.threadKey.set(key); + return message; + } + + /** + * Messaging-friendly version of {@link #clearThreadKey()} that can be invoked from + * a service activator. + * @param message the message. + * @return the message (unchanged). + */ + public Message clearThreadKey(Message message) { + this.threadKey.remove(); + return message; + } + + @Override + public Session getSession() { + return getSession(this.threadKey.get()); + } + + public Session getSession(Object key) { + SessionFactory sessionFactory = this.factoryLocator.getSessionFactory(key); + Assert.notNull(sessionFactory, "No default SessionFactory configured"); + return sessionFactory.getSession(); + } + +} diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/SessionFactoryLocator.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/SessionFactoryLocator.java new file mode 100644 index 00000000000..cb5785dc788 --- /dev/null +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/SessionFactoryLocator.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.file.remote.session; + +/** + * + * A factory returning a {@link SessionFactory} based on some key. + * + * @author Gary Russell + * @since 4.2 + * + */ +public interface SessionFactoryLocator { + + /** + * Return a {@link SessionFactory} for the key. + * @param key the key. + * @return the session factory. + */ + SessionFactory getSessionFactory(Object key); + +} diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/remote/session/DelegatingSessionFactoryTests.java b/spring-integration-file/src/test/java/org/springframework/integration/file/remote/session/DelegatingSessionFactoryTests.java new file mode 100644 index 00000000000..61d3202b089 --- /dev/null +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/remote/session/DelegatingSessionFactoryTests.java @@ -0,0 +1,193 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.file.remote.session; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.ImportResource; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.file.remote.AbstractFileInfo; +import org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * @author Gary Russell + * @since 4.2 + * + */ +@ContextConfiguration +@RunWith(SpringJUnit4ClassRunner.class) +public class DelegatingSessionFactoryTests { + + @Autowired + TestSessionFactory foo; + + @Autowired + TestSessionFactory bar; + + @Autowired + DelegatingSessionFactory dsf; + + @Autowired + MessageChannel in; + + @Autowired + PollableChannel out; + + @Autowired + DefaultSessionFactoryLocator sessionFactoryLocator; + + @Test + public void testDelegates() { + assertEquals(foo.mockSession, this.dsf.getSession("foo")); + assertEquals(bar.mockSession, this.dsf.getSession("bar")); + assertEquals(bar.mockSession, this.dsf.getSession("junk")); + assertEquals(bar.mockSession, this.dsf.getSession()); + this.dsf.setThreadKey("foo"); + assertEquals(foo.mockSession, this.dsf.getSession("foo")); + this.dsf.clearThreadKey(); + TestSessionFactory factory = new TestSessionFactory(); + this.sessionFactoryLocator.addSessionFactory("baz", factory); + this.dsf.setThreadKey("baz"); + assertEquals(factory.mockSession, this.dsf.getSession("baz")); + this.dsf.clearThreadKey(); + assertSame(factory, sessionFactoryLocator.removeSessionFactory("baz")); + } + + @Test + public void testFlow() throws Exception { + in.send(new GenericMessage("foo")); + Message received = out.receive(0); + assertNotNull(received); + verify(foo.mockSession).list("foo/"); + assertNull(TestUtils.getPropertyValue(dsf, "threadKey", ThreadLocal.class).get()); + } + + @Configuration + @ImportResource("classpath:/org/springframework/integration/file/remote/session/delegating-session-factory-context.xml") + @EnableIntegration + public static class Config { + + @Bean + TestSessionFactory foo() { + return new TestSessionFactory(); + } + + @Bean + TestSessionFactory bar() { + return new TestSessionFactory(); + } + + @Bean + DelegatingSessionFactory dsf() { + SessionFactoryLocator sff = sessionFactoryLocator(); + return new DelegatingSessionFactory(sff); + } + + @Bean + public SessionFactoryLocator sessionFactoryLocator() { + Map> factories = new HashMap>(); + factories.put("foo", foo()); + TestSessionFactory bar = bar(); + factories.put("bar", bar); + SessionFactoryLocator sff = new DefaultSessionFactoryLocator(factories, bar); + return sff; + } + + @ServiceActivator(inputChannel="c1") + @Bean + MessageHandler handler() { + AbstractRemoteFileOutboundGateway gateway = new AbstractRemoteFileOutboundGateway(dsf(), "ls", "payload") { + + @Override + protected boolean isDirectory(String file) { + return false; + } + + @Override + protected boolean isLink(String file) { + return false; + } + + @Override + protected String getFilename(String file) { + return file; + } + + @Override + protected String getFilename(AbstractFileInfo file) { + return file.getFilename(); + } + + @Override + protected long getModified(String file) { + return 0; + } + + @Override + protected List> asFileInfoList(Collection files) { + return null; + } + + @Override + protected String enhanceNameWithSubDirectory(String file, String directory) { + return null; + } + }; + gateway.setOutputChannelName("c2"); + gateway.setOptions("-1"); + return gateway; + } + + } + + private static class TestSessionFactory implements SessionFactory { + + @SuppressWarnings("unchecked") + private final Session mockSession = mock(Session.class); + + @Override + public Session getSession() { + return this.mockSession ; + } + + } + +} diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/remote/session/delegating-session-factory-context.xml b/spring-integration-file/src/test/java/org/springframework/integration/file/remote/session/delegating-session-factory-context.xml new file mode 100644 index 00000000000..63333b37b35 --- /dev/null +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/remote/session/delegating-session-factory-context.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + diff --git a/src/reference/asciidoc/ftp.adoc b/src/reference/asciidoc/ftp.adoc index 4ed581e22f5..9ec6197303e 100644 --- a/src/reference/asciidoc/ftp.adoc +++ b/src/reference/asciidoc/ftp.adoc @@ -121,6 +121,39 @@ public class AdvancedFtpSessionFactory extends DefaultFtpSessionFactory { } ---- +[[ftp-dsf]] +=== Delegating Session Factory + +_Version 4.2_ introduced the `DelegatingSessionFactory` which allows the selection of the actual session factory at +runtime. +Prior to invoking the ftp endpoint, call `setThreadKey()` on the factory to associate a key with the current thread. +That key is then used to lookup the actual session factory to be used. +The key can be cleared by calling `clearThreadKey()` after use. + +Convenience methods have been added so this can easily be done from a message flow: + +[source, xml] +---- + + + + + + + + + + + + + +---- + +IMPORTANT: When using session caching (see <>), each of the delegates should be cached; you +cannot cache the `DelegatingSessionFactory` itself. + [[ftp-inbound]] === FTP Inbound Channel Adapter diff --git a/src/reference/asciidoc/sftp.adoc b/src/reference/asciidoc/sftp.adoc index 26ee8c08dc6..3a7af62a95f 100644 --- a/src/reference/asciidoc/sftp.adoc +++ b/src/reference/asciidoc/sftp.adoc @@ -159,6 +159,39 @@ Defaults to `0`, which means, that no timeout will occur. The remote user to use. _Mandatory_. +[[sftp-dsf]] +=== Delegating Session Factory + +_Version 4.2_ introduced the `DelegatingSessionFactory` which allows the selection of the actual session factory at +runtime. +Prior to invoking the ftp endpoint, call `setThreadKey()` on the factory to associate a key with the current thread. +That key is then used to lookup the actual session factory to be used. +The key can be cleared by calling `clearThreadKey()` after use. + +Convenience methods have been added so this can easily be done from a message flow: + +[source, xml] +---- + + + + + + + + + + + + + +---- + +IMPORTANT: When using session caching (see <>), each of the delegates should be cached; you +cannot cache the `DelegatingSessionFactory` itself. + [[sftp-session-caching]] === SFTP Session Caching diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index f99c1988ea0..288ed33561e 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -270,6 +270,13 @@ occur after part of the request is completed. If such a condition occurs, a `PartialSuccessException` is thrown containing the partial results. See <> and <> for more information. +===== Delegating Session Factory + +A delegating session factory is now available, enabling the selection of a particular session factory based on some +thread context value. + +See <> and <> for more information. + ==== Websocket Changes `WebSocketHandlerDecoratorFactory` support has been added to the `ServerWebSocketContainer`