Skip to content

Commit

Permalink
ISPN-1106 - Allow the users to start multiple caches at once
Browse files Browse the repository at this point in the history
This is no longer strictly necessary for ISPN-1106, as we are waiting
with a shorter timeout on transactions with locks and so the rehash
does not block for a very long period of time.

It is recommended however to start all caches on application startup,
and this method provides an easy way for users to start all their caches.
  • Loading branch information
Dan Berindei authored and maniksurtani committed Sep 7, 2011
1 parent 6ed94d3 commit 548fa62
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 26 deletions.
108 changes: 82 additions & 26 deletions core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
*/
package org.infinispan.manager;

import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.CacheException;
import org.infinispan.Version;
Expand Down Expand Up @@ -64,6 +63,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -447,21 +447,10 @@ public <K, V> Cache<K, V> getCache(String cacheName) {
return cw.getCache();
}

boolean acquired = false;
try {
if (cacheCreateLock.tryLock(defaultConfiguration.getLockAcquisitionTimeout(), MILLISECONDS)) {
acquired = true;
return createCache(cacheName);
} else {
throw new CacheException("Unable to acquire lock on cache with name " + cacheName);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CacheException("Interrupted while trying to get lock on cache with cache name " + cacheName, e);
} finally {
if (acquired)
cacheCreateLock.unlock();
if (caches.size() > 0) {
log.shouldBeUsingStartCache(cacheName);
}
return createCache(cacheName);
}

@Override
Expand All @@ -478,6 +467,42 @@ public <K, V> Cache<K, V> getCache(String cacheName, boolean createIfAbsent) {
return getCache(cacheName);
}

@Override
public EmbeddedCacheManager startCaches(final String... cacheNames) {
List<Thread> threads = new ArrayList<Thread>(cacheNames.length);
boolean haveStoppedCaches = false;
boolean haveRunningCaches = false;
for (final String cacheName : cacheNames) {
if (isRunning(cacheName)) {
haveRunningCaches = true;
continue;
}

haveStoppedCaches = true;
String threadName = "CacheStartThread," + globalConfiguration.getClusterName() + "," + cacheName;
Thread thread = new Thread(threadName) {
@Override
public void run() {
createCache(cacheName);
}
};
thread.start();
threads.add(thread);
}
if (haveStoppedCaches && haveRunningCaches) {
log.asymmetricClusterWarning();
}
try {
for (Thread thread : threads) {
thread.join(defaultConfiguration.getLockAcquisitionTimeout());
}
} catch (InterruptedException e) {
throw new CacheException("Interrupted while waiting for the caches to start");
}

return this;
}

@Override
public void removeCache(String cacheName) {
RemoveCacheCommand cmd = new RemoveCacheCommand(this, globalComponentRegistry);
Expand Down Expand Up @@ -536,14 +561,42 @@ public boolean isCoordinator() {
return t != null && t.isCoordinator();
}

@GuardedBy("Cache name lock container keeps a lock per cache name which guards this method")
private Cache createCache(String cacheName) {
boolean trace = log.isTraceEnabled();
LogFactory.pushNDC(cacheName, trace);
try {
Cache cache = wireCache(cacheName);
// a null return value means the cache was created by someone else before we got the lock
if (cache == null)
return caches.get(cacheName).getCache();

// start the cache-level components
try {
cache.start();
} finally {
// allow other threads to access the cache
caches.get(cacheName).latch.countDown();
}

return cache;
} finally {
LogFactory.popNDC(trace);
}
}

/**
* @return a null return value means the cache was created by someone else before we got the lock
*/
private Cache wireCache(String cacheName) {
boolean acquired = false;
try {
if (!cacheCreateLock.tryLock(defaultConfiguration.getLockAcquisitionTimeout(), MILLISECONDS)) {
throw new CacheException("Unable to acquire lock on cache with name " + cacheName);
}
acquired = true;
CacheWrapper existingCache = caches.get(cacheName);
if (existingCache != null)
return existingCache.getCache();
return null;

Configuration c = getConfiguration(cacheName);
setConfigurationName(cacheName, c);
Expand All @@ -553,18 +606,21 @@ private Cache createCache(String cacheName) {
c.assertValid();
Cache cache = new InternalCacheFactory().createCache(c, globalComponentRegistry, cacheName, reflectionCache);
CacheWrapper cw = new CacheWrapper(cache);
try {
existingCache = caches.putIfAbsent(cacheName, cw);
if (existingCache != null) {
throw new IllegalStateException("attempt to initialize the cache twice");
}
cache.start();
} finally {
cw.latch.countDown();
existingCache = caches.put(cacheName, cw);
if (existingCache != null) {
throw new IllegalStateException("attempt to initialize the cache twice");
}

// start the global components here, while we have the global lock
globalComponentRegistry.start();

return cache;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CacheException("Interrupted while trying to get lock on cache with cache name " + cacheName, e);
} finally {
LogFactory.popNDC(trace);
if (acquired)
cacheCreateLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@ public interface EmbeddedCacheManager extends CacheContainer, Listenable {
*/
<K, V> Cache<K, V> getCache(String cacheName, boolean createIfAbsent);

/**
* Starts a set of caches in parallel.
* Infinispan doesn't yet support asymmetric clusters (that is, two nodes
* having different sets of caches running). Calling this method on
* application/application server startup with all your cache names
* will ensure that the cluster is symmetric.
*
* @param cacheNames the names of the caches to start
* @since 5.0
*/
EmbeddedCacheManager startCaches(String... cacheNames);

/**
* Removes a cache with the given name from the system. This is a cluster
* wide operation that results not only in stopping the cache with the given
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/infinispan/util/logging/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -739,4 +739,13 @@ void asyncStoreShutdownTimeoutTooHigh(long configuredAsyncStopTimeout,
@LogMessage(level = ERROR)
@Message(value = "Unable to unlock keys %2$s for transaction %1$s after they were rebalanced off node %3$s", id = 154)
void unableToUnlockRebalancedKeys(GlobalTransaction gtx, List<Object> keys, Address self, @Cause Throwable t);

@LogMessage(level = WARN)
@Message(value = "You are not starting all your caches at the same time. This can lead to problems as asymmetric clusters are not supported, see ISPN-658", id = 155)
void asymmetricClusterWarning();

@LogMessage(level = WARN)
@Message(value = "You are not starting all your caches at the same time. This can lead to problems as asymmetric clusters are not supported, see ISPN-658. " +
"We recommend using EmbeddedCacheManager.startCaches() to start all your caches upfront.", id = 156)
void shouldBeUsingStartCache(String cacheName);
}
78 changes: 78 additions & 0 deletions core/src/test/java/org/infinispan/api/ParallelCacheStartTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2011 Red Hat Inc. and/or its affiliates and other contributors
* as indicated by the @author tags. All rights reserved.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License, v. 2.1.
* This program is distributed in the hope that it will be useful, but WITHOUT A
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public License,
* v.2.1 along with this distribution; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*/
package org.infinispan.api;

import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.config.Configuration.CacheMode;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;

import java.util.List;

@Test(groups = "functional", testName = "api.ParallelCacheStartTest")
public class ParallelCacheStartTest extends MultipleCacheManagersTest {
Cache cache1, cache2;
EmbeddedCacheManager cm1, cm2;
Configuration cfg;

public ParallelCacheStartTest() {
cleanup = CleanupPhase.AFTER_METHOD;
}

protected void createCacheManagers() throws Throwable {
cm1 = addClusterEnabledCacheManager();
cfg = new Configuration();
cfg.setCacheMode(CacheMode.REPL_SYNC);
cfg.setFetchInMemoryState(false);
cm1.defineConfiguration("cache1", cfg);
cm1.defineConfiguration("cache2", cfg);
}

public void testParallelStartup() throws Exception {
// start both caches in parallel
cm1.startCaches("cache1", "cache2");
List memb1 = cm1.getMembers();
assert 1 == memb1.size() : "Expected 1 member; was " + memb1;

Object coord = memb1.get(0);

cm2 = addClusterEnabledCacheManager();
cm2.defineConfiguration("cache1", cfg);
cm2.defineConfiguration("cache2", cfg);

// again start both caches in parallel
cm2.startCaches("cache1", "cache2");

TestingUtil.blockUntilViewsReceived(50000, true, cm1, cm2);
memb1 = cm1.getMembers();
List memb2 = cm2.getMembers();
assert 2 == memb1.size();
assert memb1.equals(memb2);

cm1.stop();
TestingUtil.blockUntilViewsReceived(50000, false, cm2);

memb2 = cm2.getMembers();
assert 1 == memb2.size();
assert !coord.equals(memb2.get(0));
}
}

0 comments on commit 548fa62

Please sign in to comment.