-
Notifications
You must be signed in to change notification settings - Fork 945
/
ConnectionPoolSupport.java
239 lines (196 loc) · 8.97 KB
/
ConnectionPoolSupport.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package io.lettuce.core.support;
import static io.lettuce.core.support.ConnectionWrapping.HasTargetConnection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.commons.pool2.impl.SoftReferenceObjectPool;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.support.ConnectionWrapping.Origin;
/**
* Connection pool support for {@link GenericObjectPool} and {@link SoftReferenceObjectPool}. Connection pool creation requires
* a {@link Supplier} that creates Redis connections. The pool can allocate either wrapped or direct connections.
* <ul>
* <li>Wrapped instances will return the connection back to the pool when called {@link StatefulConnection#close()}.</li>
* <li>Regular connections need to be returned to the pool with {@link GenericObjectPool#returnObject(Object)}</li>
* </ul>
* <p>
* Lettuce connections are designed to be thread-safe so one connection can be shared amongst multiple threads and Lettuce
* connections {@link ClientOptions#isAutoReconnect() auto-reconnect} by default. Connection pooling with Lettuce can be
* required when you're invoking Redis operations in multiple threads and you use
* <ul>
* <li>blocking commands such as {@code BLPOP}.</li>
* <li>transactions {@code BLPOP}.</li>
* <li>{@link StatefulConnection#setAutoFlushCommands(boolean) command batching}.</li>
* </ul>
*
* Transactions and command batching affect connection state. Blocking commands won't propagate queued commands to Redis until
* the blocking command is completed.
*
* <h3>Example</h3>
*
* <pre class="code">
* // application initialization
* RedisClusterClient clusterClient = RedisClusterClient.create(RedisURI.create(host, port));
* GenericObjectPool<StatefulRedisClusterConnection<String, String>> pool = ConnectionPoolSupport
* .createGenericObjectPool(() -> clusterClient.connect(), new GenericObjectPoolConfig());
*
* // executing work
* try (StatefulRedisClusterConnection<String, String> connection = pool.borrowObject()) {
* // perform some work
* }
*
* // terminating
* pool.close();
* clusterClient.shutdown();
* </pre>
*
* @author Mark Paluch
* @since 4.3
*/
public abstract class ConnectionPoolSupport {
private ConnectionPoolSupport() {
}
/**
* Creates a new {@link GenericObjectPool} using the {@link Supplier}. Allocated instances are wrapped and must not be
* returned with {@link ObjectPool#returnObject(Object)}.
*
* @param connectionSupplier must not be {@code null}.
* @param config must not be {@code null}.
* @param <T> connection type.
* @return the connection pool.
*/
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config) {
return createGenericObjectPool(connectionSupplier, config, true);
}
/**
* Creates a new {@link GenericObjectPool} using the {@link Supplier}.
*
* @param connectionSupplier must not be {@code null}.
* @param config must not be {@code null}.
* @param wrapConnections {@code false} to return direct connections that need to be returned to the pool using
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connection that are returned to the pool
* when invoking {@link StatefulConnection#close()}.
* @param <T> connection type.
* @return the connection pool.
*/
@SuppressWarnings("unchecked")
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
Supplier<T> connectionSupplier, GenericObjectPoolConfig<T> config, boolean wrapConnections) {
LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");
AtomicReference<Origin<T>> poolRef = new AtomicReference<>();
GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {
@Override
public T borrowObject() throws Exception {
return wrapConnections ? ConnectionWrapping.wrapConnection(super.borrowObject(), poolRef.get())
: super.borrowObject();
}
@Override
public void returnObject(T obj) {
if (wrapConnections && obj instanceof HasTargetConnection) {
super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection());
return;
}
super.returnObject(obj);
}
};
poolRef.set(new ObjectPoolWrapper<>(pool));
return pool;
}
/**
* Creates a new {@link SoftReferenceObjectPool} using the {@link Supplier}. Allocated instances are wrapped and must not be
* returned with {@link ObjectPool#returnObject(Object)}.
*
* @param connectionSupplier must not be {@code null}.
* @param <T> connection type.
* @return the connection pool.
*/
public static <T extends StatefulConnection<?, ?>> SoftReferenceObjectPool<T> createSoftReferenceObjectPool(
Supplier<T> connectionSupplier) {
return createSoftReferenceObjectPool(connectionSupplier, true);
}
/**
* Creates a new {@link SoftReferenceObjectPool} using the {@link Supplier}.
*
* @param connectionSupplier must not be {@code null}.
* @param wrapConnections {@code false} to return direct connections that need to be returned to the pool using
* {@link ObjectPool#returnObject(Object)}. {@code true} to return wrapped connection that are returned to the pool
* when invoking {@link StatefulConnection#close()}.
* @param <T> connection type.
* @return the connection pool.
*/
@SuppressWarnings("unchecked")
public static <T extends StatefulConnection<?, ?>> SoftReferenceObjectPool<T> createSoftReferenceObjectPool(
Supplier<T> connectionSupplier, boolean wrapConnections) {
LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
AtomicReference<Origin<T>> poolRef = new AtomicReference<>();
SoftReferenceObjectPool<T> pool = new SoftReferenceObjectPool<T>(new RedisPooledObjectFactory<>(connectionSupplier)) {
@Override
public synchronized T borrowObject() throws Exception {
return wrapConnections ? ConnectionWrapping.wrapConnection(super.borrowObject(), poolRef.get())
: super.borrowObject();
}
@Override
public synchronized void returnObject(T obj) throws Exception {
if (wrapConnections && obj instanceof HasTargetConnection) {
super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection());
return;
}
super.returnObject(obj);
}
};
poolRef.set(new ObjectPoolWrapper<>(pool));
return pool;
}
/**
* @author Mark Paluch
* @since 4.3
*/
private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>> extends BasePooledObjectFactory<T> {
private final Supplier<T> connectionSupplier;
RedisPooledObjectFactory(Supplier<T> connectionSupplier) {
this.connectionSupplier = connectionSupplier;
}
@Override
public T create() throws Exception {
return connectionSupplier.get();
}
@Override
public void destroyObject(PooledObject<T> p) throws Exception {
p.getObject().close();
}
@Override
public PooledObject<T> wrap(T obj) {
return new DefaultPooledObject<>(obj);
}
@Override
public boolean validateObject(PooledObject<T> p) {
return p.getObject().isOpen();
}
}
private static class ObjectPoolWrapper<T> implements Origin<T> {
private static final CompletableFuture<Void> COMPLETED = CompletableFuture.completedFuture(null);
private final ObjectPool<T> pool;
ObjectPoolWrapper(ObjectPool<T> pool) {
this.pool = pool;
}
@Override
public void returnObject(T o) throws Exception {
pool.returnObject(o);
}
@Override
public CompletableFuture<Void> returnObjectAsync(T o) throws Exception {
pool.returnObject(o);
return COMPLETED;
}
}
}