Skip to content

Commit

Permalink
Revert write through (hazelcast#14849)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmedenjak committed Apr 5, 2019
1 parent 8aa219b commit aaeb678
Show file tree
Hide file tree
Showing 28 changed files with 145 additions and 467 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import static com.hazelcast.client.spi.properties.ClientProperty.IO_BALANCER_INTERVAL_SECONDS;
import static com.hazelcast.client.spi.properties.ClientProperty.IO_INPUT_THREAD_COUNT;
import static com.hazelcast.client.spi.properties.ClientProperty.IO_OUTPUT_THREAD_COUNT;
import static com.hazelcast.client.spi.properties.ClientProperty.IO_WRITE_THROUGH_ENABLED;
import static com.hazelcast.nio.IOUtil.closeResource;
import static com.hazelcast.util.ExceptionUtil.rethrow;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -183,9 +182,7 @@ protected NioNetworking initNetworking(final HazelcastClientInstanceImpl client)
.errorHandler(new ClientConnectionChannelErrorHandler())
.inputThreadCount(inputThreads)
.outputThreadCount(outputThreads)
.balancerIntervalSeconds(properties.getInteger(IO_BALANCER_INTERVAL_SECONDS))
.writeThroughEnabled(properties.getBoolean(IO_WRITE_THROUGH_ENABLED))
.concurrencyDetection(client.getConcurrencyDetection()));
.balancerIntervalSeconds(properties.getInteger(IO_BALANCER_INTERVAL_SECONDS)));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@
import com.hazelcast.internal.metrics.metricsets.ThreadMetricSet;
import com.hazelcast.internal.nearcache.NearCacheManager;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.ConcurrencyDetection;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.map.impl.MapService;
Expand Down Expand Up @@ -161,10 +160,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import static com.hazelcast.client.spi.properties.ClientProperty.CONCURRENT_WINDOW_MS;
import static com.hazelcast.client.spi.properties.ClientProperty.IO_WRITE_THROUGH_ENABLED;
import static com.hazelcast.client.spi.properties.ClientProperty.MAX_CONCURRENT_INVOCATIONS;
import static com.hazelcast.client.spi.properties.ClientProperty.RESPONSE_THREAD_DYNAMIC;
import static com.hazelcast.util.EmptyStatement.ignore;
import static com.hazelcast.util.ExceptionUtil.rethrow;
import static com.hazelcast.util.Preconditions.checkNotNull;
Expand All @@ -176,7 +171,6 @@ public class HazelcastClientInstanceImpl implements HazelcastInstance, Serializa
private static final AtomicInteger CLIENT_ID = new AtomicInteger();
private static final short PROTOCOL_VERSION = ClientMessage.VERSION;

private final ConcurrencyDetection concurrencyDetection;
private final HazelcastProperties properties;
private final int id = CLIENT_ID.getAndIncrement();
private final String instanceName;
Expand Down Expand Up @@ -235,11 +229,10 @@ public HazelcastClientInstanceImpl(ClientConfig clientConfig,
loggingType, BuildInfoProvider.getBuildInfo(), instanceName);
logGroupPasswordInfo();
ClassLoader classLoader = config.getClassLoader();
properties = new HazelcastProperties(config.getProperties());
concurrencyDetection = initConcurrencyDetection();
clientExtension = createClientInitializer(classLoader);
clientExtension.beforeStart(this);
lifecycleService = new LifecycleServiceImpl(this);
properties = new HazelcastProperties(config.getProperties());
metricsRegistry = initMetricsRegistry();
serializationService = clientExtension.createSerializationService((byte) -1);
proxyManager = new ProxyManager(this);
Expand Down Expand Up @@ -268,18 +261,6 @@ public HazelcastClientInstanceImpl(ClientConfig clientConfig,
cpSubsystem = new CPSubsystemImpl(this);
}

private ConcurrencyDetection initConcurrencyDetection() {
boolean writeThrough = properties.getBoolean(IO_WRITE_THROUGH_ENABLED);
boolean dynamicResponse = properties.getBoolean(RESPONSE_THREAD_DYNAMIC);
boolean backPressureEnabled = properties.getInteger(MAX_CONCURRENT_INVOCATIONS) < Integer.MAX_VALUE;

if (writeThrough || dynamicResponse || backPressureEnabled) {
return ConcurrencyDetection.createEnabled(properties.getInteger(CONCURRENT_WINDOW_MS));
} else {
return ConcurrencyDetection.createDisabled();
}
}

