Skip to content

Commit

Permalink
preload over hotrod
Browse files Browse the repository at this point in the history
  • Loading branch information
mmarkus committed Jul 19, 2010
1 parent b181ced commit c51b706
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import net.jcip.annotations.ThreadSafe;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalEntryFactory;
import org.infinispan.loaders.AbstractCacheStore;
import org.infinispan.loaders.CacheLoaderConfig;
import org.infinispan.loaders.CacheLoaderException;
Expand All @@ -13,8 +15,11 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand All @@ -30,10 +35,9 @@
* very costly operation as well). Purging takes place at the remote end (infinispan cluster).
* <p/>
*
* @author Mircea.Markus@jboss.com
* @see org.infinispan.loaders.remote.RemoteCacheStoreConfig
* @see <a href="http://community.jboss.org/wiki/JavaHotRodclient">Hotrod Java Client</a>
*
* @author Mircea.Markus@jboss.com
* @since 4.1
*/
@ThreadSafe
Expand All @@ -44,13 +48,13 @@ public class RemoteCacheStore extends AbstractCacheStore {

private volatile RemoteCacheStoreConfig config;
private volatile RemoteCacheManager remoteCacheManager;
private volatile Cache<Object, InternalCacheEntry> remoteCache;
private volatile RemoteCache remoteCache;
private static final String LIFESPAN = "lifespan";
private static final String MAXIDLE = "maxidle";

@Override
public InternalCacheEntry load(Object key) throws CacheLoaderException {
return remoteCache.get(key);
return (InternalCacheEntry) remoteCache.get(key);
}

@Override
Expand All @@ -60,6 +64,11 @@ protected void purgeInternal() throws CacheLoaderException {
}
}

@Override
public boolean containsKey(Object key) throws CacheLoaderException {
return remoteCache.containsKey(key);
}

