Skip to content

Commit

Permalink
Merge pull request hazelcast#11358 from ahmetmircik/fix/3.8.6/queryCa…
Browse files Browse the repository at this point in the history
…cheDestroyFix

[BACKPORT] Query cache destroy fix
  • Loading branch information
mdogan committed Sep 14, 2017
2 parents dd89770 + b38a169 commit 2a88ee8
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 91 deletions.
Expand Up @@ -112,7 +112,6 @@
import com.hazelcast.map.impl.LazyMapEntry;
import com.hazelcast.map.impl.ListenerAdapter;
import com.hazelcast.map.impl.SimpleEntryView;
import com.hazelcast.map.impl.querycache.subscriber.InternalQueryCache;
import com.hazelcast.map.impl.querycache.subscriber.QueryCacheEndToEndProvider;
import com.hazelcast.map.impl.querycache.subscriber.QueryCacheRequest;
import com.hazelcast.map.impl.querycache.subscriber.SubscriberContext;
Expand All @@ -138,7 +137,6 @@
import com.hazelcast.spi.impl.UnmodifiableLazyList;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.CollectionUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.IterationType;
import com.hazelcast.util.MapUtil;
import com.hazelcast.util.Preconditions;
Expand Down Expand Up @@ -1457,14 +1455,12 @@ private QueryCache getQueryCacheInternal(String name, MapListener listener, Pred
return createQueryCache(request);
}


