Permalink
Browse files

Removing EhCache. Will remove later.

  • Loading branch information...
1 parent 4849960 commit 46129ca3117d817d058bfb52ffc513701797a61b @rbpark committed Mar 22, 2012
@@ -11,12 +11,32 @@ public CacheFlowManager(CachingFlowManager manager) {
@Override
public int getCacheSize() {
- return this.manager.getCache().getSize();
+ return this.manager.getCacheSize();
}
@Override
+ public long getCleanIntervalMillisec() {
+ return this.manager.getCleanInterval();
+ }
+
+ @Override
+ public void setCleanIntervalMillisec(long interval) {
+ this.manager.setCleanInterval(interval);
+ }
+
+ @Override
+ public long getTimeToIdleMillisec() {
+ return this.manager.getTimeToIdle();
+ }
+
+ @Override
+ public void setTimeToIdleMillisec(long millisec) {
+ this.manager.setTimeToIdleMillisec(millisec);
+ }
+
+ @Override
public void purgeCache() {
- this.manager.getCache().evictExpiredElements();
+ this.manager.evictFinishedIdleFlows();
}
}
@@ -6,4 +6,16 @@
@DisplayName("OPERATION: purgeCache")
public void purgeCache();
+
+ @DisplayName("OPERATION: getCleanIntervalMillisec")
+ public long getCleanIntervalMillisec();
+
+ @DisplayName("OPERATION: setCleanIntervalMillisec")
+ public void setCleanIntervalMillisec(long interval);
+
+ @DisplayName("OPERATION: getTimeToIdleMillisec")
+ public long getTimeToIdleMillisec();
+
+ @DisplayName("OPERATION: setTimeToIdleMillisec")
+ public void setTimeToIdleMillisec(long millisec);
}
@@ -19,19 +19,17 @@
import azkaban.common.utils.Props;
import azkaban.flow.ExecutableFlow;
import azkaban.flow.Flow;
-
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
-import net.sf.ehcache.config.CacheConfiguration;
-import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+import azkaban.jobs.Status;
import org.apache.log4j.Logger;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
/**
* A FlowManager that caches ExecutableFlows.
@@ -44,39 +42,100 @@
private static final Logger log = Logger.getLogger(CachingFlowManager.class);
private final FlowManager baseManager;
- private CacheManager manager = CacheManager.create();
- private Cache cache;
+// private CacheManager manager = CacheManager.create();
+// private Cache cache;
private long cleanInterval = 60000;
+ private long timeToIdle = 300;
private long nextCleanTime = 0;
+ private ConcurrentHashMap<String, Element> cache = new ConcurrentHashMap<String,Element>();
+
+ private class Element {
+ private FlowExecutionHolder holder;
+ private final long createTime;
+ private long lastAccessTime;
+
+ public Element(FlowExecutionHolder holder) {
+ this.holder = holder;
+ createTime = System.currentTimeMillis();
+ lastAccessTime = createTime;
+ }
+
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public void touch() {
+ lastAccessTime = System.currentTimeMillis();
+ }
+
+ public FlowExecutionHolder getValue() {
+ return holder;
+ }
+
+ public boolean isComplete() {
+ switch(holder.getFlow().getStatus()) {
+ case SUCCEEDED:
+ case COMPLETED:
+ case FAILED:
+ return true;
+ default:
+ return false;
+ }
+ }
+ }
public CachingFlowManager(FlowManager baseManager, final int cacheSize, final long timeToIdle)
{
this.baseManager = baseManager;
- CacheConfiguration config = new CacheConfiguration();
- config.setName("flowhistory");
- config.setMaxEntriesLocalHeap(cacheSize);
- config.setTimeToIdleSeconds(timeToIdle);
-
- config.eternal(false);
- config.diskPersistent(false);
- config.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU);
log.info("Creating Flow cache of size " + cacheSize + " and tti of " + timeToIdle);
- cache = new Cache(config);
- manager.addCache(cache);
-
- cleanInterval = timeToIdle * 1000l;
+ this.timeToIdle = timeToIdle;
+ cleanInterval = timeToIdle;
}
+ public long getCleanInterval() {
+ return cleanInterval;
+ }
+
+ public void setCleanInterval(long millisec) {
+ cleanInterval = millisec;
+ }
+
+ public long getTimeToIdle() {
+ return timeToIdle;
+ }
+
+ public void setTimeToIdleMillisec(long millisec) {
+ timeToIdle = millisec;
+ }
+
+
private void cleanCache() {
if(nextCleanTime < System.currentTimeMillis()) {
- int cacheSize = cache.getSize();
- cache.evictExpiredElements();
- log.info("Evicted from CachingFlowManager: " + (cacheSize - cache.getSize()));
-
+ evictFinishedIdleFlows();
nextCleanTime = System.currentTimeMillis() + cleanInterval;
}
}
+ public void evictFinishedIdleFlows() {
+ ArrayList<String> toRemove = new ArrayList<String>();
+ for (Map.Entry<String, Element> entry: cache.entrySet()) {
+ Element element = entry.getValue();
+
+ if(element.isComplete() && (System.currentTimeMillis() - element.getLastAccessTime()) > timeToIdle) {
+ toRemove.add(entry.getKey());
+ }
+ }
+
+
+ log.info("Evicted from CachingFlowManager: " + toRemove.size());
+ for (String key: toRemove) {
+ cache.remove(key);
+ }
+ }
public boolean hasFlow(String name)
{
@@ -104,8 +163,8 @@ public Flow getFlow(String name)
return baseManager.iterator();
}
- public Cache getCache() {
- return cache;
+ public int getCacheSize() {
+ return cache.size();
}
public ExecutableFlow createNewExecutableFlow(String name)
@@ -120,15 +179,10 @@ public ExecutableFlow createNewExecutableFlow(String name)
@Override
public void execute(Props parentProperties, FlowCallback callback) {
String id = getId();
- if (!cache.isKeyInCache(id)) {
- addToCache(
- new FlowExecutionHolder(
- retVal,
- parentProperties
- )
- );
+ if (!cache.containsKey(id)) {
+ addToCache(new FlowExecutionHolder(retVal, parentProperties));
}
-
+
super.execute(parentProperties, callback);
}
};
@@ -146,15 +200,15 @@ public long getCurrMaxId()
public FlowExecutionHolder saveExecutableFlow(FlowExecutionHolder holder)
{
- cleanCache();
return baseManager.saveExecutableFlow(holder);
}
public FlowExecutionHolder loadExecutableFlow(long id)
{
Element elem = cache.get(id);
if (elem != null) {
- return (FlowExecutionHolder)elem.getValue();
+ elem.touch();
+ return elem.getValue();
}
final FlowExecutionHolder retVal = baseManager.loadExecutableFlow(id);
@@ -175,8 +229,8 @@ private void addToCache(FlowExecutionHolder retVal)
return;
}
- Element element = new Element(retVal.getFlow().getId(), retVal);
- cache.put(element);
+ Element element = new Element(retVal);
+ cache.put(retVal.getFlow().getId(), element);
}
@Override
@@ -454,6 +454,7 @@ public void progressMade() {
@Override
public void completed(Status status) {
+ logger.info("Flow " + flow.getId() + ": " + jobDescriptor.getId() + " completed status " + status);
runningJob.setEndTime(new DateTime());
MonitorImpl.getInternalMonitorInterface().workflowEvent(flow.getId(),
@@ -494,6 +495,7 @@ public void completed(Status status) {
executing.remove(runningJob.getId());
throw e;
} finally {
+ logger.info("Flow " + flow.getId() + ": " + jobDescriptor.getId() + " completed.");
// mark the job as completed
executing.remove(runningJob.getId());
completed.put(runningJob.getId(), runningJob);

0 comments on commit 46129ca

Please sign in to comment.