Skip to content
This repository has been archived by the owner on Jun 16, 2021. It is now read-only.

ISPN-3007 Use new protocol server configuration builders from ISPN-2984 #90

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,8 +21,9 @@
import java.util.List;
import java.util.Locale;

import org.infinispan.server.core.Main;
import org.infinispan.server.hotrod.HotRodServer;
import org.jboss.as.controller.AbstractAddStepHandler;
import org.infinispan.server.hotrod.configuration.HotRodServerConfigurationBuilder;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
import org.jboss.as.controller.ServiceVerificationHandler;
Expand All @@ -36,11 +37,11 @@
import static org.infinispan.server.endpoint.subsystem.EndpointUtils.copyIfSet;

/**
* @author <a href="http://www.dataforte.net/blog/">Tristan Tarrant</a>
* @author Tristan Tarrant
*/
class HotRodSubsystemAdd extends AbstractAddStepHandler implements DescriptionProvider {
class HotRodSubsystemAdd extends ProtocolServiceSubsystemAdd implements DescriptionProvider {

static final HotRodSubsystemAdd INSTANCE = new HotRodSubsystemAdd();
static final ProtocolServiceSubsystemAdd INSTANCE = new HotRodSubsystemAdd();

static ModelNode createOperation(ModelNode address, ModelNode existing) {
ModelNode operation = Util.getEmptyOperation(ModelDescriptionConstants.ADD, address);
Expand Down Expand Up @@ -69,16 +70,46 @@ public ModelNode getModelDescription(Locale locale) {
@Override
protected void performRuntime(OperationContext context, ModelNode operation, ModelNode model, ServiceVerificationHandler verificationHandler, List<ServiceController<?>> newControllers)
throws OperationFailedException {
// Create the builder
HotRodServerConfigurationBuilder configurationBuilder = new HotRodServerConfigurationBuilder();
configureProtocolServer(configurationBuilder, operation);
configureProtocolServerTopology(configurationBuilder, operation);
// Create the service
final ProtocolServerService service = new ProtocolServerService(operation, HotRodServer.class);
final ProtocolServerService service = new ProtocolServerService(getServiceName(operation), HotRodServer.class, configurationBuilder);

// Setup the various dependencies with injectors and install the service
ServiceBuilder<?> builder = context.getServiceTarget().addService(EndpointUtils.getServiceName(operation, "hotrod"), service);
EndpointUtils.addCacheContainerDependency(context, builder, service.getCacheContainerName(), service.getCacheManager());
EndpointUtils.addSocketBindingDependency(builder, service.getRequiredSocketBindingName(), service.getSocketBinding());
EndpointUtils.addCacheContainerDependency(context, builder, getCacheContainerName(operation), service.getCacheManager());
EndpointUtils.addSocketBindingDependency(builder, getSocketBindingName(operation), service.getSocketBinding());
builder.install();
}

private void configureProtocolServerTopology(HotRodServerConfigurationBuilder builder, ModelNode config) {
if (!config.hasDefined(ModelKeys.TOPOLOGY_STATE_TRANSFER)) {
return;
}

config = config.get(ModelKeys.TOPOLOGY_STATE_TRANSFER);
if (config.hasDefined(ModelKeys.LOCK_TIMEOUT)) {
builder.topologyLockTimeout(config.get(ModelKeys.LOCK_TIMEOUT).asLong());
}
if (config.hasDefined(ModelKeys.REPLICATION_TIMEOUT)) {
builder.topologyReplTimeout(config.get(ModelKeys.REPLICATION_TIMEOUT).asLong());
}
if (config.hasDefined(ModelKeys.UPDATE_TIMEOUT)) {
builder.topologyUpdateTimeout(config.get(ModelKeys.UPDATE_TIMEOUT).asLong());
}
if (config.hasDefined(ModelKeys.EXTERNAL_HOST)) {
builder.proxyHost(config.get(ModelKeys.EXTERNAL_HOST).asString());
}
if (config.hasDefined(ModelKeys.EXTERNAL_PORT)) {
builder.proxyPort(config.get(ModelKeys.EXTERNAL_PORT).asInt());
}
if (config.hasDefined(ModelKeys.LAZY_RETRIEVAL)) {
builder.topologyStateTransfer(!config.get(ModelKeys.LAZY_RETRIEVAL).asBoolean(false));
}
}

@Override
protected void populateModel(ModelNode source, ModelNode target) throws OperationFailedException {
populate(source, target);
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Locale;

import org.infinispan.server.memcached.MemcachedServer;
import org.infinispan.server.memcached.configuration.MemcachedServerConfigurationBuilder;
import org.jboss.as.controller.AbstractAddStepHandler;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
Expand All @@ -38,7 +39,7 @@
/**
* @author Tristan Tarrant
*/
class MemcachedSubsystemAdd extends AbstractAddStepHandler implements DescriptionProvider {
class MemcachedSubsystemAdd extends ProtocolServiceSubsystemAdd implements DescriptionProvider {

static final MemcachedSubsystemAdd INSTANCE = new MemcachedSubsystemAdd();

Expand Down Expand Up @@ -69,14 +70,17 @@ public ModelNode getModelDescription(Locale locale) {
@Override
protected void performRuntime(OperationContext context, ModelNode operation, ModelNode model, ServiceVerificationHandler verificationHandler, List<ServiceController<?>> newControllers)
throws OperationFailedException {

MemcachedServerConfigurationBuilder configurationBuilder = new MemcachedServerConfigurationBuilder();
this.configureProtocolServer(configurationBuilder, operation);
// Create the service
final ProtocolServerService service = new ProtocolServerService(operation, MemcachedServer.class);
final ProtocolServerService service = new ProtocolServerService(getServiceName(operation), MemcachedServer.class, configurationBuilder);

// Setup the various dependencies with injectors and install the service
ServiceBuilder<?> builder = context.getServiceTarget().addService(EndpointUtils.getServiceName(operation, "memcached"), service);
EndpointUtils.addCacheContainerDependency(context, builder, service.getCacheContainerName(), service.getCacheManager());
EndpointUtils.addCacheDependency(context, builder, service.getCacheContainerName(), "memcachedCache");
EndpointUtils.addSocketBindingDependency(builder, service.getRequiredSocketBindingName(), service.getSocketBinding());
EndpointUtils.addCacheContainerDependency(context, builder, getCacheContainerName(operation), service.getCacheManager());
EndpointUtils.addCacheDependency(context, builder, getCacheContainerName(operation), "memcachedCache");
EndpointUtils.addSocketBindingDependency(builder, getSocketBindingName(operation), service.getSocketBinding());
builder.install();
}

Expand Down
Expand Up @@ -21,16 +21,15 @@
import static org.infinispan.server.endpoint.EndpointLogger.ROOT_LOGGER;

import java.net.InetSocketAddress;
import java.util.Properties;

import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.core.Main;
import org.infinispan.server.core.ProtocolServer;
import org.infinispan.server.core.configuration.ProtocolServerConfiguration;
import org.infinispan.server.core.configuration.ProtocolServerConfigurationBuilder;
import org.infinispan.server.core.transport.Transport;
import org.infinispan.util.ReflectionUtil;
import org.jboss.as.network.NetworkUtils;
import org.jboss.as.network.SocketBinding;
import org.jboss.dmr.ModelNode;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.StartContext;
import org.jboss.msc.service.StartException;
Expand All @@ -43,19 +42,13 @@
* @author Tristan Tarrant
*/
class ProtocolServerService implements Service<ProtocolServer> {
private static final String DEFAULT_WORKER_THREADS = "160";

// The cacheManager that will be injected by the container (specified by the cacheContainer
// attribute)
private final InjectedValue<EmbeddedCacheManager> cacheManager = new InjectedValue<EmbeddedCacheManager>();
// The socketBinding that will be injected by the container
private final InjectedValue<SocketBinding> socketBinding = new InjectedValue<SocketBinding>();
// The configuration for this service
private final ModelNode config;
// Additional connector properties
private final Properties connectorProperties = new Properties();
// Topology state transfer properties
private final Properties topologyStateTransferProperties = new Properties();
private final ProtocolServerConfigurationBuilder<?, ?> configurationBuilder;
// The class which determines the type of server
private final Class<? extends ProtocolServer> serverClass;
// The server which handles the protocol
Expand All @@ -65,32 +58,26 @@ class ProtocolServerService implements Service<ProtocolServer> {
// The name of the server
private final String serverName;

ProtocolServerService(ModelNode config, Class<? extends ProtocolServer> serverClass) {
this.config = config.clone();
ProtocolServerService(String serverName, Class<? extends ProtocolServer> serverClass, ProtocolServerConfigurationBuilder<?, ?> configurationBuilder) {
this.configurationBuilder = configurationBuilder;
this.serverClass = serverClass;
String serverTypeName = serverClass.getSimpleName();
this.serverName = config.hasDefined(ModelKeys.NAME) ? serverTypeName + " "
+ config.get(ModelKeys.NAME).asString() : serverTypeName;
this.serverName = serverName != null ? serverTypeName + " " + serverName : serverTypeName;
}

@Override
public synchronized void start(final StartContext context) throws StartException {
ROOT_LOGGER.endpointStarting(serverName);

assert connectorProperties.isEmpty();
assert topologyStateTransferProperties.isEmpty();

boolean done = false;
try {
loadConnectorProperties(config);
loadTopologyStateTransferProperties(config);
validateConfiguration();

SocketBinding socketBinding = getSocketBinding().getValue();
InetSocketAddress socketAddress = socketBinding.getSocketAddress();
configurationBuilder.host(socketAddress.getAddress().getHostAddress());
configurationBuilder.port(socketAddress.getPort());
ROOT_LOGGER.endpointStarted(serverName, NetworkUtils.formatAddress(socketAddress), socketAddress.getPort());
// Start the connector
startProtocolServer();

SocketBinding binding = socketBinding.getValue();
ROOT_LOGGER.endpointStarted(serverName, NetworkUtils.formatAddress(binding.getAddress()), binding.getAbsolutePort());
startProtocolServer(configurationBuilder.build());

done = true;
} catch (StartException e) {
Expand All @@ -104,32 +91,16 @@ public synchronized void start(final StartContext context) throws StartException
}
}

private void validateConfiguration() throws StartException {
// There has to be at least one connector defined.
if (connectorProperties.isEmpty()) {
throw ROOT_LOGGER.noConnectorDefined();
}
}

private void startProtocolServer() throws StartException {

Properties props = copy(connectorProperties);
if (props == null) {
return;
}

// Merge topology state transfer settings
props.putAll(topologyStateTransferProperties);

private void startProtocolServer(ProtocolServerConfiguration configuration) throws StartException {
// Start the server and record it
ProtocolServer server;
try {
server = serverClass.newInstance();
} catch (Exception e) {
throw ROOT_LOGGER.failedConnectorInstantiation(e, serverName);
throw ROOT_LOGGER.failedConnectorInstantiation(e, serverName);
}
ROOT_LOGGER.connectorStarting(serverName);
server.start(props, getCacheManager().getValue());
server.start(configuration, getCacheManager().getValue());
protocolServer = server;

try {
Expand All @@ -155,8 +126,6 @@ private void doStop() {
}
}
} finally {
connectorProperties.clear();
topologyStateTransferProperties.clear();
ROOT_LOGGER.connectorStopped(serverName);
}
}
Expand All @@ -173,92 +142,10 @@ InjectedValue<EmbeddedCacheManager> getCacheManager() {
return cacheManager;
}

String getCacheContainerName() {
if (!config.hasDefined(ModelKeys.CACHE_CONTAINER)) {
return null;
}
return config.get(ModelKeys.CACHE_CONTAINER).asString();
}

String getRequiredSocketBindingName() {
return config.hasDefined(ModelKeys.SOCKET_BINDING) ? config.get(ModelKeys.SOCKET_BINDING).asString() : null;
}

InjectedValue<SocketBinding> getSocketBinding() {
return socketBinding;
}

private void loadConnectorProperties(ModelNode config) {
if (config.hasDefined(ModelKeys.SOCKET_BINDING)) {
SocketBinding socketBinding = getSocketBinding().getValue();
InetSocketAddress socketAddress = socketBinding.getSocketAddress();
connectorProperties.setProperty(Main.PROP_KEY_HOST(), socketAddress.getAddress().getHostAddress());
connectorProperties.setProperty(Main.PROP_KEY_PORT(), String.valueOf(socketAddress.getPort()));
}
if (config.hasDefined(ModelKeys.WORKER_THREADS)) {
connectorProperties.setProperty(Main.PROP_KEY_WORKER_THREADS(), config.get(ModelKeys.WORKER_THREADS)
.asString());
} else {
connectorProperties.setProperty(Main.PROP_KEY_WORKER_THREADS(), DEFAULT_WORKER_THREADS);
}
if (config.hasDefined(ModelKeys.IDLE_TIMEOUT)) {
connectorProperties.setProperty(Main.PROP_KEY_IDLE_TIMEOUT(), config.get(ModelKeys.IDLE_TIMEOUT).asString());
}
if (config.hasDefined(ModelKeys.TCP_NODELAY)) {
connectorProperties.setProperty(Main.PROP_KEY_TCP_NO_DELAY(), config.get(ModelKeys.TCP_NODELAY).asString());
}
if (config.hasDefined(ModelKeys.SEND_BUFFER_SIZE)) {
connectorProperties.setProperty(Main.PROP_KEY_SEND_BUF_SIZE(), config.get(ModelKeys.SEND_BUFFER_SIZE)
.asString());
}
if (config.hasDefined(ModelKeys.RECEIVE_BUFFER_SIZE)) {
connectorProperties.setProperty(Main.PROP_KEY_RECV_BUF_SIZE(), config.get(ModelKeys.RECEIVE_BUFFER_SIZE)
.asString());
}

}

private void loadTopologyStateTransferProperties(ModelNode config) {
if (!config.hasDefined(ModelKeys.TOPOLOGY_STATE_TRANSFER)) {
return;
}

config = config.get(ModelKeys.TOPOLOGY_STATE_TRANSFER);
if (config.hasDefined(ModelKeys.LOCK_TIMEOUT)) {
topologyStateTransferProperties.setProperty(Main.PROP_KEY_TOPOLOGY_LOCK_TIMEOUT(),
config.get(ModelKeys.LOCK_TIMEOUT).asString());
}
if (config.hasDefined(ModelKeys.REPLICATION_TIMEOUT)) {
topologyStateTransferProperties.setProperty(Main.PROP_KEY_TOPOLOGY_REPL_TIMEOUT(),
config.get(ModelKeys.REPLICATION_TIMEOUT).asString());
}
if (config.hasDefined(ModelKeys.UPDATE_TIMEOUT)) {
topologyStateTransferProperties.setProperty(Main.PROP_KEY_TOPOLOGY_UPDATE_TIMEOUT(),
config.get(ModelKeys.UPDATE_TIMEOUT).asString());
}
if (config.hasDefined(ModelKeys.EXTERNAL_HOST)) {
topologyStateTransferProperties.setProperty(Main.PROP_KEY_PROXY_HOST(), config.get(ModelKeys.EXTERNAL_HOST)
.asString());
}
if (config.hasDefined(ModelKeys.EXTERNAL_PORT)) {
topologyStateTransferProperties.setProperty(Main.PROP_KEY_PROXY_PORT(), config.get(ModelKeys.EXTERNAL_PORT)
.asString());
}
if (config.hasDefined(ModelKeys.LAZY_RETRIEVAL)) {
topologyStateTransferProperties.setProperty(Main.PROP_KEY_TOPOLOGY_STATE_TRANSFER(),
Boolean.toString(!config.get(ModelKeys.LAZY_RETRIEVAL).asBoolean(false)));
}
}

private static Properties copy(Properties p) {
if (p == null) {
return null;
}
Properties newProps = new Properties();
newProps.putAll(p);
return newProps;
}

public Transport getTransport() {
return transport;
}
Expand Down
@@ -0,0 +1,58 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
* as indicated by the @author tags. All rights reserved.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License, v. 2.1.
* This program is distributed in the hope that it will be useful, but WITHOUT A
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public License,
* v.2.1 along with this distribution; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*/
package org.infinispan.server.endpoint.subsystem;

import org.infinispan.server.core.configuration.ProtocolServerConfigurationBuilder;
import org.jboss.as.controller.AbstractAddStepHandler;
import org.jboss.dmr.ModelNode;

public abstract class ProtocolServiceSubsystemAdd extends AbstractAddStepHandler {
private static final int DEFAULT_WORKER_THREADS = 160;

protected String getServiceName(ModelNode config) {
return config.hasDefined(ModelKeys.NAME) ? config.get(ModelKeys.NAME).asString() : null;
}

protected String getSocketBindingName(ModelNode config) {
return config.hasDefined(ModelKeys.SOCKET_BINDING) ? config.get(ModelKeys.SOCKET_BINDING).asString() : null;
}

protected String getCacheContainerName(ModelNode config) {
return config.hasDefined(ModelKeys.CACHE_CONTAINER) ? config.get(ModelKeys.CACHE_CONTAINER).asString() : null;
}

protected void configureProtocolServer(ProtocolServerConfigurationBuilder<?, ?> builder, ModelNode config) {

builder.workerThreads(config.hasDefined(ModelKeys.WORKER_THREADS) ? config.get(ModelKeys.WORKER_THREADS).asInt() : DEFAULT_WORKER_THREADS);

if (config.hasDefined(ModelKeys.IDLE_TIMEOUT)) {
builder.idleTimeout(config.get(ModelKeys.IDLE_TIMEOUT).asInt());
}
if (config.hasDefined(ModelKeys.TCP_NODELAY)) {
builder.tcpNoDelay(config.get(ModelKeys.TCP_NODELAY).asBoolean());
}
if (config.hasDefined(ModelKeys.SEND_BUFFER_SIZE)) {
builder.sendBufSize(config.get(ModelKeys.SEND_BUFFER_SIZE).asInt());
}
if (config.hasDefined(ModelKeys.RECEIVE_BUFFER_SIZE)) {
builder.recvBufSize(config.get(ModelKeys.RECEIVE_BUFFER_SIZE).asInt());
}
}

}