Skip to content

Commit

Permalink
ISPN-5972 Number of entries not working correctly in cache statistics…
Browse files Browse the repository at this point in the history
… in mgmt console
  • Loading branch information
pruivo authored and anistor committed May 12, 2016
1 parent c88cd7b commit 551f0ee
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
@@ -1,7 +1,6 @@
package org.infinispan.stats.impl; package org.infinispan.stats.impl;


import java.io.Serializable; import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
Expand All @@ -13,6 +12,7 @@


import org.infinispan.AdvancedCache; import org.infinispan.AdvancedCache;
import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.Configuration;
import org.infinispan.distexec.DefaultExecutorService; import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.distexec.DistributedCallable; import org.infinispan.distexec.DistributedCallable;
Expand Down Expand Up @@ -76,7 +76,7 @@ public class ClusterCacheStatsImpl implements ClusterCacheStats, JmxStatisticsEx
public static final long DEFAULT_STALE_STATS_THRESHOLD = 3000; public static final long DEFAULT_STALE_STATS_THRESHOLD = 3000;


private static final Log log = LogFactory.getLog(ClusterCacheStatsImpl.class); private static final Log log = LogFactory.getLog(ClusterCacheStatsImpl.class);
private transient Cache<?, ?> cache; private transient AdvancedCache<?, ?> cache;
private transient DefaultExecutorService des; private transient DefaultExecutorService des;
private TimeService ts; private TimeService ts;
private boolean statisticsEnabled = false; private boolean statisticsEnabled = false;
Expand All @@ -99,22 +99,22 @@ public class ClusterCacheStatsImpl implements ClusterCacheStats, JmxStatisticsEx
private double hitRatio; private double hitRatio;


//LockManager //LockManager
int numberOfLocksHeld; private int numberOfLocksHeld;
int numberOfLocksAvailable; private int numberOfLocksAvailable;


//invalidation, passivation activation //invalidation, passivation activation
long invalidations; private long invalidations;
long activations; private long activations;
long passivations; private long passivations;


//cacheloader metrics //cacheloader metrics
long cacheLoaderLoads; private long cacheLoaderLoads;
long cacheLoaderMisses; private long cacheLoaderMisses;
long cacheWriterStores; private long cacheWriterStores;


@Inject @Inject
public void injectDependencies(Cache<?, ?> cache, TimeService ts, Configuration configuration) { public void injectDependencies(Cache<?, ?> cache, TimeService ts, Configuration configuration) {
this.cache = cache; this.cache = cache.getAdvancedCache();
this.ts = ts; this.ts = ts;
this.statisticsEnabled = configuration.jmxStatistics().enabled(); this.statisticsEnabled = configuration.jmxStatistics().enabled();
} }
Expand Down Expand Up @@ -518,16 +518,15 @@ public long getStoreWrites(){
} }
} }


protected boolean launchNewDistTask() { private boolean launchNewDistTask() {
long duration = ts.timeDuration(statsUpdateTimestamp, ts.time(), TimeUnit.MILLISECONDS); long duration = ts.timeDuration(statsUpdateTimestamp, ts.time(), TimeUnit.MILLISECONDS);
return duration > staleStatsTreshold; return duration > staleStatsTreshold;
} }


