Skip to content

Commit

Permalink
AKZ-108 Implements a event-based monitoring sol'n.
Browse files Browse the repository at this point in the history
This commit also include contributions by Ibrahim Ulukaya.
  • Loading branch information
Donald P. Pazel authored and jcrobak committed Aug 6, 2011
1 parent 72c1e25 commit 9e81455
Show file tree
Hide file tree
Showing 43 changed files with 6,054 additions and 34 deletions.
1 change: 1 addition & 0 deletions .classpath
Expand Up @@ -27,6 +27,7 @@
<classpathentry kind="lib" path="lib/objenesis-1.2.jar"/>
<classpathentry kind="lib" path="lib/org-json-2010-02-26.jar"/>
<classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/>
<classpathentry kind="lib" path="lib/commons-math-2.1.jar"/>
<classpathentry kind="lib" path="lib/servlet-api-2.5.jar"/>
<classpathentry kind="lib" path="lib/slf4j-api-1.5.6.jar"/>
<classpathentry kind="lib" path="lib/slf4j-log4j12-1.5.6.jar"/>
Expand Down
4 changes: 4 additions & 0 deletions azkaban-common/src/java/azkaban/common/jobs/AbstractJob.java
Expand Up @@ -93,5 +93,9 @@ public Props getJobGeneratedProperties() {
}

public abstract void run() throws Exception;

public boolean isCanceled() {
return false;
}

}
6 changes: 6 additions & 0 deletions azkaban-common/src/java/azkaban/common/jobs/Job.java
Expand Up @@ -66,4 +66,10 @@ public interface Job {
* @return
*/
public Props getJobGeneratedProperties();

/**
* Determine if the job was cancelled.
* @return
*/
public boolean isCanceled();
}
14 changes: 14 additions & 0 deletions azkaban/src/java/azkaban/app/AzkabanApplication.java
Expand Up @@ -49,6 +49,9 @@
import azkaban.jobs.builtin.PythonJob;
import azkaban.jobs.builtin.RubyJob;
import azkaban.jobs.builtin.ScriptJob;
import azkaban.monitor.MonitorImpl;
import azkaban.monitor.MonitorInterface;
import azkaban.monitor.MonitorInternalInterface;
import azkaban.scheduler.LocalFileScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.serialization.DefaultExecutableFlowSerializer;
Expand Down Expand Up @@ -85,6 +88,7 @@ public class AzkabanApplication
private final ClassLoader _baseClassLoader;
private final String _hdfsUrl;
private final FlowManager _allFlows;
private final MonitorImpl _monitor;

private final JobExecutorManager _jobExecutorManager;
private final ScheduleManager _schedulerManager;
Expand Down Expand Up @@ -169,6 +173,8 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean
FlowExecutionSerializer flowExecutionSerializer = new FlowExecutionSerializer(flowSerializer);
FlowExecutionDeserializer flowExecutionDeserializer = new FlowExecutionDeserializer(flowDeserializer);

_monitor = (MonitorImpl)MonitorImpl.getMonitor();

