Permalink
Browse files

Fixed bug with scheduled events. Added "0" value events after a job/f…

…low completes.
  • Loading branch information...
1 parent d76f6e2 commit d171776fcd4abee8cdd341234fbc0fb2560259a0 Nik Hodgkinson committed Oct 7, 2011
Showing with 15 additions and 7 deletions.
  1. +15 −7 azkaban/src/java/azkaban/monitor/consumer/KafkaEmitterConsumer.java
@@ -10,6 +10,7 @@
import com.metamx.event.ServiceEmitter;
import com.metamx.event.ServiceMetricEvent;
import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.io.IOException;
@@ -118,7 +119,7 @@ public synchronized void start()
private void processJobEvents()
{
final int jobCount = jobStatsQueue.size();
-
+ DateTime now = new DateTime();
for (int i = 0; i < jobCount; i++) {
final NativeJobClassStats currStats = jobStatsQueue.poll();
final String jobName = currStats.getJobClassName();
@@ -191,19 +192,21 @@ private void processJobEvents()
}
event.setUser2(states.toArray(new String[states.size()]));
- emitter.emit(event.build(JOB_LIVE_NAME, 1));
+ emitter.emit(event.build(now, JOB_LIVE_NAME, 1));
+ emitter.emit(event.build(now.plusMinutes(1), JOB_LIVE_NAME, 0));
jobEvents.remove(jobName);
}
// Emit all recurring events.
for (ServiceMetricEvent.Builder event : jobEvents.values()) {
- emitter.emit(event.build(JOB_LIVE_NAME, 1));
+ emitter.emit(event.build(now, JOB_LIVE_NAME, 1));
}
}
private void processWorkflowEvents()
{
final int wfCount = wfStatsQueue.size();
+ final DateTime now = new DateTime();
for (int i = 0; i < wfCount; i++) {
final NativeWorkflowClassStats currStats = wfStatsQueue.poll();
@@ -240,12 +243,16 @@ private void processWorkflowEvents()
event.setUser1(wfName);
}
-
List<String> states = Lists.newArrayList((event.getUser2() != null) ? event.getUser2() : new String[0]);
// This is a simple state machine
if (currScheduled != lastScheduled) {
// New scheduled job, emit and continue.
- states.add("scheduled");
+ event = new ServiceMetricEvent.Builder();
+ event.setUser1(wfName);
+ event.setUser2("scheduled");
+ emitter.emit(event.build(now, WORKFLOW_LIVE_NAME, 1));
+ emitter.emit(event.build(now.plusMinutes(1), WORKFLOW_LIVE_NAME, 0));
+ continue;
} else if (currStarted != lastStarted) {
// New run of this flow, add to recurring events and continue.
wfEvents.put(wfName, event.setUser2("started"));
@@ -262,13 +269,14 @@ private void processWorkflowEvents()
}
event.setUser2(states.toArray(new String[states.size()]));
- emitter.emit(event.build(WORKFLOW_LIVE_NAME, 1));
+ emitter.emit(event.build(now, WORKFLOW_LIVE_NAME, 1));
+ emitter.emit(event.build(now.plusMinutes(1), WORKFLOW_LIVE_NAME, 0));
wfEvents.remove(wfName);
}
// Emit all recurring events.
for (ServiceMetricEvent.Builder event : wfEvents.values()) {
- emitter.emit(event.build(WORKFLOW_LIVE_NAME, 1));
+ emitter.emit(event.build(now, WORKFLOW_LIVE_NAME, 1));
}
}
}

0 comments on commit d171776

Please sign in to comment.