Skip to content

Commit

Permalink
ISPN-8620 Execute operations concurrently over the same connection
Browse files Browse the repository at this point in the history
  • Loading branch information
rvansa authored and wburns committed Mar 13, 2018
1 parent b88af94 commit ef6afd6
Show file tree
Hide file tree
Showing 96 changed files with 901 additions and 1,152 deletions.
Expand Up @@ -20,9 +20,10 @@ public class ConnectionPoolConfiguration {
private final boolean testOnBorrow;
private final boolean testOnReturn;
private final boolean testWhileIdle;
private final int maxPendingRequests;

ConnectionPoolConfiguration(ExhaustedAction exhaustedAction, boolean lifo, int maxActive, int maxTotal, long maxWait, int maxIdle, int minIdle, int numTestsPerEvictionRun,
long timeBetweenEvictionRuns, long minEvictableIdleTime, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
long timeBetweenEvictionRuns, long minEvictableIdleTime, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle, int maxPendingRequests) {
this.exhaustedAction = exhaustedAction;
this.lifo = lifo;
this.maxActive = maxActive;
Expand All @@ -36,6 +37,7 @@ public class ConnectionPoolConfiguration {
this.testOnBorrow = testOnBorrow;
this.testOnReturn = testOnReturn;
this.testWhileIdle = testWhileIdle;
this.maxPendingRequests = maxPendingRequests;
}

public ExhaustedAction exhaustedAction() {
Expand Down Expand Up @@ -90,11 +92,15 @@ public boolean testWhileIdle() {
return testWhileIdle;
}

public int maxPendingRequests() {
return maxPendingRequests;
}

@Override
public String toString() {
return "ConnectionPoolConfiguration [exhaustedAction=" + exhaustedAction + ", lifo=" + lifo + ", maxActive=" + maxActive + ", maxTotal=" + maxTotal + ", maxWait=" + maxWait
+ ", maxIdle=" + maxIdle + ", minIdle=" + minIdle + ", numTestsPerEvictionRun=" + numTestsPerEvictionRun + ", timeBetweenEvictionRuns=" + timeBetweenEvictionRuns
+ ", minEvictableIdleTime=" + minEvictableIdleTime + ", testOnBorrow=" + testOnBorrow + ", testOnReturn=" + testOnReturn + ", testWhileIdle=" + testWhileIdle + "]";
+ ", minEvictableIdleTime=" + minEvictableIdleTime + ", testOnBorrow=" + testOnBorrow + ", testOnReturn=" + testOnReturn + ", testWhileIdle=" + testWhileIdle
+ ", maxPendingRequests=" + maxPendingRequests + "]";
}

}
Expand Up @@ -25,6 +25,7 @@ public class ConnectionPoolConfigurationBuilder extends AbstractConfigurationChi
private boolean testOnBorrow = false;
private boolean testOnReturn = false;
private boolean testWhileIdle = true;
private int maxPendingRequests = 5;

ConnectionPoolConfigurationBuilder(ConfigurationBuilder builder) {
super(builder);
Expand Down Expand Up @@ -189,6 +190,22 @@ public ConnectionPoolConfigurationBuilder testWhileIdle(boolean testWhileIdle) {
return this;
}

/**
* Specifies maximum number of requests sent over single connection at one instant.
* Connections with more concurrent requests will be ignored in the pool when choosing available connection
* and the pool will try to create a new connection if all connections are utilized. Only if the new connection
* cannot be created and the {@link #exhaustedAction(ExhaustedAction) exhausted action}
* is set to {@link ExhaustedAction#WAIT} the pool will allow sending the request over one of the over-utilized
* connections.
* The rule of thumb is that this should be set to higher values if the values are small (< 1kB) and to lower values
* if the entries are big (> 10kB).
* Default setting for this parameter is 5.
*/
public ConnectionPoolConfigurationBuilder maxPendingRequests(int maxPendingRequests) {
this.maxPendingRequests = maxPendingRequests;
return this;
}

/**
* Configures the connection pool parameter according to properties
*/
Expand All @@ -207,6 +224,7 @@ public ConnectionPoolConfigurationBuilder withPoolProperties(Properties properti
testOnBorrow(typed.getBooleanProperty("testOnBorrow", testOnBorrow, true));
testOnReturn(typed.getBooleanProperty("testOnReturn", testOnReturn, true));
testWhileIdle(typed.getBooleanProperty("testWhileIdle", testWhileIdle, true));
maxPendingRequests(typed.getIntProperty("maxPendingRequests", maxPendingRequests, true));
return this;
}

Expand All @@ -217,7 +235,7 @@ public void validate() {
@Override
public ConnectionPoolConfiguration create() {
return new ConnectionPoolConfiguration(exhaustedAction, lifo, maxActive, maxTotal, maxWait, maxIdle, minIdle, numTestsPerEvictionRun, timeBetweenEvictionRuns,
minEvictableIdleTime, testOnBorrow, testOnReturn, testWhileIdle);
minEvictableIdleTime, testOnBorrow, testOnReturn, testWhileIdle, maxPendingRequests);
}

@Override
Expand All @@ -235,6 +253,7 @@ public ConnectionPoolConfigurationBuilder read(ConnectionPoolConfiguration templ
testOnBorrow = template.testOnBorrow();
testOnReturn = template.testOnReturn();
testWhileIdle = template.testWhileIdle();
maxPendingRequests = template.maxPendingRequests();
return this;
}

Expand Down
Expand Up @@ -8,6 +8,7 @@
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.netty.ByteBufUtil;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.impl.transport.netty.HeaderDecoder;
import org.infinispan.counter.api.CounterListener;
import org.infinispan.counter.api.StrongCounter;
import org.infinispan.counter.api.WeakCounter;
Expand All @@ -30,7 +31,7 @@ public class AddListenerOperation extends BaseCounterOperation<Boolean> {

public AddListenerOperation(Codec codec, ChannelFactory channelFactory, AtomicInteger topologyId,
Configuration cfg, String counterName, byte[] listenerId, SocketAddress server) {
super(codec, channelFactory, topologyId, cfg, counterName);
super(COUNTER_ADD_LISTENER_REQUEST, COUNTER_ADD_LISTENER_RESPONSE, codec, channelFactory, topologyId, cfg, counterName);
this.listenerId = listenerId;
this.server = server;
}
Expand All @@ -42,20 +43,20 @@ public Channel getChannel() {
@Override
protected void executeOperation(Channel channel) {
this.channel = channel;
ByteBuf buf = getHeaderAndCounterNameBufferAndRead(channel, COUNTER_ADD_LISTENER_REQUEST,
ByteBufUtil.estimateArraySize(listenerId));
ByteBuf buf = getHeaderAndCounterNameBufferAndRead(channel, ByteBufUtil.estimateArraySize(listenerId));
ByteBufUtil.writeArray(buf, listenerId);
channel.writeAndFlush(buf);
}

@Override
public Boolean decodePayload(ByteBuf buf, short status) {
public void acceptResponse(ByteBuf buf, short status, HeaderDecoder decoder) {
checkStatus(status);
if (status != NO_ERROR_STATUS) {
this.channel = null;
return false;
complete(false);
} else {
complete(true);
}
return true;
}

@Override
Expand Down
Expand Up @@ -5,6 +5,7 @@
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.impl.transport.netty.HeaderDecoder;
import org.infinispan.commons.logging.Log;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.counter.exception.CounterOutOfBoundsException;
Expand All @@ -30,23 +31,23 @@ public class AddOperation extends BaseCounterOperation<Long> {

public AddOperation(Codec codec, ChannelFactory channelFactory, AtomicInteger topologyId, Configuration cfg,
String counterName, long delta) {
super(codec, channelFactory, topologyId, cfg, counterName);
super(COUNTER_ADD_AND_GET_REQUEST, COUNTER_ADD_AND_GET_RESPONSE, codec, channelFactory, topologyId, cfg, counterName);
this.delta = delta;
}

@Override
protected void executeOperation(Channel channel) {
ByteBuf buf = getHeaderAndCounterNameBufferAndRead(channel, COUNTER_ADD_AND_GET_REQUEST, 8);
ByteBuf buf = getHeaderAndCounterNameBufferAndRead(channel, 8);
buf.writeLong(delta);
channel.writeAndFlush(buf);
}

@Override
public Long decodePayload(ByteBuf buf, short status) {
public void acceptResponse(ByteBuf buf, short status, HeaderDecoder decoder) {
checkStatus(status);
assertBoundaries(status);
assert status == NO_ERROR_STATUS;
return buf.readLong();
complete(buf.readLong());
}

private void assertBoundaries(short status) {
Expand Down
Expand Up @@ -32,9 +32,9 @@ abstract class BaseCounterOperation<T> extends RetryOnFailureOperation<T> {
private static final byte[] COUNTER_CACHE_NAME = RemoteCacheManager.cacheNameBytes("org.infinispan.counter");
private final String counterName;

BaseCounterOperation(Codec codec, ChannelFactory channelFactory, AtomicInteger topologyId, Configuration cfg,
BaseCounterOperation(short requestCode, short responseCode, Codec codec, ChannelFactory channelFactory, AtomicInteger topologyId, Configuration cfg,
String counterName) {
super(codec, channelFactory, EMPTY_CACHE_NAME, topologyId, 0, cfg);
super(requestCode, responseCode, codec, channelFactory, EMPTY_CACHE_NAME, topologyId, 0, cfg);
this.counterName = counterName;
}

Expand All @@ -44,21 +44,20 @@ abstract class BaseCounterOperation<T> extends RetryOnFailureOperation<T> {
* @return the {@link HeaderParams}.
*/
void sendHeaderAndCounterNameAndRead(Channel channel, short opCode) {
ByteBuf buf = getHeaderAndCounterNameBufferAndRead(channel, opCode, 0);
ByteBuf buf = getHeaderAndCounterNameBufferAndRead(channel, 0);
channel.writeAndFlush(buf);
}

ByteBuf getHeaderAndCounterNameBufferAndRead(Channel channel, short opCode, int extraBytes) {
HeaderParams header = headerParams(opCode);
scheduleRead(channel, header);
ByteBuf getHeaderAndCounterNameBufferAndRead(Channel channel, int extraBytes) {
scheduleRead(channel);

// counterName should never be null/empty
byte[] counterBytes = counterName.getBytes(HotRodConstants.HOTROD_STRING_CHARSET);
ByteBuf buf = channel.alloc().buffer(codec.estimateHeaderSize(header) + ByteBufUtil.estimateArraySize(counterBytes) + extraBytes);
codec.writeHeader(buf, header);
ByteBufUtil.writeString(buf, counterName);

setCacheName(header);
setCacheName();
return buf;
}

Expand All @@ -74,8 +73,8 @@ void checkStatus(short status) {
}
}

void setCacheName(HeaderParams params) {
params.cacheName(COUNTER_CACHE_NAME);
void setCacheName() {
header.cacheName(COUNTER_CACHE_NAME);
}

@Override
Expand Down
Expand Up @@ -5,6 +5,7 @@
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.impl.transport.netty.HeaderDecoder;
import org.infinispan.commons.logging.Log;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.counter.api.CounterConfiguration;
Expand All @@ -31,26 +32,26 @@ public class CompareAndSwapOperation extends BaseCounterOperation<Long> {

public CompareAndSwapOperation(Codec codec, ChannelFactory channelFactory, AtomicInteger topologyId,
Configuration cfg, String counterName, long expect, long update, CounterConfiguration counterConfiguration) {
super(codec, channelFactory, topologyId, cfg, counterName);
super(COUNTER_CAS_REQUEST, COUNTER_CAS_RESPONSE, codec, channelFactory, topologyId, cfg, counterName);
this.expect = expect;
this.update = update;
this.counterConfiguration = counterConfiguration;
}

@Override
protected void executeOperation(Channel channel) {
ByteBuf buf = getHeaderAndCounterNameBufferAndRead(channel, COUNTER_CAS_REQUEST, 16);
ByteBuf buf = getHeaderAndCounterNameBufferAndRead(channel, 16);
buf.writeLong(expect);
buf.writeLong(update);
channel.writeAndFlush(buf);
}

@Override
public Long decodePayload(ByteBuf buf, short status) {
public void acceptResponse(ByteBuf buf, short status, HeaderDecoder decoder) {
checkStatus(status);
assertBoundaries(status);
assert status == NO_ERROR_STATUS;
return buf.readLong();
complete(buf.readLong());
}

private void assertBoundaries(short status) {
Expand Down
Expand Up @@ -8,6 +8,7 @@
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.netty.ByteBufUtil;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.impl.transport.netty.HeaderDecoder;
import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterManager;

Expand All @@ -26,20 +27,20 @@ public class DefineCounterOperation extends BaseCounterOperation<Boolean> {

public DefineCounterOperation(Codec codec, ChannelFactory channelFactory, AtomicInteger topologyId,
Configuration cfg, String counterName, CounterConfiguration configuration) {
super(codec, channelFactory, topologyId, cfg, counterName);
super(COUNTER_CREATE_REQUEST, COUNTER_CREATE_RESPONSE, codec, channelFactory, topologyId, cfg, counterName);
this.configuration = configuration;
}

@Override
protected void executeOperation(Channel channel) {
ByteBuf buf = getHeaderAndCounterNameBufferAndRead(channel, COUNTER_CREATE_REQUEST, 28);
ByteBuf buf = getHeaderAndCounterNameBufferAndRead(channel, 28);
encodeConfiguration(configuration, buf::writeByte, buf::writeLong, i -> ByteBufUtil.writeVInt(buf, i));
channel.writeAndFlush(buf);
}

@Override
public Boolean decodePayload(ByteBuf buf, short status) {
public void acceptResponse(ByteBuf buf, short status, HeaderDecoder decoder) {
checkStatus(status);
return status == NO_ERROR_STATUS;
complete(status == NO_ERROR_STATUS);
}
}
Expand Up @@ -8,6 +8,7 @@
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.transport.netty.ByteBufUtil;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.impl.transport.netty.HeaderDecoder;
import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterManager;

Expand All @@ -24,7 +25,7 @@ public class GetConfigurationOperation extends BaseCounterOperation<CounterConfi

public GetConfigurationOperation(Codec codec, ChannelFactory channelFactory, AtomicInteger topologyId,
Configuration cfg, String counterName) {
super(codec, channelFactory, topologyId, cfg, counterName);
super(COUNTER_GET_CONFIGURATION_REQUEST, COUNTER_GET_CONFIGURATION_RESPONSE, codec, channelFactory, topologyId, cfg, counterName);
}

@Override
Expand All @@ -33,11 +34,12 @@ protected void executeOperation(Channel channel) {
}

@Override
public CounterConfiguration decodePayload(ByteBuf buf, short status) {
public void acceptResponse(ByteBuf buf, short status, HeaderDecoder decoder) {
if (status != NO_ERROR_STATUS) {
return null;
complete(null);
return;
}

return decodeConfiguration(buf::readByte, buf::readLong, () -> ByteBufUtil.readVInt(buf));
complete(decodeConfiguration(buf::readByte, buf::readLong, () -> ByteBufUtil.readVInt(buf)));
}
}
Expand Up @@ -6,7 +6,6 @@

import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.HeaderParams;
import org.infinispan.client.hotrod.impl.transport.netty.ByteBufUtil;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelFactory;
import org.infinispan.client.hotrod.impl.transport.netty.HeaderDecoder;
Expand All @@ -24,23 +23,26 @@
public class GetCounterNamesOperation extends BaseCounterOperation<Collection<String>> {
private int size;
private Collection<String> names;
private HeaderDecoder<?> decoder;

public GetCounterNamesOperation(Codec codec, ChannelFactory transportFactory, AtomicInteger topologyId,
Configuration cfg) {
super(codec, transportFactory, topologyId, cfg, "");
super(COUNTER_GET_NAMES_REQUEST, COUNTER_GET_NAMES_RESPONSE, codec, transportFactory, topologyId, cfg, "");
}

@Override
protected void executeOperation(Channel channel) {
HeaderParams header = headerParams(COUNTER_GET_NAMES_REQUEST);
decoder = scheduleRead(channel, header);
sendHeader(channel, header);
setCacheName(header);
scheduleRead(channel);
sendHeader(channel);
setCacheName();
}

@Override
public Collection<String> decodePayload(ByteBuf buf, short status) {
protected void reset() {
names = null;
}

@Override
public void acceptResponse(ByteBuf buf, short status, HeaderDecoder decoder) {
assert status == NO_ERROR_STATUS;
if (names == null) {
size = ByteBufUtil.readVInt(buf);
Expand All @@ -50,6 +52,6 @@ public Collection<String> decodePayload(ByteBuf buf, short status) {
names.add(ByteBufUtil.readString(buf));
decoder.checkpoint();
}
return names;
complete(names);
}
}

0 comments on commit ef6afd6

Please sign in to comment.