diff --git a/azkaban/src/java/azkaban/app/AzkabanApplication.java b/azkaban/src/java/azkaban/app/AzkabanApplication.java index 3fa7f62..b403156 100644 --- a/azkaban/src/java/azkaban/app/AzkabanApplication.java +++ b/azkaban/src/java/azkaban/app/AzkabanApplication.java @@ -17,31 +17,6 @@ package azkaban.app; -import java.io.File; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.List; -import java.util.TimeZone; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import azkaban.monitor.consumer.KafkaEmitterConsumer; -import com.metamx.event.HttpPostEmitter; -import com.metamx.event.LoggingEmitter; -import com.metamx.event.ServiceEmitter; -import org.apache.log4j.Logger; -import org.apache.log4j.Priority; -import org.apache.velocity.app.VelocityEngine; -import org.apache.velocity.runtime.log.Log4JLogChute; -import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader; -import org.joda.time.DateTimeZone; - import azkaban.app.jmx.JobScheduler; import azkaban.app.jmx.RefreshJobs; import azkaban.common.jobs.Job; @@ -52,19 +27,12 @@ import azkaban.flow.RefreshableFlowManager; import azkaban.jobcontrol.impl.jobs.locks.NamedPermitManager; import azkaban.jobcontrol.impl.jobs.locks.ReadWriteLockManager; - import azkaban.jobs.JobExecutorManager; -import azkaban.jobs.builtin.JavaJob; -import azkaban.jobs.builtin.JavaProcessJob; -import azkaban.jobs.builtin.NoopJob; -import azkaban.jobs.builtin.PigProcessJob; -import azkaban.jobs.builtin.ProcessJob; -import azkaban.jobs.builtin.PythonJob; -import azkaban.jobs.builtin.RubyJob; -import azkaban.jobs.builtin.ScriptJob; +import azkaban.jobs.builtin.*; import azkaban.monitor.MonitorImpl; import azkaban.monitor.MonitorInterface; import azkaban.monitor.MonitorInternalInterface; +import azkaban.monitor.consumer.KafkaEmitterConsumer; import azkaban.scheduler.LocalFileScheduleLoader; import azkaban.scheduler.ScheduleManager; import azkaban.serialization.DefaultExecutableFlowSerializer; @@ -74,6 +42,29 @@ import azkaban.serialization.de.ExecutableFlowDeserializer; import azkaban.serialization.de.FlowExecutionDeserializer; import com.google.common.collect.ImmutableMap; +import com.metamx.event.EmitterConfig; +import com.metamx.event.HttpPostEmitter; +import com.metamx.event.ServiceEmitter; +import com.metamx.http.client.HttpClient; +import org.apache.log4j.Logger; +import org.apache.velocity.app.VelocityEngine; +import org.apache.velocity.runtime.log.Log4JLogChute; +import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader; +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.joda.time.DateTimeZone; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import java.util.TimeZone; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; /** * Master application that runs everything @@ -90,6 +81,9 @@ public class AzkabanApplication private static final Logger logger = Logger.getLogger(AzkabanApplication.class); private static final String INSTANCE_NAME = "instance.name"; private static final String DEFAULT_TIMEZONE_ID = "default.timezone.id"; + private static final int FLUSH_MILLIS = 15000; + private static final int FLUSH_COUNT = 100; + private static final String KAFKA_HOST = "http://in.metamx.com/"; private final String _instanceName; private final List _jobDirs; @@ -195,11 +189,20 @@ public AzkabanApplication(final List jobDirs, final File logDir, final Fil FlowExecutionSerializer flowExecutionSerializer = new FlowExecutionSerializer(flowSerializer); FlowExecutionDeserializer flowExecutionDeserializer = new FlowExecutionDeserializer(flowDeserializer); + + /**** Added for monitoring **********************************************/ _monitor = MonitorImpl.getMonitor(); KafkaEmitterConsumer consumer = new KafkaEmitterConsumer(); + + ClientBootstrap bootstrap = HttpClient.createDefaultBootstrap(); + HttpClient httpClient = HttpClient.createDefaultInstance(1, bootstrap); + ConcurrentHashMap urlMap = new ConcurrentHashMap(); + urlMap.put("metrics", KAFKA_HOST); + EmitterConfig emitterConfig = new EmitterConfig(FLUSH_MILLIS, FLUSH_COUNT, urlMap); + KafkaEmitterConsumer.KafkaMonitor monitor = consumer.getMonitor( new ScheduledThreadPoolExecutor(1), - new ServiceEmitter("Azkaban", this.getAppInstanceName(), new LoggingEmitter(logger, Priority.INFO)) + new ServiceEmitter("azkaban/" + this.getAppInstanceName(), "N/A", new HttpPostEmitter(emitterConfig, httpClient)) ); monitor.start(); _monitor.registerGlobalNotification( @@ -211,6 +214,8 @@ public AzkabanApplication(final List jobDirs, final File logDir, final Fil consumer, MonitorInterface.GlobalNotificationType.ANY_JOB_CLASS_STATS_CHANGE ); + /**** End addition for monitoring ***************************************/ + _allFlows = new CachingFlowManager( new RefreshableFlowManager( diff --git a/azkaban/src/java/azkaban/monitor/consumer/KafkaEmitterConsumer.java b/azkaban/src/java/azkaban/monitor/consumer/KafkaEmitterConsumer.java index 67e74a1..7f9a39b 100644 --- a/azkaban/src/java/azkaban/monitor/consumer/KafkaEmitterConsumer.java +++ b/azkaban/src/java/azkaban/monitor/consumer/KafkaEmitterConsumer.java @@ -26,6 +26,8 @@ public class KafkaEmitterConsumer implements MonitorListener { private static final Logger log = Logger.getLogger(KafkaEmitterConsumer.class); + private static final String JOB_LIVE_NAME = "job/live"; + private static final String WORKFLOW_LIVE_NAME = "workflow/live"; // private final ConcurrentLinkedQueue globalStatsQueue = new ConcurrentLinkedQueue(); private final ConcurrentLinkedQueue wfStatsQueue = new ConcurrentLinkedQueue(); @@ -132,8 +134,8 @@ private void processJobEvents() jobStates.put(jobName, currStats); ServiceMetricEvent.Builder event; - if (wfEvents.containsKey(jobName)) { - event = wfEvents.get(jobName); + if (jobEvents.containsKey(jobName)) { + event = jobEvents.get(jobName); } else { event = new ServiceMetricEvent.Builder(); event.setUser1(jobName); @@ -145,48 +147,57 @@ private void processJobEvents() final long currFailed = currStats.getNumTimesJobFailed(); final long currStarted = currStats.getNumTimesJobStarted(); final long currSuccessful = currStats.getNumTimesJobSuccessful(); + final long currLogStarted = currStats.getNumLoggingJobStarts(); + final long currThrottleStarted = currStats.getNumResourceThrottledStart(); + final long currRetryStarted = currStats.getNumRetryJobStarts(); final long lastTries = lastStats.getNumJobTries(); final long lastCanceled = lastStats.getNumTimesJobCanceled(); final long lastFailed = lastStats.getNumTimesJobFailed(); final long lastStarted = lastStats.getNumTimesJobStarted(); final long lastSuccessful = lastStats.getNumTimesJobSuccessful(); + final long lastLogStarted = lastStats.getNumLoggingJobStarts(); + final long lastThrottleStarted = lastStats.getNumResourceThrottledStart(); + final long lastRetryStarted = lastStats.getNumRetryJobStarts(); List states = Lists.newArrayList((event.getUser2() != null) ? event.getUser2() : new String[0]); // This is a simple state machine if (currTries != lastTries) { // The job has been retried, add to recurring events and continue. - if (!states.contains("Retried")) { - states.add("Retried"); + if (!states.contains("retried")) { + states.add("retried"); } jobEvents.put(jobName, event.setUser2(states.toArray(new String[states.size()]))); continue; - } else if (currStarted != lastStarted) { + } else if (currStarted != lastStarted/* + || currRetryStarted != lastRetryStarted + || currLogStarted != lastLogStarted + || currThrottleStarted != lastThrottleStarted*/) { // New run of this flow, add to recurring events and continue. - jobEvents.put(jobName, event.setUser2("Started")); + jobEvents.put(jobName, event.setUser2("started")); continue; } else if (currCanceled != lastCanceled) { // Job canceled. - states.add("Canceled"); + states.add("canceled"); } else if (currFailed != lastFailed) { // Job failed. - states.add("Failed"); + states.add("failed"); } else if (currSuccessful != lastSuccessful) { // Job succeeded. - states.add("Succeeded"); + states.add("succeeded"); } else { log.warn("Received unhandled event."); continue; } event.setUser2(states.toArray(new String[states.size()])); - emitter.emit(event.build("Job Live Events", 1)); + emitter.emit(event.build(JOB_LIVE_NAME, 1)); jobEvents.remove(jobName); } // Emit all recurring events. for (ServiceMetricEvent.Builder event : jobEvents.values()) { - emitter.emit(event.build("Job Live Events", 1)); + emitter.emit(event.build(JOB_LIVE_NAME, 1)); } } @@ -234,30 +245,30 @@ private void processWorkflowEvents() // This is a simple state machine if (currScheduled != lastScheduled) { // New scheduled job, emit and continue. - states.add("Scheduled"); + states.add("scheduled"); } else if (currStarted != lastStarted) { // New run of this flow, add to recurring events and continue. - wfEvents.put(wfName, event.setUser2("Started")); + wfEvents.put(wfName, event.setUser2("started")); continue; } else if (currCanceled != lastCanceled) { // Job canceled. - states.add("Canceled"); + states.add("canceled"); } else if (currFailed != lastFailed) { // Job failed. - states.add("Failed"); + states.add("failed"); } else if (currSuccessful != lastSuccessful) { // Job succeeded. - states.add("Succeeded"); + states.add("succeeded"); } event.setUser2(states.toArray(new String[states.size()])); - emitter.emit(event.build("Workflow Live Events", 1)); + emitter.emit(event.build(WORKFLOW_LIVE_NAME, 1)); wfEvents.remove(wfName); } // Emit all recurring events. for (ServiceMetricEvent.Builder event : wfEvents.values()) { - emitter.emit(event.build("Workflow Live Events", 1)); + emitter.emit(event.build(WORKFLOW_LIVE_NAME, 1)); } } } @@ -268,15 +279,15 @@ private void setJobProperties(NativeJobClassStats currStats, ServiceMetricEvent. {// Prime the job properties. List jobTypes = new ArrayList(); if (currStats.isLoggingJob()) { - jobTypes.add("Logging"); + jobTypes.add("logging"); } if (currStats.isResourceThrottledJob()) { - jobTypes.add("Throttled"); + jobTypes.add("throttled"); } if (currStats.isRetryJob()) { - jobTypes.add("Retry"); + jobTypes.add("retry"); } event.setUser3(jobTypes.toArray(new String[jobTypes.size()])); diff --git a/lib/guava-r08.jar b/lib/guava-r08.jar new file mode 100644 index 0000000..618a3cb Binary files /dev/null and b/lib/guava-r08.jar differ diff --git a/lib/http-client-0.0.1.jar b/lib/http-client-0.0.1.jar deleted file mode 100644 index 0ba768b..0000000 Binary files a/lib/http-client-0.0.1.jar and /dev/null differ diff --git a/lib/http-client-0.1.3.jar b/lib/http-client-0.1.3.jar new file mode 100644 index 0000000..eb4c772 Binary files /dev/null and b/lib/http-client-0.1.3.jar differ diff --git a/lib/netty-3.2.4.Final.jar b/lib/netty-3.2.4.Final.jar new file mode 100644 index 0000000..391ba87 Binary files /dev/null and b/lib/netty-3.2.4.Final.jar differ diff --git a/lib/src/guava-r08-sources.jar b/lib/src/guava-r08-sources.jar new file mode 100644 index 0000000..1641bf0 Binary files /dev/null and b/lib/src/guava-r08-sources.jar differ diff --git a/lib/src/http-client-0.0.1-sources.jar b/lib/src/http-client-0.0.1-sources.jar deleted file mode 100644 index 143838d..0000000 Binary files a/lib/src/http-client-0.0.1-sources.jar and /dev/null differ diff --git a/lib/src/http-client-0.1.3-sources.jar b/lib/src/http-client-0.1.3-sources.jar new file mode 100644 index 0000000..e3920e5 Binary files /dev/null and b/lib/src/http-client-0.1.3-sources.jar differ