Skip to content

Commit

Permalink
ISPN-8376 Add DMR cluster counters management
Browse files Browse the repository at this point in the history
  • Loading branch information
vblagoje authored and ryanemerson committed Dec 6, 2017
1 parent 5cbd2fe commit a3b64d5
Show file tree
Hide file tree
Showing 29 changed files with 1,452 additions and 4 deletions.
Expand Up @@ -11,4 +11,4 @@
<module name="org.infinispan"/>
<module name="org.jboss.logging"/>
</dependencies>
</module>
</module>
Expand Up @@ -6,12 +6,15 @@
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.cli.interpreter.Interpreter;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.counter.EmbeddedCounterManagerFactory;
import org.infinispan.counter.api.CounterManager;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.interceptors.AsyncInterceptor;
Expand Down Expand Up @@ -306,5 +309,9 @@ public static Set<String> getSitesView(DefaultCacheContainer cacheManager) {
GetSitesViewAction action = new GetSitesViewAction(cacheManager);
return doPrivileged(action);
}

public static Optional<CounterManager> findCounterManager(EmbeddedCacheManager cacheManager) {
return Optional.ofNullable(doPrivileged((PrivilegedAction<CounterManager>) () -> EmbeddedCounterManagerFactory.asCounterManager(cacheManager)));
}

}
@@ -0,0 +1,16 @@
package org.infinispan.server.infinispan.spi.service;

import org.jboss.msc.service.ServiceName;