_allFlows = new CachingFlowManager(
new RefreshableFlowManager(
_jobManager,
Expand Down Expand Up @@ -370,4 +376,12 @@ public String getRuntimeProperty(String name) {
public void setRuntimeProperty(String key, String value) {
_jobExecutorManager.setRuntimeProperty(key, value);
}

public MonitorInterface getMonitor() {
return _monitor;
}

public MonitorInternalInterface getInternalMonitor() {
return _monitor;
}
}
24 changes: 24 additions & 0 deletions azkaban/src/java/azkaban/app/LoggingJob.java
Expand Up @@ -31,6 +31,9 @@
import azkaban.common.jobs.Job;
import azkaban.common.utils.Props;
import azkaban.common.utils.Utils;
import azkaban.monitor.MonitorImpl;
import azkaban.monitor.MonitorInterface.JobState;
import azkaban.monitor.MonitorInternalInterface.JobAction;

/**
* A wrapper for a job that attaches a Log4J appender to write to the logs
Expand All @@ -52,6 +55,10 @@ public LoggingJob(String logDir, Job innerJob, String loggerName) {
this._logDir = Utils.nonNull(logDir);
this._logger = Logger.getLogger(loggerName);
}

public synchronized boolean isCanceled() {
return getInnerJob().isCanceled();
}

public LoggingJob(String logDir, Job innerJob, String loggerName, String loggerPattern) {
super(innerJob);
Expand Down Expand Up @@ -90,10 +97,27 @@ public void run() {
boolean jobNotStaleException = false;
long start = System.currentTimeMillis();
try {
MonitorImpl.getInternalMonitorInterface().jobEvent(
getInnerJob(),
System.currentTimeMillis(),
JobAction.START_WORKFLOW_JOB,
JobState.NOP);

getInnerJob().run();
succeeded = true;

MonitorImpl.getInternalMonitorInterface().jobEvent(
getInnerJob(),
System.currentTimeMillis(),
JobAction.END_WORKFLOW_JOB,
JobState.SUCCESSFUL);
} catch(Exception e) {
_logger.error("Fatal error occurred while running job '" + jobName + "':", e);
MonitorImpl.getInternalMonitorInterface().jobEvent(
getInnerJob(),
System.currentTimeMillis(),
JobAction.END_WORKFLOW_JOB,
getInnerJob().isCanceled() ? JobState.CANCELED : JobState.FAILED);
if(e instanceof RuntimeException)
throw (RuntimeException) e;
else
Expand Down
21 changes: 21 additions & 0 deletions azkaban/src/java/azkaban/flow/IndividualJobExecutableFlow.java
Expand Up @@ -30,6 +30,9 @@
import azkaban.common.jobs.Job;
import azkaban.common.utils.Props;
import azkaban.jobs.Status;
import azkaban.monitor.MonitorImpl;
import azkaban.monitor.MonitorInterface.JobState;
import azkaban.monitor.MonitorInternalInterface.JobAction;

/**
* An implemention of the ExecutableFlow interface that just
Expand Down Expand Up @@ -166,6 +169,12 @@ else if (jobState != Status.COMPLETED && ! this.parentProps.equalsProps(parentPr
public void run()
{
final List<FlowCallback> callbackList;

MonitorImpl.getInternalMonitorInterface().jobEvent(
job,
System.currentTimeMillis(),
JobAction.START_WORKFLOW_JOB,
JobState.NOP);

try {
job.run();
Expand All @@ -177,6 +186,13 @@ public void run()
exceptions.put(getName(), e);
callbackList = callbacksToCall; // Get the reference before leaving the synchronized
}

MonitorImpl.getInternalMonitorInterface().jobEvent(
job,
System.currentTimeMillis(),
JobAction.END_WORKFLOW_JOB,
JobState.FAILED);

callCallbacks(callbackList, jobState);

throw new RuntimeException(e);
Expand All @@ -188,6 +204,11 @@ public void run()
callbackList = callbacksToCall; // Get the reference before leaving the synchronized
}

MonitorImpl.getInternalMonitorInterface().jobEvent(
job,
System.currentTimeMillis(),
JobAction.END_WORKFLOW_JOB,
JobState.SUCCESSFUL);
returnProps.logProperties(String.format("Return props for job[%s]", getName()));

callCallbacks(callbackList, jobState);
Expand Down
Expand Up @@ -19,6 +19,9 @@
import org.apache.log4j.Logger;

import azkaban.jobcontrol.impl.jobs.locks.JobLock;
import azkaban.monitor.MonitorImpl;
import azkaban.monitor.MonitorInterface.JobState;
import azkaban.monitor.MonitorInternalInterface.JobAction;

import azkaban.common.jobs.DelegatingJob;
import azkaban.common.jobs.Job;
Expand Down Expand Up @@ -60,17 +63,45 @@ public void run() throws Exception
}
long totalWait = System.currentTimeMillis() - start;
_logger.info(_jobLock + " Time: " + totalWait + " ms.");

MonitorImpl.getInternalMonitorInterface().workflowResourceThrottledJobEvent(this, totalWait);

try {
boolean shouldRunJob;
synchronized(lock) {
shouldRunJob = ! canceled;
}

MonitorImpl.getInternalMonitorInterface().jobEvent(
getInnerJob(),
System.currentTimeMillis(),
JobAction.START_WORKFLOW_JOB,
JobState.NOP);

if(shouldRunJob) {
getInnerJob().run();

MonitorImpl.getInternalMonitorInterface().jobEvent(
getInnerJob(),
System.currentTimeMillis(),
JobAction.END_WORKFLOW_JOB,
JobState.SUCCESSFUL);
}
else {
_logger.info("Job was canceled while waiting for lock. Not running.");
MonitorImpl.getInternalMonitorInterface().jobEvent(
getInnerJob(),
System.currentTimeMillis(),
JobAction.END_WORKFLOW_JOB,
JobState.CANCELED);
}
} catch (Exception e) {
MonitorImpl.getInternalMonitorInterface().jobEvent(
getInnerJob(),
System.currentTimeMillis(),
JobAction.END_WORKFLOW_JOB,
JobState.FAILED);
throw e;
} finally {
_jobLock.releaseLock();
}
Expand All @@ -85,4 +116,8 @@ public void cancel() throws Exception
super.cancel();
}
}

public synchronized boolean isCanceled() {
return canceled;
}
}
32 changes: 28 additions & 4 deletions azkaban/src/java/azkaban/jobcontrol/impl/jobs/RetryingJob.java
Expand Up @@ -21,6 +21,9 @@
import azkaban.common.jobs.DelegatingJob;
import azkaban.common.jobs.Job;
import azkaban.common.jobs.JobFailedException;
import azkaban.monitor.MonitorImpl;
import azkaban.monitor.MonitorInterface.JobState;
import azkaban.monitor.MonitorInternalInterface.JobAction;

public class RetryingJob extends DelegatingJob {

Expand Down Expand Up @@ -49,26 +52,47 @@ public void run() {
return;
}
}
_logger.info("Retrying failed job '" + getInnerJob().getId() + " for attempt "
+ (tries + 1));
_logger.info("Retrying failed job '" + getInnerJob().getId() + " for attempt " + (tries + 1));
}

MonitorImpl.getInternalMonitorInterface().jobEvent(
getInnerJob(),
System.currentTimeMillis(),
JobAction.START_WORKFLOW_JOB,
JobState.NOP);

try {
getInnerJob().run();

MonitorImpl.getInternalMonitorInterface().jobEvent(
getInnerJob(),
System.currentTimeMillis(),
JobAction.END_WORKFLOW_JOB,
JobState.SUCCESSFUL);
return;
} catch(Exception e) {
_logger.error("Job '" + getInnerJob().getId() + " failed attempt " + (tries + 1), e);
String sadness = "";
for(int i = 0; i < tries + 1; i++)
sadness += ":-( ";
_logger.info(sadness);

MonitorImpl.getInternalMonitorInterface().jobEvent(
getInnerJob(),
System.currentTimeMillis(),
JobAction.END_WORKFLOW_JOB,
getInnerJob().isCanceled() ? JobState.CANCELED : JobState.FAILED);
}
}

// if we get here it means we haven't succeded (otherwise we would have
// returned)
throw new JobFailedException(_retries + " run attempt" + (_retries > 1 ? "s" : "")
+ " failed.");
throw new JobFailedException(_retries + " run attempt" + (_retries > 1 ? "s" : "") +
" failed.");
}

public synchronized boolean isCanceled() {
return getInnerJob().isCanceled();
}

}
15 changes: 15 additions & 0 deletions azkaban/src/java/azkaban/jobs/JobExecutorManager.java
Expand Up @@ -37,6 +37,9 @@
import azkaban.flow.FlowCallback;
import azkaban.flow.FlowExecutionHolder;
import azkaban.flow.FlowManager;
import azkaban.monitor.MonitorImpl;
import azkaban.monitor.MonitorInterface.WorkflowState;
import azkaban.monitor.MonitorInternalInterface.WorkflowAction;
import azkaban.util.process.ProcessFailureException;

public class JobExecutorManager {
Expand Down Expand Up @@ -126,6 +129,11 @@ public void execute(ExecutableFlow flow) {
final JobExecution executingJob = new JobExecution(flow.getName(),
new DateTime(),
true);
MonitorImpl.getInternalMonitorInterface().workflowEvent(flow.getId(),
System.currentTimeMillis(),
WorkflowAction.START_WORKFLOW,
WorkflowState.NOP,
flow.getName());

executor.execute(new ExecutingFlowRunnable(holder, executingJob));
}
Expand Down Expand Up @@ -405,6 +413,13 @@ public void progressMade() {
public void completed(Status status) {
runningJob.setEndTime(new DateTime());

MonitorImpl.getInternalMonitorInterface().workflowEvent(flow.getId(),
System.currentTimeMillis(),
WorkflowAction.END_WORKFLOW,
(status == Status.SUCCEEDED ? WorkflowState.SUCCESSFUL :
(status == Status.FAILED ? WorkflowState.FAILED : WorkflowState.UNKNOWN)),
flow.getName());

try {
allKnownFlows.saveExecutableFlow(holder);
switch(status) {
Expand Down
5 changes: 5 additions & 0 deletions azkaban/src/java/azkaban/jobs/builtin/NoopJob.java
Expand Up @@ -56,4 +56,9 @@ public Props getJobGeneratedProperties()
{
return new Props();
}

@Override
public boolean isCanceled() {
return false;
}
}
14 changes: 12 additions & 2 deletions azkaban/src/java/azkaban/jobs/builtin/ProcessJob.java
Expand Up @@ -43,14 +43,17 @@ public class ProcessJob extends AbstractProcessJob implements Job {

private volatile Process _process;
private volatile boolean _isComplete;

private volatile boolean _isCancelled;

public ProcessJob(JobDescriptor descriptor) {
super(descriptor);
}


public void run() {

synchronized(this) {
_isCancelled = false;
}
resolveProps();

// Sets a list of all the commands that need to be run.
Expand Down Expand Up @@ -157,6 +160,9 @@ public void cancel() throws Exception {
warn("Force kill the process");
_process.destroy();
}
synchronized (this) {
_isCancelled = true;
}
}
}

Expand Down Expand Up @@ -304,5 +310,9 @@ public static String[] partitionCommandLine(String command) {

return commands.toArray(new String[commands.size()]);
}

public synchronized boolean isCanceled() {
return _isCancelled;
}

}

0 comments on commit 9e81455

Please sign in to comment.