Permalink
Browse files

Flow caching cleaner.

  • Loading branch information...
1 parent 93fd187 commit 5a4db047c682c792524aa9df5150c57f05d8959a @rbpark committed Mar 21, 2012
@@ -35,6 +35,7 @@
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.joda.time.DateTimeZone;
+import azkaban.app.jmx.CacheFlowManager;
import azkaban.app.jmx.JmxExecutorManager;
import azkaban.app.jmx.JobScheduler;
import azkaban.app.jmx.RefreshJobs;
@@ -104,6 +105,7 @@
private ObjectName jobRefresherName;
private ObjectName jobSchedulerName;
private ObjectName jobExecutorName;
+ private ObjectName cacheManagerName;
private NamedPermitManager _permitManager;
private ReadWriteLockManager _readWriteLockManager;
@@ -274,13 +276,16 @@ private void configureMBeanServer() {
jobRefresherName = new ObjectName("azkaban.app.jmx.RefreshJobs:name=jobRefresher");
jobSchedulerName = new ObjectName("azkaban.app.jmx.JobScheduler:name=jobScheduler");
jobExecutorName = new ObjectName("azkaban.app.jmx.JmxExecutorManager:name=jobExecutor");
+ cacheManagerName = new ObjectName("azkaban.app.jmx.CacheFlowManager:name=cacheFlowManager");
mbeanServer.registerMBean(new RefreshJobs(this), jobRefresherName);
logger.info("Bean " + jobRefresherName.getCanonicalName() + " registered.");
mbeanServer.registerMBean(new JobScheduler(_schedulerManager, _jobManager), jobSchedulerName);
logger.info("Bean " + jobSchedulerName.getCanonicalName() + " registered.");
mbeanServer.registerMBean(new JmxExecutorManager(_jobExecutorManager, _permitManager, _readWriteLockManager), jobExecutorName);
logger.info("Bean " + jobExecutorName.getCanonicalName() + " registered.");
+ mbeanServer.registerMBean(new CacheFlowManager((CachingFlowManager)_allFlows), cacheManagerName);
+ logger.info("Bean " + cacheManagerName.getCanonicalName() + " registered.");
}
catch(Exception e) {
logger.error("Failed to configure MBeanServer", e);
@@ -291,6 +296,8 @@ public void close() {
try {
mbeanServer.unregisterMBean(jobRefresherName);
mbeanServer.unregisterMBean(jobSchedulerName);
+ mbeanServer.unregisterMBean(jobExecutorName);
+ mbeanServer.unregisterMBean(cacheManagerName);
} catch (Exception e) {
logger.error("Failed to cleanup MBeanServer", e);
}
@@ -0,0 +1,22 @@
+package azkaban.app.jmx;
+
+import azkaban.flow.CachingFlowManager;
+
+public class CacheFlowManager implements CacheFlowManagerMBean {
+ private CachingFlowManager manager;
+
+ public CacheFlowManager(CachingFlowManager manager) {
+ this.manager = manager;
+ }
+
+ @Override
+ public int getCacheSize() {
+ return this.manager.getCache().getSize();
+ }
+
+ @Override
+ public void purgeCache() {
+ this.manager.getCache().evictExpiredElements();
+ }
+
+}
@@ -0,0 +1,9 @@
+package azkaban.app.jmx;
+
+public interface CacheFlowManagerMBean {
+ @DisplayName("OPERATION: getCacheSize")
+ public int getCacheSize();
+
+ @DisplayName("OPERATION: purgeCache")
+ public void purgeCache();
+}
@@ -19,7 +19,6 @@
import azkaban.common.utils.Props;
import azkaban.flow.ExecutableFlow;
import azkaban.flow.Flow;
-import azkaban.jobs.Status;
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
@@ -30,11 +29,8 @@
import org.apache.log4j.Logger;
import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Set;
/**
@@ -46,25 +42,42 @@
public class CachingFlowManager implements FlowManager
{
private static final Logger log = Logger.getLogger(CachingFlowManager.class);
+
private final FlowManager baseManager;
private CacheManager manager = CacheManager.create();
private Cache cache;
-
+ private long cleanInterval = 60000;
+ private long nextCleanTime = 0;
+
public CachingFlowManager(FlowManager baseManager, final int cacheSize, final long timeToIdle)
{
this.baseManager = baseManager;
CacheConfiguration config = new CacheConfiguration();
config.setName("flowhistory");
config.setMaxEntriesLocalHeap(cacheSize);
config.setTimeToIdleSeconds(timeToIdle);
+
config.eternal(false);
config.diskPersistent(false);
config.memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU);
-
+ log.info("Creating Flow cache of size " + cacheSize + " and tti of " + timeToIdle);
cache = new Cache(config);
manager.addCache(cache);
+
+ cleanInterval = timeToIdle;
}
+ private void cleanCache() {
+ if(nextCleanTime < System.currentTimeMillis()) {
+ int cacheSize = cache.getSize();
+ cache.evictExpiredElements();
+ log.info("Evicted from CachingFlowManager: " + (cacheSize - cache.getSize()));
+
+ nextCleanTime = System.currentTimeMillis() + cleanInterval;
+ }
+ }
+
+
public boolean hasFlow(String name)
{
return baseManager.hasFlow(name);
@@ -91,6 +104,10 @@ public Flow getFlow(String name)
return baseManager.iterator();
}
+ public Cache getCache() {
+ return cache;
+ }
+
public ExecutableFlow createNewExecutableFlow(String name)
{
final ExecutableFlow retVal = baseManager.createNewExecutableFlow(name);
@@ -152,6 +169,7 @@ public void reload()
private void addToCache(FlowExecutionHolder retVal)
{
+ cleanCache();
if (retVal == null || retVal.getFlow() == null) {
return;
}
@@ -169,4 +187,5 @@ private void addToCache(FlowExecutionHolder retVal)
public List<String> getRootNamesByFolder(String folder) {
return baseManager.getRootNamesByFolder(folder);
}
+
}
@@ -1,4 +1,5 @@
-log4j.rootLogger=INFO, Azkaban
+log4j.rootLogger=INFO, Console
+
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout

0 comments on commit 5a4db04

Please sign in to comment.