diff --git a/src/main/java/reactor/ipc/netty/resources/DefaultPoolResources.java b/src/main/java/reactor/ipc/netty/resources/DefaultPoolResources.java index 04673c368a..30a87e81d3 100644 --- a/src/main/java/reactor/ipc/netty/resources/DefaultPoolResources.java +++ b/src/main/java/reactor/ipc/netty/resources/DefaultPoolResources.java @@ -16,6 +16,7 @@ package reactor.ipc.netty.resources; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,7 +49,7 @@ ChannelPool newPool(Bootstrap b, ChannelHealthChecker checker); } - final ConcurrentMap channelPools; + final ConcurrentMap channelPools; final String name; final PoolFactory provider; @@ -63,25 +64,17 @@ public ChannelPool selectOrCreate(SocketAddress remote, Supplier bootstrap, Consumer onChannelCreate, EventLoopGroup group) { - SocketAddress address = remote; + SocketAddressHolder holder = new SocketAddressHolder(remote); for (; ; ) { - Pool pool = channelPools.get(remote); + Pool pool = channelPools.get(holder); if (pool != null) { return pool; } - Bootstrap b = bootstrap.get(); - if (remote != null) { - b = b.remoteAddress(remote); - } - else { - address = b.config() - .remoteAddress(); - } if (log.isDebugEnabled()) { - log.debug("New {} client pool for {}", name, address); + log.debug("New {} client pool for {}", name, remote); } - pool = new Pool(b, provider, onChannelCreate, group); - if (channelPools.putIfAbsent(address, pool) == null) { + pool = new Pool(bootstrap.get().remoteAddress(remote), provider, onChannelCreate, group); + if (channelPools.putIfAbsent(holder, pool) == null) { return pool; } pool.close(); @@ -195,7 +188,7 @@ public void dispose() { public Mono disposeLater() { return Mono.fromRunnable(() -> { Pool pool; - for (SocketAddress key: channelPools.keySet()) { + for (SocketAddressHolder key: channelPools.keySet()) { pool = channelPools.remove(key); if(pool != null){ pool.close(); @@ -213,4 +206,32 @@ public boolean isDisposed() { static final Logger log = Loggers.getLogger(DefaultPoolResources.class); + + final static class SocketAddressHolder { + final SocketAddress holder; + final String fqdn; + + SocketAddressHolder(SocketAddress holder) { + this.holder = holder; + this.fqdn = holder instanceof InetSocketAddress ? holder.toString() : null; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SocketAddressHolder that = (SocketAddressHolder) o; + + return holder.equals(that.holder) && + (fqdn != null ? fqdn.equals(that.fqdn) : that.fqdn == null); + } + + @Override + public int hashCode() { + int result = holder.hashCode(); + result = 31 * result + (fqdn != null ? fqdn.hashCode() : 0); + return result; + } + } }