Skip to content

Commit

Permalink
ISPN-6879 Calculate (and expose) minimum number of nodes for data in
Browse files Browse the repository at this point in the history
Infinispan

* Added min num nodes to Stats classes
  • Loading branch information
wburns authored and ryanemerson committed Dec 22, 2017
1 parent 87a4e9d commit c127fc5
Show file tree
Hide file tree
Showing 20 changed files with 309 additions and 2 deletions.
10 changes: 10 additions & 0 deletions core/src/main/java/org/infinispan/container/DataContainer.java
Expand Up @@ -234,4 +234,14 @@ default void resize(long newSize) {
default long capacity() {
throw new UnsupportedOperationException();
}

/**
* Returns how large the eviction size is currently. This is only supported if the container is bounded. An
* {@link UnsupportedOperationException} is thrown otherwise. This value will always be lower than the value returned
* from {@link DataContainer#capacity()}
* @return how large the counted eviction is
*/
default long evictionSize() {
throw new UnsupportedOperationException();
}
}
Expand Up @@ -362,6 +362,12 @@ public Iterator<InternalCacheEntry<K, V>> iteratorIncludingExpired() {
return new EntryIterator(entries.values().iterator(), true);
}

@Override
public long evictionSize() {
Policy.Eviction<K, InternalCacheEntry<K, V>> evict = eviction();
return evict.weightedSize().orElse(entries.size());
}