/**
* Enumeration of service name factories for services associated with a counter.
*
* @author Vladimir Blagojevic
*/
public class CounterServiceName {

public static ServiceName getServiceName(String container, String counterName) {
return CacheContainerServiceName.CACHE_CONTAINER.getServiceName(container).append(counterName);
}

}
Expand Up @@ -90,6 +90,7 @@ public enum Attribute {
INDEX(ModelKeys.INDEX),
INITIAL_CLUSTER_SIZE(ModelKeys.INITIAL_CLUSTER_SIZE),
INITIAL_CLUSTER_TIMEOUT(ModelKeys.INITIAL_CLUSTER_TIMEOUT),
INITIAL_VALUE(ModelKeys.INITIAL_VALUE),
INTERVAL(ModelKeys.INTERVAL),
ISOLATION(ModelKeys.ISOLATION),
JNDI_NAME(ModelKeys.JNDI_NAME),
Expand All @@ -100,6 +101,7 @@ public enum Attribute {
LISTENER_EXECUTOR(ModelKeys.LISTENER_EXECUTOR),
LOCK_TIMEOUT(ModelKeys.LOCK_TIMEOUT),
LOCKING(ModelKeys.LOCKING),
LOWER_BOUND(ModelKeys.LOWER_BOUND),
MACHINE(ModelKeys.MACHINE),
MAPPER(ModelKeys.MAPPER),
MARSHALLER(ModelKeys.MARSHALLER),
Expand All @@ -120,6 +122,7 @@ public enum Attribute {
MODULE(ModelKeys.MODULE),
NAME(ModelKeys.NAME),
NAMESPACE(XMLConstants.XMLNS_ATTRIBUTE),
NUM_OWNERS(ModelKeys.NUM_OWNERS),
NOTIFICATIONS(ModelKeys.NOTIFICATIONS),
OUTBOUND_SOCKET_BINDING(ModelKeys.OUTBOUND_SOCKET_BINDING),
OWNERS(ModelKeys.OWNERS),
Expand All @@ -140,6 +143,7 @@ public enum Attribute {
READ_ONLY(ModelKeys.READ_ONLY),
REALM(ModelKeys.REALM),
RELATIVE_TO(ModelKeys.RELATIVE_TO),
RELIABILITY(ModelKeys.RELIABILITY),
REMOTE_CACHE(ModelKeys.REMOTE_CACHE),
@Deprecated
REMOTE_COMMAND_EXECUTOR(ModelKeys.REMOTE_COMMAND_EXECUTOR),
Expand Down Expand Up @@ -168,6 +172,7 @@ public enum Attribute {
STATISTICS_AVAILABLE(ModelKeys.STATISTICS_AVAILABLE),
STRICT_PEER_TO_PEER(ModelKeys.STRICT_PEER_TO_PEER),
STOP_TIMEOUT(ModelKeys.STOP_TIMEOUT),
STORAGE(ModelKeys.STORAGE),
STRATEGY(ModelKeys.STRATEGY),
STRIPING(ModelKeys.STRIPING),
TAKE_BACKUP_OFFLINE_AFTER_FAILURES(ModelKeys.TAKE_BACKUP_OFFLINE_AFTER_FAILURES),
Expand All @@ -179,6 +184,8 @@ public enum Attribute {
TOTAL_ORDER_EXECUTOR(ModelKeys.TOTAL_ORDER_EXECUTOR),
TYPE(ModelKeys.TYPE),
USERNAME(ModelKeys.USERNAME),
UPPER_BOUND(ModelKeys.UPPER_BOUND),
VALUE(ModelKeys.VALUE),
WAIT_TIME(ModelKeys.WAIT_TIME),
WHEN_SPLIT(ModelKeys.WHEN_SPLIT),
;
Expand Down
@@ -0,0 +1,64 @@
package org.jboss.as.clustering.infinispan.subsystem;

import org.infinispan.counter.configuration.CounterManagerConfigurationBuilder;
import org.infinispan.counter.configuration.Reliability;
import org.jboss.as.controller.PathElement;
import org.jboss.as.controller.ReloadRequiredRemoveStepHandler;
import org.jboss.as.controller.SimpleAttributeDefinition;
import org.jboss.as.controller.SimpleAttributeDefinitionBuilder;
import org.jboss.as.controller.SimpleResourceDefinition;
import org.jboss.as.controller.registry.AttributeAccess;
import org.jboss.as.controller.registry.ManagementResourceRegistration;
import org.jboss.as.controller.services.path.ResolvePathHandler;
import org.jboss.dmr.ModelNode;
import org.jboss.dmr.ModelType;

/**
* CacheContainerCountersResource
*
* @author Vladimir Blagojevic
* @since 9.2
*/
public class CacheContainerCountersResource extends SimpleResourceDefinition {
static final PathElement PATH = PathElement.pathElement(ModelKeys.COUNTERS, ModelKeys.COUNTERS_NAME);

//attributes
static final SimpleAttributeDefinition RELIABILITY = new SimpleAttributeDefinitionBuilder(ModelKeys.RELIABILITY,
ModelType.STRING, false)
.setXmlName(Attribute.RELIABILITY.getLocalName())
.setAllowExpression(false)
.setAllowedValues(Reliability.AVAILABLE.toString(), Reliability.CONSISTENT.toString())
.setDefaultValue(new ModelNode().set(CounterManagerConfigurationBuilder.defaultConfiguration().reliability().toString()))
.setFlags(AttributeAccess.Flag.RESTART_RESOURCE_SERVICES)
.build();

static final SimpleAttributeDefinition NUM_OWNERS = new SimpleAttributeDefinitionBuilder(ModelKeys.NUM_OWNERS,
ModelType.LONG, false)
.setXmlName(Attribute.NUM_OWNERS.getLocalName())
.setAllowExpression(false)
.setFlags(AttributeAccess.Flag.RESTART_RESOURCE_SERVICES)
.setDefaultValue(new ModelNode().set(CounterManagerConfigurationBuilder.defaultConfiguration().numOwners()))
.build();

private final boolean runtimeRegistration;

CacheContainerCountersResource(ResolvePathHandler resolvePathHandler, boolean runtimeRegistration) {
super(PATH, new InfinispanResourceDescriptionResolver(ModelKeys.CACHE_CONTAINER, ModelKeys.COUNTERS),
CacheConfigOperationHandlers.CONTAINER_CONFIGURATIONS_ADD, ReloadRequiredRemoveStepHandler.INSTANCE);
this.runtimeRegistration = runtimeRegistration;
}

@Override
public void registerChildren(ManagementResourceRegistration rr) {
rr.registerSubModel(new StrongCounterResource(runtimeRegistration));
rr.registerSubModel(new WeakCounterResource(runtimeRegistration));
}

@Override
public void registerAttributes(ManagementResourceRegistration resourceRegistration) {
super.registerAttributes(resourceRegistration);
if (runtimeRegistration) {
CountersMetricsHandler.INSTANCE.registerMetrics(resourceRegistration);
}
}
}
Expand Up @@ -403,5 +403,6 @@ public void registerChildren(ManagementResourceRegistration resourceRegistration
resourceRegistration.registerSubModel(new InvalidationCacheResource(resolvePathHandler, runtimeRegistration));
resourceRegistration.registerSubModel(new ReplicatedCacheResource(resolvePathHandler, runtimeRegistration));
resourceRegistration.registerSubModel(new DistributedCacheResource(resolvePathHandler, runtimeRegistration));
resourceRegistration.registerSubModel(new CacheContainerCountersResource(resolvePathHandler, runtimeRegistration));
}
}
@@ -0,0 +1,164 @@
package org.jboss.as.clustering.infinispan.subsystem;

import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OP_ADDR;

import java.util.ArrayList;
import java.util.Collection;

import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterConfiguration.Builder;
import org.infinispan.counter.api.CounterType;
import org.infinispan.counter.api.Storage;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.infinispan.spi.service.CacheContainerServiceName;
import org.infinispan.server.infinispan.spi.service.CounterServiceName;
import org.jboss.as.controller.AbstractAddStepHandler;
import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
import org.jboss.as.controller.PathAddress;
import org.jboss.dmr.ModelNode;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.ServiceBuilder;
import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceTarget;
import org.jboss.msc.value.InjectedValue;

/**
* Add operation handler for /subsystem=infinispan/cache-container=clustered/counter=*
*
* @author Vladimir Blagojevic
*
*/
public class CounterAddHandler extends AbstractAddStepHandler implements RestartableServiceHandler{

CounterAddHandler() {
}

@Override
protected void performRuntime(OperationContext context, ModelNode operation, ModelNode model)
throws OperationFailedException {

super.performRuntime(context, operation, model);

this.installRuntimeServices(context, operation, model, null);
}

@Override
public Collection<ServiceController<?>> installRuntimeServices(OperationContext context, ModelNode operation,
ModelNode containerModel, ModelNode cacheModel) throws OperationFailedException {

String counterName = getCounterName(operation);
String containerName = getContainerName(operation);
String counterType = getCounterType(operation);

Builder b = getBuilder(containerModel, counterType);
processModelNode(context, containerModel, b);
String name = CounterResource.COUNTER_NAME.resolveModelAttribute(context, containerModel).asString();
if (!counterName.equals(name)) {
throw new OperationFailedException("Counter node name and node's name attribute should be the same");
}

// wire counter service for this counter
Collection<ServiceController<?>> controllers = new ArrayList<>(1);
ServiceController<?> service = this.installCounterService(context.getServiceTarget(),
containerName, counterName, b.build());
controllers.add(service);

return controllers;
}

@Override
public void removeRuntimeServices(OperationContext context, ModelNode operation, ModelNode containerModel,
ModelNode cacheModel) throws OperationFailedException {

String counterName = getCounterName(operation);
String containerName = getContainerName(operation);

context.removeService(CounterServiceName.getServiceName(containerName, counterName));
}

private ServiceController<?> installCounterService(ServiceTarget target, String containerName,
String configurationName, CounterConfiguration configuration) {
final InjectedValue<EmbeddedCacheManager> container = new InjectedValue<>();
final CounterConfigurationDependencies dependencies = new CounterConfigurationDependencies(container);
final Service<?> service = new CounterService(configuration, configurationName, dependencies);
final ServiceBuilder<?> builder = target.addService(CounterServiceName.getServiceName(containerName, configurationName), service)
.addDependency(CacheContainerServiceName.CACHE_CONTAINER.getServiceName(containerName), EmbeddedCacheManager.class, container);
return builder.install();
}

@Override
protected void populateModel(ModelNode operation, ModelNode model) throws OperationFailedException {
this.populate(operation, model);
}

void populate(ModelNode fromModel, ModelNode toModel) throws OperationFailedException {
for (AttributeDefinition attr : CounterResource.ATTRIBUTES) {
attr.validateAndSet(fromModel, toModel);
}
}

private String getCounterName(ModelNode operation) {
PathAddress counterAddress = getCounterAddressFromOperation(operation);
return counterAddress.getLastElement().getValue();
}

private String getContainerName(ModelNode operation) {
PathAddress containerAddress = getCacheContainerAddressFromOperation(operation);
return containerAddress.getLastElement().getValue();
}

private PathAddress getCacheContainerAddressFromOperation(ModelNode operation) {
PathAddress counterAddress = getCounterAddressFromOperation(operation);
return counterAddress.subAddress(0, counterAddress.size() - 2);
}

private String getCounterType(ModelNode operation) {
PathAddress counterAddress = getCounterAddressFromOperation(operation);
int size = counterAddress.size();
PathAddress subAddress = counterAddress.subAddress(size - 1, size);
return subAddress.getLastElement().getKey();
}

private PathAddress getCounterAddressFromOperation(ModelNode operation) {
return PathAddress.pathAddress(operation.get(OP_ADDR));
}

private Builder getBuilder(ModelNode counter, String counterType) {
boolean isWeakCounter = ModelKeys.WEAK_COUNTER.equals(counterType);
if (isWeakCounter) {
return CounterConfiguration.builder(CounterType.WEAK);
} else {
ModelNode lowerBoundModel = counter.get(ModelKeys.LOWER_BOUND);
ModelNode upperBoundModel = counter.get(ModelKeys.UPPER_BOUND);
boolean isBounded = lowerBoundModel.isDefined() || upperBoundModel.isDefined();
return isBounded ? CounterConfiguration.builder(CounterType.BOUNDED_STRONG)
: CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG);
}
}

void processModelNode(OperationContext context, ModelNode counter,
CounterConfiguration.Builder builder) throws OperationFailedException {

long initialValue = CounterResource.INITIAL_VALUE.resolveModelAttribute(context, counter).asLong();
String storageType = CounterResource.STORAGE.resolveModelAttribute(context, counter).asString();

builder.initialValue(initialValue);
builder.storage(Storage.valueOf(storageType));
}

private static class CounterConfigurationDependencies implements CounterService.Dependencies {

private InjectedValue<EmbeddedCacheManager> container;
public CounterConfigurationDependencies(InjectedValue<EmbeddedCacheManager> container) {
this.container = container;
}

@Override
public EmbeddedCacheManager getCacheContainer() {
return this.container.getValue();
}

}
}

0 comments on commit a3b64d5

Please sign in to comment.