Skip to content

Commit

Permalink
Merge pull request hazelcast#16971 from pveentjer/v4.1/performance/th…
Browse files Browse the repository at this point in the history
…read-affinity

Introduction of CPU thread affinity
  • Loading branch information
pveentjer committed May 20, 2020
2 parents 8f4a099 + a7e7def commit a149622
Show file tree
Hide file tree
Showing 12 changed files with 629 additions and 18 deletions.
23 changes: 23 additions & 0 deletions hazelcast/pom.xml
Expand Up @@ -315,6 +315,29 @@
</build>

<dependencies>
<!-- needed for thread affinity -->
<dependency>
<groupId>net.openhft</groupId>
<artifactId>affinity</artifactId>
<version>3.2.2</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.5.8</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down
Expand Up @@ -106,6 +106,7 @@
import static com.hazelcast.client.properties.ClientProperty.SHUFFLE_MEMBER_LIST;
import static com.hazelcast.core.LifecycleEvent.LifecycleState.CLIENT_CHANGED_CLUSTER;
import static com.hazelcast.internal.nio.IOUtil.closeResource;
import static com.hazelcast.internal.util.ThreadAffinity.newSystemThreadAffinity;
import static com.hazelcast.internal.util.ExceptionUtil.rethrow;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -270,10 +271,13 @@ protected NioNetworking initNetworking() {
.threadNamePrefix(client.getName())
.errorHandler(new ClientConnectionChannelErrorHandler())
.inputThreadCount(inputThreads)
.inputThreadAffinity(newSystemThreadAffinity("hazelcast.client.io.input.thread.affinity"))
.outputThreadCount(outputThreads)
.outputThreadAffinity(newSystemThreadAffinity("hazelcast.client.io.output.thread.affinity"))
.balancerIntervalSeconds(properties.getInteger(IO_BALANCER_INTERVAL_SECONDS))
.writeThroughEnabled(properties.getBoolean(IO_WRITE_THROUGH_ENABLED))
.concurrencyDetection(client.getConcurrencyDetection()));
.concurrencyDetection(client.getConcurrencyDetection())
);
}

