Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Adding EHCache instead of hashmap cache.

  • Loading branch information...
commit 93fd187e1dcbe085b0dceb35c82b3a1fbb5405fe 1 parent 20a4958
@rbpark authored
View
1  .classpath
@@ -40,5 +40,6 @@
<classpathentry kind="lib" path="lib/pig-0.9.1-withouthadoop.jar"/>
<classpathentry kind="lib" path="lib/commons-configuration-1.8.jar"/>
<classpathentry kind="lib" path="lib/hadoop-core-1.0.0-p8.jar"/>
+ <classpathentry kind="lib" path="lib/ehcache-core-2.5.1.jar"/>
<classpathentry kind="output" path="dist/classes"/>
</classpath>
View
3  azkaban/src/java/azkaban/app/AzkabanApplication.java
@@ -206,7 +206,8 @@ public AzkabanApplication(final List<File> jobDirs, final File logDir, final Fil
executionsStorageDir,
lastExecutionId
),
- defaultProps.getInt("azkaban.flow.cache.size", 1000)
+ defaultProps.getInt("azkaban.flow.cache.size", 1000),
+ defaultProps.getInt("azkaban.flow.cache.tti", 300)
);
_jobManager.setFlowManager(_allFlows);
View
73 azkaban/src/java/azkaban/flow/CachingFlowManager.java
@@ -21,6 +21,12 @@
import azkaban.flow.Flow;
import azkaban.jobs.Status;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
import org.apache.log4j.Logger;
import java.util.Collection;
@@ -40,40 +46,23 @@
public class CachingFlowManager implements FlowManager
{
private static final Logger log = Logger.getLogger(CachingFlowManager.class);
-
private final FlowManager baseManager;
- private final Map<String, FlowExecutionHolder> flowCache;
+ private CacheManager manager = CacheManager.create();
+ private Cache cache;
- public CachingFlowManager(FlowManager baseManager, final int cacheSize)
+ public CachingFlowManager(FlowManager baseManager, final int cacheSize, final long timeToIdle)
{
this.baseManager = baseManager;
-
- this.flowCache = Collections.synchronizedMap(
- new LinkedHashMap<String, FlowExecutionHolder>((int) (cacheSize * 1.5), 0.75f, true){
- @Override
- protected boolean removeEldestEntry(Map.Entry<String, FlowExecutionHolder> eldest)
- {
- final boolean tooManyElements = super.size() > cacheSize;
-
- if (tooManyElements) {
- final Status status = eldest.getValue().getFlow().getStatus();
-
- if (status != Status.RUNNING) {
- return true;
- }
- else {
- log.warn(String.format(
- "Cache is at size[%s] and should have evicted an entry, but the oldest entry wasn't completed[%s]. Perhaps the cache size is too small",
- super.size(),
- status
- ));
- }
- }
-
- return false;
- }
- }
- );
+ CacheConfiguration config = new CacheConfiguration();
+ config.setName("flowhistory");
+ config.setMaxEntriesLocalHeap(cacheSize);
+ config.setTimeToIdleSeconds(timeToIdle);
+ config.eternal(false);
+ config.diskPersistent(false);
+ config.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU);
+
+ cache = new Cache(config);
+ manager.addCache(cache);
}
public boolean hasFlow(String name)
@@ -113,8 +102,8 @@ public ExecutableFlow createNewExecutableFlow(String name)
return new WrappingExecutableFlow(retVal){
@Override
public void execute(Props parentProperties, FlowCallback callback) {
-
- if (! flowCache.containsKey(getId())) {
+ String id = getId();
+ if (!cache.isKeyInCache(id)) {
addToCache(
new FlowExecutionHolder(
retVal,
@@ -145,13 +134,12 @@ public FlowExecutionHolder saveExecutableFlow(FlowExecutionHolder holder)
public FlowExecutionHolder loadExecutableFlow(long id)
{
- final FlowExecutionHolder executableFlow = flowCache.get(String.valueOf(id));
- if (executableFlow != null) {
- return executableFlow;
+ Element elem = cache.get(id);
+ if (elem != null) {
+ return (FlowExecutionHolder)elem.getValue();
}
-
+
final FlowExecutionHolder retVal = baseManager.loadExecutableFlow(id);
-
addToCache(retVal);
return retVal;
@@ -168,15 +156,8 @@ private void addToCache(FlowExecutionHolder retVal)
return;
}
- if (flowCache.put(retVal.getFlow().getId(), retVal) != null) {
- throw new IllegalStateException(
- String.format(
- "Attempted to add object to the cache but the id[%s] already existed. Flow was [%s]",
- retVal.getFlow().getId(),
- retVal
- )
- );
- }
+ Element element = new Element(retVal.getFlow().getId(), retVal);
+ cache.put(element);
}
@Override
View
BIN  lib/ehcache-core-2.5.1.jar
Binary file not shown
Please sign in to comment.
Something went wrong with that request. Please try again.