Skip to content

Commit

Permalink
Updates for job statistics and prepare for galaxy deployment.
Browse files Browse the repository at this point in the history
  • Loading branch information
Nik Hodgkinson committed Oct 4, 2011
1 parent eb39927 commit d76f6e2
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 56 deletions.
75 changes: 40 additions & 35 deletions azkaban/src/java/azkaban/app/AzkabanApplication.java
Expand Up @@ -17,31 +17,6 @@
package azkaban.app;


import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import azkaban.monitor.consumer.KafkaEmitterConsumer;
import com.metamx.event.HttpPostEmitter;
import com.metamx.event.LoggingEmitter;
import com.metamx.event.ServiceEmitter;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.joda.time.DateTimeZone;

import azkaban.app.jmx.JobScheduler;
import azkaban.app.jmx.RefreshJobs;
import azkaban.common.jobs.Job;
Expand All @@ -52,19 +27,12 @@
import azkaban.flow.RefreshableFlowManager;
import azkaban.jobcontrol.impl.jobs.locks.NamedPermitManager;
import azkaban.jobcontrol.impl.jobs.locks.ReadWriteLockManager;

import azkaban.jobs.JobExecutorManager;
import azkaban.jobs.builtin.JavaJob;
import azkaban.jobs.builtin.JavaProcessJob;
import azkaban.jobs.builtin.NoopJob;
import azkaban.jobs.builtin.PigProcessJob;
import azkaban.jobs.builtin.ProcessJob;
import azkaban.jobs.builtin.PythonJob;
import azkaban.jobs.builtin.RubyJob;
import azkaban.jobs.builtin.ScriptJob;
import azkaban.jobs.builtin.*;
import azkaban.monitor.MonitorImpl;
import azkaban.monitor.MonitorInterface;
import azkaban.monitor.MonitorInternalInterface;
import azkaban.monitor.consumer.KafkaEmitterConsumer;
import azkaban.scheduler.LocalFileScheduleLoader;
import azkaban.scheduler.ScheduleManager;
import azkaban.serialization.DefaultExecutableFlowSerializer;
Expand All @@ -74,6 +42,29 @@
import azkaban.serialization.de.ExecutableFlowDeserializer;
import azkaban.serialization.de.FlowExecutionDeserializer;
import com.google.common.collect.ImmutableMap;
import com.metamx.event.EmitterConfig;
import com.metamx.event.HttpPostEmitter;
import com.metamx.event.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import org.apache.log4j.Logger;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.runtime.log.Log4JLogChute;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.joda.time.DateTimeZone;

import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;

/**
* Master application that runs everything
Expand All @@ -90,6 +81,9 @@ public class AzkabanApplication
private static final Logger logger = Logger.getLogger(AzkabanApplication.class);
private static final String INSTANCE_NAME = "instance.name";
private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id";
private static final int FLUSH_MILLIS = 15000;
private static final int FLUSH_COUNT = 100;
private static final String KAFKA_HOST = "http://in.metamx.com/";

private final String _instanceName;
private final List<File> _jobDirs;
Expand Down Expand Up @@ -195,11 +189,20 @@ public AzkabanApplication(final List<File> jobDirs, final File logDir, final Fil
FlowExecutionSerializer flowExecutionSerializer = new FlowExecutionSerializer(flowSerializer);
FlowExecutionDeserializer flowExecutionDeserializer = new FlowExecutionDeserializer(flowDeserializer);


/**** Added for monitoring **********************************************/
_monitor = MonitorImpl.getMonitor();
KafkaEmitterConsumer consumer = new KafkaEmitterConsumer();

ClientBootstrap bootstrap = HttpClient.createDefaultBootstrap();
HttpClient httpClient = HttpClient.createDefaultInstance(1, bootstrap);
ConcurrentHashMap<String, String> urlMap = new ConcurrentHashMap<String, String>();
urlMap.put("metrics", KAFKA_HOST);
EmitterConfig emitterConfig = new EmitterConfig(FLUSH_MILLIS, FLUSH_COUNT, urlMap);

