-
Notifications
You must be signed in to change notification settings - Fork 946
/
AbstractClusterNodeConnectionFactory.java
97 lines (78 loc) · 3.32 KB
/
AbstractClusterNodeConnectionFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package io.lettuce.core.cluster;
import java.net.SocketAddress;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.resource.ClientResources;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
/**
* Supporting class for {@link ClusterNodeConnectionFactory} implementations.
* <p>
* Provides utility methods to resolve {@link SocketAddress} and {@link Partitions}.
*
* @author Mark Paluch
* @since 4.4
*/
abstract class AbstractClusterNodeConnectionFactory<K, V> implements ClusterNodeConnectionFactory<K, V> {
private static final InternalLogger logger = InternalLoggerFactory
.getInstance(PooledClusterConnectionProvider.DefaultClusterNodeConnectionFactory.class);
private final ClientResources clientResources;
private volatile Partitions partitions;
/**
* Create a new {@link AbstractClusterNodeConnectionFactory} given {@link ClientResources}.
*
* @param clientResources must not be {@code null}.
*/
public AbstractClusterNodeConnectionFactory(ClientResources clientResources) {
this.clientResources = clientResources;
}
public void setPartitions(Partitions partitions) {
this.partitions = partitions;
}
public Partitions getPartitions() {
return partitions;
}
/**
* Get a {@link Mono} of {@link SocketAddress} for a
* {@link io.lettuce.core.cluster.ClusterNodeConnectionFactory.ConnectionKey}.
* <p>
* This {@link Supplier} resolves the requested endpoint on each {@link Supplier#get()}.
*
* @param connectionKey must not be {@code null}.
* @return
*/
Mono<SocketAddress> getSocketAddressSupplier(ConnectionKey connectionKey) {
return Mono.fromCallable(() -> {
if (connectionKey.nodeId != null) {
SocketAddress socketAddress = getSocketAddress(connectionKey.nodeId);
logger.debug("Resolved SocketAddress {} using for Cluster node {}", socketAddress, connectionKey.nodeId);
return socketAddress;
}
SocketAddress socketAddress = resolve(RedisURI.create(connectionKey.host, connectionKey.port));
logger.debug("Resolved SocketAddress {} using for Cluster node at {}:{}", socketAddress, connectionKey.host,
connectionKey.port);
return socketAddress;
});
}
/**
* Get the {@link SocketAddress} for a {@code nodeId} from {@link Partitions}.
*
* @param nodeId
* @return the {@link SocketAddress}.
* @throws IllegalArgumentException if {@code nodeId} cannot be looked up.
*/
private SocketAddress getSocketAddress(String nodeId) {
for (RedisClusterNode partition : partitions) {
if (partition.getNodeId().equals(nodeId)) {
return resolve(partition.getUri());
}
}
throw new IllegalArgumentException(String.format("Cannot resolve a RedisClusterNode for nodeId %s", nodeId));
}
private SocketAddress resolve(RedisURI redisURI) {
return clientResources.socketAddressResolver().resolve(redisURI);
}
}