Skip to content

Commit

Permalink
Reuse stale connections collection when closing stale connections #443
Browse files Browse the repository at this point in the history
Stale connections expiry now uses the same collection of connections to prevent stale expiry preparation and the actual closing to use different connections.
  • Loading branch information
mp911de committed Jan 21, 2017
1 parent 9bdc3c2 commit 7a38494
Showing 1 changed file with 29 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,22 @@ private void reconfigurePartitions() {
}

resetFastConnectionCache();
closeStaleConnections();
closeStaleConnections(staleConnections);
}

/**
* Close stale connections.
*/
@Override
public void closeStaleConnections() {
logger.debug("closeStaleConnections() count before expiring: {}", getConnectionCount());

Set<ConnectionKey> stale = getStaleConnectionKeys();
closeStaleConnections(stale);
}

protected void closeStaleConnections(Set<ConnectionKey> stale) {

logger.debug("closeStaleConnections() count before expiring: {}", getConnectionCount());

for (ConnectionKey connectionKey : stale) {
StatefulRedisConnection<K, V> connection = connections.get(connectionKey);
Expand Down Expand Up @@ -360,6 +365,7 @@ private Set<ConnectionKey> getStaleConnectionKeys() {
*/
@Override
public void setAutoFlushCommands(boolean autoFlush) {

synchronized (stateLock) {
this.autoFlushCommands = autoFlush;
}
Expand All @@ -378,6 +384,7 @@ public void flushCommands() {

@Override
public void setReadFrom(ReadFrom readFrom) {

synchronized (stateLock) {
this.readFrom = readFrom;
Arrays.fill(readers, null);
Expand All @@ -404,10 +411,10 @@ long getConnectionCount() {
* Synchronize on {@code stateLock} to initiate a happens-before relation and clear the thread caches of other threads.
*/
private void resetFastConnectionCache() {

synchronized (stateLock) {
Arrays.fill(writers, null);
Arrays.fill(readers, null);

}
}

Expand All @@ -417,12 +424,15 @@ private static RuntimeException invalidConnectionPoint(String message) {
}

Supplier<SocketAddress> getSocketAddressSupplier(final ConnectionKey connectionKey) {

return () -> {

if (connectionKey.nodeId != null) {
SocketAddress socketAddress = getSocketAddress(connectionKey.nodeId);
logger.debug("Resolved SocketAddress {} using for Cluster node {}", socketAddress, connectionKey.nodeId);
return socketAddress;
}

SocketAddress socketAddress = new InetSocketAddress(connectionKey.host, connectionKey.port);
logger.debug("Resolved SocketAddress {} using for Cluster node at {}:{}", socketAddress, connectionKey.host,
connectionKey.port);
Expand All @@ -431,6 +441,7 @@ Supplier<SocketAddress> getSocketAddressSupplier(final ConnectionKey connectionK
}

private SocketAddress getSocketAddress(String nodeId) {

for (RedisClusterNode partition : partitions) {
if (partition.getNodeId().equals(nodeId)) {
return SocketAddressResolver.resolve(partition.getUri(), redisClusterClient.getResources().dnsResolver());
Expand Down Expand Up @@ -465,6 +476,7 @@ public ConnectionKey(Intent intent, String host, int port) {

@Override
public boolean equals(Object o) {

if (this == o)
return true;
if (!(o instanceof ConnectionKey))
Expand All @@ -483,12 +495,26 @@ public boolean equals(Object o) {

@Override
public int hashCode() {

int result = intent != null ? intent.name().hashCode() : 0;
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
result = 31 * result + (host != null ? host.hashCode() : 0);
result = 31 * result + port;
return result;
}

@Override
public String toString() {

StringBuffer sb = new StringBuffer();
sb.append(getClass().getSimpleName());
sb.append(" [intent=").append(intent);
sb.append(", nodeId='").append(nodeId).append('\'');
sb.append(", host='").append(host).append('\'');
sb.append(", port=").append(port);
sb.append(']');
return sb.toString();
}
}

boolean validateClusterNodeMembership() {
Expand Down Expand Up @@ -588,6 +614,5 @@ public StatefulRedisConnection<K, V> apply(ConnectionKey key) {
}

interface ClusterNodeConnectionFactory<K, V> extends Function<ConnectionKey, StatefulRedisConnection<K, V>> {

}
}

0 comments on commit 7a38494

Please sign in to comment.