diff --git a/hazelcast/pom.xml b/hazelcast/pom.xml index 0a037452ccf3..abe421f050f9 100644 --- a/hazelcast/pom.xml +++ b/hazelcast/pom.xml @@ -315,6 +315,29 @@ + + + net.openhft + affinity + 3.2.2 + provided + true + + + org.slf4j + slf4j-api + 1.7.25 + provided + true + + + org.slf4j + slf4j-simple + 1.5.8 + provided + true + + com.fasterxml.jackson.core jackson-core diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java index 1f7390157c51..75a9916d80ec 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/connection/tcp/TcpClientConnectionManager.java @@ -106,6 +106,7 @@ import static com.hazelcast.client.properties.ClientProperty.SHUFFLE_MEMBER_LIST; import static com.hazelcast.core.LifecycleEvent.LifecycleState.CLIENT_CHANGED_CLUSTER; import static com.hazelcast.internal.nio.IOUtil.closeResource; +import static com.hazelcast.internal.util.ThreadAffinity.newSystemThreadAffinity; import static com.hazelcast.internal.util.ExceptionUtil.rethrow; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.Collectors.toList; @@ -270,10 +271,13 @@ protected NioNetworking initNetworking() { .threadNamePrefix(client.getName()) .errorHandler(new ClientConnectionChannelErrorHandler()) .inputThreadCount(inputThreads) + .inputThreadAffinity(newSystemThreadAffinity("hazelcast.client.io.input.thread.affinity")) .outputThreadCount(outputThreads) + .outputThreadAffinity(newSystemThreadAffinity("hazelcast.client.io.output.thread.affinity")) .balancerIntervalSeconds(properties.getInteger(IO_BALANCER_INTERVAL_SECONDS)) .writeThroughEnabled(properties.getBoolean(IO_WRITE_THROUGH_ENABLED)) - .concurrencyDetection(client.getConcurrencyDetection())); + .concurrencyDetection(client.getConcurrencyDetection()) + ); } private WaitStrategy initializeWaitStrategy(ClientConfig clientConfig) { diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientResponseHandlerSupplier.java b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientResponseHandlerSupplier.java index a2a2be686241..5e24dbaef3ae 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientResponseHandlerSupplier.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/spi/impl/ClientResponseHandlerSupplier.java @@ -20,11 +20,13 @@ import com.hazelcast.client.impl.protocol.ClientMessage; import com.hazelcast.client.impl.spi.impl.listener.ClientListenerServiceImpl; import com.hazelcast.internal.util.ConcurrencyDetection; +import com.hazelcast.internal.util.ThreadAffinity; +import com.hazelcast.internal.util.MutableInteger; import com.hazelcast.internal.util.concurrent.MPSCQueue; +import com.hazelcast.internal.util.executor.HazelcastManagedThread; import com.hazelcast.logging.ILogger; import com.hazelcast.spi.properties.HazelcastProperties; import com.hazelcast.spi.properties.HazelcastProperty; -import com.hazelcast.internal.util.MutableInteger; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.concurrent.BlockingQueue; @@ -36,8 +38,9 @@ import static com.hazelcast.client.properties.ClientProperty.RESPONSE_THREAD_COUNT; import static com.hazelcast.client.properties.ClientProperty.RESPONSE_THREAD_DYNAMIC; import static com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher.onOutOfMemory; -import static com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier.getIdleStrategy; +import static com.hazelcast.internal.util.ThreadAffinity.newSystemThreadAffinity; import static com.hazelcast.internal.util.HashUtil.hashToIndex; +import static com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier.getIdleStrategy; /** * A {@link Supplier} for {@link Supplier} instance that processes responses for client @@ -75,6 +78,7 @@ protected MutableInteger initialValue() { private final Consumer responseHandler; private final boolean responseThreadsDynamic; private final ConcurrencyDetection concurrencyDetection; + private final ThreadAffinity threadAffinity = newSystemThreadAffinity("hazelcast.client.response.thread.affinity"); public ClientResponseHandlerSupplier(ClientInvocationServiceImpl invocationService, ConcurrencyDetection concurrencyDetection) { @@ -85,6 +89,10 @@ public ClientResponseHandlerSupplier(ClientInvocationServiceImpl invocationServi HazelcastProperties properties = client.getProperties(); int responseThreadCount = properties.getInteger(RESPONSE_THREAD_COUNT); + if (threadAffinity.isEnabled()) { + responseThreadCount = threadAffinity.getThreadCount(); + } + if (responseThreadCount < 0) { throw new IllegalArgumentException(RESPONSE_THREAD_COUNT.getName() + " can't be smaller than 0"); } @@ -93,6 +101,7 @@ public ClientResponseHandlerSupplier(ClientInvocationServiceImpl invocationServi this.responseThreads = new ResponseThread[responseThreadCount]; for (int k = 0; k < responseThreads.length; k++) { responseThreads[k] = new ResponseThread(invocationService.client.getName() + ".responsethread-" + k + "-"); + responseThreads[k].setThreadAffinity(threadAffinity); } if (responseThreadCount == 0) { @@ -167,7 +176,7 @@ private ResponseThread nextResponseThread() { } } - private class ResponseThread extends Thread { + private class ResponseThread extends HazelcastManagedThread { private final BlockingQueue responseQueue; private final AtomicBoolean started = new AtomicBoolean(); @@ -179,7 +188,7 @@ private class ResponseThread extends Thread { } @Override - public void run() { + public void executeRun() { try { doRun(); } catch (OutOfMemoryError e) { diff --git a/hazelcast/src/main/java/com/hazelcast/instance/impl/DefaultNodeContext.java b/hazelcast/src/main/java/com/hazelcast/instance/impl/DefaultNodeContext.java index 3e40a81e4292..7cfd4cf126d1 100644 --- a/hazelcast/src/main/java/com/hazelcast/instance/impl/DefaultNodeContext.java +++ b/hazelcast/src/main/java/com/hazelcast/instance/impl/DefaultNodeContext.java @@ -41,6 +41,7 @@ import java.util.Properties; import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig; +import static com.hazelcast.internal.util.ThreadAffinity.newSystemThreadAffinity; import static com.hazelcast.spi.properties.ClusterProperty.IO_BALANCER_INTERVAL_SECONDS; import static com.hazelcast.spi.properties.ClusterProperty.IO_INPUT_THREAD_COUNT; import static com.hazelcast.spi.properties.ClusterProperty.IO_OUTPUT_THREAD_COUNT; @@ -169,9 +170,12 @@ private Networking createNetworking(Node node) { .threadNamePrefix(node.hazelcastInstance.getName()) .errorHandler(errorHandler) .inputThreadCount(props.getInteger(IO_INPUT_THREAD_COUNT)) + .inputThreadAffinity(newSystemThreadAffinity("hazelcast.io.input.thread.affinity")) .outputThreadCount(props.getInteger(IO_OUTPUT_THREAD_COUNT)) + .outputThreadAffinity(newSystemThreadAffinity("hazelcast.io.output.thread.affinity")) .balancerIntervalSeconds(props.getInteger(IO_BALANCER_INTERVAL_SECONDS)) .writeThroughEnabled(props.getBoolean(IO_WRITE_THROUGH_ENABLED)) - .concurrencyDetection(node.nodeEngine.getConcurrencyDetection())); + .concurrencyDetection(node.nodeEngine.getConcurrencyDetection()) + ); } } diff --git a/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/NioNetworking.java b/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/NioNetworking.java index 1608856e38c7..f0eef6ba858e 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/NioNetworking.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/NioNetworking.java @@ -31,6 +31,7 @@ import com.hazelcast.internal.networking.OutboundHandler; import com.hazelcast.internal.networking.nio.iobalancer.IOBalancer; import com.hazelcast.internal.util.ConcurrencyDetection; +import com.hazelcast.internal.util.ThreadAffinity; import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy; import com.hazelcast.logging.ILogger; import com.hazelcast.logging.LoggingService; @@ -115,14 +116,15 @@ public final class NioNetworking implements Networking, DynamicMetricsProvider { private final BackoffIdleStrategy idleStrategy; private final boolean selectorWorkaroundTest; private final boolean selectionKeyWakeupEnabled; + private final ThreadAffinity outputThreadAffinity; private volatile ExecutorService closeListenerExecutor; private final ConcurrencyDetection concurrencyDetection; private final boolean writeThroughEnabled; + private final ThreadAffinity inputThreadAffinity; private volatile IOBalancer ioBalancer; private volatile NioThread[] inputThreads; private volatile NioThread[] outputThreads; private volatile ScheduledFuture publishFuture; - // Currently this is a coarse grained aggregation of the bytes/send received. // In the future you probably want to split this up in member and client and potentially // wan specific. @@ -143,6 +145,8 @@ public NioNetworking(Context ctx) { this.outputThreadCount = ctx.outputThreadCount; this.logger = loggingService.getLogger(NioNetworking.class); this.errorHandler = ctx.errorHandler; + this.inputThreadAffinity = ctx.inputThreadAffinity; + this.outputThreadAffinity = ctx.outputThreadAffinity; this.balancerIntervalSeconds = ctx.balancerIntervalSeconds; this.selectorMode = ctx.selectorMode; this.selectorWorkaroundTest = ctx.selectorWorkaroundTest; @@ -216,6 +220,7 @@ public void restart() { idleStrategy); thread.id = i; thread.setSelectorWorkaroundTest(selectorWorkaroundTest); + thread.setThreadAffinity(inputThreadAffinity); inThreads[i] = thread; thread.start(); } @@ -231,6 +236,7 @@ public void restart() { idleStrategy); thread.id = i; thread.setSelectorWorkaroundTest(selectorWorkaroundTest); + thread.setThreadAffinity(outputThreadAffinity); outThreads[i] = thread; thread.start(); } @@ -476,6 +482,9 @@ public static class Context { private int inputThreadCount = 1; private int outputThreadCount = 1; private int balancerIntervalSeconds; + private ThreadAffinity inputThreadAffinity = ThreadAffinity.DISABLED; + private ThreadAffinity outputThreadAffinity = ThreadAffinity.DISABLED; + // The selector mode determines how IO threads will block (or not) on the Selector: // select: this is the default mode, uses Selector.select(long timeout) // selectnow: use Selector.selectNow() @@ -547,15 +556,39 @@ public Context errorHandler(ChannelErrorHandler errorHandler) { } public Context inputThreadCount(int inputThreadCount) { + if (inputThreadAffinity.isEnabled()) { + return this; + } this.inputThreadCount = inputThreadCount; return this; } public Context outputThreadCount(int outputThreadCount) { + if (outputThreadAffinity.isEnabled()) { + return this; + } this.outputThreadCount = outputThreadCount; return this; } + public Context inputThreadAffinity(ThreadAffinity inputThreadAffinity) { + this.inputThreadAffinity = inputThreadAffinity; + + if (inputThreadAffinity.isEnabled()) { + inputThreadCount = inputThreadAffinity.getThreadCount(); + } + return this; + } + + public Context outputThreadAffinity(ThreadAffinity outputThreadAffinity) { + this.outputThreadAffinity = outputThreadAffinity; + + if (outputThreadAffinity.isEnabled()) { + outputThreadCount = outputThreadAffinity.getThreadCount(); + } + return this; + } + public Context balancerIntervalSeconds(int balancerIntervalSeconds) { this.balancerIntervalSeconds = balancerIntervalSeconds; return this; diff --git a/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/NioThread.java b/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/NioThread.java index 0b39ab556335..76f3e43d9f7f 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/NioThread.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/NioThread.java @@ -21,6 +21,7 @@ import com.hazelcast.internal.networking.ChannelErrorHandler; import com.hazelcast.internal.util.concurrent.IdleStrategy; import com.hazelcast.internal.util.counters.SwCounter; +import com.hazelcast.internal.util.executor.HazelcastManagedThread; import com.hazelcast.logging.ILogger; import com.hazelcast.spi.impl.operationexecutor.OperationHostileThread; @@ -56,7 +57,7 @@ import static java.lang.System.currentTimeMillis; @ExcludedMetricTargets(MANAGEMENT_CENTER) -public class NioThread extends Thread implements OperationHostileThread { +public class NioThread extends HazelcastManagedThread implements OperationHostileThread { // WARNING: This value has significant effect on idle CPU usage! private static final int SELECT_WAIT_TIME_MILLIS @@ -226,7 +227,7 @@ public void addTaskAndWakeup(Runnable task) { } @Override - public void run() { + public void executeRun() { // This outer loop is a bit complex but it takes care of a lot of stuff: // * it calls runSelectNowLoop or runSelectLoop based on selectNow enabled or not. // * handles backoff and retrying in case if io exception is thrown diff --git a/hazelcast/src/main/java/com/hazelcast/internal/util/ThreadAffinity.java b/hazelcast/src/main/java/com/hazelcast/internal/util/ThreadAffinity.java new file mode 100644 index 000000000000..1ef9c9be1f1c --- /dev/null +++ b/hazelcast/src/main/java/com/hazelcast/internal/util/ThreadAffinity.java @@ -0,0 +1,328 @@ +/* + * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.internal.util; + +import com.hazelcast.logging.Logger; +import net.openhft.affinity.Affinity; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Contains the thread affinity logic for certain threads. + * + * Inside is a list of CPU bitmaps and using the {@link #nextAllowedCpus()} there is a round robin + * over the list of the CPU bitmaps. The reason for a round robin is that the same ThreadAffinity can + * be used when threads get stopped and new threads created. + * + * This class is threadsafe. + */ +public class ThreadAffinity { + public static final ThreadAffinity DISABLED = new ThreadAffinity(null); + + final List allowedCpusList; + final AtomicInteger threadIndex = new AtomicInteger(); + + public ThreadAffinity(String affinity) { + allowedCpusList = parse(affinity); + + if (allowedCpusList.isEmpty()) { + return; + } + + if (!isAffinityAvailable()) { + throw new RuntimeException("Can't use affinity '" + affinity + "'. Thread affinity support is not available."); + } + } + + /** + * Creates a new ThreadAffinity based on a system property. + * + * If no property is set, then affinity is disabled. + * + * @param property the name of the system property. + * @return the created ThreadAffinity. + * @throws InvalidAffinitySyntaxException if there is a problem with the value. + */ + public static ThreadAffinity newSystemThreadAffinity(String property) { + String value = System.getProperty(property); + try { + return new ThreadAffinity(value); + } catch (InvalidAffinitySyntaxException e) { + throw new InvalidAffinitySyntaxException("Invalid affinity syntax for System property '" + property + "'." + + " Value '" + value + "'. " + + " Errormessage '" + e.getMessage() + "'"); + } + } + + public int getThreadCount() { + return allowedCpusList.size(); + } + + public BitSet nextAllowedCpus() { + if (allowedCpusList.isEmpty()) { + return null; + } + + int index = threadIndex.getAndIncrement() % allowedCpusList.size(); + return allowedCpusList.get(index); + } + + public boolean isEnabled() { + return !allowedCpusList.isEmpty(); + } + + private static boolean isAffinityAvailable() { + try { + boolean jnaAvailable = Affinity.isJNAAvailable(); + if (!jnaAvailable) { + Logger.getLogger(ThreadAffinity.class).warning("jna is not available"); + } + return jnaAvailable; + } catch (NoClassDefFoundError e) { + Logger.getLogger(ThreadAffinity.class).warning("Affinity jar isn't available: " + e.getMessage()); + return false; + } + } + + static List parse(String affinity) { + List cpus = new ArrayList<>(); + if (affinity == null) { + return cpus; + } + + affinity = affinity.trim(); + if (affinity.isEmpty()) { + return cpus; + } + + List groups = new AffinityParser(affinity).parse(); + for (CpuGroup group : groups) { + BitSet allowedCpus = new BitSet(); + + for (Integer cpu : group.cpus) { + allowedCpus.set(cpu); + } + for (int k = 0; k < group.threadCount; k++) { + cpus.add(allowedCpus); + } + } + + return cpus; + } + + static class AffinityParser { + private final String string; + private final List groups = new ArrayList<>(); + private int index; + private int digit; + private int integer; + private int fromRange; + private int toRange; + + AffinityParser(String string) { + this.string = string; + } + + List parse() { + if (!expression() || index < string.length()) { + throw new InvalidAffinitySyntaxException("Syntax Error at " + index); + } + + // verification that we have no duplicate cpus. + BitSet usedCpus = new BitSet(); + for (CpuGroup group : groups) { + for (Integer cpu : group.cpus) { + if (usedCpus.get(cpu)) { + throw new InvalidAffinitySyntaxException("Duplicate CPU found, offending CPU=" + cpu); + } + usedCpus.set(cpu); + } + } + + return groups; + } + + boolean expression() { + if (!item()) { + return false; + } + + while (character(',')) { + if (!item()) { + return false; + } + } + + return true; + } + + boolean item() { + if (range()) { + for (int cpu = fromRange; cpu <= toRange; cpu++) { + CpuGroup group = new CpuGroup(); + group.cpus.add(cpu); + group.threadCount = 1; + groups.add(group); + } + return true; + } else { + return group(); + } + } + + boolean range() { + if (!integer()) { + return false; + } + fromRange = integer; + toRange = integer; + if (character('-')) { + if (!integer()) { + return false; + } + toRange = integer; + if (toRange < fromRange) { + error("ToRange can't smaller than fromRange, toRange=" + toRange + " fromRange=" + fromRange + "."); + } + } + + return true; + } + + private void error(String error) { + throw new InvalidAffinitySyntaxException(error + " at index:" + index); + } + + @SuppressWarnings("checkstyle:NPathComplexity") + boolean group() { + if (!character('[')) { + return false; + } + + if (!range()) { + return false; + } + + CpuGroup group = new CpuGroup(); + addCpuRangeToGroup(group); + + while (character(',')) { + if (!range()) { + return false; + } + + addCpuRangeToGroup(group); + } + + if (!character(']')) { + return false; + } + + if (character(':')) { + if (!integer()) { + return false; + } + group.threadCount = integer; + if (group.threadCount == 0) { + error("Thread count can't be 0."); + } else if (group.threadCount > group.cpus.size()) { + error("Thread count can't be larger than number of cpu's in the group. " + + "Thread count = " + group.threadCount + " cpus:" + group.cpus); + } + } else { + group.threadCount = group.cpus.size(); + } + + groups.add(group); + return true; + } + + private void addCpuRangeToGroup(CpuGroup group) { + for (int k = fromRange; k <= toRange; k++) { + group.cpus.add(k); + } + } + + @SuppressWarnings("checkstyle:magicnumber") + boolean integer() { + if (!digit()) { + return false; + } + + integer = digit; + for (; ; ) { + if (!digit()) { + return true; + } + integer = integer * 10; + integer += digit; + } + } + + boolean digit() { + if (index >= string.length()) { + return false; + } + + char c = string.charAt(index); + if (Character.isDigit(c)) { + index++; + digit = Character.getNumericValue(c); + return true; + } else { + return false; + } + } + + boolean character(char expected) { + if (index >= string.length()) { + return false; + } + + char c = string.charAt(index); + if (c == expected) { + index++; + return true; + } else { + return false; + } + } + } + + static class InvalidAffinitySyntaxException extends RuntimeException { + InvalidAffinitySyntaxException(String message) { + super(message); + } + } + + static class CpuGroup { + List cpus = new LinkedList<>(); + int threadCount = -1; + + @Override + public String toString() { + return "CpuGroup{" + + "cpus=" + cpus + + ", count=" + threadCount + + '}'; + } + } +} diff --git a/hazelcast/src/main/java/com/hazelcast/internal/util/executor/HazelcastManagedThread.java b/hazelcast/src/main/java/com/hazelcast/internal/util/executor/HazelcastManagedThread.java index ca2d3b8735d8..81f0255fd2e8 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/util/executor/HazelcastManagedThread.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/util/executor/HazelcastManagedThread.java @@ -17,6 +17,12 @@ package com.hazelcast.internal.util.executor; import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher; +import com.hazelcast.internal.util.ThreadAffinity; +import com.hazelcast.logging.ILogger; +import com.hazelcast.logging.Logger; +import net.openhft.affinity.Affinity; + +import java.util.BitSet; /** * Base class for all Hazelcast threads to manage them from a single point. @@ -27,6 +33,8 @@ */ public class HazelcastManagedThread extends Thread { + private BitSet allowedCpus; + public HazelcastManagedThread() { } @@ -42,6 +50,10 @@ public HazelcastManagedThread(Runnable target, String name) { super(target, name); } + public void setThreadAffinity(ThreadAffinity threadAffinity) { + this.allowedCpus = threadAffinity.nextAllowedCpus(); + } + @Override public void setContextClassLoader(ClassLoader cl) { // Set only if specified classloader is not empty, otherwise go one with current @@ -71,10 +83,20 @@ protected void afterRun() { } - /** - * Manages the thread lifecycle and can be overridden to customize if needed. - */ - public void run() { + @Override + public final void run() { + if (allowedCpus != null) { + Affinity.setAffinity(allowedCpus); + BitSet actualCpus = Affinity.getAffinity(); + ILogger logger = Logger.getLogger(HazelcastManagedThread.class); + if (!actualCpus.equals(allowedCpus)) { + logger.warning(getName() + " affinity was not applied successfully. " + + "Expected CPUs:" + allowedCpus + ". Actual CPUs:" + actualCpus); + } else { + logger.info(getName() + " has affinity for CPUs:" + allowedCpus); + } + } + try { beforeRun(); executeRun(); @@ -85,3 +107,4 @@ public void run() { } } } + diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/OperationExecutorImpl.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/OperationExecutorImpl.java index 42e253ce7117..285f0c86c95c 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/OperationExecutorImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/OperationExecutorImpl.java @@ -22,6 +22,7 @@ import com.hazelcast.internal.metrics.Probe; import com.hazelcast.internal.metrics.StaticMetricsProvider; import com.hazelcast.internal.nio.Packet; +import com.hazelcast.internal.util.ThreadAffinity; import com.hazelcast.internal.util.concurrent.IdleStrategy; import com.hazelcast.internal.util.concurrent.MPSCQueue; import com.hazelcast.logging.ILogger; @@ -56,6 +57,7 @@ import static com.hazelcast.internal.metrics.MetricDescriptorConstants.OPERATION_METRIC_EXECUTOR_RUNNING_PARTITION_COUNT; import static com.hazelcast.internal.metrics.MetricDescriptorConstants.OPERATION_PREFIX; import static com.hazelcast.internal.metrics.ProbeLevel.MANDATORY; +import static com.hazelcast.internal.util.ThreadAffinity.newSystemThreadAffinity; import static com.hazelcast.internal.util.Preconditions.checkNotNull; import static com.hazelcast.internal.util.ThreadUtil.createThreadPoolName; import static com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier.getIdleStrategy; @@ -91,7 +93,7 @@ public final class OperationExecutorImpl implements OperationExecutor, StaticMet private static final HazelcastProperty IDLE_STRATEGY = new HazelcastProperty("hazelcast.operation.partitionthread.idlestrategy", "block"); private static final int TERMINATION_TIMEOUT_SECONDS = 3; - + private final ThreadAffinity threadAffinity = newSystemThreadAffinity("hazelcast.operation.thread.affinity"); private final ILogger logger; // all operations for specific partitions will be executed on these threads, e.g. map.put(key, value) @@ -152,6 +154,9 @@ private PartitionOperationThread[] initPartitionThreads(HazelcastProperties prop NodeExtension nodeExtension, ClassLoader configClassLoader) { int threadCount = properties.getInteger(PARTITION_OPERATION_THREAD_COUNT); + if (threadAffinity.isEnabled()) { + threadCount = threadAffinity.getThreadCount(); + } IdleStrategy idleStrategy = getIdleStrategy(properties, IDLE_STRATEGY); PartitionOperationThread[] threads = new PartitionOperationThread[threadCount]; @@ -164,7 +169,7 @@ private PartitionOperationThread[] initPartitionThreads(HazelcastProperties prop PartitionOperationThread partitionThread = new PartitionOperationThread(threadName, threadId, operationQueue, logger, nodeExtension, partitionOperationRunners, configClassLoader); - + partitionThread.setThreadAffinity(threadAffinity); threads[threadId] = partitionThread; normalQueue.setConsumerThread(partitionThread); } diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/OperationThread.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/OperationThread.java index 6c8b33ef33d9..57fe9ba36a27 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/OperationThread.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/operationexecutor/impl/OperationThread.java @@ -109,7 +109,7 @@ public int getThreadId() { public abstract OperationRunner operationRunner(int partitionId); @Override - public final void run() { + public final void executeRun() { nodeExtension.onThreadStart(this); try { while (!shutdown) { diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/InboundResponseHandlerSupplier.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/InboundResponseHandlerSupplier.java index 4edd45460e4a..6f358957c555 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/InboundResponseHandlerSupplier.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/operationservice/impl/InboundResponseHandlerSupplier.java @@ -20,11 +20,13 @@ import com.hazelcast.internal.metrics.Probe; import com.hazelcast.internal.metrics.StaticMetricsProvider; import com.hazelcast.internal.nio.Packet; +import com.hazelcast.internal.util.ThreadAffinity; import com.hazelcast.internal.util.MutableInteger; import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy; import com.hazelcast.internal.util.concurrent.BusySpinIdleStrategy; import com.hazelcast.internal.util.concurrent.IdleStrategy; import com.hazelcast.internal.util.concurrent.MPSCQueue; +import com.hazelcast.internal.util.executor.HazelcastManagedThread; import com.hazelcast.logging.ILogger; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationexecutor.OperationHostileThread; @@ -45,6 +47,7 @@ import static com.hazelcast.internal.metrics.MetricDescriptorConstants.OPERATION_METRIC_INBOUND_RESPONSE_HANDLER_RESPONSE_QUEUE_SIZE; import static com.hazelcast.internal.metrics.MetricDescriptorConstants.OPERATION_PREFIX; import static com.hazelcast.internal.metrics.ProbeLevel.MANDATORY; +import static com.hazelcast.internal.util.ThreadAffinity.newSystemThreadAffinity; import static com.hazelcast.internal.util.EmptyStatement.ignore; import static com.hazelcast.internal.util.HashUtil.hashToIndex; import static com.hazelcast.internal.util.ThreadUtil.createThreadName; @@ -98,7 +101,9 @@ public class InboundResponseHandlerSupplier implements StaticMetricsProvider, Su private final NodeEngine nodeEngine; private final InvocationRegistry invocationRegistry; private final HazelcastProperties properties; + private final ThreadAffinity threadAffinity = newSystemThreadAffinity("hazelcast.operation.response.thread.affinity"); + @SuppressWarnings("checkstyle:executablestatementcount") InboundResponseHandlerSupplier(ClassLoader classLoader, InvocationRegistry invocationRegistry, String hzName, @@ -108,6 +113,9 @@ public class InboundResponseHandlerSupplier implements StaticMetricsProvider, Su this.logger = nodeEngine.getLogger(InboundResponseHandlerSupplier.class); this.properties = nodeEngine.getProperties(); int responseThreadCount = properties.getInteger(RESPONSE_THREAD_COUNT); + if (threadAffinity.isEnabled()) { + responseThreadCount = threadAffinity.getThreadCount(); + } if (responseThreadCount < 0) { throw new IllegalArgumentException(RESPONSE_THREAD_COUNT.getName() + " can't be smaller than 0"); } @@ -257,7 +265,7 @@ public void accept(Packet packet) { * The ResponseThread needs to implement the OperationHostileThread interface to make sure that the OperationExecutor * is not going to schedule any operations on this task due to retry. */ - private final class ResponseThread extends Thread implements OperationHostileThread { + private final class ResponseThread extends HazelcastManagedThread implements OperationHostileThread { private final BlockingQueue responseQueue; private final InboundResponseHandler inboundResponseHandler; @@ -267,10 +275,11 @@ private ResponseThread(String hzName, int threadIndex) { super(createThreadName(hzName, "response-" + threadIndex)); this.inboundResponseHandler = new InboundResponseHandler(invocationRegistry, nodeEngine); this.responseQueue = new MPSCQueue<>(this, getIdleStrategy(properties, IDLE_STRATEGY)); + this.setThreadAffinity(threadAffinity); } @Override - public void run() { + public void executeRun() { try { doRun(); } catch (InterruptedException e) { diff --git a/hazelcast/src/test/java/com/hazelcast/internal/util/ThreadAffinityTest.java b/hazelcast/src/test/java/com/hazelcast/internal/util/ThreadAffinityTest.java new file mode 100644 index 000000000000..84841de1c2d1 --- /dev/null +++ b/hazelcast/src/test/java/com/hazelcast/internal/util/ThreadAffinityTest.java @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hazelcast.internal.util; + +import com.hazelcast.test.HazelcastParallelClassRunner; +import com.hazelcast.test.annotation.ParallelJVMTest; +import com.hazelcast.test.annotation.QuickTest; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.BitSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@RunWith(HazelcastParallelClassRunner.class) +@Category({QuickTest.class, ParallelJVMTest.class}) +public class ThreadAffinityTest { + + @Test + public void whenNull() { + ThreadAffinity threadAffinity = new ThreadAffinity(null); + + assertFalse(threadAffinity.isEnabled()); + assertNull(threadAffinity.nextAllowedCpus()); + } + + @Test + public void whenEmptyString() { + ThreadAffinity threadAffinity = new ThreadAffinity(""); + + assertFalse(threadAffinity.isEnabled()); + assertNull(threadAffinity.nextAllowedCpus()); + } + + @Test(expected = ThreadAffinity.InvalidAffinitySyntaxException.class) + public void whenSyntaxError() { + new ThreadAffinity("abc"); + } + + + @Test(expected = ThreadAffinity.InvalidAffinitySyntaxException.class) + public void whenTrailingComma() { + new ThreadAffinity("10,"); + } + + @Test(expected = ThreadAffinity.InvalidAffinitySyntaxException.class) + public void whenTrailingText() { + new ThreadAffinity("1a"); + } + + @Test + public void whenIndividualCores() { + ThreadAffinity threadAffinity = new ThreadAffinity("1,3,4,8"); + + assertTrue(threadAffinity.isEnabled()); + assertEquals(4, threadAffinity.allowedCpusList.size()); + assertEquals(threadAffinity.allowedCpusList.get(0), newBitset(1)); + assertEquals(threadAffinity.allowedCpusList.get(1), newBitset(3)); + assertEquals(threadAffinity.allowedCpusList.get(2), newBitset(4)); + assertEquals(threadAffinity.allowedCpusList.get(3), newBitset(8)); + } + + @Test(expected = ThreadAffinity.InvalidAffinitySyntaxException.class) + public void whenIndividualCoresAndNegative() { + new ThreadAffinity("-10"); + } + + @Test + public void whenRange() { + ThreadAffinity threadAffinity = new ThreadAffinity("1-4"); + + assertTrue(threadAffinity.isEnabled()); + assertEquals(4, threadAffinity.allowedCpusList.size()); + assertEquals(threadAffinity.allowedCpusList.get(0), newBitset(1)); + assertEquals(threadAffinity.allowedCpusList.get(1), newBitset(2)); + assertEquals(threadAffinity.allowedCpusList.get(2), newBitset(3)); + assertEquals(threadAffinity.allowedCpusList.get(3), newBitset(4)); + } + + @Test + public void whenGroup() { + ThreadAffinity threadAffinity = new ThreadAffinity("[1-5]"); + + assertTrue(threadAffinity.isEnabled()); + assertEquals(5, threadAffinity.allowedCpusList.size()); + assertEquals(threadAffinity.allowedCpusList.get(0), newBitset(1, 2, 3, 4, 5)); + assertEquals(threadAffinity.allowedCpusList.get(1), newBitset(1, 2, 3, 4, 5)); + assertEquals(threadAffinity.allowedCpusList.get(2), newBitset(1, 2, 3, 4, 5)); + assertEquals(threadAffinity.allowedCpusList.get(3), newBitset(1, 2, 3, 4, 5)); + assertEquals(threadAffinity.allowedCpusList.get(4), newBitset(1, 2, 3, 4, 5)); + } + + @Test + public void whenGroupAndThreadCount() { + ThreadAffinity threadAffinity = new ThreadAffinity("[1-5]:2"); + + assertTrue(threadAffinity.isEnabled()); + assertEquals(2, threadAffinity.allowedCpusList.size()); + assertEquals(threadAffinity.allowedCpusList.get(0), newBitset(1, 2, 3, 4, 5)); + assertEquals(threadAffinity.allowedCpusList.get(1), newBitset(1, 2, 3, 4, 5)); + } + + @Test(expected = ThreadAffinity.InvalidAffinitySyntaxException.class) + public void whenGroupAndNegativeThreadCount() { + new ThreadAffinity("[1-10]:-2"); + } + + @Test(expected = ThreadAffinity.InvalidAffinitySyntaxException.class) + public void whenGroupAndThreadCountZero() { + new ThreadAffinity("[1-5]:0"); + } + + @Test(expected = ThreadAffinity.InvalidAffinitySyntaxException.class) + public void whenGroupAndThreadTooLarge() { + new ThreadAffinity("[1,2]:3"); + } + + @Test(expected = ThreadAffinity.InvalidAffinitySyntaxException.class) + public void whenInvalidRange() { + new ThreadAffinity("4-3"); + } + + @Test(expected = ThreadAffinity.InvalidAffinitySyntaxException.class) + public void whenDuplicate() { + new ThreadAffinity("1,1"); + } + + @Test + public void whenComplex() { + ThreadAffinity threadAffinity = new ThreadAffinity("1,3-4,[5-8]:2,10,[20,21,32]:2"); + + assertTrue(threadAffinity.isEnabled()); + assertEquals(8, threadAffinity.allowedCpusList.size()); + assertEquals(threadAffinity.allowedCpusList.get(0), newBitset(1)); + assertEquals(threadAffinity.allowedCpusList.get(1), newBitset(3)); + assertEquals(threadAffinity.allowedCpusList.get(2), newBitset(4)); + assertEquals(threadAffinity.allowedCpusList.get(3), newBitset(5, 6, 7, 8)); + assertEquals(threadAffinity.allowedCpusList.get(4), newBitset(5, 6, 7, 8)); + assertEquals(threadAffinity.allowedCpusList.get(5), newBitset(10)); + assertEquals(threadAffinity.allowedCpusList.get(6), newBitset(20, 21, 32)); + assertEquals(threadAffinity.allowedCpusList.get(7), newBitset(20, 21, 32)); + } + + @NotNull + public BitSet newBitset(int... cpus) { + BitSet bitSet = new BitSet(); + for (int c : cpus) { + bitSet.set(c); + } + return bitSet; + } +} +