Skip to content

Commit

Permalink
Extended key for the connection pool cache
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Feb 8, 2018
1 parent 6e73dfe commit d9a1793
Showing 1 changed file with 36 additions and 15 deletions.
51 changes: 36 additions & 15 deletions src/main/java/reactor/ipc/netty/resources/DefaultPoolResources.java
Expand Up @@ -16,6 +16,7 @@

package reactor.ipc.netty.resources;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -48,7 +49,7 @@ ChannelPool newPool(Bootstrap b,
ChannelHealthChecker checker);
}

final ConcurrentMap<SocketAddress, Pool> channelPools;
final ConcurrentMap<SocketAddressHolder, Pool> channelPools;
final String name;
final PoolFactory provider;

Expand All @@ -63,25 +64,17 @@ public ChannelPool selectOrCreate(SocketAddress remote,
Supplier<? extends Bootstrap> bootstrap,
Consumer<? super Channel> onChannelCreate,
EventLoopGroup group) {
SocketAddress address = remote;
SocketAddressHolder holder = new SocketAddressHolder(remote);
for (; ; ) {
Pool pool = channelPools.get(remote);
Pool pool = channelPools.get(holder);
if (pool != null) {
return pool;
}
Bootstrap b = bootstrap.get();
if (remote != null) {
b = b.remoteAddress(remote);
}
else {
address = b.config()
.remoteAddress();
}
if (log.isDebugEnabled()) {
log.debug("New {} client pool for {}", name, address);
log.debug("New {} client pool for {}", name, remote);
}
pool = new Pool(b, provider, onChannelCreate, group);
if (channelPools.putIfAbsent(address, pool) == null) {
pool = new Pool(bootstrap.get().remoteAddress(remote), provider, onChannelCreate, group);
if (channelPools.putIfAbsent(holder, pool) == null) {
return pool;
}
pool.close();
Expand Down Expand Up @@ -195,7 +188,7 @@ public void dispose() {
public Mono<Void> disposeLater() {
return Mono.fromRunnable(() -> {
Pool pool;
for (SocketAddress key: channelPools.keySet()) {
for (SocketAddressHolder key: channelPools.keySet()) {
pool = channelPools.remove(key);
if(pool != null){
pool.close();
Expand All @@ -213,4 +206,32 @@ public boolean isDisposed() {

static final Logger log = Loggers.getLogger(DefaultPoolResources.class);


final static class SocketAddressHolder {
final SocketAddress holder;
final String fqdn;

SocketAddressHolder(SocketAddress holder) {
this.holder = holder;
this.fqdn = holder instanceof InetSocketAddress ? holder.toString() : null;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SocketAddressHolder that = (SocketAddressHolder) o;

return holder.equals(that.holder) &&
(fqdn != null ? fqdn.equals(that.fqdn) : that.fqdn == null);
}

@Override
public int hashCode() {
int result = holder.hashCode();
result = 31 * result + (fqdn != null ? fqdn.hashCode() : 0);
return result;
}
}
}

0 comments on commit d9a1793

Please sign in to comment.