private QueryCache createQueryCache(QueryCacheRequest request) {
ConstructorFunction<String, InternalQueryCache> constructorFunction
= new ClientQueryCacheEndToEndConstructor(request);
@SuppressWarnings("unchecked")
private QueryCache<K, V> createQueryCache(QueryCacheRequest request) {
SubscriberContext subscriberContext = queryCacheContext.getSubscriberContext();
QueryCacheEndToEndProvider queryCacheEndToEndProvider = subscriberContext.getEndToEndQueryCacheProvider();
return queryCacheEndToEndProvider.getOrCreateQueryCache(request.getMapName(),
request.getCacheName(), constructorFunction);
return queryCacheEndToEndProvider.getOrCreateQueryCache(request.getMapName(), request.getCacheName(),
new ClientQueryCacheEndToEndConstructor(request));
}

@Override
Expand Down Expand Up @@ -1660,4 +1656,16 @@ public void handle(int partitionId, String uuid) {
public ClientQueryCacheContext getQueryCacheContext() {
return queryCacheContext;
}

@Override
protected void onDestroy() {
try {
SubscriberContext subscriberContext = queryCacheContext.getSubscriberContext();
QueryCacheEndToEndProvider provider = subscriberContext.getEndToEndQueryCacheProvider();
provider.destroyAllQueryCaches(name);
} finally {
super.onDestroy();
}
}

}
Expand Up @@ -21,29 +21,52 @@
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.querycache.QueryCacheContext;
import com.hazelcast.map.impl.querycache.accumulator.DefaultAccumulatorInfoSupplier;
import com.hazelcast.map.impl.querycache.publisher.MapListenerRegistry;
import com.hazelcast.map.impl.querycache.publisher.MapPublisherRegistry;
import com.hazelcast.map.impl.querycache.publisher.PartitionAccumulatorRegistry;
import com.hazelcast.map.impl.querycache.publisher.PublisherContext;
import com.hazelcast.map.impl.querycache.publisher.PublisherRegistry;
import com.hazelcast.map.impl.querycache.publisher.QueryCacheListenerRegistry;
import com.hazelcast.map.impl.querycache.subscriber.QueryCacheEndToEndProvider;
import com.hazelcast.map.impl.querycache.subscriber.QueryCacheFactory;
import com.hazelcast.map.impl.querycache.subscriber.SubscriberContext;
import com.hazelcast.query.TruePredicate;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.eventservice.impl.EventServiceImpl;
import com.hazelcast.spi.impl.eventservice.impl.EventServiceSegment;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
@Ignore("Enable this test after fix of this issue: https://github.com/hazelcast/hazelcast/issues/11145")
public class ClientQueryCacheMemoryLeakTest extends HazelcastTestSupport {

private TestHazelcastFactory factory = new TestHazelcastFactory();

@After
public void tearDown() throws Exception {
factory.shutdownAll();
}

@Test
public void removes_internal_query_caches_upon_map_destroy() throws Exception {
factory.newHazelcastInstance();
Expand All @@ -52,6 +75,8 @@ public void removes_internal_query_caches_upon_map_destroy() throws Exception {
String mapName = "test";
IMap<Integer, Integer> map = client.getMap(mapName);

populateMap(map);

for (int j = 0; j < 10; j++) {
map.getQueryCache(j + "-test-QC", TruePredicate.INSTANCE, true);
}
Expand All @@ -67,8 +92,102 @@ public void removes_internal_query_caches_upon_map_destroy() throws Exception {
assertEquals(0, queryCacheFactory.getQueryCacheCount());
}

@After
public void tearDown() throws Exception {
factory.shutdownAll();
@Test
public void no_query_cache_left_after_creating_and_destroying_same_map_concurrently() throws Exception {
final HazelcastInstance node = factory.newHazelcastInstance();
final HazelcastInstance client = factory.newHazelcastClient();
final String mapName = "test";

ExecutorService pool = Executors.newFixedThreadPool(5);

for (int i = 0; i < 1000; i++) {
Runnable runnable = new Runnable() {
public void run() {
IMap<Integer, Integer> map = client.getMap(mapName);
;
try {
populateMap(map);
for (int j = 0; j < 10; j++) {
map.getQueryCache(j + "-test-QC", TruePredicate.INSTANCE, true);
}
} finally {
map.destroy();
}

}
};
pool.submit(runnable);
}

pool.shutdown();
pool.awaitTermination(60, TimeUnit.SECONDS);

SubscriberContext subscriberContext = getSubscriberContext(client, mapName);
QueryCacheEndToEndProvider provider = subscriberContext.getEndToEndQueryCacheProvider();
QueryCacheFactory queryCacheFactory = subscriberContext.getQueryCacheFactory();

assertEquals(0, provider.getQueryCacheCount(mapName));
assertEquals(0, queryCacheFactory.getQueryCacheCount());

assertNoListenerLeftOnEventService(node);
assertNoRegisteredListenerLeft(node, mapName);
assertNoAccumulatorInfoSupplierLeft(node, mapName);
assertNoPartitionAccumulatorRegistryLeft(node, mapName);
}

private static void assertNoAccumulatorInfoSupplierLeft(HazelcastInstance node, String mapName) {
PublisherContext publisherContext = getPublisherContext(node);
DefaultAccumulatorInfoSupplier accumulatorInfoSupplier
= (DefaultAccumulatorInfoSupplier) publisherContext.getAccumulatorInfoSupplier();
int accumulatorInfoCountOfMap = accumulatorInfoSupplier.accumulatorInfoCountOfMap(mapName);
assertEquals(0, accumulatorInfoCountOfMap);
}

private static void assertNoRegisteredListenerLeft(HazelcastInstance node, String mapName) {
PublisherContext publisherContext = getPublisherContext(node);
MapListenerRegistry mapListenerRegistry = publisherContext.getMapListenerRegistry();
QueryCacheListenerRegistry registry = mapListenerRegistry.getOrNull(mapName);
if (registry != null) {
Map<String, String> registeredListeners = registry.getAll();
assertTrue(registeredListeners.isEmpty());
}
}

private static void assertNoPartitionAccumulatorRegistryLeft(HazelcastInstance node, String mapName) {
PublisherContext publisherContext = getPublisherContext(node);
MapPublisherRegistry mapPublisherRegistry = publisherContext.getMapPublisherRegistry();
PublisherRegistry registry = mapPublisherRegistry.getOrCreate(mapName);
if(registry == null) {
return;
}

Map<String, PartitionAccumulatorRegistry> accumulatorRegistryMap = registry.getAll();
assertTrue(accumulatorRegistryMap.isEmpty());
}

private static void assertNoListenerLeftOnEventService(HazelcastInstance node) {
NodeEngineImpl nodeEngineImpl = getNodeEngineImpl(node);
EventServiceImpl eventService = ((EventServiceImpl) nodeEngineImpl.getEventService());
EventServiceSegment segment = eventService.getSegment(MapService.SERVICE_NAME, false);
ConcurrentMap registrationIdMap = segment.getRegistrationIdMap();
assertEquals(registrationIdMap.toString(), 0, registrationIdMap.size());
}

private static void populateMap(IMap<Integer, Integer> map) {
for (int i = 0; i < 10; i++) {
map.put(i, i);
}
}

private static SubscriberContext getSubscriberContext(HazelcastInstance client, String mapName) {
final IMap<Integer, Integer> map = client.getMap(mapName);
return ((ClientMapProxy) map).getQueryCacheContext().getSubscriberContext();
}

private static PublisherContext getPublisherContext(HazelcastInstance node) {
MapService mapService = getNodeEngineImpl(node).getService(MapService.SERVICE_NAME);
MapServiceContext mapServiceContext = mapService.getMapServiceContext();
QueryCacheContext queryCacheContext = mapServiceContext.getQueryCacheContext();
return queryCacheContext.getPublisherContext();
}
}
Expand Up @@ -38,7 +38,6 @@
import com.hazelcast.map.impl.query.QueryResultUtils;
import com.hazelcast.map.impl.query.Target;
import com.hazelcast.map.impl.querycache.QueryCacheContext;
import com.hazelcast.map.impl.querycache.subscriber.InternalQueryCache;
import com.hazelcast.map.impl.querycache.subscriber.NodeQueryCacheEndToEndConstructor;
import com.hazelcast.map.impl.querycache.subscriber.QueryCacheEndToEndProvider;
import com.hazelcast.map.impl.querycache.subscriber.QueryCacheRequest;
Expand Down Expand Up @@ -67,7 +66,6 @@
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.util.CollectionUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.IterationType;
import com.hazelcast.util.MapUtil;
import com.hazelcast.util.UuidUtil;
Expand Down Expand Up @@ -945,11 +943,10 @@ private QueryCache<K, V> getQueryCacheInternal(String name, MapListener listener
}

private QueryCache<K, V> createQueryCache(QueryCacheRequest request) {
ConstructorFunction<String, InternalQueryCache> constructorFunction = new NodeQueryCacheEndToEndConstructor(request);
QueryCacheContext queryCacheContext = request.getContext();
SubscriberContext subscriberContext = queryCacheContext.getSubscriberContext();
QueryCacheEndToEndProvider queryCacheEndToEndProvider = subscriberContext.getEndToEndQueryCacheProvider();
return queryCacheEndToEndProvider.getOrCreateQueryCache(request.getMapName(), request.getCacheName(),
constructorFunction);
new NodeQueryCacheEndToEndConstructor(request));
}
}
Expand Up @@ -53,6 +53,9 @@
import com.hazelcast.map.impl.operation.RemoveInterceptorOperation;
import com.hazelcast.map.impl.query.MapQueryEngine;
import com.hazelcast.map.impl.query.QueryEventFilter;
import com.hazelcast.map.impl.querycache.QueryCacheContext;
import com.hazelcast.map.impl.querycache.subscriber.QueryCacheEndToEndProvider;
import com.hazelcast.map.impl.querycache.subscriber.SubscriberContext;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.map.listener.MapListener;
import com.hazelcast.map.listener.MapPartitionLostListener;
Expand Down Expand Up @@ -1114,6 +1117,20 @@ private void publishMapEvent(int numberOfAffectedEntries, EntryEventType eventTy
mapEventPublisher.publishMapEvent(thisAddress, name, eventType, numberOfAffectedEntries);
}

@Override
protected boolean preDestroy() {
try {
QueryCacheContext queryCacheContext = mapServiceContext.getQueryCacheContext();
SubscriberContext subscriberContext = queryCacheContext.getSubscriberContext();
QueryCacheEndToEndProvider provider = subscriberContext.getEndToEndQueryCacheProvider();
provider.destroyAllQueryCaches(name);
} finally {
super.preDestroy();
}

return true;
}

protected long getTimeInMillis(long time, TimeUnit timeunit) {
long timeInMillis = timeunit.toMillis(time);
if (time > 0 && timeInMillis == 0) {
Expand Down
Expand Up @@ -172,12 +172,12 @@ public Object toObject(Object obj) {
return mapServiceContext.toObject(obj);
}

private String registerLocalIMapListener(String mapName) {
private String registerLocalIMapListener(String name) {
return mapServiceContext.addLocalListenerAdapter(new ListenerAdapter<IMapEvent>() {
@Override
public void onEvent(IMapEvent event) {
// NOP
}
}, mapName);
}, name);
}
}
Expand Up @@ -49,6 +49,10 @@ public DefaultAccumulatorInfoSupplier() {
@Override
public AccumulatorInfo getAccumulatorInfoOrNull(String mapName, String cacheId) {
ConcurrentMap<String, AccumulatorInfo> cacheToInfoMap = cacheInfoPerMap.get(mapName);
if (cacheToInfoMap == null) {
return null;
}

return cacheToInfoMap.get(cacheId);
}

Expand All @@ -64,6 +68,17 @@ public void remove(String mapName, String cacheId) {
if (cacheToInfoMap == null) {
return;
}

cacheToInfoMap.remove(cacheId);
}

// only for testing
public int accumulatorInfoCountOfMap(String mapName) {
ConcurrentMap<String, AccumulatorInfo> accumulatorInfo = cacheInfoPerMap.get(mapName);
if (accumulatorInfo == null) {
return 0;
} else {
return accumulatorInfo.size();
}
}
}
Expand Up @@ -24,7 +24,6 @@
import com.hazelcast.map.impl.query.QueryEventFilter;
import com.hazelcast.map.impl.querycache.InvokerWrapper;
import com.hazelcast.map.impl.querycache.NodeInvokerWrapper;
import com.hazelcast.map.impl.querycache.QueryCacheConfigurator;
import com.hazelcast.map.impl.querycache.QueryCacheContext;
import com.hazelcast.map.impl.querycache.QueryCacheEventService;
import com.hazelcast.map.impl.querycache.accumulator.Accumulator;
Expand Down Expand Up @@ -189,24 +188,25 @@ private void destroyRemoteResources() {

InvokerWrapper invokerWrapper = context.getInvokerWrapper();
if (invokerWrapper instanceof NodeInvokerWrapper) {
subscriberContext.getEventService().removePublisherListener(mapName, publisherListenerId);

Collection<Member> memberList = context.getMemberList();
for (Member member : memberList) {
Address address = member.getAddress();
Object removePublisher = subscriberContextSupport.createDestroyQueryCacheOperation(mapName, cacheName);
Object removePublisher = subscriberContextSupport.createDestroyQueryCacheOperation(mapName, cacheId);
invokerWrapper.invokeOnTarget(removePublisher, address);
}
} else {
try {
subscriberContext.getEventService().removePublisherListener(mapName, publisherListenerId);
} finally {
Object removePublisher = subscriberContextSupport.createDestroyQueryCacheOperation(mapName, cacheName);
Object removePublisher = subscriberContextSupport.createDestroyQueryCacheOperation(mapName, cacheId);
invokerWrapper.invoke(removePublisher);
}
}
}

private boolean destroyLocalResources() {
removeConfig();
removeAccumulatorInfo();
removeSubscriberRegistry();
return removeInternalQueryCache();
Expand All @@ -230,16 +230,10 @@ private void removeAccumulatorInfo() {
accumulatorInfoSupplier.remove(mapName, cacheId);
}

private void removeConfig() {
SubscriberContext subscriberContext = context.getSubscriberContext();
QueryCacheConfigurator queryCacheConfigurator = subscriberContext.geQueryCacheConfigurator();
queryCacheConfigurator.removeConfiguration(mapName, cacheName);
}

private boolean removeInternalQueryCache() {
SubscriberContext subscriberContext = context.getSubscriberContext();
QueryCacheEndToEndProvider cacheProvider = subscriberContext.getEndToEndQueryCacheProvider();
cacheProvider.remove(mapName, cacheName);
cacheProvider.removeSingleQueryCache(mapName, cacheId);
clear();
return subscriberContext.getQueryCacheFactory().remove(this);
}
Expand Down

0 comments on commit 2a88ee8

Please sign in to comment.