@Override
public void store(InternalCacheEntry entry) throws CacheLoaderException {
if (log.isTraceEnabled()) {
Expand All @@ -68,25 +77,25 @@ public void store(InternalCacheEntry entry) throws CacheLoaderException {
remoteCache.put(entry.getKey(), entry, toSeconds(entry.getLifespan(), entry, LIFESPAN), TimeUnit.SECONDS, toSeconds(entry.getMaxIdle(), entry, MAXIDLE), TimeUnit.SECONDS);
}

private long toSeconds(long millis, InternalCacheEntry entry, String desc) {
if (millis > 0 && millis < 1000) {
if (log.isTraceEnabled()) {
log.trace("Adjusting " + desc + " time for (k,v): (" + entry.getKey() + ", " + entry.getValue() + ") from "
+ millis + " millis to 1 sec, as milliseconds are not supported by HotRod");
}
return 1;
}
return TimeUnit.MILLISECONDS.toSeconds(millis);
}

@Override
public void fromStream(ObjectInput inputStream) throws CacheLoaderException {
fail();
Map result;
try {
result = (Map<Object, InternalCacheEntry>) marshaller.objectFromObjectStream(inputStream);
remoteCache.putAll(result);
} catch (Exception e) {
throw new CacheLoaderException("Exception while reading data", e);
}
}

@Override
public void toStream(ObjectOutput outputStream) throws CacheLoaderException {
fail();
Map map = remoteCache.getBulk();
try {
marshaller.objectToObjectStream(map, outputStream);
} catch (IOException e) {
throw new CacheLoaderException("Exception while serializing remote data to stream", e);
}
}

@Override
Expand All @@ -101,24 +110,18 @@ public boolean remove(Object key) throws CacheLoaderException {

@Override
public Set<InternalCacheEntry> loadAll() throws CacheLoaderException {
fail();
return null;
Map map = remoteCache.getBulk();
return convertToInternalCacheEntries(map);
}

@Override
public Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException {
fail();
return null;
return convertToInternalCacheEntries(remoteCache.getBulk(numEntries));
}

@Override
public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
fail();
return null;
}

private void fail() throws CacheLoaderException {
String message = "RemoteCacheStore can only run in shared mode and it doesn't support preload!";
String message = "RemoteCacheStore can only run in shared mode! This method shouldn't be called in shared mode";
log.error(message);
throw new CacheLoaderException(message);
}
Expand Down Expand Up @@ -151,4 +154,24 @@ public void stop() throws CacheLoaderException {
public Class<? extends CacheLoaderConfig> getConfigurationClass() {
return RemoteCacheStoreConfig.class;
}

private long toSeconds(long millis, InternalCacheEntry entry, String desc) {
if (millis > 0 && millis < 1000) {
if (log.isTraceEnabled()) {
log.trace("Adjusting " + desc + " time for (k,v): (" + entry.getKey() + ", " + entry.getValue() + ") from "
+ millis + " millis to 1 sec, as milliseconds are not supported by HotRod");
}
return 1;
}
return TimeUnit.MILLISECONDS.toSeconds(millis);
}

private Set<InternalCacheEntry> convertToInternalCacheEntries(Map map) {
Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>(map.size());
Set<Map.Entry> set = map.entrySet();
for (Map.Entry e : set) {
result.add((InternalCacheEntry) e.getValue());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,6 @@ public boolean isUseDefaultRemoteCache() {
return CacheContainer.DEFAULT_CACHE_NAME.equals(getRemoteCacheName());
}

@Override
public void setPurgeOnStartup(Boolean purgeOnStartup) {
super.setPurgeOnStartup(purgeOnStartup);
if (purgeOnStartup) {
log.info("Purge on start-up will be ignored; remote cache store cannot be purged.");
}
}

@Override
public void setFetchPersistentState(Boolean fetchPersistentState) {
if (fetchPersistentState) {
String message = "fetchPersistentState cannot be set for remote cache store. This is because " +
"persistent state cannot be generated by RemoteCacheStore, and state generation is proprietary. I.e. this " +
"store would not be able to integrate state generated by other store implementations.";
log.error(message);
throw new IllegalStateException(message);
} else {
super.setFetchPersistentState(fetchPersistentState);
}
}

public Properties getHotRodClientProperties() {
return hotRodClientProperties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ protected CacheStore createCacheStore() throws Exception {
RemoteCacheStoreConfig remoteCacheStoreConfig = new RemoteCacheStoreConfig();
remoteCacheStoreConfig.setUseDefaultRemoteCache(true);
assert remoteCacheStoreConfig.isUseDefaultRemoteCache();

localCacheManager = TestCacheManagerFactory.createLocalCacheManager();
Configuration configuration = localCacheManager.getDefaultConfiguration();
configuration.setEvictionWakeUpInterval(10);
Expand All @@ -43,7 +43,6 @@ protected CacheStore createCacheStore() throws Exception {
RemoteCacheStore remoteCacheStore = new RemoteCacheStore();
remoteCacheStore.init(remoteCacheStoreConfig, getCache(), getMarshaller());
remoteCacheStore.start();
super.supportsLoadAll = false;
return remoteCacheStore;
}

Expand All @@ -54,42 +53,28 @@ public void tearDown() {
}

@Override
public void testLoadKeys() throws CacheLoaderException {
//not applicable as relies on loadAll
}

@Override
protected void purgeExpired() throws CacheLoaderException {
localCacheManager.getCache().clear();
}

@Override
public void testPreload() throws CacheLoaderException {
//not applicable as relies on loadAll
protected void assertEventuallyExpires(String key) throws Exception {
for (int i = 0; i < 10; i++) {
if (cs.load("k") == null) break;
Thread.sleep(1000);
}
assert cs.load("k") == null;
}

@Override
public void testPreloadWithMaxSize() throws CacheLoaderException {
//not applicable as relies on loadAll
}

protected void sleepForStopStartTest() throws InterruptedException {
Thread.sleep(3000);
}


@Override
public void testStoreAndRemoveAll() throws CacheLoaderException {
//not applicable as relies on loadAll
}

@Override
public void testStreamingAPI() throws IOException, ClassNotFoundException, CacheLoaderException {
//not applicable as relies on loadAll
protected void purgeExpired() throws CacheLoaderException {
localCacheManager.getCache().getAdvancedCache().getEvictionManager().processEviction();
}

/**
* This is not supported, see assertion in {@link RemoteCacheStore#loadAllKeys(java.util.Set)}
*/
@Override
public void testStreamingAPIReusingStreams() throws IOException, ClassNotFoundException, CacheLoaderException {
//not applicable as relies on loadAll
public void testLoadKeys() throws CacheLoaderException {
}
}

0 comments on commit c51b706

Please sign in to comment.