Skip to content

Commit

Permalink
fix #407 Put more information into the key for pooled connections
Browse files Browse the repository at this point in the history
Take into account SSL, Proxy, Logging, Compression and Protocol
when generating the pooled connection key
  • Loading branch information
violetagg committed Oct 18, 2018
1 parent fec0c1e commit 56ae431
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 52 deletions.
85 changes: 67 additions & 18 deletions src/main/java/reactor/netty/channel/BootstrapHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.logging.LoggingHandler;
import reactor.core.Exceptions;
import reactor.netty.ConnectionObserver;
Expand Down Expand Up @@ -428,24 +429,7 @@ static ChannelHandler updateConfiguration(@Nullable ChannelHandler handler,

static BiConsumer<ConnectionObserver, ? super Channel> logConfiguration(LoggingHandler handler, boolean debugSsl) {
Objects.requireNonNull(handler, "loggingHandler");
return (listener, channel) -> {
if (channel.pipeline().get(NettyPipeline.SslHandler) != null) {
if (debugSsl) {
channel.pipeline()
.addBefore(NettyPipeline.SslHandler,
NettyPipeline.SslLoggingHandler,
new LoggingHandler("reactor.netty.tcp.ssl"));
}
channel.pipeline()
.addAfter(NettyPipeline.SslHandler,
NettyPipeline.LoggingHandler,
handler);
}
else {
channel.pipeline()
.addFirst(NettyPipeline.LoggingHandler, handler);
}
};
return new LoggingHandlerSupportConsumer(handler, debugSsl);
}

@ChannelHandler.Sharable
Expand Down Expand Up @@ -510,6 +494,24 @@ static final class PipelineConfiguration {
this.deferredConsumer = deferredConsumer;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PipelineConfiguration that = (PipelineConfiguration) o;
return Objects.equals(consumer, that.consumer) &&
Objects.equals(name, that.name) &&
Objects.equals(deferredConsumer, that.deferredConsumer);
}

@Override
public int hashCode() {
return Objects.hash(consumer, name, deferredConsumer);
}
}

static final class BootstrapPipelineHandler extends ArrayList<PipelineConfiguration>
Expand Down Expand Up @@ -554,4 +556,51 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
static final ChannelOption<ChannelOperations.OnSetup> OPS_OPTION = ChannelOption.newInstance("ops_factory");
static final ChannelOption<ConnectionObserver> OBSERVER_OPTION = ChannelOption.newInstance("connectionObserver");


static final class LoggingHandlerSupportConsumer
implements BiConsumer<ConnectionObserver, Channel> {

final ChannelHandler handler;
final boolean debugSsl;

LoggingHandlerSupportConsumer(ChannelHandler handler, boolean debugSsl) {
this.handler = handler;
this.debugSsl = debugSsl;
}

@Override
public void accept(ConnectionObserver connectionObserver, Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
if (pipeline.get(NettyPipeline.SslHandler) != null) {
if (debugSsl) {
pipeline.addBefore(NettyPipeline.SslHandler,
NettyPipeline.SslLoggingHandler,
new LoggingHandler("reactor.netty.tcp.ssl"));
}
pipeline.addAfter(NettyPipeline.SslHandler,
NettyPipeline.LoggingHandler,
handler);
}
else {
pipeline.addFirst(NettyPipeline.LoggingHandler, handler);
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LoggingHandlerSupportConsumer that = (LoggingHandlerSupportConsumer) o;
return Objects.equals(handler, that.handler);
}

@Override
public int hashCode() {
return Objects.hash(handler);
}
}
}
26 changes: 23 additions & 3 deletions src/main/java/reactor/netty/http/client/HttpClientConnect.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void subscribe(CoreSubscriber<? super Connection> actual) {
}
if ((configuration.protocols & HttpClientConfiguration.h11) == HttpClientConfiguration.h11) {
BootstrapHandlers.updateConfiguration(b, NettyPipeline.HttpInitializer,
new Http1Initializer(handler));
new Http1Initializer(handler, configuration.protocols));
// return;
}
// if ((configuration.protocols & HttpClientConfiguration.h2) == HttpClientConfiguration.h2) {
Expand Down Expand Up @@ -267,7 +267,7 @@ public void subscribe(CoreSubscriber<? super Connection> actual) {
if ((configuration.protocols & HttpClientConfiguration.h11) == HttpClientConfiguration.h11) {
BootstrapHandlers.updateConfiguration(b,
NettyPipeline.HttpInitializer,
new Http1Initializer(handler));
new Http1Initializer(handler, configuration.protocols));
// return;
}
// if ((configuration.protocols & HttpClientConfiguration.h2c) == HttpClientConfiguration.h2c) {
Expand Down Expand Up @@ -680,9 +680,11 @@ static final class Http1Initializer
implements BiConsumer<ConnectionObserver, Channel> {

final HttpClientHandler handler;
final int protocols;

Http1Initializer(HttpClientHandler handler) {
Http1Initializer(HttpClientHandler handler, int protocols) {
this.handler = handler;
this.protocols = protocols;
}

@Override
Expand All @@ -697,6 +699,24 @@ public void accept(ConnectionObserver listener, Channel channel) {
new HttpContentDecompressor());
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Http1Initializer that = (Http1Initializer) o;
return handler.compress == that.handler.compress &&
protocols == that.protocols;
}

@Override
public int hashCode() {
return Objects.hash(handler.compress, protocols);
}
}