private final class DefaultEvictionListener implements EvictionListener<K, InternalCacheEntry<K, V>> {

@Override
Expand Down
Expand Up @@ -195,6 +195,16 @@ protected void performClear() {
super.performClear();
}

@Override
public long capacity() {
return maxSize;
}

@Override
public long evictionSize() {
return currentSize;
}

/**
* This method repeatedly removes the head of the LRU list until there the current size is less than or equal to
* `maxSize`.
Expand Down
Expand Up @@ -675,4 +675,9 @@ public Iterator<InternalCacheEntry<WrappedBytes, WrappedBytes>> iterator() {
public Iterator<InternalCacheEntry<WrappedBytes, WrappedBytes>> iteratorIncludingExpired() {
return entryStreamIncludingExpired().iterator();
}

@Override
public long evictionSize() {
return size.get();
}
}
Expand Up @@ -20,6 +20,9 @@
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ClusteringConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.offheap.OffHeapMemoryAllocator;
import org.infinispan.context.Flag;
Expand All @@ -33,6 +36,7 @@
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.jmx.annotations.MeasurementType;
import org.infinispan.jmx.annotations.Units;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.TimeService;
import org.infinispan.util.concurrent.StripedCounters;

Expand Down Expand Up @@ -400,6 +404,48 @@ public long getOffHeapMemoryUsed() {
return allocator.getAllocatedAmount();
}

@ManagedAttribute(
description = "Amount of nodes required to guarantee data consistency",
displayName = "Required Minimum Nodes",
displayType = DisplayType.SUMMARY
)
public int getRequiredMinimumNumberOfNodes() {
return calculateRequiredMinimumNumberOfNodes(cache);
}

public static int calculateRequiredMinimumNumberOfNodes(AdvancedCache<?, ?> cache) {
Configuration config = cache.getCacheConfiguration();

ClusteringConfiguration clusteringConfiguration = config.clustering();
CacheMode mode = clusteringConfiguration.cacheMode();
if (mode.isReplicated() || !mode.isClustered()) {
// Local and replicated only require the 1 node to keep the data
return 1;
}
CacheTopology cacheTopology = cache.getDistributionManager().getCacheTopology();
if (mode.isInvalidation()) {
// Invalidation requires all as we don't know what data is installed on which
return cacheTopology.getMembers().size();
}
int numMembers = cacheTopology.getMembers().size();
// If scattered just assume 2 owners - numOwners in config says 1 though
int numOwners = mode.isScattered() ? 2 : clusteringConfiguration.hash().numOwners();
int minNodes = numMembers - numOwners + 1;
long maxSize = config.memory().size();

int evictionRestrictedNodes;
if (maxSize > 0) {
DataContainer dataContainer = cache.getDataContainer();
long totalData = dataContainer.evictionSize() * numOwners;
long capacity = dataContainer.capacity();

evictionRestrictedNodes = (int) (totalData / capacity) + (totalData % capacity != 0 ? 1 : 0);
} else {
evictionRestrictedNodes = 1;
}
return Math.max(evictionRestrictedNodes, minNodes);
}

@ManagedAttribute(
description = "Number of seconds since cache started",
displayName = "Seconds since cache started",
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/infinispan/stats/Stats.java
Expand Up @@ -95,6 +95,11 @@ public interface Stats {
*/
long getAverageRemoveTime();

/**
* @return Required minimum number of nodes to guarantee data consistency
*/
int getRequiredMinimumNumberOfNodes();

/**
* Reset statistics
*/
Expand Down
Expand Up @@ -154,14 +154,24 @@ int addIntAttributes(List<Map<String, Number>> responseList, String attribute) {
int total = 0;
for (Map<String, Number> m : responseList) {
Number value = m.get(attribute);
long intValue = value.intValue();
int intValue = value.intValue();
if (intValue > -1) {
total += intValue;
}
}
return total;
}

int maxIntAttributes(List<Map<String, Number>> responseList, String attribute) {
int max = 0;
for (Map<String, Number> m : responseList) {
Number value = m.get(attribute);
int intValue = value.intValue();
max = Math.max(max, intValue);
}
return max;
}

void putLongAttributesAverage(List<Map<String, Number>> responseList, String attribute) {
long nonZeroValues = 0;
long total = 0;
Expand All @@ -185,6 +195,10 @@ void putIntAttributes(List<Map<String, Number>> responseList, String attribute)
statsMap.put(attribute, addIntAttributes(responseList, attribute));
}

void putIntAttributesMax(List<Map<String, Number>> responseList, String attribute) {
statsMap.put(attribute, maxIntAttributes(responseList, attribute));
}

long getStatAsLong(String attribute) {
return getStat(attribute).longValue();
}
Expand Down
Expand Up @@ -121,6 +121,15 @@ public long getAverageRemoveTime() {
return result;
}

@Override
public int getRequiredMinimumNumberOfNodes() {
int result = -1;
for (Stats stats : getStats()) {
result = Math.max(result, stats.getRequiredMinimumNumberOfNodes());
}
return result;
}

protected long calculateAverageRemoveTime() {
long totalAverageRemoveTime = 0;
int includedCacheCounter = 0;
Expand Down Expand Up @@ -497,6 +506,17 @@ public void reset() {
resetStatistics();
}

private Set<Stats> getStats() {
Set<Stats> stats = new HashSet<Stats>();
for (String cn : cm.getCacheNames()) {
if (cm.cacheExists(cn)) {
AdvancedCache cache = cm.getCache(cn).getAdvancedCache();
stats.add(cache.getStats());
}
}
return stats;
}

private Set<Stats> getEnabledStats() {
Set<Stats> stats = new HashSet<Stats>();
for (String cn : cm.getCacheNames()) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static org.infinispan.stats.impl.StatKeys.PASSIVATIONS;
import static org.infinispan.stats.impl.StatKeys.REMOVE_HITS;
import static org.infinispan.stats.impl.StatKeys.REMOVE_MISSES;
import static org.infinispan.stats.impl.StatKeys.REQUIRED_MIN_NODES;
import static org.infinispan.stats.impl.StatKeys.STORES;
import static org.infinispan.stats.impl.StatKeys.TIME_SINCE_START;

Expand Down Expand Up @@ -105,9 +106,11 @@ void updateStats() throws Exception {
putLongAttributesAverage(responseList, AVERAGE_WRITE_TIME);
putLongAttributesAverage(responseList, AVERAGE_READ_TIME);
putLongAttributesAverage(responseList, AVERAGE_REMOVE_TIME);
putLongAttributesAverage(responseList, OFF_HEAP_MEMORY_USED);

putIntAttributes(responseList, NUMBER_OF_LOCKS_HELD);
putIntAttributes(responseList, NUMBER_OF_LOCKS_AVAILABLE);
putIntAttributesMax(responseList, REQUIRED_MIN_NODES);

long numberOfEntriesInMemory = getCacheMode(cache).isReplicated() ?
cache.getStats().getCurrentNumberOfEntriesInMemory() :
Expand Down Expand Up @@ -139,6 +142,11 @@ public long getAverageRemoveTime() {
return getStatAsLong(AVERAGE_REMOVE_TIME);
}

@Override
public int getRequiredMinimumNumberOfNodes() {
return getStatAsInt(REQUIRED_MIN_NODES);
}

@Override
@ManagedAttribute(description = "Cluster wide average number of milliseconds for a write operation in the cache",
displayName = "Cluster wide average write time",
Expand Down Expand Up @@ -420,6 +428,7 @@ public Map<String, Number> call() throws Exception {
}

map.put(OFF_HEAP_MEMORY_USED, stats.getOffHeapMemoryUsed());
map.put(REQUIRED_MIN_NODES, stats.getRequiredMinimumNumberOfNodes());
map.put(STORES, stats.getStores());
map.put(REMOVE_HITS, stats.getRemoveHits());
map.put(REMOVE_MISSES, stats.getRemoveMisses());
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/infinispan/stats/impl/StatKeys.java
Expand Up @@ -20,6 +20,7 @@ class StatKeys {
static final String OFF_HEAP_MEMORY_USED = "offHeapMemoryUsed";
static final String RETRIEVALS = "retrievals";
static final String STORES = "stores";
static final String REQUIRED_MIN_NODES = "minRequiredNodes";

//LockManager
static final String NUMBER_OF_LOCKS_HELD = "numberOfLocksHeld";
Expand Down
Expand Up @@ -15,6 +15,7 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.SurvivesRestarts;
import org.infinispan.interceptors.impl.CacheMgmtInterceptor;
import org.infinispan.jmx.JmxStatisticsExposer;
import org.infinispan.jmx.annotations.DisplayType;
import org.infinispan.jmx.annotations.MBean;
Expand Down Expand Up @@ -203,6 +204,11 @@ public long getAverageRemoveTime() {
return (removeTimes.sum()) / removes;
}

@Override
public int getRequiredMinimumNumberOfNodes() {
return CacheMgmtInterceptor.calculateRequiredMinimumNumberOfNodes(cache);
}

@Override
public void reset() {
resetStatistics();
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/infinispan/stats/impl/StatsImpl.java
Expand Up @@ -11,6 +11,7 @@
import static org.infinispan.stats.impl.StatKeys.OFF_HEAP_MEMORY_USED;
import static org.infinispan.stats.impl.StatKeys.REMOVE_HITS;
import static org.infinispan.stats.impl.StatKeys.REMOVE_MISSES;
import static org.infinispan.stats.impl.StatKeys.REQUIRED_MIN_NODES;
import static org.infinispan.stats.impl.StatKeys.RETRIEVALS;
import static org.infinispan.stats.impl.StatKeys.STORES;
import static org.infinispan.stats.impl.StatKeys.TIME_SINCE_RESET;
Expand All @@ -36,7 +37,7 @@ public class StatsImpl implements Stats {

private static String[] Attributes = new String[]{TIME_SINCE_RESET, TIME_SINCE_START, NUMBER_OF_ENTRIES, NUMBER_OF_ENTRIES_IN_MEMORY,
OFF_HEAP_MEMORY_USED, RETRIEVALS, STORES, HITS, MISSES, REMOVE_HITS, REMOVE_MISSES, EVICTIONS, AVERAGE_READ_TIME,
AVERAGE_REMOVE_TIME, AVERAGE_WRITE_TIME};
AVERAGE_REMOVE_TIME, AVERAGE_WRITE_TIME, REQUIRED_MIN_NODES};

private final Map<String, Long> statsMap = new HashMap<>();
final CacheMgmtInterceptor mgmtInterceptor;
Expand All @@ -62,6 +63,7 @@ public StatsImpl(AsyncInterceptorChain chain) {
statsMap.put(AVERAGE_READ_TIME, mgmtInterceptor.getAverageReadTime());
statsMap.put(AVERAGE_REMOVE_TIME, mgmtInterceptor.getAverageRemoveTime());
statsMap.put(AVERAGE_WRITE_TIME, mgmtInterceptor.getAverageWriteTime());
statsMap.put(REQUIRED_MIN_NODES, (long) mgmtInterceptor.getRequiredMinimumNumberOfNodes());
} else {
for (String key : Attributes)
statsMap.put(key, -1L);
Expand All @@ -87,6 +89,7 @@ public StatsImpl(Stats other) {
statsMap.put(AVERAGE_READ_TIME, other.getAverageReadTime());
statsMap.put(AVERAGE_REMOVE_TIME, other.getAverageRemoveTime());
statsMap.put(AVERAGE_WRITE_TIME, other.getAverageWriteTime());
statsMap.put(REQUIRED_MIN_NODES, (long) other.getRequiredMinimumNumberOfNodes());
} else {
for (String key : Attributes)
statsMap.put(key, -1L);
Expand Down Expand Up @@ -173,6 +176,11 @@ public long getAverageRemoveTime() {
return statsMap.get(AVERAGE_REMOVE_TIME);
}

@Override
public int getRequiredMinimumNumberOfNodes() {
return Math.toIntExact(statsMap.get(REQUIRED_MIN_NODES));
}

@Override
public void reset() {
if (mgmtInterceptor != null) {
Expand Down

0 comments on commit c127fc5

Please sign in to comment.