Browse files

ISPN-1876 clustered query is not executed on the right cache

Put a EmbeddedCacheManager in CommandInitializer to get the correct cache
when a ClusteredQueryCommand arrives in the target node.
  • Loading branch information...
1 parent ee504c9 commit 22fd4e883a74888490e292c8136e6016a05d27e8 Israel Lacerra committed with galderz Mar 30, 2013
View
25 core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
@@ -411,6 +411,31 @@ protected Transaction tx(int i) {
waitForClusterToForm();
return caches;
}
+
+ /**
+ * Create cacheNames.lenght in each CacheManager (numMembersInCluster cacheManagers).
+ *
+ * @param numMembersInCluster
+ * @param defaultConfigBuilder
+ * @param cacheNames
+ * @return A list with size numMembersInCluster containing a list of cacheNames.length caches
+ */
+ protected <K, V> List<List<Cache<K, V>>> createClusteredCaches(int numMembersInCluster,
+ ConfigurationBuilder defaultConfigBuilder, String[] cacheNames) {
+ List<List<Cache<K, V>>> allCaches = new ArrayList<List<Cache<K, V>>>(numMembersInCluster);
+ for (int i = 0; i < numMembersInCluster; i++) {
+ EmbeddedCacheManager cm = addClusterEnabledCacheManager(defaultConfigBuilder);
+ List<Cache<K, V>> currentCacheManagerCaches = new ArrayList<Cache<K, V>>(cacheNames.length);
+
+ for (String cacheName : cacheNames) {
+ Cache<K, V> cache = cm.getCache(cacheName);
+ currentCacheManagerCaches.add(cache);
+ }
+ allCaches.add(currentCacheManagerCaches);
+ }
+ waitForClusterToForm(cacheNames);
+ return allCaches;
+ }
protected ReplListener replListener(Cache cache) {
ReplListener listener = listeners.get(cache);
View
13 query/src/main/java/org/infinispan/query/CommandInitializer.java
@@ -26,6 +26,7 @@
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.module.ModuleCommandInitializer;
+import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.query.backend.QueryInterceptor;
import org.infinispan.query.impl.ComponentRegistryUtils;
@@ -41,8 +42,10 @@
private Cache<?, ?> cache;
private SearchFactoryImplementor searchFactoryImplementor;
private QueryInterceptor queryInterceptor;
+ private EmbeddedCacheManager cacheManager;
- public void setCache(Cache<?, ?> cache){
+ public void setCache(Cache<?, ?> cache, EmbeddedCacheManager cacheManager){
+ this.cacheManager = cacheManager;
this.cache = cache;
SearchManager searchManager = Search.getSearchManager(cache);
SearchFactory searchFactory = searchManager.getSearchFactory();
@@ -58,16 +61,16 @@ public void initializeReplicableCommand(final ReplicableCommand c, final boolean
queryCommand.fetchExecutionContext(this);
}
- public final Cache<?, ?> getCache() {
- return cache;
- }
-
public final SearchFactoryImplementor getSearchFactory() {
return searchFactoryImplementor;
}
public final QueryInterceptor getQueryInterceptor() {
return queryInterceptor;
}
+
+ public EmbeddedCacheManager getCacheManager(){
+ return cacheManager;
+ }
}
View
2 query/src/main/java/org/infinispan/query/clustered/ClusteredQueryCommand.java
@@ -72,7 +72,7 @@ public ClusteredQueryCommand(String cacheName) {
@Override
public void fetchExecutionContext(CommandInitializer ci) {
- this.cache = ci.getCache();
+ this.cache = ci.getCacheManager().getCache(cacheName);
}
public static ClusteredQueryCommand createLazyIterator(HSQuery query, Cache<?, ?> cache, UUID id) {
View
6 query/src/main/java/org/infinispan/query/impl/LifecycleManager.java
@@ -158,10 +158,12 @@ public void cacheStarted(ComponentRegistry cr, String cacheName) {
throw new IllegalStateException( "It was expected to find the Query interceptor registered in the InterceptorChain but it wasn't found" );
}
- // initializing the query module command initializer. we can t inject Cache with @inject in there
+ // initializing the query module command initializer.
+ // we can t inject Cache and CacheManager with @inject in there
Cache<?, ?> cache = cr.getComponent(Cache.class);
CommandInitializer initializer = cr.getComponent(CommandInitializer.class);
- initializer.setCache(cache);
+ EmbeddedCacheManager cacheManager = cr.getGlobalComponentRegistry().getComponent(EmbeddedCacheManager.class);
+ initializer.setCache(cache, cacheManager);
QueryBox queryBox = new QueryBox();
queryBox.setCache(cache.getAdvancedCache());
View
67 query/src/test/java/org/infinispan/query/blackbox/ClusteredQueryMultipleCachesTest.java
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+package org.infinispan.query.blackbox;
+
+import java.util.List;
+
+import org.infinispan.Cache;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.query.test.Person;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for testing clustered queries functionality on multiple cache instances
+ * (In these tests we have two caches in each CacheManager)
+ *
+ * @author Israel Lacerra <israeldl@gmail.com>
+ * @since 5.2
+ */
+@Test(groups = "functional", testName = "query.blackbox.ClusteredQueryMultipleCachesTest")
+public class ClusteredQueryMultipleCachesTest extends ClusteredQueryTest {
+
+ Cache<String, Person> cacheBMachine1, cacheBMachine2;
+ Person person5;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ ConfigurationBuilder cacheCfg = getDefaultClusteredCacheConfig(getCacheMode(), false);
+ cacheCfg.indexing().enable().indexLocalOnly(true).addProperty("default.directory_provider", "ram")
+ .addProperty("lucene_version", "LUCENE_CURRENT");
+ enhanceConfig(cacheCfg);
+ String[] cacheNames = { "cacheA", "cacheB" };
+ List<List<Cache<String, Person>>> caches = createClusteredCaches(2, cacheCfg, cacheNames);
+ cacheAMachine1 = caches.get(0).get(0);
+ cacheAMachine2 = caches.get(1).get(0);
+ cacheBMachine1 = caches.get(0).get(1);
+ cacheBMachine2 = caches.get(1).get(1);
+ }
+
+ @Override
+ protected void prepareTestData() {
+ super.prepareTestData();
+
+ person5 = new Person();
+ person5.setName("People In Another Cache");
+ person5.setBlurb("Also eats grass");
+ person5.setAge(5);
+
+ cacheBMachine2.put("anotherNewOne", person5);
+ }
+
+}
View
18 query/src/test/java/org/infinispan/query/blackbox/ClusteredQueryTest.java
@@ -50,7 +50,7 @@
@Test(groups = "functional", testName = "query.blackbox.ClusteredQueryTest")
public class ClusteredQueryTest extends MultipleCacheManagersTest {
- Cache<String, Person> cache1, cache2;
+ Cache<String, Person> cacheAMachine1, cacheAMachine2;
Person person1;
Person person2;
Person person3;
@@ -82,15 +82,15 @@ protected void createCacheManagers() throws Throwable {
.addProperty("lucene_version", "LUCENE_CURRENT");
enhanceConfig(cacheCfg);
List<Cache<String, Person>> caches = createClusteredCaches(2, cacheCfg);
- cache1 = caches.get(0);
- cache2 = caches.get(1);
+ cacheAMachine1 = caches.get(0);
+ cacheAMachine2 = caches.get(1);
}
protected CacheMode getCacheMode() {
return CacheMode.REPL_SYNC;
}
- private void prepareTestData() {
+ protected void prepareTestData() {
person1 = new Person();
person1.setName("NavinSurtani");
person1.setBlurb("Likes playing WoW");
@@ -108,16 +108,16 @@ private void prepareTestData() {
// Put the 3 created objects in the cache1.
- cache2.put(key1, person1);
- cache1.put(key2, person2);
- cache1.put(key3, person3);
+ cacheAMachine2.put(key1, person1);
+ cacheAMachine1.put(key2, person2);
+ cacheAMachine1.put(key3, person3);
person4 = new Person();
person4.setName("MightyGoat");
person4.setBlurb("Also eats grass");
person4.setAge(66);
- cache1.put("newOne", person4);
+ cacheAMachine1.put("newOne", person4);
}
public void testLazyOrdered() throws ParseException {
@@ -237,7 +237,7 @@ private void populateCache() throws ParseException {
queries[1] = luceneQuery;
luceneQuery = luceneQuery.combine(queries);
- cacheQuery = Search.getSearchManager(cache1).getClusteredQuery(luceneQuery);
+ cacheQuery = Search.getSearchManager(cacheAMachine1).getClusteredQuery(luceneQuery);
}
}
View
6 query/src/test/java/org/infinispan/query/blackbox/TopologyAwareClusteredQueryTest.java
@@ -47,9 +47,9 @@ protected void createCacheManagers() throws Throwable {
for(Object cache : caches) {
cacheManagers.add(((Cache) cache).getCacheManager());
}
-
- cache1 = (Cache<String, Person>) caches.get(0);
- cache2 = (Cache<String, Person>) caches.get(1);
+
+ cacheAMachine1 = (Cache<String, Person>) caches.get(0);
+ cacheAMachine2 = (Cache<String, Person>) caches.get(1);
waitForClusterToForm();
}

0 comments on commit 22fd4e8

Please sign in to comment.