Skip to content

Commit 13b4fac

Browse files
garyrussellartembilan
authored andcommitted
INT-3491: (S)FTP Delegation Session Factory
JIRA: https://jira.spring.io/browse/INT-3491 Select the session factory at runtime. Polishing; PR Comments Also add/remove factories.
1 parent b073688 commit 13b4fac

File tree

9 files changed

+518
-1
lines changed

9 files changed

+518
-1
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/CachingSessionFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525

2626
import org.springframework.beans.factory.DisposableBean;
2727
import org.springframework.integration.util.SimplePool;
28+
import org.springframework.util.Assert;
2829

2930
/**
3031
* A {@link SessionFactory} implementation that caches Sessions for reuse without
@@ -62,13 +63,17 @@ public CachingSessionFactory(SessionFactory<F> sessionFactory) {
6263
* Create a CachingSessionFactory with the specified session limit. By default, if
6364
* no sessions are available in the cache, and the size limit has been reached,
6465
* calling threads will block until a session is available.
66+
* <p>
67+
* Do not cache a {@link DelegatingSessionFactory}, cache each delegate therein instead.
6568
* @see #setSessionWaitTimeout(long)
6669
* @see #setPoolSize(int)
6770
*
6871
* @param sessionFactory The underlying session factory.
6972
* @param sessionCacheSize The maximum cache size.
7073
*/
7174
public CachingSessionFactory(SessionFactory<F> sessionFactory, int sessionCacheSize) {
75+
Assert.isTrue(!(sessionFactory instanceof DelegatingSessionFactory),
76+
"'sessionFactory' cannot be a 'DelegatingSessionFactory'; cache each delegate instead");
7277
this.sessionFactory = sessionFactory;
7378
this.pool = new SimplePool<Session<F>>(sessionCacheSize, new SimplePool.PoolItemCallback<Session<F>>() {
7479
@Override
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.integration.file.remote.session;
17+
18+
import java.util.Map;
19+
import java.util.concurrent.ConcurrentHashMap;
20+
21+
/**
22+
* The default implementation of {@link SessionFactoryLocator} using a simple map lookup
23+
* and an optional default to fall back on.
24+
*
25+
* @author Gary Russell
26+
* @since 4.2
27+
*
28+
*/
29+
public class DefaultSessionFactoryLocator<F> implements SessionFactoryLocator<F> {
30+
31+
private final Map<Object, SessionFactory<F>> factories = new ConcurrentHashMap<Object, SessionFactory<F>>();
32+
33+
private final SessionFactory<F> defaultFactory;
34+
35+
/**
36+
* @param factories A map of factories, keyed by lookup key.
37+
*/
38+
public DefaultSessionFactoryLocator(Map<Object, SessionFactory<F>> factories) {
39+
this(factories, null);
40+
}
41+
42+
/**
43+
* @param factories A map of factories, keyed by lookup key.
44+
* @param defaultFactory A default to be used if the lookup fails.
45+
*/
46+
public DefaultSessionFactoryLocator(Map<Object, SessionFactory<F>> factories, SessionFactory<F> defaultFactory) {
47+
this.factories.putAll(factories);
48+
this.defaultFactory = defaultFactory;
49+
}
50+
51+
/**
52+
* Add a session factory.
53+
* @param key the lookup key.
54+
* @param factory the factory.
55+
*/
56+
public void addSessionFactory(String key, SessionFactory<F> factory) {
57+
this.factories.put(key, factory);
58+
}
59+
60+
/**
61+
* Remove a session factory.
62+
* @param key the lookup key.
63+
* @return the factory, if it was present.
64+
*/
65+
public SessionFactory<F> removeSessionFactory(Object key) {
66+
return this.factories.remove(key);
67+
}
68+
69+
@Override
70+
public SessionFactory<F> getSessionFactory(Object key) {
71+
if (key == null) {
72+
return this.defaultFactory;
73+
}
74+
SessionFactory<F> factory = this.factories.get(key);
75+
return factory != null ? factory : this.defaultFactory;
76+
}
77+
78+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.integration.file.remote.session;
17+
18+
import java.util.Map;
19+
20+
import org.springframework.messaging.Message;
21+
import org.springframework.util.Assert;
22+
23+
/**
24+
* {@link SessionFactory} that delegates to a {@link SessionFactory} retrieved from a
25+
* {@link SessionFactoryLocator}.
26+
*
27+
* @author Gary Russell
28+
* @since 4.2
29+
*
30+
*/
31+
public class DelegatingSessionFactory<F> implements SessionFactory<F> {
32+
33+
private final SessionFactoryLocator<F> factoryLocator;
34+
35+
private final ThreadLocal<Object> threadKey = new ThreadLocal<Object>();
36+
37+
/**
38+
* Construct an instance with a {@link DefaultSessionFactoryLocator} using the
39+
* supplied factories and default key.
40+
* @param factories the factories.
41+
* @param defaultFactory the default to use if the lookup fails.
42+
*/
43+
public DelegatingSessionFactory(Map<Object, SessionFactory<F>> factories, SessionFactory<F> defaultFactory) {
44+
this(new DefaultSessionFactoryLocator<F>(factories, defaultFactory));
45+
}
46+
47+
/**
48+
* Construct an instance using the supplied factory.
49+
* @param factoryLocator the factory.
50+
*/
51+
public DelegatingSessionFactory(SessionFactoryLocator<F> factoryLocator) {
52+
Assert.notNull(factoryLocator, "'factoryFactory' cannot be null");
53+
this.factoryLocator = factoryLocator;
54+
}
55+
56+
/**
57+
* Return this factory's locator.
58+
* @return the locator.
59+
*/
60+
public SessionFactoryLocator<F> getFactoryLocator() {
61+
return factoryLocator;
62+
}
63+
64+
/**
65+
* Set a key to be used for {@link #getSession()} on this thread.
66+
* @param key the key.
67+
*/
68+
public void setThreadKey(Object key) {
69+
this.threadKey.set(key);
70+
}
71+
72+
/**
73+
* Clear the key for this thread.
74+
*/
75+
public void clearThreadKey() {
76+
this.threadKey.remove();
77+
}
78+
79+
/**
80+
* Messaging-friendly version of {@link #setThreadKey(Object)} that can be invoked from
81+
* a service activator.
82+
* @param message the message.
83+
* @param key the key.
84+
* @return the message (unchanged).
85+
*/
86+
public Message<?> setThreadKey(Message<?> message, Object key) {
87+
this.threadKey.set(key);
88+
return message;
89+
}
90+
91+
/**
92+
* Messaging-friendly version of {@link #clearThreadKey()} that can be invoked from
93+
* a service activator.
94+
* @param message the message.
95+
* @return the message (unchanged).
96+
*/
97+
public Message<?> clearThreadKey(Message<?> message) {
98+
this.threadKey.remove();
99+
return message;
100+
}
101+
102+
@Override
103+
public Session<F> getSession() {
104+
return getSession(this.threadKey.get());
105+
}
106+
107+
public Session<F> getSession(Object key) {
108+
SessionFactory<F> sessionFactory = this.factoryLocator.getSessionFactory(key);
109+
Assert.notNull(sessionFactory, "No default SessionFactory configured");
110+
return sessionFactory.getSession();
111+
}
112+
113+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.integration.file.remote.session;
17+
18+
/**
19+
*
20+
* A factory returning a {@link SessionFactory} based on some key.
21+
*
22+
* @author Gary Russell
23+
* @since 4.2
24+
*
25+
*/
26+
public interface SessionFactoryLocator<F> {
27+
28+
/**
29+
* Return a {@link SessionFactory} for the key.
30+
* @param key the key.
31+
* @return the session factory.
32+
*/
33+
SessionFactory<F> getSessionFactory(Object key);
34+
35+
}

0 commit comments

Comments
 (0)