protected synchronized void fetchClusterWideStatsIfNeeded() { private synchronized void fetchClusterWideStatsIfNeeded() {
if (launchNewDistTask()) { if (launchNewDistTask()) {
List<CompletableFuture<Map<String, Number>>> responseList = Collections.emptyList();
try { try {
responseList = des.submitEverywhere(new DistributedCacheStatsCallable()); List<CompletableFuture<Map<String, Number>>> responseList = des.submitEverywhere(new DistributedCacheStatsCallable());
updateFieldsFromResponseMap(responseList); updateFieldsFromResponseMap(responseList);
} catch (Exception e) { } catch (Exception e) {
log.warn("Could not execute cluster wide cache stats operation ", e); log.warn("Could not execute cluster wide cache stats operation ", e);
Expand All @@ -545,7 +544,9 @@ private void updateFieldsFromResponseMap(List<CompletableFuture<Map<String, Numb
evictions = addLongAttributes(responseList, EVICTIONS); evictions = addLongAttributes(responseList, EVICTIONS);
hits = addLongAttributes(responseList, HITS); hits = addLongAttributes(responseList, HITS);
misses = addLongAttributes(responseList, MISSES); misses = addLongAttributes(responseList, MISSES);
numberOfEntries = addLongAttributes(responseList, NUMBER_OF_ENTRIES); numberOfEntries = getCacheMode(cache).isReplicated() ?
cache.getStats().getCurrentNumberOfEntries() :
addLongAttributes(responseList, NUMBER_OF_ENTRIES);
removeHits = addLongAttributes(responseList, REMOVE_HITS); removeHits = addLongAttributes(responseList, REMOVE_HITS);
removeMisses = addLongAttributes(responseList, REMOVE_MISSES); removeMisses = addLongAttributes(responseList, REMOVE_MISSES);
stores = addLongAttributes(responseList, STORES); stores = addLongAttributes(responseList, STORES);
Expand Down Expand Up @@ -638,10 +639,14 @@ private double updateHitRatio(List<CompletableFuture<Map<String, Number>>> respo
return hitRatio; return hitRatio;
} }


public static <T extends SequentialInterceptor> T getFirstInterceptorWhichExtends(AdvancedCache<?,?> cache, Class<T> interceptorClass) { private static <T extends SequentialInterceptor> T getFirstInterceptorWhichExtends(AdvancedCache<?,?> cache, Class<T> interceptorClass) {
return interceptorClass.cast(cache.getSequentialInterceptorChain().findInterceptorExtending(interceptorClass)); return interceptorClass.cast(cache.getSequentialInterceptorChain().findInterceptorExtending(interceptorClass));
} }


private static CacheMode getCacheMode(Cache cache) {
return cache.getCacheConfiguration().clustering().cacheMode();
}

private static class DistributedCacheStatsCallable implements private static class DistributedCacheStatsCallable implements
DistributedCallable<Object, Object, Map<String, Number>>, Serializable { DistributedCallable<Object, Object, Map<String, Number>>, Serializable {


Expand All @@ -655,17 +660,19 @@ private static class DistributedCacheStatsCallable implements
@Override @Override
public Map<String, Number> call() throws Exception { public Map<String, Number> call() throws Exception {


Map<String, Number> map = new HashMap<String, Number>(); Map<String, Number> map = new HashMap<>();
Stats stats = remoteCache.getStats(); Stats stats = remoteCache.getStats();
map.put(AVERAGE_READ_TIME, stats.getAverageReadTime()); map.put(AVERAGE_READ_TIME, stats.getAverageReadTime());
map.put(AVERAGE_WRITE_TIME, stats.getAverageWriteTime()); map.put(AVERAGE_WRITE_TIME, stats.getAverageWriteTime());
map.put(AVERAGE_REMOVE_TIME, stats.getAverageRemoveTime()); map.put(AVERAGE_REMOVE_TIME, stats.getAverageRemoveTime());
map.put(EVICTIONS, stats.getEvictions()); map.put(EVICTIONS, stats.getEvictions());
map.put(HITS, stats.getHits()); map.put(HITS, stats.getHits());
map.put(MISSES, stats.getMisses()); map.put(MISSES, stats.getMisses());
if (isDistributed()) { final CacheMode cacheMode = getCacheMode(remoteCache);
//for replicated caches, we don't need to send the number of entries since it is the same in all the nodes.
if (cacheMode.isDistributed()) {
map.put(NUMBER_OF_ENTRIES, stats.getCurrentNumberOfEntries() / numOwners()); map.put(NUMBER_OF_ENTRIES, stats.getCurrentNumberOfEntries() / numOwners());
} else { } else if (!cacheMode.isReplicated()){
map.put(NUMBER_OF_ENTRIES, stats.getCurrentNumberOfEntries()); map.put(NUMBER_OF_ENTRIES, stats.getCurrentNumberOfEntries());
} }
map.put(STORES, stats.getStores()); map.put(STORES, stats.getStores());
Expand Down Expand Up @@ -731,10 +738,6 @@ public void setEnvironment(Cache<Object, Object> cache, Set<Object> inputKeys) {
remoteCache = cache.getAdvancedCache(); remoteCache = cache.getAdvancedCache();
} }


private boolean isDistributed(){
return remoteCache.getCacheConfiguration().clustering().cacheMode().isDistributed();
}

private int numOwners(){ private int numOwners(){
return remoteCache.getCacheConfiguration().clustering().hash().numOwners(); return remoteCache.getCacheConfiguration().clustering().hash().numOwners();
} }
Expand Down
Expand Up @@ -63,7 +63,7 @@ public void testClusterStats() throws Exception {
cache1.put("a3", "b3"); cache1.put("a3", "b3");
cache1.put("a4", "b4"); cache1.put("a4", "b4");


assertAttributeValue(mBeanServer, clusterStats, "NumberOfEntries", 8); assertAttributeValue(mBeanServer, clusterStats, "NumberOfEntries", 4);
assertAttributeValue(mBeanServer, clusterStats, "Stores", 4); assertAttributeValue(mBeanServer, clusterStats, "Stores", 4);
assertAttributeValue(mBeanServer, clusterStats, "Evictions", 0); assertAttributeValue(mBeanServer, clusterStats, "Evictions", 0);
assertAttributeValueGreaterThanOrEqualTo(mBeanServer, clusterStats, "AverageWriteTime", 0); assertAttributeValueGreaterThanOrEqualTo(mBeanServer, clusterStats, "AverageWriteTime", 0);
Expand Down

0 comments on commit 551f0ee

Please sign in to comment.