private ClusterConnectorService initClusterConnectorService() {
ClusterConnectorServiceImpl service = new ClusterConnectorServiceImpl(this, connectionManager,
clientConnectionStrategy, clientDiscoveryService);
Expand Down Expand Up @@ -451,7 +432,7 @@ public void start() {
try {
lifecycleService.terminate();
} catch (Throwable t) {
ignore(t);
ignore(t);
}
rethrow(e);
}
Expand Down Expand Up @@ -858,8 +839,4 @@ public ClientFailoverConfig getFailoverConfig() {
public ClientQueryCacheContext getQueryCacheContext() {
return queryCacheContext;
}

public ConcurrencyDetection getConcurrencyDetection() {
return concurrencyDetection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ public interface ClientInvocationService {
boolean isRedoOperation();

Consumer<ClientMessage> getResponseHandler();

long concurrentInvocations();
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,27 @@ public AbstractClientInvocationService(HazelcastClientInstanceImpl client) {
this.invocationLogger = client.getLoggingService().getLogger(ClientInvocationService.class);
this.invocationTimeoutMillis = initInvocationTimeoutMillis();
this.invocationRetryPauseMillis = initInvocationRetryPauseMillis();
this.responseHandlerSupplier = new ClientResponseHandlerSupplier(this, client.getConcurrencyDetection());
this.responseHandlerSupplier = new ClientResponseHandlerSupplier(this);

HazelcastProperties properties = client.getProperties();
this.callIdSequence = CallIdFactory.newCallIdSequence(
properties.getInteger(MAX_CONCURRENT_INVOCATIONS),
properties.getLong(BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS),
client.getConcurrencyDetection());
int maxAllowedConcurrentInvocations = properties.getInteger(MAX_CONCURRENT_INVOCATIONS);
long backofftimeoutMs = properties.getLong(BACKPRESSURE_BACKOFF_TIMEOUT_MILLIS);
// clients needs to have a call id generator capable of determining how many
// pending calls there are. So backpressure needs to be on
this.callIdSequence = CallIdFactory
.newCallIdSequence(true, maxAllowedConcurrentInvocations, backofftimeoutMs);

client.getMetricsRegistry().scanAndRegister(this, "invocations");
}

@Override
public long concurrentInvocations() {
return callIdSequence.concurrentInvocations();
}

private long initInvocationRetryPauseMillis() {
return client.getProperties().getPositiveMillisOrDefault(INVOCATION_RETRY_PAUSE_MILLIS);

}

private long initInvocationTimeoutMillis() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ErrorCodec;
import com.hazelcast.internal.util.ConcurrencyDetection;
import com.hazelcast.internal.util.concurrent.MPSCQueue;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.properties.HazelcastProperties;
Expand Down Expand Up @@ -59,6 +58,13 @@ public class ClientResponseHandlerSupplier implements Supplier<Consumer<ClientMe
private static final HazelcastProperty IDLE_STRATEGY
= new HazelcastProperty("hazelcast.client.responsequeue.idlestrategy", "block");

// expert setting; we don't want to expose this to the public
// there can be some concurrent request from HZ itself that we need to exclude
// with a single user thread doing e.g. map.get, 4 is the minimum number of concurrent invocations
// before we consider the system to be actually concurrent and to process on response threads.
private static final HazelcastProperty MIN_CONCURRENT_INVOCATIONS
= new HazelcastProperty("hazelcast.client.responsequeue.dynamic.min.concurrent.invocations", "4");

private static final ThreadLocal<MutableInteger> INT_HOLDER = new ThreadLocal<MutableInteger>() {
@Override
protected MutableInteger initialValue() {
Expand All @@ -73,16 +79,15 @@ protected MutableInteger initialValue() {
private final ILogger logger;
private final Consumer<ClientMessage> responseHandler;
private final boolean responseThreadsDynamic;
private final ConcurrencyDetection concurrencyDetection;
private final int minConcurrentInvocations;

public ClientResponseHandlerSupplier(AbstractClientInvocationService invocationService,
ConcurrencyDetection concurrencyDetection) {
public ClientResponseHandlerSupplier(AbstractClientInvocationService invocationService) {
this.invocationService = invocationService;
this.concurrencyDetection = concurrencyDetection;
this.client = invocationService.client;
this.logger = invocationService.invocationLogger;

HazelcastProperties properties = client.getProperties();
this.minConcurrentInvocations = properties.getInteger(MIN_CONCURRENT_INVOCATIONS);
int responseThreadCount = properties.getInteger(RESPONSE_THREAD_COUNT);
if (responseThreadCount < 0) {
throw new IllegalArgumentException(RESPONSE_THREAD_COUNT.getName() + " can't be smaller than 0");
Expand Down Expand Up @@ -223,16 +228,16 @@ public void accept(ClientMessage message) {
}

// dynamically switches between direct processing on io thread and processing on
// response thread based on if concurrency is detected
// response thread based on the number of invocations.
class DynamicResponseHandler implements Consumer<ClientMessage> {
@Override
public void accept(ClientMessage message) {
if (concurrencyDetection.isDetected()) {
if (invocationService.concurrentInvocations() <= minConcurrentInvocations) {
process(message);
} else {
ResponseThread responseThread = nextResponseThread();
responseThread.queue(message);
responseThread.ensureStarted();
} else {
process(message);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,30 +167,6 @@ public final class ClientProperty {
public static final HazelcastProperty IO_BALANCER_INTERVAL_SECONDS
= new HazelcastProperty("hazelcast.client.io.balancer.interval.seconds", 20, SECONDS);

/**
* Optimization that allows sending of packets over the network to be done on the calling thread if the
* conditions are right. This can reduce latency and increase performance for low threaded environments.
*
* It is disabled.
*/
public static final HazelcastProperty IO_WRITE_THROUGH_ENABLED
= new HazelcastProperty("hazelcast.client.io.write.through", false);

/**
* Property needed for concurrency detection so that write through and dynamic response handling
* can be done correctly. This property sets the window the concurrency detection will signalling
* that concurrency has been detected, even if there are no further updates in that window.
*
* Normally in a concurrency system the windows keeps sliding forward so it will always remain
* concurrent.
*
* Setting it too high effectively disabled the optimization because once concurrency has been detected
* it will keep that way. Setting it too low could lead to suboptimal performance because the system
* will try write through and other optimization even though the system is concurrent.
*/
public static final HazelcastProperty CONCURRENT_WINDOW_MS
= new HazelcastProperty("hazelcast.client.concurrent.window.ms", 100, MILLISECONDS);

/**
* The number of response threads.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ private Consumer<ClientMessage> getResponseHandler(int threadCount, boolean dyna
HazelcastClientInstanceImpl clientInstanceImpl = getHazelcastClientInstanceImpl(client);
AbstractClientInvocationService invocationService = (AbstractClientInvocationService) clientInstanceImpl.getInvocationService();

ClientResponseHandlerSupplier responseHandlerSupplier =
new ClientResponseHandlerSupplier(invocationService, clientInstanceImpl.getConcurrencyDetection());
ClientResponseHandlerSupplier responseHandlerSupplier = new ClientResponseHandlerSupplier(invocationService);
return responseHandlerSupplier.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import static com.hazelcast.spi.properties.GroupProperty.IO_BALANCER_INTERVAL_SECONDS;
import static com.hazelcast.spi.properties.GroupProperty.IO_INPUT_THREAD_COUNT;
import static com.hazelcast.spi.properties.GroupProperty.IO_OUTPUT_THREAD_COUNT;
import static com.hazelcast.spi.properties.GroupProperty.IO_WRITE_THROUGH_ENABLED;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;

Expand Down Expand Up @@ -174,8 +173,6 @@ private Networking createNetworking(Node node) {
.errorHandler(errorHandler)
.inputThreadCount(props.getInteger(IO_INPUT_THREAD_COUNT))
.outputThreadCount(props.getInteger(IO_OUTPUT_THREAD_COUNT))
.balancerIntervalSeconds(props.getInteger(IO_BALANCER_INTERVAL_SECONDS))
.writeThroughEnabled(props.getBoolean(IO_WRITE_THROUGH_ENABLED))
.concurrencyDetection(node.nodeEngine.getConcurrencyDetection()));
.balancerIntervalSeconds(props.getInteger(IO_BALANCER_INTERVAL_SECONDS)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,6 @@ void process() throws Exception {
}
} while (!cleanPipeline);

if (migrationRequested()) {
startMigration();
return;
}

if (unregisterRead) {
unregisterOp(OP_READ);
}
Expand Down Expand Up @@ -271,7 +266,7 @@ private String pipelineToString() {

@Override
public NioInboundPipeline wakeup() {
ownerAddTaskAndWakeup(new NioPipelineTask(this) {
addTaskAndWakeup(new NioPipelineTask(this) {
@Override
protected void run0() throws IOException {
registerOp(OP_READ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.hazelcast.internal.networking.Networking;
import com.hazelcast.internal.networking.OutboundHandler;
import com.hazelcast.internal.networking.nio.iobalancer.IOBalancer;
import com.hazelcast.internal.util.ConcurrencyDetection;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
Expand Down Expand Up @@ -99,8 +98,6 @@ public final class NioNetworking implements Networking {
private final BackoffIdleStrategy idleStrategy;
private final boolean selectorWorkaroundTest;
private volatile ExecutorService closeListenerExecutor;
private final ConcurrencyDetection concurrencyDetection;
private final boolean writeThroughEnabled;
private volatile IOBalancer ioBalancer;
private volatile NioThread[] inputThreads;
private volatile NioThread[] outputThreads;
Expand Down Expand Up @@ -129,8 +126,6 @@ public NioNetworking(Context ctx) {
this.selectorMode = ctx.selectorMode;
this.selectorWorkaroundTest = ctx.selectorWorkaroundTest;
this.idleStrategy = ctx.idleStrategy;
this.concurrencyDetection = ctx.concurrencyDetection;
this.writeThroughEnabled = ctx.writeThroughEnabled;
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "used only for testing")
Expand Down Expand Up @@ -158,7 +153,6 @@ public void start() {
logger.fine("TcpIpConnectionManager configured with Non Blocking IO-threading model: "
+ inputThreadCount + " input threads and "
+ outputThreadCount + " output threads");
logger.fine("write through enabled:" + writeThroughEnabled);
}

logger.log(selectorMode != SELECT ? Level.INFO : FINE, "IO threads selector mode is " + selectorMode);
Expand Down Expand Up @@ -277,9 +271,7 @@ private NioOutboundPipeline newOutboundPipeline(NioChannel channel) {
threads[index],
errorHandler,
loggingService.getLogger(NioOutboundPipeline.class),
ioBalancer,
concurrencyDetection,
writeThroughEnabled);
ioBalancer);
}

private NioInboundPipeline newInboundPipeline(NioChannel channel) {
Expand Down Expand Up @@ -389,11 +381,6 @@ public static class Context {
// In Hazelcast 3.8, selector mode must be set via HazelcastProperties
private SelectorMode selectorMode = SelectorMode.getConfiguredValue();
private boolean selectorWorkaroundTest = Boolean.getBoolean("hazelcast.io.selector.workaround.test");
private ConcurrencyDetection concurrencyDetection;

// if the calling thread is allowed to write through to the socket if that is possible.
// this is an optimization that can speed up low threaded setups
private boolean writeThroughEnabled;

public Context() {
String selectorModeString = SelectorMode.getConfiguredString();
Expand All @@ -402,16 +389,6 @@ public Context() {
}
}

public Context writeThroughEnabled(boolean writeThroughEnabled) {
this.writeThroughEnabled = writeThroughEnabled;
return this;
}

public Context concurrencyDetection(ConcurrencyDetection concurrencyDetection) {
this.concurrencyDetection = concurrencyDetection;
return this;
}

public Context selectorWorkaroundTest(boolean selectorWorkaroundTest) {
this.selectorWorkaroundTest = selectorWorkaroundTest;
return this;
Expand Down
Loading

0 comments on commit aaeb678

Please sign in to comment.