KafkaEmitterConsumer.KafkaMonitor monitor = consumer.getMonitor(
new ScheduledThreadPoolExecutor(1),
new ServiceEmitter("Azkaban", this.getAppInstanceName(), new LoggingEmitter(logger, Priority.INFO))
new ServiceEmitter("azkaban/" + this.getAppInstanceName(), "N/A", new HttpPostEmitter(emitterConfig, httpClient))
);
monitor.start();
_monitor.registerGlobalNotification(
Expand All @@ -211,6 +214,8 @@ public AzkabanApplication(final List<File> jobDirs, final File logDir, final Fil
consumer,
MonitorInterface.GlobalNotificationType.ANY_JOB_CLASS_STATS_CHANGE
);
/**** End addition for monitoring ***************************************/


_allFlows = new CachingFlowManager(
new RefreshableFlowManager(
Expand Down
53 changes: 32 additions & 21 deletions azkaban/src/java/azkaban/monitor/consumer/KafkaEmitterConsumer.java
Expand Up @@ -26,6 +26,8 @@
public class KafkaEmitterConsumer implements MonitorListener
{
private static final Logger log = Logger.getLogger(KafkaEmitterConsumer.class);
private static final String JOB_LIVE_NAME = "job/live";
private static final String WORKFLOW_LIVE_NAME = "workflow/live";

// private final ConcurrentLinkedQueue<NativeGlobalStats> globalStatsQueue = new ConcurrentLinkedQueue<NativeGlobalStats>();
private final ConcurrentLinkedQueue<NativeWorkflowClassStats> wfStatsQueue = new ConcurrentLinkedQueue<NativeWorkflowClassStats>();
Expand Down Expand Up @@ -132,8 +134,8 @@ private void processJobEvents()
jobStates.put(jobName, currStats);

ServiceMetricEvent.Builder event;
if (wfEvents.containsKey(jobName)) {
event = wfEvents.get(jobName);
if (jobEvents.containsKey(jobName)) {
event = jobEvents.get(jobName);
} else {
event = new ServiceMetricEvent.Builder();
event.setUser1(jobName);
Expand All @@ -145,48 +147,57 @@ private void processJobEvents()
final long currFailed = currStats.getNumTimesJobFailed();
final long currStarted = currStats.getNumTimesJobStarted();
final long currSuccessful = currStats.getNumTimesJobSuccessful();
final long currLogStarted = currStats.getNumLoggingJobStarts();
final long currThrottleStarted = currStats.getNumResourceThrottledStart();
final long currRetryStarted = currStats.getNumRetryJobStarts();

final long lastTries = lastStats.getNumJobTries();
final long lastCanceled = lastStats.getNumTimesJobCanceled();
final long lastFailed = lastStats.getNumTimesJobFailed();
final long lastStarted = lastStats.getNumTimesJobStarted();
final long lastSuccessful = lastStats.getNumTimesJobSuccessful();
final long lastLogStarted = lastStats.getNumLoggingJobStarts();
final long lastThrottleStarted = lastStats.getNumResourceThrottledStart();
final long lastRetryStarted = lastStats.getNumRetryJobStarts();

List<String> states = Lists.newArrayList((event.getUser2() != null) ? event.getUser2() : new String[0]);
// This is a simple state machine
if (currTries != lastTries) {
// The job has been retried, add to recurring events and continue.
if (!states.contains("Retried")) {
states.add("Retried");
if (!states.contains("retried")) {
states.add("retried");
}
jobEvents.put(jobName, event.setUser2(states.toArray(new String[states.size()])));
continue;
} else if (currStarted != lastStarted) {
} else if (currStarted != lastStarted/*
|| currRetryStarted != lastRetryStarted
|| currLogStarted != lastLogStarted
|| currThrottleStarted != lastThrottleStarted*/) {
// New run of this flow, add to recurring events and continue.
jobEvents.put(jobName, event.setUser2("Started"));
jobEvents.put(jobName, event.setUser2("started"));
continue;
} else if (currCanceled != lastCanceled) {
// Job canceled.
states.add("Canceled");
states.add("canceled");
} else if (currFailed != lastFailed) {
// Job failed.
states.add("Failed");
states.add("failed");
} else if (currSuccessful != lastSuccessful) {
// Job succeeded.
states.add("Succeeded");
states.add("succeeded");
} else {
log.warn("Received unhandled event.");
continue;
}

event.setUser2(states.toArray(new String[states.size()]));
emitter.emit(event.build("Job Live Events", 1));
emitter.emit(event.build(JOB_LIVE_NAME, 1));
jobEvents.remove(jobName);
}

// Emit all recurring events.
for (ServiceMetricEvent.Builder event : jobEvents.values()) {
emitter.emit(event.build("Job Live Events", 1));
emitter.emit(event.build(JOB_LIVE_NAME, 1));
}
}

Expand Down Expand Up @@ -234,30 +245,30 @@ private void processWorkflowEvents()
// This is a simple state machine
if (currScheduled != lastScheduled) {
// New scheduled job, emit and continue.
states.add("Scheduled");
states.add("scheduled");
} else if (currStarted != lastStarted) {
// New run of this flow, add to recurring events and continue.
wfEvents.put(wfName, event.setUser2("Started"));
wfEvents.put(wfName, event.setUser2("started"));
continue;
} else if (currCanceled != lastCanceled) {
// Job canceled.
states.add("Canceled");
states.add("canceled");
} else if (currFailed != lastFailed) {
// Job failed.
states.add("Failed");
states.add("failed");
} else if (currSuccessful != lastSuccessful) {
// Job succeeded.
states.add("Succeeded");
states.add("succeeded");
}

event.setUser2(states.toArray(new String[states.size()]));
emitter.emit(event.build("Workflow Live Events", 1));
emitter.emit(event.build(WORKFLOW_LIVE_NAME, 1));
wfEvents.remove(wfName);
}

// Emit all recurring events.
for (ServiceMetricEvent.Builder event : wfEvents.values()) {
emitter.emit(event.build("Workflow Live Events", 1));
emitter.emit(event.build(WORKFLOW_LIVE_NAME, 1));
}
}
}
Expand All @@ -268,15 +279,15 @@ private void setJobProperties(NativeJobClassStats currStats, ServiceMetricEvent.
{// Prime the job properties.
List<String> jobTypes = new ArrayList<String>();
if (currStats.isLoggingJob()) {
jobTypes.add("Logging");
jobTypes.add("logging");
}

if (currStats.isResourceThrottledJob()) {
jobTypes.add("Throttled");
jobTypes.add("throttled");
}

if (currStats.isRetryJob()) {
jobTypes.add("Retry");
jobTypes.add("retry");
}

event.setUser3(jobTypes.toArray(new String[jobTypes.size()]));
Expand Down
Binary file added lib/guava-r08.jar
Binary file not shown.
Binary file removed lib/http-client-0.0.1.jar
Binary file not shown.
Binary file added lib/http-client-0.1.3.jar
Binary file not shown.
Binary file added lib/netty-3.2.4.Final.jar
Binary file not shown.
Binary file added lib/src/guava-r08-sources.jar
Binary file not shown.
Binary file removed lib/src/http-client-0.0.1-sources.jar
Binary file not shown.
Binary file added lib/src/http-client-0.1.3-sources.jar
Binary file not shown.

0 comments on commit d76f6e2

Please sign in to comment.