Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-9422 Remove Worker Threads property #6773

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions documentation/src/main/asciidoc/topics/upgrading.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
Total Order transaction protocol is going to be removed in a future release.
Use the default protocol (2PC).

=== Removed the infinispan.server.hotrod.workerThreads system property

The `infinispan.server.hotrod.workerThreads` property was introduced as a hack to work around the fact that the configuration did not expose it.
The property has been removed and endpoint worker threads must now be exclusively configured using the `worker-threads` attribute.

=== Removed AtomicMap and FineGrainedAtomicMap

AtomicMapLookup, AtomicMap and FineGrainedAtomicMap have been removed. Please see FunctionalMaps or Cache#Merge for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ protected void startTransport() {
public ThreadPoolExecutor getExecutor() {
if (this.executor == null || this.executor.isShutdown()) {
DefaultThreadFactory factory = new DefaultThreadFactory(getQualifiedName() + "-ServerHandler");
int workerThreads = getWorkerThreads();
int workerThreads = configuration.workerThreads();
this.executor = new ThreadPoolExecutor(
workerThreads,
workerThreads,
Expand Down Expand Up @@ -236,10 +236,4 @@ public boolean isTransportEnabled() {
public NettyTransport getTransport() {
return transport;
}

/**
* @deprecated Use the {@link #getExecutor()} to obtain information about the worker executor instead
*/
@Deprecated
public abstract int getWorkerThreads();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ public abstract class ProtocolServerConfiguration {
public static final AttributeDefinition<Boolean> TCP_NODELAY = AttributeDefinition.builder("tcp-nodelay", true).immutable().build();
public static final AttributeDefinition<Boolean> TCP_KEEPALIVE = AttributeDefinition.builder("tcp-keepalive", false).immutable().build();
public static final AttributeDefinition<Integer> IO_THREADS = AttributeDefinition.builder("io-threads", 2 * ProcessorInfo.availableProcessors()).immutable().build();
public static final AttributeDefinition<Integer> WORKER_THREADS = AttributeDefinition.builder("worker-threads", 160).immutable().build();
public static final AttributeDefinition<AdminOperationsHandler> ADMIN_OPERATION_HANDLER = AttributeDefinition.builder("admin-operation-handler", null, AdminOperationsHandler.class).immutable().build();
public static final AttributeDefinition<Boolean> ZERO_CAPACITY_NODE = AttributeDefinition.builder("zero-capacity-node", false).immutable().build();

// The default value can be overridden so it is the responsibility of each protocol to add it to the set
public static final AttributeDefinition<Integer> WORKER_THREADS = AttributeDefinition.builder("worker-threads", 1).immutable().build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the attribute definition in the base class at all? It's not very clear why the Memcached and REST servers should have only 1 worker thread, if each had its own definition it could also have a comment explaining the choice.


public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(ProtocolServerConfiguration.class,
DEFAULT_CACHE_NAME, NAME, HOST, PORT, IDLE_TIMEOUT, IGNORED_CACHES, RECV_BUF_SIZE, SEND_BUF_SIZE, START_TRANSPORT, TCP_NODELAY, TCP_KEEPALIVE, IO_THREADS, WORKER_THREADS, ADMIN_OPERATION_HANDLER, ZERO_CAPACITY_NODE);
DEFAULT_CACHE_NAME, NAME, HOST, PORT, IDLE_TIMEOUT, IGNORED_CACHES, RECV_BUF_SIZE, SEND_BUF_SIZE, START_TRANSPORT, TCP_NODELAY, TCP_KEEPALIVE, IO_THREADS, ADMIN_OPERATION_HANDLER, ZERO_CAPACITY_NODE);
}

private final Attribute<String> defaultCacheName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ public ProtocolServerConfiguration getConfiguration() {
public ChannelInitializer<Channel> getInitializer() {
return null;
}

@Override
public int getWorkerThreads() {
return configuration.workerThreads();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

public class MockServerConfiguration extends ProtocolServerConfiguration {

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(MockServerConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(), WORKER_THREADS);
}

protected MockServerConfiguration(AttributeSet attributes, SslConfiguration ssl) {
super(attributes, ssl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public class MockServerConfigurationBuilder extends ProtocolServerConfigurationBuilder<MockServerConfiguration, MockServerConfigurationBuilder> {

public MockServerConfigurationBuilder() {
super(12345);
super(12345, MockServerConfiguration.attributeDefinitionSet());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,11 +582,6 @@ private static long toMillis(long duration, TimeUnitValue unit) {
}
}

@Override
public int getWorkerThreads() {
return Integer.getInteger("infinispan.server.hotrod.workerThreads", configuration.workerThreads());
}

public String toString() {
return "HotRodServer[" +
"configuration=" + configuration +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class HotRodServerConfiguration extends ProtocolServerConfiguration {
public static final AttributeDefinition<Long> TOPOLOGY_REPL_TIMEOUT = AttributeDefinition.builder("topology-repl-timeout", 10000L).immutable().build();
public static final AttributeDefinition<Boolean> TOPOLOGY_STATE_TRANSFER = AttributeDefinition.builder("topology-state-transfer", true).immutable().build();

// The Hot Rod server has a different default
public static final AttributeDefinition<Integer> WORKER_THREADS = AttributeDefinition.builder("worker-threads", 160).immutable().build();

private final Attribute<String> proxyHost;
private final Attribute<Integer> proxyPort;
private final Attribute<Long> topologyLockTimeout;
Expand All @@ -31,7 +34,7 @@ public class HotRodServerConfiguration extends ProtocolServerConfiguration {

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(HotRodServerConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(),
PROXY_HOST, PROXY_PORT, TOPOLOGY_STATE_TRANSFER, TOPOLOGY_AWAIT_INITIAL_TRANSFER, TOPOLOGY_LOCK_TIMEOUT, TOPOLOGY_REPL_TIMEOUT);
WORKER_THREADS, PROXY_HOST, PROXY_PORT, TOPOLOGY_STATE_TRANSFER, TOPOLOGY_AWAIT_INITIAL_TRANSFER, TOPOLOGY_LOCK_TIMEOUT, TOPOLOGY_REPL_TIMEOUT);
}

HotRodServerConfiguration(AttributeSet attributes, SslConfiguration ssl, AuthenticationConfiguration authentication) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,14 @@ private void parseHotRodConnector(XMLExtendedStreamReader reader, PathAddress su
ParseUtils.requireNoNamespaceAttribute(reader, i);
String value = reader.getAttributeValue(i);
Attribute attribute = Attribute.forName(reader.getAttributeLocalName(i));
name = parseConnectorAttributes(reader, connector, name, i, value, attribute);
switch (attribute) {
case WORKER_THREADS:
HotRodConnectorResource.WORKER_THREADS.parseAndSetParameter(value, connector, reader);
break;
default:
name = parseConnectorAttributes(reader, connector, name, i, value, attribute);
break;
}
}

PathAddress connectorAddress = subsystemAddress.append(PathElement.pathElement(ModelKeys.HOTROD_CONNECTOR, name));
Expand Down Expand Up @@ -151,9 +158,9 @@ private void parseMemcachedConnector(XMLExtendedStreamReader reader, PathAddress
case CACHE:
MemcachedConnectorResource.CACHE.parseAndSetParameter(value, connector, reader);
break;
case CLIENT_ENCODING:
MemcachedConnectorResource.CLIENT_ENCODING.parseAndSetParameter(value, connector, reader);
break;
case CLIENT_ENCODING:
MemcachedConnectorResource.CLIENT_ENCODING.parseAndSetParameter(value, connector, reader);
break;
default:
name = parseConnectorAttributes(reader, connector, name, i, value, attribute);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ private void writeHotRodConnector(final XMLExtendedStreamWriter writer, final Mo
writer.writeStartElement(Element.HOTROD_CONNECTOR.getLocalName());
writeCommonConnector(writer, connector);
writeProtocolServerConnector(writer, connector);
for (SimpleAttributeDefinition attribute : HotRodConnectorResource.HOTROD_ATTRIBUTES) {
attribute.marshallAsAttribute(connector, true, writer);
}
writeTopologyStateTransfer(writer, connector);
writeAuthentication(writer, connector);
writeEncryption(writer, connector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@
*/
package org.infinispan.server.endpoint.subsystem;

import org.infinispan.server.hotrod.configuration.HotRodServerConfiguration;
import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.OperationStepHandler;
import org.jboss.as.controller.PathElement;
import org.jboss.as.controller.ReloadRequiredWriteAttributeHandler;
import org.jboss.as.controller.ResourceDefinition;
import org.jboss.as.controller.SimpleAttributeDefinition;
import org.jboss.as.controller.SimpleAttributeDefinitionBuilder;
import org.jboss.as.controller.registry.ManagementResourceRegistration;
import org.jboss.dmr.ModelNode;
import org.jboss.dmr.ModelType;

/**
* HotRodConnectorResource.
Expand All @@ -32,6 +40,16 @@ public class HotRodConnectorResource extends ProtocolServerConnectorResource imp

public static final PathElement HOTROD_CONNECTOR_PATH = PathElement.pathElement(ModelKeys.HOTROD_CONNECTOR);

static final SimpleAttributeDefinition WORKER_THREADS =
new SimpleAttributeDefinitionBuilder(ModelKeys.WORKER_THREADS, ModelType.INT, true)
.setAllowExpression(true)
.setXmlName(ModelKeys.WORKER_THREADS)
.setDefaultValue(new ModelNode().set(HotRodServerConfiguration.WORKER_THREADS.getDefaultValue()))
.setRestartAllServices()
.build();

static final SimpleAttributeDefinition[] HOTROD_ATTRIBUTES = {WORKER_THREADS};

public HotRodConnectorResource(boolean isRuntimeRegistration) {
super(HOTROD_CONNECTOR_PATH, EndpointExtension.getResourceDescriptionResolver(ModelKeys.HOTROD_CONNECTOR), HotRodSubsystemAdd.INSTANCE, HotRodSubsystemRemove.INSTANCE, isRuntimeRegistration);
}
Expand All @@ -47,6 +65,11 @@ public void registerChildren(ManagementResourceRegistration resourceRegistration
public void registerAttributes(ManagementResourceRegistration resourceRegistration) {
super.registerAttributes(resourceRegistration);

final OperationStepHandler writeHandler = new ReloadRequiredWriteAttributeHandler(HOTROD_ATTRIBUTES);
for (AttributeDefinition attr : HOTROD_ATTRIBUTES) {
resourceRegistration.registerReadWriteAttribute(attr, null, writeHandler);
}

if (isRuntimeRegistration()) {
ProtocolServerMetricsHandler.registerMetrics(resourceRegistration, "hotrod");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ private static void populate(ModelNode source, ModelNode target) throws Operatio
for(AttributeDefinition attr : ProtocolServerConnectorResource.PROTOCOL_SERVICE_ATTRIBUTES) {
attr.validateAndSet(source, target);
}
for(AttributeDefinition attr : HotRodConnectorResource.HOTROD_ATTRIBUTES) {
attr.validateAndSet(source, target);
}
}

@Override
Expand All @@ -65,6 +68,7 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod
ModelNode config = Resource.Tools.readModel(context.readResource(PathAddress.EMPTY_ADDRESS));
// Create the builder
HotRodServerConfigurationBuilder configurationBuilder = new HotRodServerConfigurationBuilder();
configurationBuilder.workerThreads(HotRodConnectorResource.WORKER_THREADS.resolveModelAttribute(context, config).asInt());
configureProtocolServer(context, configurationBuilder, config);
configureProtocolServerTopology(context, configurationBuilder, config);
// Create the service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class MemcachedConnectorResource extends ProtocolServerConnectorResource
.build();


static final SimpleAttributeDefinition[] MEMCACHED_CONNECTOR_ATTRIBUTES = {CACHE, CLIENT_ENCODING};
static final SimpleAttributeDefinition[] MEMCACHED_CONNECTOR_ATTRIBUTES = { ProtocolServerConnectorResource.WORKER_THREADS, CACHE, CLIENT_ENCODING};

public MemcachedConnectorResource(boolean isRuntimeRegistration) {
super(MEMCACHED_CONNECTOR_PATH, EndpointExtension.getResourceDescriptionResolver(ModelKeys.MEMCACHED_CONNECTOR), MemcachedSubsystemAdd.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public class ProtocolServerConnectorResource extends CommonConnectorResource {
.build();

static final SimpleAttributeDefinition[] PROTOCOL_SERVICE_ATTRIBUTES = {
SOCKET_BINDING, IDLE_TIMEOUT, TCP_NODELAY, TCP_KEEPALIVE, RECEIVE_BUFFER_SIZE, SEND_BUFFER_SIZE, IO_THREADS, WORKER_THREADS
SOCKET_BINDING, IDLE_TIMEOUT, TCP_NODELAY, TCP_KEEPALIVE, RECEIVE_BUFFER_SIZE, SEND_BUFFER_SIZE, IO_THREADS
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ protected void configureProtocolServer(OperationContext context, ProtocolServerC
builder.name(config.get(ModelKeys.NAME).asString());
}
builder.ioThreads(ProtocolServerConnectorResource.IO_THREADS.resolveModelAttribute(context, config).asInt());
builder.workerThreads(ProtocolServerConnectorResource.WORKER_THREADS.resolveModelAttribute(context, config).asInt());
builder.idleTimeout(ProtocolServerConnectorResource.IDLE_TIMEOUT.resolveModelAttribute(context, config).asInt());
builder.tcpNoDelay(ProtocolServerConnectorResource.TCP_NODELAY.resolveModelAttribute(context, config).asBoolean());
builder.tcpKeepAlive(ProtocolServerConnectorResource.TCP_KEEPALIVE.resolveModelAttribute(context, config).asBoolean());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ rest-connector.socket-binding=The socket binding to use for this connector
rest-connector.extended-headers=Allow retrieval of extended information about entries (NEVER, ON_DEMAND)
rest-connector.max-content-length=Maximum allowed content length
rest-connector.compression-level=Compression level when using compressed requests and responses
rest-connector.worker-threads=The number of worker threads to use for this REST connector
topology-state-transfer=Configuration related to topology state transfer
topology-state-transfer.add=Adds the topology state transfer configuration to the connector
topology-state-transfer.remove=Removes the topology state transfer configuration from the connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ public class EndpointSubsystemTestCase extends ClusteringSubsystemTest {
private final int expectedOperationCount;
private final String[] templates;

public EndpointSubsystemTestCase(Path xmlPath, Properties properties) {
public EndpointSubsystemTestCase(Path xmlPath, Properties properties, String name) {
super(Constants.SUBSYSTEM_NAME, new EndpointExtension(), xmlPath.getFileName().toString());
this.expectedOperationCount = Integer.parseInt(properties.getProperty("expected.operations.count"));
this.xsdPath = properties.getProperty("xsd.path");
this.templates = null;
}

@Parameters
@Parameters(name = "{2}")
public static Collection<Object[]> data() throws Exception {
URL configDir = Thread.currentThread().getContextClassLoader().getResource("org/infinispan/server/endpoint");
List<Path> paths = Files.list(Paths.get(configDir.toURI()))
Expand All @@ -93,7 +93,7 @@ public static Collection<Object[]> data() throws Exception {
try (Reader r = new FileReader(propsPath)) {
properties.load(r);
}
data.add(new Object[]{xmlPath, properties});
data.add(new Object[]{xmlPath, properties, xmlPath.getFileName().toString()});
}
// Ensure that we contain the current schema version at the very least
assertTrue("Could not find a '" + currentSchema + "' configuration file", hasCurrentSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,10 @@ public void stop() {
scheduler.shutdown();
}

@Override
public int getWorkerThreads() {
// Unused for now, so just return the smallest possible valid value
return 1;
}

/**
* Returns the cache being used by the Memcached server
*/
public Cache<byte[], byte[]> getCache() {
return memcachedCache;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class MemcachedServerConfiguration extends ProtocolServerConfiguration {
private final Attribute<MediaType> clientEncoding;

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(MemcachedServerConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(), CLIENT_ENCODING);
return new AttributeSet(MemcachedServerConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(), WORKER_THREADS, CLIENT_ENCODING);
}

MemcachedServerConfiguration(AttributeSet attributes, SslConfiguration ssl) {
Expand Down
5 changes: 0 additions & 5 deletions server/rest/src/main/java/org/infinispan/rest/RestServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,4 @@ protected void startInternal(RestServerConfiguration configuration, EmbeddedCach

this.restDispatcher = new RestDispatcherImpl(resourceManager);
}

@Override
public int getWorkerThreads() {
return Integer.getInteger("infinispan.server.rest.workerThreads", configuration.workerThreads());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class RestServerConfiguration extends ProtocolServerConfiguration {

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(RestServerConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(),
EXTENDED_HEADERS, CONTEXT_PATH, MAX_CONTENT_LENGTH, CORS_RULES, COMPRESSION_LEVEL);
WORKER_THREADS, EXTENDED_HEADERS, CONTEXT_PATH, MAX_CONTENT_LENGTH, CORS_RULES, COMPRESSION_LEVEL);
}

RestServerConfiguration(AttributeSet attributes, SslConfiguration ssl, AuthenticationConfiguration authentication) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
@ConfigurationFor(SinglePortEndpointRouter.class)
public class SinglePortRouterConfiguration extends ProtocolServerConfiguration {

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(SinglePortRouterConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(), WORKER_THREADS);
}

/**
* Creates new configuration based on the IP address and port.
*/
public SinglePortRouterConfiguration(AttributeSet attributes, SslConfiguration ssl) {
super(attributes, ssl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ public SinglePortRouterConfiguration build() {
else if (keystorePath != null) {
sslConfigurationBuilder.keyStoreFileName(keystorePath).keyStorePassword(keystorePassword).enable();
}
AttributeSet attributes = ProtocolServerConfiguration.attributeDefinitionSet();
AttributeSet attributes = SinglePortRouterConfiguration.attributeDefinitionSet();
attributes.attribute(ProtocolServerConfiguration.NAME).set(name);
attributes.attribute(ProtocolServerConfiguration.HOST).set(ip.getHostName());
attributes.attribute(ProtocolServerConfiguration.PORT).set(port);
attributes.attribute(ProtocolServerConfiguration.IDLE_TIMEOUT).set(100);
attributes.attribute(ProtocolServerConfiguration.RECV_BUF_SIZE).set(receiveBufferSize);
attributes.attribute(ProtocolServerConfiguration.SEND_BUF_SIZE).set(sendBufferSize);
attributes.attribute(ProtocolServerConfiguration.WORKER_THREADS).set(1);
return new SinglePortRouterConfiguration(attributes.protect(), sslConfigurationBuilder.create());
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,4 @@ public ChannelInitializer<Channel> getInitializer() {
public Protocol getProtocol() {
return Protocol.SINGLE_PORT;
}

@Override
public int getWorkerThreads() {
return 1;
}
}