Skip to content

Commit

Permalink
AZK-123: Add jmx hooks for Schedule-Unschedule commands
Browse files Browse the repository at this point in the history
  • Loading branch information
Ibrahim Ulukaya committed Sep 21, 2011
1 parent 35dea11 commit 7d0f432
Show file tree
Hide file tree
Showing 8 changed files with 853 additions and 224 deletions.
72 changes: 48 additions & 24 deletions azkaban/src/java/azkaban/app/AzkabanApplication.java
Expand Up @@ -35,6 +35,7 @@
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.joda.time.DateTimeZone;

import azkaban.app.jmx.JobScheduler;
import azkaban.app.jmx.RefreshJobs;
import azkaban.common.jobs.Job;
import azkaban.common.utils.Props;
Expand Down Expand Up @@ -97,18 +98,23 @@ public class AzkabanApplication

private final JobExecutorManager _jobExecutorManager;
private final ScheduleManager _schedulerManager;

private MBeanServer mbeanServer;
private ObjectName jobRefresherName;
private ObjectName jobSchedulerName;

public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean enableDevMode) throws IOException {
public AzkabanApplication(final List<File> jobDirs, final File logDir, final File tempDir, final boolean enableDevMode) throws IOException {
this._jobDirs = Utils.nonNull(jobDirs);
this._logsDir = Utils.nonNull(logDir);
this._tempDir = Utils.nonNull(tempDir);

if(!this._logsDir.exists())
if(!this._logsDir.exists()) {
this._logsDir.mkdirs();
}

if(!this._tempDir.exists())
if(!this._tempDir.exists()) {
this._tempDir.mkdirs();
}

for(File jobDir: _jobDirs) {
if(!jobDir.exists()) {
Expand All @@ -117,8 +123,9 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean
}
}

if(jobDirs.size() < 1)
if(jobDirs.size() < 1) {
throw new IllegalArgumentException("No job directory given.");
}

Props defaultProps = PropsUtils.loadPropsInDirs(_jobDirs, ".properties", ".schema");

Expand Down Expand Up @@ -168,7 +175,9 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean
File executionsStorageDir = new File(
defaultProps.getString("azkaban.executions.storage.dir", initialJobDir.getAbsolutePath() + "/executions")
);
if (! executionsStorageDir.exists()) executionsStorageDir.mkdirs();
if (! executionsStorageDir.exists()) {
executionsStorageDir.mkdirs();
}
long lastExecutionId = getLastExecutionId(executionsStorageDir);
logger.info(String.format("Using path[%s] for storing executions.", executionsStorageDir));
logger.info(String.format("Last known execution id was [%s]", lastExecutionId));
Expand All @@ -179,7 +188,7 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean
FlowExecutionSerializer flowExecutionSerializer = new FlowExecutionSerializer(flowSerializer);
FlowExecutionDeserializer flowExecutionDeserializer = new FlowExecutionDeserializer(flowDeserializer);

_monitor = (MonitorImpl)MonitorImpl.getMonitor();
_monitor = MonitorImpl.getMonitor();

_allFlows = new CachingFlowManager(
new RefreshableFlowManager(
Expand Down Expand Up @@ -208,18 +217,19 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean
*/
String server_url = defaultProps.getString("server.url", null) ;
if (server_url != null) {
if (server_url.endsWith("/"))
_jobExecutorManager.setRuntimeProperty(AppCommon.DEFAULT_LOG_URL_PREFIX, server_url + "logs?file=" );
else
_jobExecutorManager.setRuntimeProperty(AppCommon.DEFAULT_LOG_URL_PREFIX, server_url + "/logs?file=" );
if (server_url.endsWith("/")) {
_jobExecutorManager.setRuntimeProperty(AppCommon.DEFAULT_LOG_URL_PREFIX, server_url + "logs?file=" );
} else {
_jobExecutorManager.setRuntimeProperty(AppCommon.DEFAULT_LOG_URL_PREFIX, server_url + "/logs?file=" );
}
}

this._velocityEngine = configureVelocityEngine(enableDevMode);

configureMBeanServer();
}

private VelocityEngine configureVelocityEngine(boolean devMode) {
private VelocityEngine configureVelocityEngine(final boolean devMode) {
VelocityEngine engine = new VelocityEngine();
engine.setProperty("resource.loader", "classpath");
engine.setProperty("classpath.resource.loader.class",
Expand Down Expand Up @@ -252,15 +262,27 @@ private void configureMBeanServer() {
logger.info("Registering MBeans...");
mbeanServer = ManagementFactory.getPlatformMBeanServer();
try {
ObjectName azkabanAppName = new ObjectName("azkaban.app.jmx.RefreshJobs:name=jobRefresher");
mbeanServer.registerMBean(new RefreshJobs(this), azkabanAppName);
logger.info("Bean " + azkabanAppName.getCanonicalName() + " registered.");
jobRefresherName = new ObjectName("azkaban.app.jmx.RefreshJobs:name=jobRefresher");
jobSchedulerName = new ObjectName("azkaban.app.jmx.jobScheduler:name=jobScheduler");
mbeanServer.registerMBean(new RefreshJobs(this), jobRefresherName);
logger.info("Bean " + jobRefresherName.getCanonicalName() + " registered.");
mbeanServer.registerMBean(new JobScheduler(_schedulerManager, _jobManager), jobSchedulerName);
logger.info("Bean " + jobSchedulerName.getCanonicalName() + " registered.");
}
catch(Exception e) {
logger.error("Failed to configure MBeanServer", e);
}
}

public void close() {
try {
mbeanServer.unregisterMBean(jobRefresherName);
mbeanServer.unregisterMBean(jobSchedulerName);
} catch (Exception e) {
logger.error("Failed to cleanup MBeanServer", e);
}
}

public String getLogDirectory() {
return _logsDir.getAbsolutePath();
}
Expand Down Expand Up @@ -333,7 +355,7 @@ private ClassLoader getBaseClassloader() throws MalformedURLException
return retVal;
}

private NamedPermitManager getNamedPermitManager(Props props) throws MalformedURLException
private NamedPermitManager getNamedPermitManager(final Props props) throws MalformedURLException
{
int workPermits = props.getInt("total.job.permits", Integer.MAX_VALUE);
NamedPermitManager permitManager = new NamedPermitManager();
Expand All @@ -342,33 +364,35 @@ private NamedPermitManager getNamedPermitManager(Props props) throws MalformedUR
return permitManager;
}

private File getBackupFile(Props defaultProps, File initialJobDir)
private File getBackupFile(final Props defaultProps, final File initialJobDir)
{
File retVal = new File(initialJobDir.getAbsoluteFile(), "jobs.schedule.backup");

String backupFile = defaultProps.getString("schedule.backup.file", null);
if(backupFile != null)
if(backupFile != null) {
retVal = new File(backupFile);
else
} else {
logger.info("Schedule backup file param not set. Defaulting to " + retVal.getAbsolutePath());
}

return retVal;
}

private File getScheduleFile(Props defaultProps, File initialJobDir)
private File getScheduleFile(final Props defaultProps, final File initialJobDir)
{
File retVal = new File(initialJobDir.getAbsoluteFile(), "jobs.schedule");

String scheduleFile = defaultProps.getString("schedule.file", null);
if(scheduleFile != null)
if(scheduleFile != null) {
retVal = new File(scheduleFile);
else
} else {
logger.info("Schedule file param not set. Defaulting to " + retVal.getAbsolutePath());
}

return retVal;
}

private long getLastExecutionId(File executionsStorageDir)
private long getLastExecutionId(final File executionsStorageDir)
{
long lastId = 0;

Expand All @@ -390,11 +414,11 @@ private long getLastExecutionId(File executionsStorageDir)
}


public String getRuntimeProperty(String name) {
public String getRuntimeProperty(final String name) {
return _jobExecutorManager.getRuntimeProperty(name);
}

public void setRuntimeProperty(String key, String value) {
public void setRuntimeProperty(final String key, final String value) {
_jobExecutorManager.setRuntimeProperty(key, value);
}

Expand Down
36 changes: 36 additions & 0 deletions azkaban/src/java/azkaban/app/jmx/DisplayName.java
@@ -0,0 +1,36 @@
/*
* Copyright 2011 Adconion, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.app.jmx;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import javax.management.DescriptorKey;

/**
* DisplayName - This annotation allows to supply
* a display name for a method in the MBean interface.
*/
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DisplayName {
@DescriptorKey("displayName")
String value();
}
141 changes: 141 additions & 0 deletions azkaban/src/java/azkaban/app/jmx/JobScheduler.java
@@ -0,0 +1,141 @@
/*
* Copyright 2011 Adconion, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.app.jmx;

import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Hours;
import org.joda.time.IllegalFieldValueException;
import org.joda.time.LocalDateTime;
import org.joda.time.Minutes;
import org.joda.time.ReadablePeriod;
import org.joda.time.Seconds;
import org.joda.time.format.DateTimeFormat;

import azkaban.app.JobDescriptor;
import azkaban.app.JobManager;
import azkaban.scheduler.ScheduleManager;

/**
* @author ibrahimulukaya
* Implements the JobSchedulerMBean
*/
public class JobScheduler implements JobSchedulerMBean {
private static Logger logger = Logger.getLogger(JobScheduler.class);
private ScheduleManager scheduler;
private JobManager jobManager;

public JobScheduler(ScheduleManager scheduler, JobManager jobManager) {
this.scheduler = scheduler;
this.jobManager = jobManager;
}

public String scheduleWorkflow(String jobName, boolean ignoreDeps,
int hour, int minutes, int seconds, String scheduledDate,
boolean isRecurring, int period, String periodUnits) {
String errorMsg = null;
if (jobName == null || jobName.trim().length() == 0) {
errorMsg = "You must select at least one job to run.";
logger.error(errorMsg);
return errorMsg;
}
JobDescriptor descriptor = jobManager.getJobDescriptor(jobName);
if (descriptor == null) {
errorMsg = "Job: '" + jobName + "' doesn't exist.";
logger.error(errorMsg);
return errorMsg;
}

DateTime day = null;
DateTime time = null;
try {
if (scheduledDate == null || scheduledDate.trim().length() == 0) {
day = new LocalDateTime().toDateTime();
time = day.withHourOfDay(hour).withMinuteOfHour(minutes)
.withSecondOfMinute(seconds);
if (day.isAfter(time)) {
time = time.plusDays(1);
}
} else {
try {
day = DateTimeFormat.forPattern("MM-dd-yyyy")
.parseDateTime(scheduledDate);
} catch (IllegalArgumentException e) {
logger.error(e);
return "Invalid date: '" + scheduledDate
+ "', \"MM-dd-yyyy\" format is expected.";
}
time = day.withHourOfDay(hour).withMinuteOfHour(minutes)
.withSecondOfMinute(seconds);
}
} catch (IllegalFieldValueException e) {
logger.error(e);
return "Invalid schedule time (see logs): " + e.getMessage();

}
ReadablePeriod thePeriod = null;
if (isRecurring) {
if ("d".equals(periodUnits)) {
thePeriod = Days.days(period);
} else if ("h".equals(periodUnits)) {
thePeriod = Hours.hours(period);
} else if ("m".equals(periodUnits)) {
thePeriod = Minutes.minutes(period);
} else if ("s".equals(periodUnits)) {
thePeriod = Seconds.seconds(period);
} else {
errorMsg = "Unknown period unit: " + periodUnits;
logger.error(errorMsg);
return errorMsg;
}
}
try {
if (thePeriod == null) {
scheduler.schedule(jobName, time, ignoreDeps);
} else {
scheduler.schedule(jobName, time, thePeriod, ignoreDeps);
}
return "Schedule Successful!";
} catch (Exception e) {
logger.error(e);
return "Schedule Failed (see logs): " + e.getMessage();
}
}

public String removeScheduledWorkflow(String jobName) {
String errorMsg = null;
if (jobName == null || jobName.trim().length() == 0) {
errorMsg = "You must select at least one job to remove.";
logger.error(errorMsg);
return errorMsg;
}
if (scheduler.getSchedule(jobName) == null) {
errorMsg = "Job: '"+ jobName + "' doesn't exist in schedule.";
logger.error(errorMsg);
return errorMsg;
}
try {
scheduler.removeScheduledJob(jobName);
return "Job: '" + jobName + "' is successfully removed from " +
"schedule.";
} catch (Exception e) {
logger.error(e);
return "Removing From Schedule Failed (see logs): " +
e.getMessage();
}
}
}

0 comments on commit 7d0f432

Please sign in to comment.