Skip to content

Commit

Permalink
Fix RMI manual distribution bug
Browse files Browse the repository at this point in the history
* RMI URLs where incorrectly being cast to lowercase, which means manual distribution only worked if the cache name was also in lowercase

Fixed issues with distributed cache (see LDEV-3279)

* Added EHCacheClassLoader to resolve issues with loading RMI classes
* The <cacheManagerEventListenerFactory /> settings are now always honored (previously the properties were only populated when set to "manual" distribution mode, but it's need for "automatic" mode in order to control the ports being used)
* Changed default ehCache distribution configuration for optimal performance by disable "via Copy" and "put" operations
* Refactored code so serialization only happens when replicating objects using the "via copy" options (other distributed cache options just mark the objects for removal, so serialization is not required)
* Fixed bug in TypeUtil.toJVM() which caused components to not be fully serialized (it was being treated as a struct and missing properties/methods extended from other components)
  • Loading branch information
dswitzer committed Jun 2, 2022
1 parent 2539143 commit 0e1b1ac
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 43 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -9,4 +9,5 @@ va specific
target
target/*
*.lock
*.DS_Store
*.DS_Store
.vscode/launch.json
4 changes: 2 additions & 2 deletions build.number
@@ -1,3 +1,3 @@
#Build Number for ANT. Do not edit!
#Thu Sep 06 23:20:19 CEST 2018
build.number=32
#Fri May 27 09:05:50 EDT 2022
build.number=36
2 changes: 1 addition & 1 deletion build.properties
@@ -1,6 +1,6 @@
bundlename: ehcache.extension
filename: ehcache-extension
bundleversion: 2.10.0.
bundleversion: 2.10.1.
bundleversionappendix: -SNAPSHOT
id: 87FE44E5-179C-43A3-A87B3D38BEF4652E
label: EHCache
Expand Down
6 changes: 3 additions & 3 deletions source/CFML/src/context/admin/cdriver/EHCache.cfc
Expand Up @@ -90,10 +90,10 @@
,field("Replicate Asynchronously","replicateAsynchronously","true",true,"whether replications are
asynchronous (checked) or synchronous (unchecked), .","checkbox",'true')

,field("Replicate Puts","replicatePuts","true",true,"whether new elements placed in a cache are replicated to others.","checkbox",'true')
,field("Replicate Puts Via Copy","replicatePutsViaCopy","true",true,"whether the new elements are copied to other caches (checked), or whether a remove message is sent.","checkbox",'true')
,field("Replicate Puts","replicatePuts","false",true,"whether new elements placed in a cache are replicated to others.","checkbox",'true')
,field("Replicate Puts Via Copy","replicatePutsViaCopy","false",true,"whether the new elements are copied to other caches (checked), or whether a remove message is sent.<br><br><small><strong>IMPORTANT</strong> — Enabling this option requires that objects are serialized, which can involve significant overhead, especially when caching components. For best performance, leave this disabled, and then the cache items will just be marked for removal in other nodes instead of pushing a serialized object to the nodes.</small>","checkbox",'true')
,field("Replicate Updates","replicateUpdates","true",true,"whether new elements which override an element already existing with the same key are replicated","checkbox",'true')
,field("Replicate Updates Via Copy","replicateUpdatesViaCopy","true",true,"whether the new elements are copied to other caches (checked), or whether a remove message is sent.","checkbox",'true')
,field("Replicate Updates Via Copy","replicateUpdatesViaCopy","false",true,"whether the new elements are copied to other caches (checked), or whether a remove message is sent.<br><br><small><strong>IMPORTANT</strong> — Enabling this option requires that objects are serialized, which can involve significant overhead, especially when caching components. For best performance, leave this disabled, and then the cache items will just be marked for removal in other nodes instead of pushing a serialized object to the nodes.</small>","checkbox",'true')
,field("Replicate Removals","replicateRemovals","true",true,"whether element removals are replicated.","checkbox",'true')
,field("Asynchronous Replication Intervall","asynchronousReplicationIntervalMillis","1000",true,"The asynchronous replicator runs at a set interval of milliseconds (has no impact when ""Replicate Asynchronously"" is not checked)","text")

Expand Down
1 change: 1 addition & 0 deletions source/java/.classpath
Expand Up @@ -7,6 +7,7 @@
<classpathentry kind="lib" path="libs/jsp-api-2.2.jar"/>
<classpathentry kind="lib" path="libs/org.apache.felix.framework-4.2.1.jar"/>
<classpathentry combineaccessrules="false" kind="src" path="/loader"/>
<classpathentry combineaccessrules="false" kind="src" path="/runtime"/>
<classpathentry kind="lib" path="libs/lucee.jar"/>
<classpathentry kind="lib" path="libs/org.lucee.ehcache-2.10.3.jar"/>
<classpathentry kind="output" path="bin"/>
Expand Down
11 changes: 11 additions & 0 deletions source/java/.project
Expand Up @@ -14,4 +14,15 @@
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
<filteredResources>
<filter>
<id>1652803992565</id>
<name></name>
<type>30</type>
<matcher>
<id>org.eclipse.core.resources.regexFilterMatcher</id>
<arguments>node_modules|.git|__CREATED_BY_JAVA_LANGUAGE_SERVER__</arguments>
</matcher>
</filter>
</filteredResources>
</projectDescription>
52 changes: 37 additions & 15 deletions source/java/src/org/lucee/extension/cache/eh/EHCache.java
Expand Up @@ -56,10 +56,11 @@
import org.lucee.extension.cache.eh.util.CacheUtil;
import org.lucee.extension.cache.eh.util.TypeUtil;

import lucee.commons.io.log.Log;

public class EHCache extends EHCacheSupport {



static {
System.setProperty("net.sf.ehcache.enableShutdownHook", "true");
}
Expand Down Expand Up @@ -89,8 +90,7 @@ public class EHCache extends EHCacheSupport {
private String cacheName;
private ClassLoader classLoader;
private CacheManagerAndHash mah;



public static void flushAllCaches() {
String[] names;
Iterator<Map<String, CacheManagerAndHash>> _it = managersColl.values().iterator();
Expand Down Expand Up @@ -272,7 +272,8 @@ private static String[] createHash(Struct[] arguments) {
}

private static String createXML(String path, String cacheName,Struct arguments, String hash, RefBoolean isDistributed) {
//boolean isDistributed=false;
getLogger().debug("ehcache", "Building ehCache XML...");

isDistributed.setValue(false);
StringBuilder xml=new StringBuilder();
xml.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
Expand All @@ -282,7 +283,6 @@ private static String createXML(String path, String cacheName,Struct arguments,
xml.append("<diskStore path=\"");
xml.append(path);
xml.append("\"/>\n");
xml.append("<cacheManagerEventListenerFactory class=\"\" properties=\"\"/>\n");


// RMI
Expand All @@ -304,10 +304,6 @@ private static String createXML(String path, String cacheName,Struct arguments,
add+" \" />\n");

//hostName=fully_qualified_hostname_or_ip,

// listener
xml.append("<cacheManagerPeerListenerFactory class=\""+RMICacheManagerPeerListenerFactory.class.getName()+"\"/>\n");

}
// Manual
else if(arguments!=null && arguments.get("distributed","").equals("manual")){
Expand All @@ -318,10 +314,17 @@ else if(arguments!=null && arguments.get("distributed","").equals("manual")){
String add = arguments.get("manual_addional","").toString().trim();
if(!Util.isEmpty(add) && !add.startsWith(","))add=","+add;
add=add.replace('\n', ' ');
xml.append(" properties=\"peerDiscovery=manual, rmiUrls="+arguments.get("manual_rmiUrls","").toString().trim().toLowerCase().replace('\n', ' ')+
xml.append(" properties=\"peerDiscovery=manual, rmiUrls="+arguments.get("manual_rmiUrls","").toString().trim().replace('\n', ' ')+
add+"\"/>\n"); //propertySeparator=\",\"
}


// listener
// Whenever RMI is being used, we must add the listener properties so we can bind
// to the specified ports in the administration.
//
// This is important for "automatic" as well, so that we can configure specific
// ports to use instead of having ehCache bind to an ephemeral port.
if( isDistributed.toBooleanValue() ){
StringBuilder sb=new StringBuilder();

String hostName=arguments.get("listener_hostName","").toString().trim().toLowerCase();
Expand All @@ -333,13 +336,16 @@ else if(arguments!=null && arguments.get("distributed","").equals("manual")){
String socketTimeoutMillis = arguments.get("listener_socketTimeoutMillis","").toString().trim().toLowerCase();
if(!Util.isEmpty(socketTimeoutMillis) && !"120000".equals(socketTimeoutMillis))
add(sb,"socketTimeoutMillis="+socketTimeoutMillis);


getLogger().debug("ehcache", "Remote port = " + remoteObjectPort);

xml.append("<cacheManagerPeerListenerFactory");
xml.append(" class=\""+RMICacheManagerPeerListenerFactory.class.getName()+"\"");
if(sb.length()>0)xml.append(" properties=\""+sb+"\"");
xml.append("/>\n");


// for non-distributed caches, we write an empty event listener
} else {
xml.append("<cacheManagerEventListenerFactory class=\"\" properties=\"\"/>\n");
}

xml.append("<defaultCache \n");
Expand All @@ -359,7 +365,13 @@ else if(arguments!=null && arguments.get("distributed","").equals("manual")){
createCacheXml(xml,cacheName,arguments,isDistributed.toBooleanValue());

xml.append("</ehcache>\n");
return xml.toString();

String xmlContents = xml.toString();

getLogger().debug("ehcache", "Finished building ehCache XML...");
getLogger().debug("ehcache", "ehcache.xml = \n" + xmlContents);

return xmlContents;
}


Expand Down Expand Up @@ -473,10 +485,12 @@ public void init(Config config,String cacheName,Struct arguments) throws IOExcep
// env stuff
System.setProperty("net.sf.ehcache.enableShutdownHook", "true");
try {
getLogger().debug("ehcache", "Setting class loader context...");
this.classLoader=CacheUtil.getClassLoaderEnv(config);
setClassLoader();

} catch (PageException pe) {
getLogger().error("ehcache", "Failed to set class loader context...");
throw CFMLEngineFactory.getInstance().getExceptionUtil().toIOException(pe);
}

Expand All @@ -494,6 +508,9 @@ public void init(Config config,String cacheName,Struct arguments) throws IOExcep

// get manager for that specific configuration (arguments)
mah=managers.get(hashArgs);

getLogger().debug("ehcache", "mah = " + ((mah == null) ? "null" : mah.toString()));

if(mah==null) {
Resource hashDir=dir.getRealResource(hashArgs);
if(!hashDir.isDirectory())hashDir.createDirectory(true);
Expand All @@ -502,6 +519,11 @@ public void init(Config config,String cacheName,Struct arguments) throws IOExcep
mah=new CacheManagerAndHash(xml);// "ehcache_"+config.getIdentification().getId()
managers.put(hashArgs, mah);
this.isDistributed=isDistributed.toBooleanValue();
if( this.isDistributed ){
// we should serialize the results if we are putting copies of the elements in the cache
this.isSerialized=(toBooleanValue(arguments.get("replicatePutsViaCopy",Boolean.FALSE),REPLICATE_PUTS_VIA_COPY) || toBooleanValue(arguments.get("replicateUpdatesViaCopy",Boolean.FALSE),REPLICATE_UPDATES_VIA_COPY)) ? true : false;
}
getLogger().debug("ehcache", "Writing EHCache XML!");
// write the xml
writeEHCacheXML(hashDir,xml);
}
Expand Down
Expand Up @@ -81,7 +81,7 @@ public String getKey() {

@Override
public Object getValue() {
return cache.isDistributed?TypeUtil.toCFML(element.getObjectValue()):element.getObjectValue();
return cache.isSerialized?TypeUtil.toCFML(element.getObjectValue()):element.getObjectValue();
}

public void setElement(Element element) {
Expand Down
20 changes: 18 additions & 2 deletions source/java/src/org/lucee/extension/cache/eh/EHCacheSupport.java
Expand Up @@ -29,10 +29,24 @@

import org.lucee.extension.cache.CacheSupport;
import org.lucee.extension.cache.eh.util.TypeUtil;
import lucee.loader.engine.CFMLEngineFactory;
import lucee.commons.io.log.Log;

public abstract class EHCacheSupport extends CacheSupport implements Cache {

protected boolean isDistributed;
protected boolean isSerialized;

public static Log getLogger() {
Log logger = CFMLEngineFactory.getInstance().getThreadConfig().getLog("application");

// for some reason, setting the application log to "debug" does not always show
// the ehCache output, so when debugging code, we can just manually set the log
// to DEBUG mode to make sure we see the log output
// logger.setLogLevel(logger.LEVEL_DEBUG);

return logger;
}

@Override
public boolean contains(String key) {
Expand Down Expand Up @@ -66,9 +80,11 @@ public void put(String key, Object value, Long idleTime, Long liveTime) {
boolean hasTime = idleTime!=null || liveTime!=null;
Integer idle = idleTime==null?null : Integer.valueOf( (int)(idleTime.longValue()/1000) );
Integer live = liveTime==null?null : Integer.valueOf( (int)(liveTime.longValue()/1000) );

getLogger().debug("ehcache", "Putting " + key + " item into cache (serializing=" + isSerialized + ")...");

if(hasTime)getCache().put(new Element(key, isDistributed?TypeUtil.toJVM(value):value ,false, idle, live));
else getCache().put(new Element(key, isDistributed?TypeUtil.toJVM(value):value));
if(hasTime)getCache().put(new Element(key, isSerialized?TypeUtil.toJVM(value):value ,false, idle, live));
else getCache().put(new Element(key, isSerialized?TypeUtil.toJVM(value):value));
}


Expand Down
Expand Up @@ -102,6 +102,7 @@ public final CacheEventListener createCacheEventListener(Properties properties)
int maximumBatchSize = extractMaximumBatchSize(properties);

if (replicateAsynchronously) {
this.log.debug("ehcache", "Replicating asynchronously...");
return new LuceeRMIAsynchronousCacheReplicator(config,log,
replicatePuts,
replicatePutsViaCopy,
Expand All @@ -111,6 +112,7 @@ public final CacheEventListener createCacheEventListener(Properties properties)
replicationIntervalMillis,
maximumBatchSize);
} else {
this.log.debug("ehcache", "Replicating synchronously...");
return new RMISynchronousCacheReplicator(
replicatePuts,
replicatePutsViaCopy,
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.lucee.extension.cache.eh.util;

import java.io.IOException;
import java.lang.reflect.Method;

import lucee.commons.io.cache.Cache;
import lucee.commons.io.cache.CacheEntry;
Expand Down Expand Up @@ -109,8 +108,7 @@ public static boolean allowAll(CacheFilter filter) {

public static ClassLoader getClassLoaderEnv(Config config) throws PageException {
try {
Method m = config.getClass().getMethod("getClassLoaderEnv", new Class[0]);
return (ClassLoader) m.invoke(config, new Object[0]);
return new EHCacheClassLoader(config);
}
catch (Exception e) {
throw CFMLEngineFactory.getInstance().getCastUtil().toPageException(e);
Expand Down

0 comments on commit 0e1b1ac

Please sign in to comment.