Skip to content

Commit

Permalink
ISPN-9422 Remove Worker Threads property
Browse files Browse the repository at this point in the history
- Also ensure that different default worker thread defaults don't pollute the others
  • Loading branch information
tristantarrant committed Jun 3, 2019
1 parent 15bed44 commit 9916b7d
Show file tree
Hide file tree
Showing 24 changed files with 74 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
== Upgrading from 9.4 to 10.0

=== Removed the infinispan.server.hotrod.workerThreads system property

The `infinispan.server.hotrod.workerThreads` property was introduced as a hack to work around the fact that the configuration did not expose it.
The property has been removed and endpoint worker threads must now be exclusively configured using the `worker-threads` attribute.

=== Removed AtomicMap and FineGrainedAtomicMap

AtomicMapLookup, AtomicMap and FineGrainedAtomicMap have been removed. Please see FunctionalMaps or Cache#Merge for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected void startTransport() {
protected ThreadPoolExecutor getExecutor() {
if (this.executor == null || this.executor.isShutdown()) {
DefaultThreadFactory factory = new DefaultThreadFactory(getQualifiedName() + "-ServerHandler");
int workerThreads = getWorkerThreads();
int workerThreads = configuration.workerThreads();
this.executor = new ThreadPoolExecutor(
workerThreads,
workerThreads,
Expand Down Expand Up @@ -244,10 +244,4 @@ public boolean isTransportEnabled() {
public NettyTransport getTransport() {
return transport;
}

/**
* @deprecated Use the {@link #getExecutor()} to obtain information about the worker executor instead
*/
@Deprecated
public abstract int getWorkerThreads();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ public abstract class ProtocolServerConfiguration {
public static final AttributeDefinition<Boolean> TCP_NODELAY = AttributeDefinition.builder("tcp-nodelay", true).immutable().build();
public static final AttributeDefinition<Boolean> TCP_KEEPALIVE = AttributeDefinition.builder("tcp-keepalive", false).immutable().build();
public static final AttributeDefinition<Integer> IO_THREADS = AttributeDefinition.builder("io-threads", 2 * ProcessorInfo.availableProcessors()).immutable().build();
public static final AttributeDefinition<Integer> WORKER_THREADS = AttributeDefinition.builder("worker-threads", 160).immutable().build();
public static final AttributeDefinition<AdminOperationsHandler> ADMIN_OPERATION_HANDLER = AttributeDefinition.builder("admin-operation-handler", null, AdminOperationsHandler.class).immutable().build();
public static final AttributeDefinition<Boolean> ZERO_CAPACITY_NODE = AttributeDefinition.builder("zero-capacity-node", false).immutable().build();

// The default value can be overridden so it is the responsibility of each protocol to add it to the set
public static final AttributeDefinition<Integer> WORKER_THREADS = AttributeDefinition.builder("worker-threads", 1).immutable().build();

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(ProtocolServerConfiguration.class,
DEFAULT_CACHE_NAME, NAME, HOST, PORT, IDLE_TIMEOUT, IGNORED_CACHES, RECV_BUF_SIZE, SEND_BUF_SIZE, START_TRANSPORT, TCP_NODELAY, TCP_KEEPALIVE, IO_THREADS, WORKER_THREADS, ADMIN_OPERATION_HANDLER, ZERO_CAPACITY_NODE);
DEFAULT_CACHE_NAME, NAME, HOST, PORT, IDLE_TIMEOUT, IGNORED_CACHES, RECV_BUF_SIZE, SEND_BUF_SIZE, START_TRANSPORT, TCP_NODELAY, TCP_KEEPALIVE, IO_THREADS, ADMIN_OPERATION_HANDLER, ZERO_CAPACITY_NODE);
}

private final Attribute<String> defaultCacheName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ public ProtocolServerConfiguration getConfiguration() {
public ChannelInitializer<Channel> getInitializer() {
return null;
}

@Override
public int getWorkerThreads() {
return configuration.workerThreads();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

public class MockServerConfiguration extends ProtocolServerConfiguration {

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(MockServerConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(), WORKER_THREADS);
}

protected MockServerConfiguration(AttributeSet attributes, SslConfiguration ssl) {
super(attributes, ssl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public class MockServerConfigurationBuilder extends ProtocolServerConfigurationBuilder<MockServerConfiguration, MockServerConfigurationBuilder> {

public MockServerConfigurationBuilder() {
super(12345);
super(12345, MockServerConfiguration.attributeDefinitionSet());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,11 +627,6 @@ private static long toMillis(long duration, TimeUnitValue unit) {
}
}

@Override
public int getWorkerThreads() {
return Integer.getInteger("infinispan.server.hotrod.workerThreads", configuration.workerThreads());
}

public String toString() {
return "HotRodServer[" +
"configuration=" + configuration +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class HotRodServerConfiguration extends ProtocolServerConfiguration {
public static final AttributeDefinition<Long> TOPOLOGY_REPL_TIMEOUT = AttributeDefinition.builder("topology-repl-timeout", 10000L).immutable().build();
public static final AttributeDefinition<Boolean> TOPOLOGY_STATE_TRANSFER = AttributeDefinition.builder("topology-state-transfer", true).immutable().build();

// The Hot Rod server has a different default
public static final AttributeDefinition<Integer> WORKER_THREADS = AttributeDefinition.builder("worker-threads", 160).immutable().build();

private final Attribute<String> proxyHost;
private final Attribute<Integer> proxyPort;
private final Attribute<Long> topologyLockTimeout;
Expand All @@ -31,7 +34,7 @@ public class HotRodServerConfiguration extends ProtocolServerConfiguration {

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(HotRodServerConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(),
PROXY_HOST, PROXY_PORT, TOPOLOGY_STATE_TRANSFER, TOPOLOGY_AWAIT_INITIAL_TRANSFER, TOPOLOGY_LOCK_TIMEOUT, TOPOLOGY_REPL_TIMEOUT);
WORKER_THREADS, PROXY_HOST, PROXY_PORT, TOPOLOGY_STATE_TRANSFER, TOPOLOGY_AWAIT_INITIAL_TRANSFER, TOPOLOGY_LOCK_TIMEOUT, TOPOLOGY_REPL_TIMEOUT);
}

HotRodServerConfiguration(AttributeSet attributes, SslConfiguration ssl, AuthenticationConfiguration authentication) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,14 @@ private void parseHotRodConnector(XMLExtendedStreamReader reader, PathAddress su
ParseUtils.requireNoNamespaceAttribute(reader, i);
String value = reader.getAttributeValue(i);
Attribute attribute = Attribute.forName(reader.getAttributeLocalName(i));
name = parseConnectorAttributes(reader, connector, name, i, value, attribute);
switch (attribute) {
case WORKER_THREADS:
HotRodConnectorResource.WORKER_THREADS.parseAndSetParameter(value, connector, reader);
break;
default:
name = parseConnectorAttributes(reader, connector, name, i, value, attribute);
break;
}
}

PathAddress connectorAddress = subsystemAddress.append(PathElement.pathElement(ModelKeys.HOTROD_CONNECTOR, name));
Expand Down Expand Up @@ -151,9 +158,9 @@ private void parseMemcachedConnector(XMLExtendedStreamReader reader, PathAddress
case CACHE:
MemcachedConnectorResource.CACHE.parseAndSetParameter(value, connector, reader);
break;
case CLIENT_ENCODING:
MemcachedConnectorResource.CLIENT_ENCODING.parseAndSetParameter(value, connector, reader);
break;
case CLIENT_ENCODING:
MemcachedConnectorResource.CLIENT_ENCODING.parseAndSetParameter(value, connector, reader);
break;
default:
name = parseConnectorAttributes(reader, connector, name, i, value, attribute);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ private void writeHotRodConnector(final XMLExtendedStreamWriter writer, final Mo
writer.writeStartElement(Element.HOTROD_CONNECTOR.getLocalName());
writeCommonConnector(writer, connector);
writeProtocolServerConnector(writer, connector);
for (SimpleAttributeDefinition attribute : HotRodConnectorResource.HOTROD_ATTRIBUTES) {
attribute.marshallAsAttribute(connector, true, writer);
}
writeTopologyStateTransfer(writer, connector);
writeAuthentication(writer, connector);
writeEncryption(writer, connector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@
*/
package org.infinispan.server.endpoint.subsystem;

import org.infinispan.server.hotrod.configuration.HotRodServerConfiguration;
import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.OperationStepHandler;
import org.jboss.as.controller.PathElement;
import org.jboss.as.controller.ReloadRequiredWriteAttributeHandler;
import org.jboss.as.controller.ResourceDefinition;
import org.jboss.as.controller.SimpleAttributeDefinition;
import org.jboss.as.controller.SimpleAttributeDefinitionBuilder;
import org.jboss.as.controller.registry.ManagementResourceRegistration;
import org.jboss.dmr.ModelNode;
import org.jboss.dmr.ModelType;

/**
* HotRodConnectorResource.
Expand All @@ -32,6 +40,16 @@ public class HotRodConnectorResource extends ProtocolServerConnectorResource imp

public static final PathElement HOTROD_CONNECTOR_PATH = PathElement.pathElement(ModelKeys.HOTROD_CONNECTOR);

static final SimpleAttributeDefinition WORKER_THREADS =
new SimpleAttributeDefinitionBuilder(ModelKeys.WORKER_THREADS, ModelType.INT, true)
.setAllowExpression(true)
.setXmlName(ModelKeys.WORKER_THREADS)
.setDefaultValue(new ModelNode().set(HotRodServerConfiguration.WORKER_THREADS.getDefaultValue()))
.setRestartAllServices()
.build();

static final SimpleAttributeDefinition[] HOTROD_ATTRIBUTES = {WORKER_THREADS};

public HotRodConnectorResource(boolean isRuntimeRegistration) {
super(HOTROD_CONNECTOR_PATH, EndpointExtension.getResourceDescriptionResolver(ModelKeys.HOTROD_CONNECTOR), HotRodSubsystemAdd.INSTANCE, HotRodSubsystemRemove.INSTANCE, isRuntimeRegistration);
}
Expand All @@ -47,6 +65,11 @@ public void registerChildren(ManagementResourceRegistration resourceRegistration
public void registerAttributes(ManagementResourceRegistration resourceRegistration) {
super.registerAttributes(resourceRegistration);

final OperationStepHandler writeHandler = new ReloadRequiredWriteAttributeHandler(HOTROD_ATTRIBUTES);
for (AttributeDefinition attr : HOTROD_ATTRIBUTES) {
resourceRegistration.registerReadWriteAttribute(attr, null, writeHandler);
}

if (isRuntimeRegistration()) {
ProtocolServerMetricsHandler.registerMetrics(resourceRegistration, "hotrod");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ private static void populate(ModelNode source, ModelNode target) throws Operatio
for(AttributeDefinition attr : ProtocolServerConnectorResource.PROTOCOL_SERVICE_ATTRIBUTES) {
attr.validateAndSet(source, target);
}
for(AttributeDefinition attr : HotRodConnectorResource.HOTROD_ATTRIBUTES) {
attr.validateAndSet(source, target);
}
}

@Override
Expand All @@ -65,6 +68,7 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod
ModelNode config = Resource.Tools.readModel(context.readResource(PathAddress.EMPTY_ADDRESS));
// Create the builder
HotRodServerConfigurationBuilder configurationBuilder = new HotRodServerConfigurationBuilder();
configurationBuilder.workerThreads(HotRodConnectorResource.WORKER_THREADS.resolveModelAttribute(context, config).asInt());
configureProtocolServer(context, configurationBuilder, config);
configureProtocolServerTopology(context, configurationBuilder, config);
// Create the service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class MemcachedConnectorResource extends ProtocolServerConnectorResource
.build();


static final SimpleAttributeDefinition[] MEMCACHED_CONNECTOR_ATTRIBUTES = {CACHE, CLIENT_ENCODING};
static final SimpleAttributeDefinition[] MEMCACHED_CONNECTOR_ATTRIBUTES = { ProtocolServerConnectorResource.WORKER_THREADS, CACHE, CLIENT_ENCODING};

public MemcachedConnectorResource(boolean isRuntimeRegistration) {
super(MEMCACHED_CONNECTOR_PATH, EndpointExtension.getResourceDescriptionResolver(ModelKeys.MEMCACHED_CONNECTOR), MemcachedSubsystemAdd.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public class ProtocolServerConnectorResource extends CommonConnectorResource {
.build();

static final SimpleAttributeDefinition[] PROTOCOL_SERVICE_ATTRIBUTES = {
SOCKET_BINDING, IDLE_TIMEOUT, TCP_NODELAY, TCP_KEEPALIVE, RECEIVE_BUFFER_SIZE, SEND_BUFFER_SIZE, IO_THREADS, WORKER_THREADS
SOCKET_BINDING, IDLE_TIMEOUT, TCP_NODELAY, TCP_KEEPALIVE, RECEIVE_BUFFER_SIZE, SEND_BUFFER_SIZE, IO_THREADS
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ protected void configureProtocolServer(OperationContext context, ProtocolServerC
builder.name(config.get(ModelKeys.NAME).asString());
}
builder.ioThreads(ProtocolServerConnectorResource.IO_THREADS.resolveModelAttribute(context, config).asInt());
builder.workerThreads(ProtocolServerConnectorResource.WORKER_THREADS.resolveModelAttribute(context, config).asInt());
builder.idleTimeout(ProtocolServerConnectorResource.IDLE_TIMEOUT.resolveModelAttribute(context, config).asInt());
builder.tcpNoDelay(ProtocolServerConnectorResource.TCP_NODELAY.resolveModelAttribute(context, config).asBoolean());
builder.tcpKeepAlive(ProtocolServerConnectorResource.TCP_KEEPALIVE.resolveModelAttribute(context, config).asBoolean());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ rest-connector.socket-binding=The socket binding to use for this connector
rest-connector.extended-headers=Allow retrieval of extended information about entries (NEVER, ON_DEMAND)
rest-connector.max-content-length=Maximum allowed content length
rest-connector.compression-level=Compression level when using compressed requests and responses
rest-connector.worker-threads=The number of worker threads to use for this REST connector
topology-state-transfer=Configuration related to topology state transfer
topology-state-transfer.add=Adds the topology state transfer configuration to the connector
topology-state-transfer.remove=Removes the topology state transfer configuration from the connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ public class EndpointSubsystemTestCase extends ClusteringSubsystemTest {
private final int expectedOperationCount;
private final String[] templates;

public EndpointSubsystemTestCase(Path xmlPath, Properties properties) {
public EndpointSubsystemTestCase(Path xmlPath, Properties properties, String name) {
super(Constants.SUBSYSTEM_NAME, new EndpointExtension(), xmlPath.getFileName().toString());
this.expectedOperationCount = Integer.parseInt(properties.getProperty("expected.operations.count"));
this.xsdPath = properties.getProperty("xsd.path");
this.templates = null;
}

@Parameters
@Parameters(name = "{2}")
public static Collection<Object[]> data() throws Exception {
URL configDir = Thread.currentThread().getContextClassLoader().getResource("org/infinispan/server/endpoint");
List<Path> paths = Files.list(Paths.get(configDir.toURI()))
Expand All @@ -93,7 +93,7 @@ public static Collection<Object[]> data() throws Exception {
try (Reader r = new FileReader(propsPath)) {
properties.load(r);
}
data.add(new Object[]{xmlPath, properties});
data.add(new Object[]{xmlPath, properties, xmlPath.getFileName().toString()});
}
// Ensure that we contain the current schema version at the very least
assertTrue("Could not find a '" + currentSchema + "' configuration file", hasCurrentSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,10 @@ public void stop() {
scheduler.shutdown();
}

@Override
public int getWorkerThreads() {
// Unused for now, so just return the smallest possible valid value
return 1;
}

/**
* Returns the cache being used by the Memcached server
*/
public Cache<byte[], byte[]> getCache() {
return memcachedCache;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class MemcachedServerConfiguration extends ProtocolServerConfiguration {
private final Attribute<MediaType> clientEncoding;

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(MemcachedServerConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(), CLIENT_ENCODING);
return new AttributeSet(MemcachedServerConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(), WORKER_THREADS, CLIENT_ENCODING);
}

MemcachedServerConfiguration(AttributeSet attributes, SslConfiguration ssl) {
Expand Down
6 changes: 0 additions & 6 deletions server/rest/src/main/java/org/infinispan/rest/RestServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,4 @@ protected void startInternal(RestServerConfiguration configuration, EmbeddedCach
resourceManager.registerResource(new CounterResource(EmbeddedCounterManagerFactory.asCounterManager(cacheManager)));
this.restDispatcher = new RestDispatcherImpl(resourceManager);
}

@Override
public int getWorkerThreads() {
// Unused for now, so just return the smallest possible valid value
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class RestServerConfiguration extends ProtocolServerConfiguration {

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(RestServerConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(),
EXTENDED_HEADERS, CONTEXT_PATH, MAX_CONTENT_LENGTH, CORS_RULES, COMPRESSION_LEVEL);
WORKER_THREADS, EXTENDED_HEADERS, CONTEXT_PATH, MAX_CONTENT_LENGTH, CORS_RULES, COMPRESSION_LEVEL);
}

RestServerConfiguration(AttributeSet attributes, SslConfiguration ssl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
*/
public class SinglePortRouterConfiguration extends ProtocolServerConfiguration {

public static AttributeSet attributeDefinitionSet() {
return new AttributeSet(SinglePortRouterConfiguration.class, ProtocolServerConfiguration.attributeDefinitionSet(), WORKER_THREADS);
}

/**
* Creates new configuration based on the IP address and port.
* @param ip The IP address used for binding. Can not be <code>null</code>.
* @param port Port used for binding. Can be 0, in that case a random port is assigned.
* @param sendBufferSize Send buffer size
* @param receiveBufferSize Receive buffer size.
*/
public SinglePortRouterConfiguration(AttributeSet attributes, SslConfiguration ssl) {
super(attributes, ssl);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ public SinglePortRouterConfiguration build() {
else if (keystorePath != null) {
sslConfigurationBuilder.keyStoreFileName(keystorePath).keyStorePassword(keystorePassword).enable();
}
AttributeSet attributes = ProtocolServerConfiguration.attributeDefinitionSet();
AttributeSet attributes = SinglePortRouterConfiguration.attributeDefinitionSet();
attributes.attribute(ProtocolServerConfiguration.NAME).set(name);
attributes.attribute(ProtocolServerConfiguration.HOST).set(ip.getHostName());
attributes.attribute(ProtocolServerConfiguration.PORT).set(port);
attributes.attribute(ProtocolServerConfiguration.IDLE_TIMEOUT).set(100);
attributes.attribute(ProtocolServerConfiguration.RECV_BUF_SIZE).set(receiveBufferSize);
attributes.attribute(ProtocolServerConfiguration.SEND_BUF_SIZE).set(sendBufferSize);
attributes.attribute(ProtocolServerConfiguration.WORKER_THREADS).set(1);
return new SinglePortRouterConfiguration(attributes.protect(), sslConfigurationBuilder.create());
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,4 @@ public ChannelInitializer<Channel> getInitializer() {
public Protocol getProtocol() {
return Protocol.SINGLE_PORT;
}

@Override
public int getWorkerThreads() {
return 1;
}

}

0 comments on commit 9916b7d

Please sign in to comment.