Skip to content

Commit

Permalink
Merge pull request #16 from iulukaya/AZK-125-JMX-MBean-Refresh-Agent
Browse files Browse the repository at this point in the history
AZK-125: JMX MBean and Agent to refresh workflows from disk
  • Loading branch information
rbpark committed Sep 12, 2011
2 parents 637e4b1 + c511f0b commit 35dea11
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 1 deletion.
3 changes: 3 additions & 0 deletions azkaban/src/java/azkaban/app/AzkabanApp.java
Expand Up @@ -30,6 +30,8 @@
import azkaban.web.pages.IndexServlet;
import azkaban.web.pages.JobDetailServlet;
import azkaban.web.pages.JobUploadServlet;
import azkaban.web.pages.RefreshJobsServlet;

import java.io.File;
import java.util.Arrays;
import joptsimple.OptionParser;
Expand Down Expand Up @@ -122,6 +124,7 @@ public static void main(String[] arguments) throws Exception {
servlets.addServlet("Api Servlet", "/call", ApiServlet.class.getName());
servlets.addServlet("Flow Execution", "/flow", FlowExecutionServlet.class.getName());
servlets.addServlet("favicon", "/favicon.ico", Default.class.getName());
servlets.addServlet("Refresh Jobs", "/refresh-jobs", RefreshJobsServlet.class.getName());

try {
server.start();
Expand Down
25 changes: 25 additions & 0 deletions azkaban/src/java/azkaban/app/AzkabanApplication.java
Expand Up @@ -19,18 +19,23 @@

import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.TimeZone;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.joda.time.DateTimeZone;

import azkaban.app.jmx.RefreshJobs;
import azkaban.common.jobs.Job;
import azkaban.common.utils.Props;
import azkaban.common.utils.Utils;
Expand Down Expand Up @@ -92,6 +97,7 @@ public class AzkabanApplication

private final JobExecutorManager _jobExecutorManager;
private final ScheduleManager _schedulerManager;
private MBeanServer mbeanServer;

public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean enableDevMode) throws IOException {
this._jobDirs = Utils.nonNull(jobDirs);
Expand Down Expand Up @@ -209,6 +215,8 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean
}

this._velocityEngine = configureVelocityEngine(enableDevMode);

configureMBeanServer();
}

