Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'model_refactor' of github.com:twitter/ambrose into mode…

…l_refactor

Conflicts:
	common/src/test/java/com/twitter/ambrose/service/impl/InMemoryStatsServiceTest.java
  • Loading branch information...
commit d607c293551ed0a890a94feb77f040fa6dadb78c 2 parents 311fb3e + 0974c60
@sagemintblue sagemintblue authored
View
5 ...ain/java/com/twitter/ambrose/service/DAGNode.java → .../main/java/com/twitter/ambrose/model/DAGNode.java
@@ -13,14 +13,13 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
-package com.twitter.ambrose.service;
+package com.twitter.ambrose.model;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
-import com.twitter.ambrose.model.Job;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
@@ -32,7 +31,7 @@
/**
* Class that represents a Job node in the DAG. The job name must not be null. At DAG creation time
* the jobID will probably be null. Ideally this will be set on the node when the job is started,
- * and the node will be sent as a <pre>WorkflowEvent.EVENT_TYPE.JOB_STARTED</pre> event.
+ * and the node will be sent as a <pre>Event.Type.JOB_STARTED</pre> event.
*
* This class can be converted to JSON as-is by doing something like this:
*
View
133 common/src/main/java/com/twitter/ambrose/model/Event.java
@@ -0,0 +1,133 @@
+/*
+Copyright 2012 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package com.twitter.ambrose.model;
+
+import com.twitter.ambrose.util.JSONUtil;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Class that represents a Event of a given Type. Each one of these created will have
+ * a unique id that increments up for each object. eventIds will always be >= 0.
+ * The data associated with the event currently can be anything.
+ *
+ * @author billg
+ */
+public class Event<T> {
+ private static AtomicInteger NEXT_ID = new AtomicInteger();
+
+ public static enum Type { JOB_STARTED, JOB_FINISHED, JOB_FAILED, JOB_PROGRESS, WORKFLOW_PROGRESS};
+
+ public static enum WorkflowProgressField {
+ workflowProgress;
+ }
+
+ private long timestamp;
+ private int id;
+ private Type type;
+ private T payload;
+
+ private Event(Type type, T payload) {
+ this.id = NEXT_ID.incrementAndGet();
+ this.timestamp = System.currentTimeMillis();
+ this.type = type;
+ this.payload = payload;
+ }
+
+ private Event(int eventId, long timestamp, Type type, T payload) {
+ this.id = eventId;
+ this.timestamp = timestamp;
+ this.type = type;
+ this.payload = payload;
+ }
+
+ public long getTimestamp() { return timestamp; }
+ public int getId() { return id; }
+ public Type getType() { return type; }
+ public Object getPayload() { return payload; }
+
+ @SuppressWarnings("unchecked")
+ public static void main(String[] args) throws IOException {
+ String json = JSONUtil.readFile("pig/src/main/resources/web/data/small-events.json");
+ List<Event> events =
+ (List<Event>)JSONUtil.readJson(json, new TypeReference<List<Event>>() { });
+// for (Event event : events) {
+// // useful if we need to read a file, add a field, output and re-generate
+// }
+
+ JSONUtil.writeJson("pig/src/main/resources/web/data/small-events.json2", events);
+ }
+
+ /**
+ * Helper method to create instances of the proper event. It is the reposibility of the caller to
+ * assure that their types are aligned with
+ * @param type
+ * @param data
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ public static Event create(Type type, Object data) {
+ switch (type) {
+ case JOB_STARTED:
+ return new JobStartedEvent((DAGNode<Job>) data);
+ case JOB_PROGRESS:
+ return new JobStartedEvent((DAGNode<Job>) data);
+ case JOB_FINISHED:
+ return new JobFinishedEvent((DAGNode<Job>) data);
+ case JOB_FAILED:
+ return new JobStartedEvent((DAGNode<Job>) data);
+ case WORKFLOW_PROGRESS:
+ return new JobStartedEvent((DAGNode<Job>) data);
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown event type %s for data payload %s", type, data));
+ }
+ }
+
+ public static class JobStartedEvent extends Event<DAGNode<? extends Job>> {
+ public JobStartedEvent(DAGNode<? extends Job> eventData) {
+ super(Type.JOB_STARTED, eventData);
+ }
+ }
+
+ public static class JobProgressEvent extends Event<DAGNode<? extends Job>> {
+ public JobProgressEvent(DAGNode<? extends Job> eventData) {
+ super(Type.JOB_PROGRESS, eventData);
+ }
+ }
+
+ public static class JobFinishedEvent extends Event<DAGNode<? extends Job>> {
+ public JobFinishedEvent(DAGNode<? extends Job> eventData) {
+ super(Type.JOB_FINISHED, eventData);
+ }
+ }
+
+ public static class JobFailedEvent extends Event<DAGNode<? extends Job>> {
+ public JobFailedEvent(DAGNode<? extends Job> eventData) {
+ super(Type.JOB_FAILED, eventData);
+ }
+ }
+
+ public static class WorkflowProgressEvent extends Event<Map<WorkflowProgressField, String>> {
+ public WorkflowProgressEvent(Map<WorkflowProgressField, String> eventData) {
+ super(Type.WORKFLOW_PROGRESS, eventData);
+ }
+ }
+}
View
26 .../java/com/twitter/ambrose/model/WorkflowInfo.java → ...main/java/com/twitter/ambrose/model/Workflow.java
@@ -35,14 +35,14 @@
@JsonSerialize(
include=JsonSerialize.Inclusion.NON_NULL
)
-public class WorkflowInfo {
+public class Workflow {
private String workflowId;
private String workflowFingerprint;
private List<Job> jobs;
/**
- * Creates a new immutable WorkflowInfo object.
+ * Creates a new immutable Workflow object.
*
* @param workflowId the id of the workflow. This surrogate id should distinguish between one
* workflow invocation and another.
@@ -52,9 +52,9 @@
* @param jobs
*/
@JsonCreator
- public WorkflowInfo(@JsonProperty("workflowId") String workflowId,
- @JsonProperty("workflowFingerprint") String workflowFingerprint,
- @JsonProperty("jobs") List<Job> jobs) {
+ public Workflow(@JsonProperty("workflowId") String workflowId,
+ @JsonProperty("workflowFingerprint") String workflowFingerprint,
+ @JsonProperty("jobs") List<Job> jobs) {
this.workflowId = workflowId;
this.workflowFingerprint = workflowFingerprint;
this.jobs = jobs;
@@ -65,29 +65,29 @@ public WorkflowInfo(@JsonProperty("workflowId") String workflowId,
public List<Job> getJobs() { return jobs; }
/**
- * Serializes a WorkflowInfo object and it's children into a JSON String.
+ * Serializes a Workflow object and it's children into a JSON String.
*
- * @param workflowInfo the object to serialize
+ * @param workflow the object to serialize
* @return a JSON string.
* @throws IOException
*/
- public static String toJSON(WorkflowInfo workflowInfo) throws IOException {
+ public static String toJSON(Workflow workflow) throws IOException {
ObjectMapper om = new ObjectMapper();
om.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
- return om.writeValueAsString(workflowInfo);
+ return om.writeValueAsString(workflow);
}
/**
- * Derializes a JSON WorkflowInfo string into a WorkflowInfo object. Unrecognized properties will
+ * Derializes a JSON Workflow string into a Workflow object. Unrecognized properties will
* be ignored.
*
* @param workflowInfoJson the string to convert into a JSON object.
- * @return a WorkflowInfo object.
+ * @return a Workflow object.
* @throws IOException
*/
- public static WorkflowInfo fromJSON(String workflowInfoJson) throws IOException {
+ public static Workflow fromJSON(String workflowInfoJson) throws IOException {
ObjectMapper om = new ObjectMapper();
om.getDeserializationConfig().set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- return om.readValue(workflowInfoJson, WorkflowInfo.class);
+ return om.readValue(workflowInfoJson, Workflow.class);
}
}
View
8 common/src/main/java/com/twitter/ambrose/server/APIHandler.java
@@ -7,13 +7,13 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import com.twitter.ambrose.model.DAGNode;
+import com.twitter.ambrose.model.Event;
import org.mortbay.jetty.HttpConnection;
import org.mortbay.jetty.Request;
import org.mortbay.jetty.handler.AbstractHandler;
-import com.twitter.ambrose.service.DAGNode;
import com.twitter.ambrose.service.StatsReadService;
-import com.twitter.ambrose.service.WorkflowEvent;
import com.twitter.ambrose.util.JSONUtil;
/**
@@ -54,10 +54,10 @@ public void handle(String target,
Integer sinceId = request.getParameter(QUERY_PARAM_LAST_EVENT_ID) != null ?
Integer.parseInt(request.getParameter(QUERY_PARAM_LAST_EVENT_ID)) : -1;
- Collection<WorkflowEvent> events =
+ Collection<Event> events =
statsReadService.getEventsSinceId(request.getParameter(QUERY_PARAM_WORKFLOW_ID), sinceId);
- sendJson(request, response, events.toArray(new WorkflowEvent[events.size()]));
+ sendJson(request, response, events.toArray(new Event[events.size()]));
}
else if (target.endsWith(".html")) {
response.setContentType(MIME_TYPE_HTML);
View
2  common/src/main/java/com/twitter/ambrose/service/DAGTransformer.java
@@ -15,6 +15,8 @@
*/
package com.twitter.ambrose.service;
+import com.twitter.ambrose.model.DAGNode;
+
import java.util.Collection;
/**
View
4 common/src/main/java/com/twitter/ambrose/service/StatsReadService.java
@@ -15,6 +15,8 @@
*/
package com.twitter.ambrose.service;
+import com.twitter.ambrose.model.DAGNode;
+import com.twitter.ambrose.model.Event;
import com.twitter.ambrose.model.Job;
import java.io.IOException;
@@ -45,5 +47,5 @@
* @param eventId the eventId that all returned events will be greater than
* @return a Collection of WorkflowEvents, ordered by eventId ascending
*/
- public Collection<WorkflowEvent> getEventsSinceId(String workflowId, int eventId) throws IOException;
+ public Collection<Event> getEventsSinceId(String workflowId, int eventId) throws IOException;
}
View
4 common/src/main/java/com/twitter/ambrose/service/StatsWriteService.java
@@ -15,6 +15,8 @@
*/
package com.twitter.ambrose.service;
+import com.twitter.ambrose.model.DAGNode;
+import com.twitter.ambrose.model.Event;
import com.twitter.ambrose.model.Job;
import java.io.IOException;
@@ -45,5 +47,5 @@
* @param workflowId the id of the workflow being updated
* @param event the event bound to the workflow
*/
- public void pushEvent(String workflowId, WorkflowEvent event) throws IOException;
+ public void pushEvent(String workflowId, Event event) throws IOException;
}
View
77 common/src/main/java/com/twitter/ambrose/service/WorkflowEvent.java
@@ -1,77 +0,0 @@
-/*
-Copyright 2012 Twitter, Inc.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-package com.twitter.ambrose.service;
-
-import com.twitter.ambrose.util.JSONUtil;
-import org.codehaus.jackson.type.TypeReference;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Class that represents a WorkflowEvent of a given EVENT_TYPE. Each one of these created will have
- * a unique eventId that increments up for each object. eventIds will always be >= 0.
- * The data associated with the event currently can be anything.
- * TODO: can we type eventData better?
- *
- * @author billg
- */
-public class WorkflowEvent {
- private static AtomicInteger NEXT_ID = new AtomicInteger();
-
- public static enum EVENT_TYPE { JOB_STARTED, JOB_FINISHED, JOB_FAILED, JOB_PROGRESS, WORKFLOW_PROGRESS};
-
- private long timestamp;
- private int eventId;
- private String runtime;
- private EVENT_TYPE eventType;
- private Object eventData;
-
- public WorkflowEvent(EVENT_TYPE eventType, Object eventData, String runtime) {
- this.eventId = NEXT_ID.incrementAndGet();
- this.timestamp = System.currentTimeMillis();
- this.eventType = eventType;
- this.eventData = eventData;
- this.runtime = runtime;
- }
-
- public WorkflowEvent(int eventId, long timestamp, EVENT_TYPE eventType, Object eventData, String runtime) {
- this.eventId = eventId;
- this.timestamp = timestamp;
- this.eventType = eventType;
- this.eventData = eventData;
- this.runtime = runtime;
- }
-
- public long getTimestamp() { return timestamp; }
- public int getEventId() { return eventId; }
- public EVENT_TYPE getEventType() { return eventType; }
- public Object getEventData() { return eventData; }
- public String getRuntime() { return runtime; }
-
- @SuppressWarnings("unchecked")
- public static void main(String[] args) throws IOException {
- String json = JSONUtil.readFile("pig/src/main/resources/web/data/small-events.json");
- List<WorkflowEvent> events =
- (List<WorkflowEvent>)JSONUtil.readJson(json, new TypeReference<List<WorkflowEvent>>() { });
-// for (WorkflowEvent event : events) {
-// // useful if we need to read a file, add a field, output and re-generate
-// }
-
- JSONUtil.writeJson("pig/src/main/resources/web/data/small-events.json2", events);
- }
-}
View
18 common/src/main/java/com/twitter/ambrose/service/impl/InMemoryStatsService.java
@@ -15,11 +15,11 @@
*/
package com.twitter.ambrose.service.impl;
+import com.twitter.ambrose.model.DAGNode;
+import com.twitter.ambrose.model.Event;
import com.twitter.ambrose.model.Job;
-import com.twitter.ambrose.service.DAGNode;
import com.twitter.ambrose.service.StatsReadService;
import com.twitter.ambrose.service.StatsWriteService;
-import com.twitter.ambrose.service.WorkflowEvent;
import com.twitter.ambrose.util.JSONUtil;
import java.io.FileNotFoundException;
@@ -59,8 +59,8 @@
private static final String DUMP_EVENTS_FILE_PARAM = "ambrose.write.events.file";
private Map<String, DAGNode<Job>> dagNodeNameMap = new HashMap<String, DAGNode<Job>>();
- private SortedMap<Integer, WorkflowEvent> eventMap =
- new ConcurrentSkipListMap<Integer, WorkflowEvent>();
+ private SortedMap<Integer, Event> eventMap =
+ new ConcurrentSkipListMap<Integer, Event>();
private Writer dagWriter = null;
private Writer eventsWriter = null;
@@ -100,16 +100,16 @@ public synchronized void sendDagNodeNameMap(String workflowId,
}
@Override
- public synchronized Collection<WorkflowEvent> getEventsSinceId(String workflowId, int sinceId) {
+ public synchronized Collection<Event> getEventsSinceId(String workflowId, int sinceId) {
int minId = sinceId >= 0 ? sinceId + 1 : sinceId;
- SortedMap<Integer, WorkflowEvent> tailMap = eventMap.tailMap(minId);
+ SortedMap<Integer, Event> tailMap = eventMap.tailMap(minId);
return tailMap.values();
}
@Override
- public synchronized void pushEvent(String workflowId, WorkflowEvent event) throws IOException {
- eventMap.put(event.getEventId(), event);
+ public synchronized void pushEvent(String workflowId, Event event) throws IOException {
+ eventMap.put(event.getId(), event);
writeJsonEventToDisk(event);
}
@@ -120,7 +120,7 @@ private void writeJsonDagNodenameMapToDisk(Map<String, DAGNode<Job>> dagNodeName
}
}
- private void writeJsonEventToDisk(WorkflowEvent event) throws IOException {
+ private void writeJsonEventToDisk(Event event) throws IOException {
if (eventsWriter != null && event != null) {
eventsWriter.append(!eventWritten ? "[ " : ", ");
JSONUtil.writeJson(eventsWriter, event);
View
42 common/src/test/java/com/twitter/ambrose/service/impl/InMemoryStatsServiceTest.java
@@ -15,7 +15,9 @@
*/
package com.twitter.ambrose.service.impl;
-import com.twitter.ambrose.service.WorkflowEvent;
+import com.twitter.ambrose.model.DAGNode;
+import com.twitter.ambrose.model.Event;
+import com.twitter.ambrose.model.Job;
import org.junit.Before;
import org.junit.Test;
@@ -35,10 +37,10 @@
private InMemoryStatsService service;
private final String workflowId = "id1";
- private final WorkflowEvent[] testEvents = new WorkflowEvent[] {
- new WorkflowEvent(WorkflowEvent.EVENT_TYPE.JOB_STARTED, "jobIdFoo", "someRuntime"),
- new WorkflowEvent(WorkflowEvent.EVENT_TYPE.JOB_PROGRESS, "50", "someRuntime"),
- new WorkflowEvent(WorkflowEvent.EVENT_TYPE.JOB_FINISHED, "done", "someRuntime")
+ private final Event[] testEvents = new Event[] {
+ new Event.JobStartedEvent(new DAGNode<Job>("some name", null)),
+ new Event.JobProgressEvent(new DAGNode<Job>("50", null)),
+ new Event.JobFinishedEvent(new DAGNode<Job>("done", null)),
};
@Before
@@ -48,15 +50,15 @@ public void setup() {
@Test
public void testGetAllEvents() throws IOException {
- for(WorkflowEvent event : testEvents) {
+ for(Event event : testEvents) {
service.pushEvent(workflowId, event);
}
- Collection<WorkflowEvent> events = service.getEventsSinceId(workflowId, -1);
- Iterator<WorkflowEvent> foundEvents = events.iterator();
+ Collection<Event> events = service.getEventsSinceId(workflowId, -1);
+ Iterator<Event> foundEvents = events.iterator();
assertTrue("No events returned", foundEvents.hasNext());
- for(WorkflowEvent sentEvent : testEvents) {
+ for(Event sentEvent : testEvents) {
assertEqualWorkflows(sentEvent, foundEvents.next());
}
assertFalse("Wrong number of events returned", foundEvents.hasNext());
@@ -64,30 +66,30 @@ public void testGetAllEvents() throws IOException {
@Test
public void testGetEventsSince() throws IOException {
- for(WorkflowEvent event : testEvents) {
+ for(Event event : testEvents) {
service.pushEvent(workflowId, event);
}
// first, peek at the first eventId
- Collection<WorkflowEvent> allEvents = service.getEventsSinceId(workflowId, -1);
- int sinceId = allEvents.iterator().next().getEventId();
+ Collection<Event> allEvents = service.getEventsSinceId(workflowId, -1);
+ int sinceId = allEvents.iterator().next().getId();
// get all events since the first
- Collection<WorkflowEvent> events = service.getEventsSinceId(workflowId, sinceId);
- Iterator<WorkflowEvent> foundEvents = events.iterator();
+ Collection<Event> events = service.getEventsSinceId(workflowId, sinceId);
+ Iterator<Event> foundEvents = events.iterator();
assertEquals("Wrong number of events returned", testEvents.length - 1, events.size());
- for(WorkflowEvent sentEvent : testEvents) {
- if (sentEvent.getEventId() <= sinceId) { continue; }
+ for(Event sentEvent : testEvents) {
+ if (sentEvent.getId() <= sinceId) { continue; }
assertEqualWorkflows(sentEvent, foundEvents.next());
}
assertFalse("Wrong number of events returned", foundEvents.hasNext());
}
- private void assertEqualWorkflows(WorkflowEvent expected, WorkflowEvent found) {
- assertEquals("Wrong eventId found", expected.getEventId(), found.getEventId());
- assertEquals("Wrong eventType found", expected.getEventType(), found.getEventType());
- assertEquals("Wrong eventData found", expected.getEventData(), found.getEventData());
+ private void assertEqualWorkflows(Event expected, Event found) {
+ assertEquals("Wrong eventId found", expected.getId(), found.getId());
+ assertEquals("Wrong eventType found", expected.getType(), found.getType());
+ assertEquals("Wrong eventData found", expected.getPayload(), found.getPayload());
}
}
View
40 pig/src/main/java/com/twitter/ambrose/pig/AmbrosePigProgressNotificationListener.java
@@ -15,12 +15,12 @@
*/
package com.twitter.ambrose.pig;
+import com.twitter.ambrose.model.DAGNode;
+import com.twitter.ambrose.model.Event;
import com.twitter.ambrose.model.Job;
-import com.twitter.ambrose.model.WorkflowInfo;
+import com.twitter.ambrose.model.Workflow;
import com.twitter.ambrose.model.hadoop.MapReduceJobState;
-import com.twitter.ambrose.service.DAGNode;
import com.twitter.ambrose.service.StatsWriteService;
-import com.twitter.ambrose.service.WorkflowEvent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -59,8 +59,6 @@
public class AmbrosePigProgressNotificationListener implements PigProgressNotificationListener {
protected Log log = LogFactory.getLog(getClass());
- private static final String RUNTIME = "pig";
-
private StatsWriteService statsWriteService;
private String workflowVersion;
@@ -70,10 +68,6 @@
private HashSet<String> completedJobIds = new HashSet<String>();
- protected static enum WorkflowProgressField {
- workflowProgress;
- }
-
protected static enum JobProgressField {
jobId, jobName, trackingUrl, isComplete, isSuccessful,
mapProgress, reduceProgress, totalMappers, totalReducers;
@@ -164,7 +158,7 @@ public void jobStartedNotification(String scriptId, String assignedJobId) {
node.getJob().setId(assignedJobId);
addMapReduceJobState(node.getJob());
dagNodeJobIdMap.put(node.getJob().getId(), node);
- pushEvent(scriptId, WorkflowEvent.EVENT_TYPE.JOB_STARTED, node);
+ pushEvent(scriptId, new Event.JobStartedEvent(node));
}
}
}
@@ -184,7 +178,7 @@ public void jobFailedNotification(String scriptId, JobStats stats) {
}
addCompletedJobStats(node.getJob(), stats);
- pushEvent(scriptId, WorkflowEvent.EVENT_TYPE.JOB_FAILED, node);
+ pushEvent(scriptId, new Event.JobFailedEvent(node));
}
/**
@@ -201,7 +195,7 @@ public void jobFinishedNotification(String scriptId, JobStats stats) {
}
addCompletedJobStats(node.getJob(), stats);
- pushEvent(scriptId, WorkflowEvent.EVENT_TYPE.JOB_FINISHED, node);
+ pushEvent(scriptId, new Event.JobFinishedEvent(node));
}
/**
@@ -217,12 +211,12 @@ public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
if (workflowVersion == null) {
log.warn("scriptFingerprint not set for this script - not saving stats." );
} else {
- WorkflowInfo workflowInfo = new WorkflowInfo(scriptId, workflowVersion, jobs);
+ Workflow workflow = new Workflow(scriptId, workflowVersion, jobs);
try {
- outputStatsData(workflowInfo);
+ outputStatsData(workflow);
} catch (IOException e) {
- log.error("Exception outputting workflowInfo", e);
+ log.error("Exception outputting workflow", e);
}
}
}
@@ -236,9 +230,9 @@ public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
public void progressUpdatedNotification(String scriptId, int progress) {
// first we report the scripts progress
- Map<WorkflowProgressField, String> eventData = new HashMap<WorkflowProgressField, String>();
- eventData.put(WorkflowProgressField.workflowProgress, Integer.toString(progress));
- pushEvent(scriptId, WorkflowEvent.EVENT_TYPE.WORKFLOW_PROGRESS, eventData);
+ Map<Event.WorkflowProgressField, String> eventData = new HashMap<Event.WorkflowProgressField, String>();
+ eventData.put(Event.WorkflowProgressField.workflowProgress, Integer.toString(progress));
+ pushEvent(scriptId, new Event.WorkflowProgressEvent(eventData));
// then for each running job, we report the job progress
for (DAGNode<PigJob> node : dagNodeNameMap.values()) {
@@ -249,7 +243,7 @@ public void progressUpdatedNotification(String scriptId, int progress) {
//only push job progress events for a completed job once
if (node.getJob().getMapReduceJobState() != null && !completedJobIds.contains(node.getJob().getId())) {
- pushEvent(scriptId, WorkflowEvent.EVENT_TYPE.JOB_PROGRESS, node);
+ pushEvent(scriptId, new Event.JobProgressEvent(node));
if (node.getJob().getMapReduceJobState().isComplete()) {
completedJobIds.add(node.getJob().getId());
@@ -292,15 +286,15 @@ private void addCompletedJobStats(PigJob job, JobStats stats) {
jobs.add(job);
}
- private void outputStatsData(WorkflowInfo workflowInfo) throws IOException {
+ private void outputStatsData(Workflow workflow) throws IOException {
if(log.isDebugEnabled()) {
- log.debug("Collected stats for script:\n" + WorkflowInfo.toJSON(workflowInfo));
+ log.debug("Collected stats for script:\n" + Workflow.toJSON(workflow));
}
}
- private void pushEvent(String scriptId, WorkflowEvent.EVENT_TYPE eventType, Object eventData) {
+ private void pushEvent(String scriptId, Event event) {
try {
- statsWriteService.pushEvent(scriptId, new WorkflowEvent(eventType, eventData, RUNTIME));
+ statsWriteService.pushEvent(scriptId, event);
} catch (IOException e) {
log.error("Couldn't send event to StatsWriteService", e);
}
Please sign in to comment.
Something went wrong with that request. Please try again.