// static final class H2Initializer
Expand Down
28 changes: 15 additions & 13 deletions src/main/java/reactor/netty/resources/PooledConnectionProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -31,6 +32,7 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
Expand Down Expand Up @@ -125,8 +127,9 @@ public Mono<Connection> acquire(Bootstrap b) {
ConnectionObserver obs = BootstrapHandlers.connectionObserver(bootstrap);

NewConnectionProvider.convertLazyRemoteAddress(bootstrap);
PoolKey holder = new PoolKey(bootstrap.config()
.remoteAddress());
ChannelHandler handler = bootstrap.config().handler();
PoolKey holder = new PoolKey(bootstrap.config().remoteAddress(),
handler != null ? handler.hashCode() : -1);

Pool pool;
for (; ; ) {
Expand Down Expand Up @@ -622,12 +625,14 @@ public final void operationComplete(Future<Channel> f) throws Exception {

final static class PoolKey {

final SocketAddress holder;
final String fqdn;
final SocketAddress holder;
final int pipelineKey;
final String fqdn;

PoolKey(SocketAddress holder) {
PoolKey(SocketAddress holder, int pipelineKey) {
this.holder = holder;
this.fqdn = holder instanceof InetSocketAddress ? holder.toString() : null;
this.pipelineKey = pipelineKey;
}

@Override
Expand All @@ -638,18 +643,15 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

PoolKey that = (PoolKey) o;

return holder.equals(that.holder) && (fqdn != null ? fqdn.equals(that.fqdn) :
that.fqdn == null);
PoolKey poolKey = (PoolKey) o;
return pipelineKey == poolKey.pipelineKey &&
Objects.equals(holder, poolKey.holder) &&
Objects.equals(fqdn, poolKey.fqdn);
}

@Override
public int hashCode() {
int result = holder.hashCode();
result = 31 * result + (fqdn != null ? fqdn.hashCode() : 0);
return result;
return Objects.hash(holder, pipelineKey, fqdn);
}
}
}
21 changes: 21 additions & 0 deletions src/main/java/reactor/netty/tcp/ProxyProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,27 @@ public String toString() {
return "ProxyProvider{" + asDetailedString() + "}";
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProxyProvider that = (ProxyProvider) o;
return Objects.equals(username, that.username) &&
Objects.equals(password, that.password) &&
Objects.equals(getAddress(), that.getAddress()) &&
Objects.equals(getNonProxyHosts(), that.getNonProxyHosts()) &&
getType() == that.getType();
}

@Override
public int hashCode() {
return Objects.hash(username, password, getAddress(), getNonProxyHosts(), getType());
}

static final class Build implements TypeSpec, AddressSpec, Builder {
String username;
Function<? super String, ? extends String> password;
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/reactor/netty/tcp/SslProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ public interface DefaultConfigurationSpec {
final long closeNotifyFlushTimeoutMillis;
final long closeNotifyReadTimeoutMillis;
final Consumer<? super SslHandler> handlerConfigurator;
final int builderHashCode;

SslProvider(SslProvider.Build builder) {
this.sslContextBuilder = builder.sslCtxBuilder;
Expand All @@ -319,6 +320,7 @@ public interface DefaultConfigurationSpec {
this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis;
this.closeNotifyFlushTimeoutMillis = builder.closeNotifyFlushTimeoutMillis;
this.closeNotifyReadTimeoutMillis = builder.closeNotifyReadTimeoutMillis;
this.builderHashCode = builder.hashCode();
}

SslProvider(SslProvider from, Consumer<? super SslHandler> handlerConfigurator) {
Expand All @@ -337,6 +339,7 @@ public interface DefaultConfigurationSpec {
this.handshakeTimeoutMillis = from.handshakeTimeoutMillis;
this.closeNotifyFlushTimeoutMillis = from.closeNotifyFlushTimeoutMillis;
this.closeNotifyReadTimeoutMillis = from.closeNotifyReadTimeoutMillis;
this.builderHashCode = from.builderHashCode;
}

SslProvider(SslProvider from, DefaultConfigurationType type) {
Expand All @@ -357,6 +360,7 @@ public interface DefaultConfigurationSpec {
this.handshakeTimeoutMillis = from.handshakeTimeoutMillis;
this.closeNotifyFlushTimeoutMillis = from.closeNotifyFlushTimeoutMillis;
this.closeNotifyReadTimeoutMillis = from.closeNotifyReadTimeoutMillis;
this.builderHashCode = from.builderHashCode;
}

void updateDefaultConfiguration() {
Expand Down Expand Up @@ -424,6 +428,22 @@ public String toString() {
return "SslProvider{" + asDetailedString() + "}";
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SslProvider that = (SslProvider) o;
return builderHashCode == that.builderHashCode;
}

@Override
public int hashCode() {
return Objects.hash(builderHashCode);
}

static final class Build implements SslContextSpec, DefaultConfigurationSpec, Builder {

Expand Down Expand Up @@ -526,6 +546,30 @@ public final Builder closeNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMil
public SslProvider build() {
return new SslProvider(this);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Build build = (Build) o;
return handshakeTimeoutMillis == build.handshakeTimeoutMillis &&
closeNotifyFlushTimeoutMillis == build.closeNotifyFlushTimeoutMillis &&
closeNotifyReadTimeoutMillis == build.closeNotifyReadTimeoutMillis &&
Objects.equals(sslCtxBuilder, build.sslCtxBuilder) &&
type == build.type &&
Objects.equals(sslContext, build.sslContext) &&
Objects.equals(handlerConfigurator, build.handlerConfigurator);
}

@Override
public int hashCode() {
return Objects.hash(sslCtxBuilder, type, sslContext, handlerConfigurator,
handshakeTimeoutMillis, closeNotifyFlushTimeoutMillis, closeNotifyReadTimeoutMillis);
}
}

static ServerBootstrap removeSslSupport(ServerBootstrap b) {
Expand Down Expand Up @@ -554,6 +598,23 @@ static final class DeferredSslSupport implements Function<Bootstrap, BiConsumer<
public BiConsumer<ConnectionObserver, Channel> apply(Bootstrap bootstrap) {
return new SslSupportConsumer(sslProvider, bootstrap.config().remoteAddress());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DeferredSslSupport that = (DeferredSslSupport) o;
return Objects.equals(sslProvider, that.sslProvider);
}

@Override
public int hashCode() {
return Objects.hash(sslProvider);
}
}

static final class SslSupportConsumer
Expand Down
Loading

0 comments on commit 56ae431

Please sign in to comment.