private VelocityEngine configureVelocityEngine(boolean devMode) {
Expand Down Expand Up @@ -239,6 +247,19 @@ private VelocityEngine configureVelocityEngine(boolean devMode) {
engine.setProperty("parser.pool.size", 3);
return engine;
}

private void configureMBeanServer() {
logger.info("Registering MBeans...");
mbeanServer = ManagementFactory.getPlatformMBeanServer();
try {
ObjectName azkabanAppName = new ObjectName("azkaban.app.jmx.RefreshJobs:name=jobRefresher");
mbeanServer.registerMBean(new RefreshJobs(this), azkabanAppName);
logger.info("Bean " + azkabanAppName.getCanonicalName() + " registered.");
}
catch(Exception e) {
logger.error("Failed to configure MBeanServer", e);
}
}

public String getLogDirectory() {
return _logsDir.getAbsolutePath();
Expand Down Expand Up @@ -384,4 +405,8 @@ public MonitorInterface getMonitor() {
public MonitorInternalInterface getInternalMonitor() {
return _monitor;
}

public void reloadJobsFromDisk() {
getJobManager().updateFlowManager();
}
}
2 changes: 1 addition & 1 deletion azkaban/src/java/azkaban/app/JobManager.java
Expand Up @@ -490,7 +490,7 @@ public void setFlowManager(FlowManager theManager)
updateFlowManager();
}

private void updateFlowManager()
public void updateFlowManager()
{
jobDescriptorCache.set(loadJobDescriptors());
manager.reload();
Expand Down
44 changes: 44 additions & 0 deletions azkaban/src/java/azkaban/app/jmx/RefreshJobs.java
@@ -0,0 +1,44 @@
/*
* Copyright 2011 Adconion, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.app.jmx;

import org.apache.log4j.Logger;

import azkaban.app.AzkabanApplication;

/**
* Implementation of {@link RefreshJobsMBean}.
*/
public class RefreshJobs implements RefreshJobsMBean {
private static final Logger logger = Logger.getLogger(RefreshJobs.class);

private AzkabanApplication app;

public RefreshJobs(AzkabanApplication app) {
this.app = app;
}

public String reloadJobsFromDisk() {
try {
this.app.reloadJobsFromDisk();
return "Reload Successful!";
}
catch (Exception e) {
logger.error(e);
return "Reload Failed (see logs): " + e.getMessage();
}
}
}
32 changes: 32 additions & 0 deletions azkaban/src/java/azkaban/app/jmx/RefreshJobsMBean.java
@@ -0,0 +1,32 @@
/*
* Copyright 2011 Adconion, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.app.jmx;

/**
* MBean interface used to trigger a refresh of all jobs in the
* $AZKABAN_HOME/jobs dir.
*
*/
public interface RefreshJobsMBean {

/**
* Trigger a reload of jobs from the jobs directory on disk. Detect and
* load new jobs if present.
* @return "Reload Successful!" on success, otherwise "Reload Failed" and
* the test of the exception message.
*/
String reloadJobsFromDisk();
}
67 changes: 67 additions & 0 deletions azkaban/src/java/azkaban/web/pages/RefreshJobsServlet.java
@@ -0,0 +1,67 @@
/*
* Copyright 2011 Adconion, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.web.pages;

import java.io.IOException;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.log4j.Logger;

import azkaban.app.JobManager;
import azkaban.common.web.Page;
import azkaban.web.AbstractAzkabanServlet;

public class RefreshJobsServlet extends AbstractAzkabanServlet {

private static final long serialVersionUID = 1;
private static Logger logger = Logger.getLogger(RefreshJobsServlet.class);

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
renderPage(request, response);
}

@Override
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
String confirmed = request.getParameter("confirm");
if (confirmed != null && confirmed.equals("true")) {
try {
getApplication().reloadJobsFromDisk();
addMessage(request, "Jobs have been refreshed");
}
catch(RuntimeException e) {
addError(request, "An exception occurred when refreshing jobs: " + e.toString());
addError(request, "See the log for errors");
logger.error("Failed to refresh jobs", e);
}
}
else {
addError(request, "Jobs not refreshed - no confirmation given");
}
renderPage(request, response);
}

private void renderPage(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
Page page = newPage(request, response, "azkaban/web/pages/refresh_jobs.vm");
page.render();
}
}
24 changes: 24 additions & 0 deletions azkaban/src/java/azkaban/web/pages/refresh_jobs.vm
@@ -0,0 +1,24 @@
<html>
<head>
<title>#appname()</title>
<link rel="stylesheet" type="text/css" href="${context}/static/css/style.css"></link>
<link rel="stylesheet" type="text/css" href="${context}/static/css/azkaban-common.css"></link>
</head>

<body>

#appnamebox()
<div id="container">
#messages()
<h2>Refresh Jobs From Disk</h2>
<div>
<div style="margin: auto; width: 500px; ">
<form method="post" action="$!context/refresh-jobs">
<input type="hidden" name="confirm" value="true"/>
<input type="submit" value="Refresh Jobs"/>
</form>
</div>
</div>
</div>
</body>
</html>
81 changes: 81 additions & 0 deletions azkaban/src/unit/azkaban/app/AzkabanApplicationTest.java
@@ -0,0 +1,81 @@
/*
* Copyright 2011 Adconion, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.app;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;

import junit.framework.Assert;

import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import azkaban.common.jobs.Job;
import azkaban.jobs.builtin.ProcessJob;

public class AzkabanApplicationTest {

private File jobDir;
private File logDir;
private File tmpDir;

@Before
public void setUp() throws IOException {
jobDir = mktempdir("azkaban_jobs");
logDir = mktempdir("azkaban_logs");
tmpDir = mktempdir("azkaban_tmp");
}

@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(jobDir);
FileUtils.deleteDirectory(logDir);
FileUtils.deleteDirectory(tmpDir);
}

private File mktempdir(String name) throws IOException {
File dir = File.createTempFile(name, ".d");
FileUtils.forceDelete(dir);
FileUtils.forceMkdir(dir);
return dir;
}

@Test
public void testAddJobAndReload() throws Exception {
String testJob = "testjob";
AzkabanApplication app = new AzkabanApplication(Arrays.asList(jobDir),
logDir, tmpDir, false);
Assert.assertEquals(0, app.getJobManager().loadJobDescriptors().size());
Assert.assertNull(app.getJobManager().getJobDescriptor(testJob));

File newJobDir = new File(jobDir, "test");
FileUtils.forceMkdir(newJobDir);
File newJob = new File(newJobDir, testJob + ".job");
FileUtils.writeLines(newJob, Arrays.asList("type=command", "command=ls"));

app.reloadJobsFromDisk();

Assert.assertEquals(1, app.getJobManager().loadJobDescriptors().size());
Job loadedJob = app.getJobManager().loadJob(testJob, true);

Assert.assertEquals(testJob, loadedJob.getId());
Assert.assertTrue(loadedJob instanceof LoggingJob);
Assert.assertTrue(((LoggingJob)loadedJob).getInnerJob() instanceof ProcessJob);
}
}

0 comments on commit 35dea11

Please sign in to comment.