private WaitStrategy initializeWaitStrategy(ClientConfig clientConfig) {
Expand Down
Expand Up @@ -20,11 +20,13 @@
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.spi.impl.listener.ClientListenerServiceImpl;
import com.hazelcast.internal.util.ConcurrencyDetection;
import com.hazelcast.internal.util.ThreadAffinity;
import com.hazelcast.internal.util.MutableInteger;
import com.hazelcast.internal.util.concurrent.MPSCQueue;
import com.hazelcast.internal.util.executor.HazelcastManagedThread;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;
import com.hazelcast.internal.util.MutableInteger;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.util.concurrent.BlockingQueue;
Expand All @@ -36,8 +38,9 @@
import static com.hazelcast.client.properties.ClientProperty.RESPONSE_THREAD_COUNT;
import static com.hazelcast.client.properties.ClientProperty.RESPONSE_THREAD_DYNAMIC;
import static com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher.onOutOfMemory;
import static com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier.getIdleStrategy;
import static com.hazelcast.internal.util.ThreadAffinity.newSystemThreadAffinity;
import static com.hazelcast.internal.util.HashUtil.hashToIndex;
import static com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier.getIdleStrategy;

/**
* A {@link Supplier} for {@link Supplier} instance that processes responses for client
Expand Down Expand Up @@ -75,6 +78,7 @@ protected MutableInteger initialValue() {
private final Consumer<ClientMessage> responseHandler;
private final boolean responseThreadsDynamic;
private final ConcurrencyDetection concurrencyDetection;
private final ThreadAffinity threadAffinity = newSystemThreadAffinity("hazelcast.client.response.thread.affinity");

public ClientResponseHandlerSupplier(ClientInvocationServiceImpl invocationService,
ConcurrencyDetection concurrencyDetection) {
Expand All @@ -85,6 +89,10 @@ public ClientResponseHandlerSupplier(ClientInvocationServiceImpl invocationServi

HazelcastProperties properties = client.getProperties();
int responseThreadCount = properties.getInteger(RESPONSE_THREAD_COUNT);
if (threadAffinity.isEnabled()) {
responseThreadCount = threadAffinity.getThreadCount();
}

if (responseThreadCount < 0) {
throw new IllegalArgumentException(RESPONSE_THREAD_COUNT.getName() + " can't be smaller than 0");
}
Expand All @@ -93,6 +101,7 @@ public ClientResponseHandlerSupplier(ClientInvocationServiceImpl invocationServi
this.responseThreads = new ResponseThread[responseThreadCount];
for (int k = 0; k < responseThreads.length; k++) {
responseThreads[k] = new ResponseThread(invocationService.client.getName() + ".responsethread-" + k + "-");
responseThreads[k].setThreadAffinity(threadAffinity);
}

if (responseThreadCount == 0) {
Expand Down Expand Up @@ -167,7 +176,7 @@ private ResponseThread nextResponseThread() {
}
}

private class ResponseThread extends Thread {
private class ResponseThread extends HazelcastManagedThread {
private final BlockingQueue<ClientMessage> responseQueue;
private final AtomicBoolean started = new AtomicBoolean();

Expand All @@ -179,7 +188,7 @@ private class ResponseThread extends Thread {
}

@Override
public void run() {
public void executeRun() {
try {
doRun();
} catch (OutOfMemoryError e) {
Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.util.Properties;

import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig;
import static com.hazelcast.internal.util.ThreadAffinity.newSystemThreadAffinity;
import static com.hazelcast.spi.properties.ClusterProperty.IO_BALANCER_INTERVAL_SECONDS;
import static com.hazelcast.spi.properties.ClusterProperty.IO_INPUT_THREAD_COUNT;
import static com.hazelcast.spi.properties.ClusterProperty.IO_OUTPUT_THREAD_COUNT;
Expand Down Expand Up @@ -169,9 +170,12 @@ private Networking createNetworking(Node node) {
.threadNamePrefix(node.hazelcastInstance.getName())
.errorHandler(errorHandler)
.inputThreadCount(props.getInteger(IO_INPUT_THREAD_COUNT))
.inputThreadAffinity(newSystemThreadAffinity("hazelcast.io.input.thread.affinity"))
.outputThreadCount(props.getInteger(IO_OUTPUT_THREAD_COUNT))
.outputThreadAffinity(newSystemThreadAffinity("hazelcast.io.output.thread.affinity"))
.balancerIntervalSeconds(props.getInteger(IO_BALANCER_INTERVAL_SECONDS))
.writeThroughEnabled(props.getBoolean(IO_WRITE_THROUGH_ENABLED))
.concurrencyDetection(node.nodeEngine.getConcurrencyDetection()));
.concurrencyDetection(node.nodeEngine.getConcurrencyDetection())
);
}
}
Expand Up @@ -31,6 +31,7 @@
import com.hazelcast.internal.networking.OutboundHandler;
import com.hazelcast.internal.networking.nio.iobalancer.IOBalancer;
import com.hazelcast.internal.util.ConcurrencyDetection;
import com.hazelcast.internal.util.ThreadAffinity;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
Expand Down Expand Up @@ -115,14 +116,15 @@ public final class NioNetworking implements Networking, DynamicMetricsProvider {
private final BackoffIdleStrategy idleStrategy;
private final boolean selectorWorkaroundTest;
private final boolean selectionKeyWakeupEnabled;
private final ThreadAffinity outputThreadAffinity;
private volatile ExecutorService closeListenerExecutor;
private final ConcurrencyDetection concurrencyDetection;
private final boolean writeThroughEnabled;
private final ThreadAffinity inputThreadAffinity;
private volatile IOBalancer ioBalancer;
private volatile NioThread[] inputThreads;
private volatile NioThread[] outputThreads;
private volatile ScheduledFuture publishFuture;

// Currently this is a coarse grained aggregation of the bytes/send received.
// In the future you probably want to split this up in member and client and potentially
// wan specific.
Expand All @@ -143,6 +145,8 @@ public NioNetworking(Context ctx) {
this.outputThreadCount = ctx.outputThreadCount;
this.logger = loggingService.getLogger(NioNetworking.class);
this.errorHandler = ctx.errorHandler;
this.inputThreadAffinity = ctx.inputThreadAffinity;
this.outputThreadAffinity = ctx.outputThreadAffinity;
this.balancerIntervalSeconds = ctx.balancerIntervalSeconds;
this.selectorMode = ctx.selectorMode;
this.selectorWorkaroundTest = ctx.selectorWorkaroundTest;
Expand Down Expand Up @@ -216,6 +220,7 @@ public void restart() {
idleStrategy);
thread.id = i;
thread.setSelectorWorkaroundTest(selectorWorkaroundTest);
thread.setThreadAffinity(inputThreadAffinity);
inThreads[i] = thread;
thread.start();
}
Expand All @@ -231,6 +236,7 @@ public void restart() {
idleStrategy);
thread.id = i;
thread.setSelectorWorkaroundTest(selectorWorkaroundTest);
thread.setThreadAffinity(outputThreadAffinity);
outThreads[i] = thread;
thread.start();
}
Expand Down Expand Up @@ -476,6 +482,9 @@ public static class Context {
private int inputThreadCount = 1;
private int outputThreadCount = 1;
private int balancerIntervalSeconds;
private ThreadAffinity inputThreadAffinity = ThreadAffinity.DISABLED;
private ThreadAffinity outputThreadAffinity = ThreadAffinity.DISABLED;

// The selector mode determines how IO threads will block (or not) on the Selector:
// select: this is the default mode, uses Selector.select(long timeout)
// selectnow: use Selector.selectNow()
Expand Down Expand Up @@ -547,15 +556,39 @@ public Context errorHandler(ChannelErrorHandler errorHandler) {
}

public Context inputThreadCount(int inputThreadCount) {
if (inputThreadAffinity.isEnabled()) {
return this;
}
this.inputThreadCount = inputThreadCount;
return this;
}

public Context outputThreadCount(int outputThreadCount) {
if (outputThreadAffinity.isEnabled()) {
return this;
}
this.outputThreadCount = outputThreadCount;
return this;
}

public Context inputThreadAffinity(ThreadAffinity inputThreadAffinity) {
this.inputThreadAffinity = inputThreadAffinity;

if (inputThreadAffinity.isEnabled()) {
inputThreadCount = inputThreadAffinity.getThreadCount();
}
return this;
}

public Context outputThreadAffinity(ThreadAffinity outputThreadAffinity) {
this.outputThreadAffinity = outputThreadAffinity;

if (outputThreadAffinity.isEnabled()) {
outputThreadCount = outputThreadAffinity.getThreadCount();
}
return this;
}

public Context balancerIntervalSeconds(int balancerIntervalSeconds) {
this.balancerIntervalSeconds = balancerIntervalSeconds;
return this;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.internal.networking.ChannelErrorHandler;
import com.hazelcast.internal.util.concurrent.IdleStrategy;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.internal.util.executor.HazelcastManagedThread;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.operationexecutor.OperationHostileThread;

Expand Down Expand Up @@ -56,7 +57,7 @@
import static java.lang.System.currentTimeMillis;

@ExcludedMetricTargets(MANAGEMENT_CENTER)
public class NioThread extends Thread implements OperationHostileThread {
public class NioThread extends HazelcastManagedThread implements OperationHostileThread {

// WARNING: This value has significant effect on idle CPU usage!
private static final int SELECT_WAIT_TIME_MILLIS
Expand Down Expand Up @@ -226,7 +227,7 @@ public void addTaskAndWakeup(Runnable task) {
}

@Override
public void run() {
public void executeRun() {
// This outer loop is a bit complex but it takes care of a lot of stuff:
// * it calls runSelectNowLoop or runSelectLoop based on selectNow enabled or not.
// * handles backoff and retrying in case if io exception is thrown
Expand Down

0 comments on commit a149622

Please sign in to comment.