Skip to content
This repository
  • 4 commits
  • 19 files changed
  • 0 comments
  • 2 contributors

Showing 19 changed files with 267 additions and 228 deletions. Show diff stats Hide diff stats

  1. +8 4 common/pom.xml
  2. +7 11 common/src/main/java/com/twitter/ambrose/{service → model}/DAGNode.java
  3. +134 0 common/src/main/java/com/twitter/ambrose/model/Event.java
  4. +0 3  common/src/main/java/com/twitter/ambrose/model/Job.java
  5. +16 20 common/src/main/java/com/twitter/ambrose/model/{WorkflowInfo.java → Workflow.java}
  6. +3 10 common/src/main/java/com/twitter/ambrose/model/hadoop/CounterGroup.java
  7. +4 4 common/src/main/java/com/twitter/ambrose/server/APIHandler.java
  8. +2 0  common/src/main/java/com/twitter/ambrose/service/DAGTransformer.java
  9. +3 1 common/src/main/java/com/twitter/ambrose/service/StatsReadService.java
  10. +3 1 common/src/main/java/com/twitter/ambrose/service/StatsWriteService.java
  11. +0 77 common/src/main/java/com/twitter/ambrose/service/WorkflowEvent.java
  12. +9 9 common/src/main/java/com/twitter/ambrose/service/impl/InMemoryStatsService.java
  13. +18 19 common/src/main/java/com/twitter/ambrose/util/JSONUtil.java
  14. +22 20 common/src/test/java/com/twitter/ambrose/service/impl/InMemoryStatsServiceTest.java
  15. +17 23 pig/src/main/java/com/twitter/ambrose/pig/AmbrosePigProgressNotificationListener.java
  16. +0 4 pig/src/main/java/com/twitter/ambrose/pig/InputInfo.java
  17. +0 4 pig/src/main/java/com/twitter/ambrose/pig/OutputInfo.java
  18. +9 12 pig/src/main/java/com/twitter/ambrose/pig/PigJob.java
  19. +12 6 pom.xml
12 common/pom.xml
@@ -38,12 +38,16 @@
38 38
39 39 <!-- serialization -->
40 40 <dependency>
41   - <groupId>org.codehaus.jackson</groupId>
42   - <artifactId>jackson-core-asl</artifactId>
  41 + <groupId>com.fasterxml.jackson.core</groupId>
  42 + <artifactId>jackson-core</artifactId>
43 43 </dependency>
44 44 <dependency>
45   - <groupId>org.codehaus.jackson</groupId>
46   - <artifactId>jackson-mapper-asl</artifactId>
  45 + <groupId>com.fasterxml.jackson.core</groupId>
  46 + <artifactId>jackson-annotations</artifactId>
  47 + </dependency>
  48 + <dependency>
  49 + <groupId>com.fasterxml.jackson.core</groupId>
  50 + <artifactId>jackson-databind</artifactId>
47 51 </dependency>
48 52 <dependency>
49 53 <groupId>com.thoughtworks.xstream</groupId>
18 ...ain/java/com/twitter/ambrose/service/DAGNode.java → .../main/java/com/twitter/ambrose/model/DAGNode.java
@@ -13,26 +13,25 @@
13 13 See the License for the specific language governing permissions and
14 14 limitations under the License.
15 15 */
16   -package com.twitter.ambrose.service;
  16 +package com.twitter.ambrose.model;
17 17
18 18 import java.io.IOException;
19 19 import java.util.Collection;
20 20 import java.util.HashSet;
21 21 import java.util.List;
22 22
23   -import com.twitter.ambrose.model.Job;
24   -import org.codehaus.jackson.annotate.JsonCreator;
25   -import org.codehaus.jackson.annotate.JsonIgnore;
26   -import org.codehaus.jackson.annotate.JsonProperty;
27   -import org.codehaus.jackson.map.annotate.JsonSerialize;
28   -import org.codehaus.jackson.type.TypeReference;
  23 +import com.fasterxml.jackson.annotation.JsonCreator;
  24 +import com.fasterxml.jackson.annotation.JsonIgnore;
  25 +import com.fasterxml.jackson.annotation.JsonProperty;
  26 +import com.fasterxml.jackson.core.type.TypeReference;
  27 +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
29 28
30 29 import com.twitter.ambrose.util.JSONUtil;
31 30
32 31 /**
33 32 * Class that represents a Job node in the DAG. The job name must not be null. At DAG creation time
34 33 * the jobID will probably be null. Ideally this will be set on the node when the job is started,
35   - * and the node will be sent as a <pre>WorkflowEvent.EVENT_TYPE.JOB_STARTED</pre> event.
  34 + * and the node will be sent as a <pre>Event.Type.JOB_STARTED</pre> event.
36 35 *
37 36 * This class can be converted to JSON as-is by doing something like this:
38 37 *
@@ -40,9 +39,6 @@
40 39 * om.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
41 40 * String json = om.writeValueAsString(dagNode);
42 41 */
43   -@JsonSerialize(
44   - include=JsonSerialize.Inclusion.NON_NULL
45   -)
46 42 public class DAGNode<T extends Job> {
47 43 private String name;
48 44 private T job;
134 common/src/main/java/com/twitter/ambrose/model/Event.java
... ... @@ -0,0 +1,134 @@
  1 +/*
  2 +Copyright 2012 Twitter, Inc.
  3 +
  4 +Licensed under the Apache License, Version 2.0 (the "License");
  5 +you may not use this file except in compliance with the License.
  6 +You may obtain a copy of the License at
  7 +
  8 +http://www.apache.org/licenses/LICENSE-2.0
  9 +
  10 +Unless required by applicable law or agreed to in writing, software
  11 +distributed under the License is distributed on an "AS IS" BASIS,
  12 +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +See the License for the specific language governing permissions and
  14 +limitations under the License.
  15 +*/
  16 +package com.twitter.ambrose.model;
  17 +
  18 +import com.twitter.ambrose.util.JSONUtil;
  19 +
  20 +import java.io.IOException;
  21 +import java.util.List;
  22 +import java.util.Map;
  23 +import java.util.concurrent.atomic.AtomicInteger;
  24 +
  25 +import com.fasterxml.jackson.core.type.TypeReference;
  26 +
  27 +/**
  28 + * Class that represents a Event of a given Type. Each one of these created will have
  29 + * a unique id that increments up for each object. eventIds will always be >= 0.
  30 + * The data associated with the event currently can be anything.
  31 + *
  32 + * @author billg
  33 + */
  34 +public class Event<T> {
  35 + private static AtomicInteger NEXT_ID = new AtomicInteger();
  36 +
  37 + public static enum Type { JOB_STARTED, JOB_FINISHED, JOB_FAILED, JOB_PROGRESS, WORKFLOW_PROGRESS};
  38 +
  39 + public static enum WorkflowProgressField {
  40 + workflowProgress;
  41 + }
  42 +
  43 + private long timestamp;
  44 + private int id;
  45 + private Type type;
  46 + private T payload;
  47 +
  48 + private Event(Type type, T payload) {
  49 + this.id = NEXT_ID.incrementAndGet();
  50 + this.timestamp = System.currentTimeMillis();
  51 + this.type = type;
  52 + this.payload = payload;
  53 + }
  54 +
  55 + private Event(int eventId, long timestamp, Type type, T payload) {
  56 + this.id = eventId;
  57 + this.timestamp = timestamp;
  58 + this.type = type;
  59 + this.payload = payload;
  60 + }
  61 +
  62 + public long getTimestamp() { return timestamp; }
  63 + public int getId() { return id; }
  64 + public Type getType() { return type; }
  65 + public Object getPayload() { return payload; }
  66 +
  67 + @SuppressWarnings("unchecked")
  68 + public static void main(String[] args) throws IOException {
  69 + String json = JSONUtil.readFile("pig/src/main/resources/web/data/small-events.json");
  70 + List<Event> events =
  71 + (List<Event>)JSONUtil.readJson(json, new TypeReference<List<Event>>() { });
  72 +// for (Event event : events) {
  73 +// // useful if we need to read a file, add a field, output and re-generate
  74 +// }
  75 +
  76 + JSONUtil.writeJson("pig/src/main/resources/web/data/small-events.json2", events);
  77 + }
  78 +
  79 + /**
  80 + * Helper method to create instances of the proper event. It is the reposibility of the caller to
  81 + * assure that their types are aligned with
  82 + * @param type
  83 + * @param data
  84 + * @return
  85 + */
  86 + @SuppressWarnings("unchecked")
  87 + public static Event create(Type type, Object data) {
  88 + switch (type) {
  89 + case JOB_STARTED:
  90 + return new JobStartedEvent((DAGNode<Job>) data);
  91 + case JOB_PROGRESS:
  92 + return new JobStartedEvent((DAGNode<Job>) data);
  93 + case JOB_FINISHED:
  94 + return new JobFinishedEvent((DAGNode<Job>) data);
  95 + case JOB_FAILED:
  96 + return new JobStartedEvent((DAGNode<Job>) data);
  97 + case WORKFLOW_PROGRESS:
  98 + return new JobStartedEvent((DAGNode<Job>) data);
  99 + default:
  100 + throw new IllegalArgumentException(
  101 + String.format("Unknown event type %s for data payload %s", type, data));
  102 + }
  103 + }
  104 +
  105 + public static class JobStartedEvent extends Event<DAGNode<? extends Job>> {
  106 + public JobStartedEvent(DAGNode<? extends Job> eventData) {
  107 + super(Type.JOB_STARTED, eventData);
  108 + }
  109 + }
  110 +
  111 + public static class JobProgressEvent extends Event<DAGNode<? extends Job>> {
  112 + public JobProgressEvent(DAGNode<? extends Job> eventData) {
  113 + super(Type.JOB_PROGRESS, eventData);
  114 + }
  115 + }
  116 +
  117 + public static class JobFinishedEvent extends Event<DAGNode<? extends Job>> {
  118 + public JobFinishedEvent(DAGNode<? extends Job> eventData) {
  119 + super(Type.JOB_FINISHED, eventData);
  120 + }
  121 + }
  122 +
  123 + public static class JobFailedEvent extends Event<DAGNode<? extends Job>> {
  124 + public JobFailedEvent(DAGNode<? extends Job> eventData) {
  125 + super(Type.JOB_FAILED, eventData);
  126 + }
  127 + }
  128 +
  129 + public static class WorkflowProgressEvent extends Event<Map<WorkflowProgressField, String>> {
  130 + public WorkflowProgressEvent(Map<WorkflowProgressField, String> eventData) {
  131 + super(Type.WORKFLOW_PROGRESS, eventData);
  132 + }
  133 + }
  134 +}
3  common/src/main/java/com/twitter/ambrose/model/Job.java
@@ -29,9 +29,6 @@
29 29 *
30 30 * @author billg
31 31 */
32   -@JsonSerialize(
33   - include=JsonSerialize.Inclusion.NON_NULL
34   -)
35 32 public class Job {
36 33
37 34 private String id;
36 .../java/com/twitter/ambrose/model/WorkflowInfo.java → ...main/java/com/twitter/ambrose/model/Workflow.java
@@ -15,15 +15,14 @@
15 15 */
16 16 package com.twitter.ambrose.model;
17 17
  18 +import java.io.IOException;
  19 +import java.util.List;
  20 +
18 21 import org.codehaus.jackson.annotate.JsonCreator;
19 22 import org.codehaus.jackson.annotate.JsonProperty;
20 23 import org.codehaus.jackson.map.DeserializationConfig;
21 24 import org.codehaus.jackson.map.ObjectMapper;
22 25 import org.codehaus.jackson.map.SerializationConfig;
23   -import org.codehaus.jackson.map.annotate.JsonSerialize;
24   -
25   -import java.io.IOException;
26   -import java.util.List;
27 26
28 27 /**
29 28 * Class that represents the runtime stats for a given workflow. A workflow consists of 1 or more
@@ -32,17 +31,14 @@
32 31 *
33 32 * @author billg
34 33 */
35   -@JsonSerialize(
36   - include=JsonSerialize.Inclusion.NON_NULL
37   -)
38   -public class WorkflowInfo {
  34 +public class Workflow {
39 35
40 36 private String workflowId;
41 37 private String workflowFingerprint;
42 38 private List<Job> jobs;
43 39
44 40 /**
45   - * Creates a new immutable WorkflowInfo object.
  41 + * Creates a new immutable Workflow object.
46 42 *
47 43 * @param workflowId the id of the workflow. This surrogate id should distinguish between one
48 44 * workflow invocation and another.
@@ -52,9 +48,9 @@
52 48 * @param jobs
53 49 */
54 50 @JsonCreator
55   - public WorkflowInfo(@JsonProperty("workflowId") String workflowId,
56   - @JsonProperty("workflowFingerprint") String workflowFingerprint,
57   - @JsonProperty("jobs") List<Job> jobs) {
  51 + public Workflow(@JsonProperty("workflowId") String workflowId,
  52 + @JsonProperty("workflowFingerprint") String workflowFingerprint,
  53 + @JsonProperty("jobs") List<Job> jobs) {
58 54 this.workflowId = workflowId;
59 55 this.workflowFingerprint = workflowFingerprint;
60 56 this.jobs = jobs;
@@ -65,29 +61,29 @@ public WorkflowInfo(@JsonProperty("workflowId") String workflowId,
65 61 public List<Job> getJobs() { return jobs; }
66 62
67 63 /**
68   - * Serializes a WorkflowInfo object and it's children into a JSON String.
  64 + * Serializes a Workflow object and it's children into a JSON String.
69 65 *
70   - * @param workflowInfo the object to serialize
  66 + * @param workflow the object to serialize
71 67 * @return a JSON string.
72 68 * @throws IOException
73 69 */
74   - public static String toJSON(WorkflowInfo workflowInfo) throws IOException {
  70 + public static String toJSON(Workflow workflow) throws IOException {
75 71 ObjectMapper om = new ObjectMapper();
76 72 om.getSerializationConfig().set(SerializationConfig.Feature.INDENT_OUTPUT, true);
77   - return om.writeValueAsString(workflowInfo);
  73 + return om.writeValueAsString(workflow);
78 74 }
79 75
80 76 /**
81   - * Derializes a JSON WorkflowInfo string into a WorkflowInfo object. Unrecognized properties will
  77 + * Derializes a JSON Workflow string into a Workflow object. Unrecognized properties will
82 78 * be ignored.
83 79 *
84 80 * @param workflowInfoJson the string to convert into a JSON object.
85   - * @return a WorkflowInfo object.
  81 + * @return a Workflow object.
86 82 * @throws IOException
87 83 */
88   - public static WorkflowInfo fromJSON(String workflowInfoJson) throws IOException {
  84 + public static Workflow fromJSON(String workflowInfoJson) throws IOException {
89 85 ObjectMapper om = new ObjectMapper();
90 86 om.getDeserializationConfig().set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
91   - return om.readValue(workflowInfoJson, WorkflowInfo.class);
  87 + return om.readValue(workflowInfoJson, Workflow.class);
92 88 }
93 89 }
13 common/src/main/java/com/twitter/ambrose/model/hadoop/CounterGroup.java
@@ -15,14 +15,13 @@
15 15 */
16 16 package com.twitter.ambrose.model.hadoop;
17 17
  18 +import java.util.HashMap;
  19 +import java.util.Map;
  20 +
18 21 import org.apache.hadoop.mapred.Counters;
19 22 import org.apache.hadoop.mapred.Counters.Counter;
20 23 import org.codehaus.jackson.annotate.JsonCreator;
21 24 import org.codehaus.jackson.annotate.JsonProperty;
22   -import org.codehaus.jackson.map.annotate.JsonSerialize;
23   -
24   -import java.util.HashMap;
25   -import java.util.Map;
26 25
27 26 /**
28 27 * Immutable class that represents a group of Hadoop counters along with the individual counter
@@ -31,9 +30,6 @@
31 30 * @author billg
32 31 */
33 32 @SuppressWarnings("deprecation")
34   -@JsonSerialize(
35   - include=JsonSerialize.Inclusion.NON_NULL
36   -)
37 33 public class CounterGroup {
38 34
39 35 private String groupName;
@@ -83,9 +79,6 @@ public CounterInfo getCounterInfo(String name) {
83 79 * CounterInfo holds the name, displayName and value of a given counter. A counter group contains
84 80 * multiple of these.
85 81 */
86   - @JsonSerialize(
87   - include=JsonSerialize.Inclusion.NON_NULL
88   - )
89 82 public static class CounterInfo {
90 83 private String name, displayName;
91 84 private long value;
8 common/src/main/java/com/twitter/ambrose/server/APIHandler.java
@@ -7,13 +7,13 @@
7 7 import javax.servlet.http.HttpServletRequest;
8 8 import javax.servlet.http.HttpServletResponse;
9 9
  10 +import com.twitter.ambrose.model.DAGNode;
  11 +import com.twitter.ambrose.model.Event;
10 12 import org.mortbay.jetty.HttpConnection;
11 13 import org.mortbay.jetty.Request;
12 14 import org.mortbay.jetty.handler.AbstractHandler;
13 15
14   -import com.twitter.ambrose.service.DAGNode;
15 16 import com.twitter.ambrose.service.StatsReadService;
16   -import com.twitter.ambrose.service.WorkflowEvent;
17 17 import com.twitter.ambrose.util.JSONUtil;
18 18
19 19 /**
@@ -54,10 +54,10 @@ public void handle(String target,
54 54 Integer sinceId = request.getParameter(QUERY_PARAM_LAST_EVENT_ID) != null ?
55 55 Integer.parseInt(request.getParameter(QUERY_PARAM_LAST_EVENT_ID)) : -1;
56 56
57   - Collection<WorkflowEvent> events =
  57 + Collection<Event> events =
58 58 statsReadService.getEventsSinceId(request.getParameter(QUERY_PARAM_WORKFLOW_ID), sinceId);
59 59
60   - sendJson(request, response, events.toArray(new WorkflowEvent[events.size()]));
  60 + sendJson(request, response, events.toArray(new Event[events.size()]));
61 61 }
62 62 else if (target.endsWith(".html")) {
63 63 response.setContentType(MIME_TYPE_HTML);
2  common/src/main/java/com/twitter/ambrose/service/DAGTransformer.java
@@ -15,6 +15,8 @@
15 15 */
16 16 package com.twitter.ambrose.service;
17 17
  18 +import com.twitter.ambrose.model.DAGNode;
  19 +
18 20 import java.util.Collection;
19 21
20 22 /**
4 common/src/main/java/com/twitter/ambrose/service/StatsReadService.java
@@ -15,6 +15,8 @@
15 15 */
16 16 package com.twitter.ambrose.service;
17 17
  18 +import com.twitter.ambrose.model.DAGNode;
  19 +import com.twitter.ambrose.model.Event;
18 20 import com.twitter.ambrose.model.Job;
19 21
20 22 import java.io.IOException;
@@ -45,5 +47,5 @@
45 47 * @param eventId the eventId that all returned events will be greater than
46 48 * @return a Collection of WorkflowEvents, ordered by eventId ascending
47 49 */
48   - public Collection<WorkflowEvent> getEventsSinceId(String workflowId, int eventId) throws IOException;
  50 + public Collection<Event> getEventsSinceId(String workflowId, int eventId) throws IOException;
49 51 }
4 common/src/main/java/com/twitter/ambrose/service/StatsWriteService.java
@@ -15,6 +15,8 @@
15 15 */
16 16 package com.twitter.ambrose.service;
17 17
  18 +import com.twitter.ambrose.model.DAGNode;
  19 +import com.twitter.ambrose.model.Event;
18 20 import com.twitter.ambrose.model.Job;
19 21
20 22 import java.io.IOException;
@@ -45,5 +47,5 @@
45 47 * @param workflowId the id of the workflow being updated
46 48 * @param event the event bound to the workflow
47 49 */
48   - public void pushEvent(String workflowId, WorkflowEvent event) throws IOException;
  50 + public void pushEvent(String workflowId, Event event) throws IOException;
49 51 }
77 common/src/main/java/com/twitter/ambrose/service/WorkflowEvent.java
... ... @@ -1,77 +0,0 @@
1   -/*
2   -Copyright 2012 Twitter, Inc.
3   -
4   -Licensed under the Apache License, Version 2.0 (the "License");
5   -you may not use this file except in compliance with the License.
6   -You may obtain a copy of the License at
7   -
8   -http://www.apache.org/licenses/LICENSE-2.0
9   -
10   -Unless required by applicable law or agreed to in writing, software
11   -distributed under the License is distributed on an "AS IS" BASIS,
12   -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   -See the License for the specific language governing permissions and
14   -limitations under the License.
15   -*/
16   -package com.twitter.ambrose.service;
17   -
18   -import com.twitter.ambrose.util.JSONUtil;
19   -import org.codehaus.jackson.type.TypeReference;
20   -
21   -import java.io.IOException;
22   -import java.util.List;
23   -import java.util.concurrent.atomic.AtomicInteger;
24   -
25   -/**
26   - * Class that represents a WorkflowEvent of a given EVENT_TYPE. Each one of these created will have
27   - * a unique eventId that increments up for each object. eventIds will always be >= 0.
28   - * The data associated with the event currently can be anything.
29   - * TODO: can we type eventData better?
30   - *
31   - * @author billg
32   - */
33   -public class WorkflowEvent {
34   - private static AtomicInteger NEXT_ID = new AtomicInteger();
35   -
36   - public static enum EVENT_TYPE { JOB_STARTED, JOB_FINISHED, JOB_FAILED, JOB_PROGRESS, WORKFLOW_PROGRESS};
37   -
38   - private long timestamp;
39   - private int eventId;
40   - private String runtime;
41   - private EVENT_TYPE eventType;
42   - private Object eventData;
43   -
44   - public WorkflowEvent(EVENT_TYPE eventType, Object eventData, String runtime) {
45   - this.eventId = NEXT_ID.incrementAndGet();
46   - this.timestamp = System.currentTimeMillis();
47   - this.eventType = eventType;
48   - this.eventData = eventData;
49   - this.runtime = runtime;
50   - }
51   -
52   - public WorkflowEvent(int eventId, long timestamp, EVENT_TYPE eventType, Object eventData, String runtime) {
53   - this.eventId = eventId;
54   - this.timestamp = timestamp;
55   - this.eventType = eventType;
56   - this.eventData = eventData;
57   - this.runtime = runtime;
58   - }
59   -
60   - public long getTimestamp() { return timestamp; }
61   - public int getEventId() { return eventId; }
62   - public EVENT_TYPE getEventType() { return eventType; }
63   - public Object getEventData() { return eventData; }
64   - public String getRuntime() { return runtime; }
65   -
66   - @SuppressWarnings("unchecked")
67   - public static void main(String[] args) throws IOException {
68   - String json = JSONUtil.readFile("pig/src/main/resources/web/data/small-events.json");
69   - List<WorkflowEvent> events =
70   - (List<WorkflowEvent>)JSONUtil.readJson(json, new TypeReference<List<WorkflowEvent>>() { });
71   -// for (WorkflowEvent event : events) {
72   -// // useful if we need to read a file, add a field, output and re-generate
73   -// }
74   -
75   - JSONUtil.writeJson("pig/src/main/resources/web/data/small-events.json2", events);
76   - }
77   -}
18 common/src/main/java/com/twitter/ambrose/service/impl/InMemoryStatsService.java
@@ -15,11 +15,11 @@
15 15 */
16 16 package com.twitter.ambrose.service.impl;
17 17
  18 +import com.twitter.ambrose.model.DAGNode;
  19 +import com.twitter.ambrose.model.Event;
18 20 import com.twitter.ambrose.model.Job;
19   -import com.twitter.ambrose.service.DAGNode;
20 21 import com.twitter.ambrose.service.StatsReadService;
21 22 import com.twitter.ambrose.service.StatsWriteService;
22   -import com.twitter.ambrose.service.WorkflowEvent;
23 23 import com.twitter.ambrose.util.JSONUtil;
24 24
25 25 import java.io.FileNotFoundException;
@@ -59,8 +59,8 @@
59 59 private static final String DUMP_EVENTS_FILE_PARAM = "ambrose.write.events.file";
60 60
61 61 private Map<String, DAGNode<Job>> dagNodeNameMap = new HashMap<String, DAGNode<Job>>();
62   - private SortedMap<Integer, WorkflowEvent> eventMap =
63   - new ConcurrentSkipListMap<Integer, WorkflowEvent>();
  62 + private SortedMap<Integer, Event> eventMap =
  63 + new ConcurrentSkipListMap<Integer, Event>();
64 64
65 65 private Writer dagWriter = null;
66 66 private Writer eventsWriter = null;
@@ -100,16 +100,16 @@ public synchronized void sendDagNodeNameMap(String workflowId,
100 100 }
101 101
102 102 @Override
103   - public synchronized Collection<WorkflowEvent> getEventsSinceId(String workflowId, int sinceId) {
  103 + public synchronized Collection<Event> getEventsSinceId(String workflowId, int sinceId) {
104 104 int minId = sinceId >= 0 ? sinceId + 1 : sinceId;
105 105
106   - SortedMap<Integer, WorkflowEvent> tailMap = eventMap.tailMap(minId);
  106 + SortedMap<Integer, Event> tailMap = eventMap.tailMap(minId);
107 107 return tailMap.values();
108 108 }
109 109
110 110 @Override
111   - public synchronized void pushEvent(String workflowId, WorkflowEvent event) throws IOException {
112   - eventMap.put(event.getEventId(), event);
  111 + public synchronized void pushEvent(String workflowId, Event event) throws IOException {
  112 + eventMap.put(event.getId(), event);
113 113 writeJsonEventToDisk(event);
114 114 }
115 115
@@ -120,7 +120,7 @@ private void writeJsonDagNodenameMapToDisk(Map<String, DAGNode<Job>> dagNodeName
120 120 }
121 121 }
122 122
123   - private void writeJsonEventToDisk(WorkflowEvent event) throws IOException {
  123 + private void writeJsonEventToDisk(Event event) throws IOException {
124 124 if (eventsWriter != null && event != null) {
125 125 eventsWriter.append(!eventWritten ? "[ " : ", ");
126 126 JSONUtil.writeJson(eventsWriter, event);
37 common/src/main/java/com/twitter/ambrose/util/JSONUtil.java
@@ -15,11 +15,6 @@
15 15 */
16 16 package com.twitter.ambrose.util;
17 17
18   -import org.codehaus.jackson.map.DeserializationConfig;
19   -import org.codehaus.jackson.map.ObjectMapper;
20   -import org.codehaus.jackson.map.SerializationConfig;
21   -import org.codehaus.jackson.type.TypeReference;
22   -
23 18 import java.io.File;
24 19 import java.io.FileInputStream;
25 20 import java.io.IOException;
@@ -29,25 +24,36 @@
29 24 import java.nio.channels.FileChannel;
30 25 import java.nio.charset.Charset;
31 26
  27 +import com.fasterxml.jackson.annotation.JsonInclude;
  28 +import com.fasterxml.jackson.core.type.TypeReference;
  29 +import com.fasterxml.jackson.databind.DeserializationFeature;
  30 +import com.fasterxml.jackson.databind.ObjectMapper;
  31 +import com.fasterxml.jackson.databind.SerializationFeature;
  32 +
  33 +
32 34 /**
33 35 * Helper method for dealing with JSON in a common way.
34 36 *
35 37 * @author billg
36 38 */
37 39 public class JSONUtil {
  40 + private static final ObjectMapper mapper = new ObjectMapper();
  41 + static {
  42 + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
  43 + mapper.enable(SerializationFeature.INDENT_OUTPUT);
  44 + mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
  45 + mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
  46 + }
38 47
39 48 /**
40 49 * Writes object to the writer as JSON using Jackson and adds a new-line before flushing.
  50 + *
41 51 * @param writer the writer to write the JSON to
42 52 * @param object the object to write as JSON
43 53 * @throws IOException if the object can't be serialized as JSON or written to the writer
44 54 */
45 55 public static void writeJson(Writer writer, Object object) throws IOException {
46   - ObjectMapper om = new ObjectMapper();
47   - om.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
48   - om.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
49   -
50   - writer.write(om.writeValueAsString(object));
  56 + mapper.writeValue(writer, object);
51 57 writer.write("\n");
52 58 writer.flush();
53 59 }
@@ -57,11 +63,7 @@ public static void writeJson(String fileName, Object object) throws IOException
57 63 }
58 64
59 65 public static Object readJson(String json, TypeReference<?> type) throws IOException {
60   - ObjectMapper om = new ObjectMapper();
61   - om.getDeserializationConfig().set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
62   -
63   - // not currently setting successors, only successorNames
64   - return om.readValue(json, type);
  66 + return mapper.readValue(json, type);
65 67 }
66 68
67 69 public static String readFile(String path) throws IOException {
@@ -70,11 +72,8 @@ public static String readFile(String path) throws IOException {
70 72 FileChannel fc = stream.getChannel();
71 73 MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
72 74 return Charset.defaultCharset().decode(bb).toString();
73   - }
74   - finally {
  75 + } finally {
75 76 stream.close();
76 77 }
77 78 }
78   -
79   -
80 79 }
42 common/src/test/java/com/twitter/ambrose/service/impl/InMemoryStatsServiceTest.java
@@ -15,7 +15,9 @@
15 15 */
16 16 package com.twitter.ambrose.service.impl;
17 17
18   -import com.twitter.ambrose.service.WorkflowEvent;
  18 +import com.twitter.ambrose.model.DAGNode;
  19 +import com.twitter.ambrose.model.Event;
  20 +import com.twitter.ambrose.model.Job;
19 21 import org.junit.Before;
20 22 import org.junit.Test;
21 23
@@ -35,10 +37,10 @@
35 37 private InMemoryStatsService service;
36 38
37 39 private final String workflowId = "id1";
38   - private final WorkflowEvent[] testEvents = new WorkflowEvent[] {
39   - new WorkflowEvent(WorkflowEvent.EVENT_TYPE.JOB_STARTED, "jobIdFoo", "someRuntime"),
40   - new WorkflowEvent(WorkflowEvent.EVENT_TYPE.JOB_PROGRESS, "50", "someRuntime"),
41   - new WorkflowEvent(WorkflowEvent.EVENT_TYPE.JOB_FINISHED, "done", "someRuntime")
  40 + private final Event[] testEvents = new Event[] {
  41 + new Event.JobStartedEvent(new DAGNode<Job>("some name", null)),
  42 + new Event.JobProgressEvent(new DAGNode<Job>("50", null)),
  43 + new Event.JobFinishedEvent(new DAGNode<Job>("done", null)),
42 44 };
43 45
44 46 @Before
@@ -48,15 +50,15 @@ public void setup() {
48 50
49 51 @Test
50 52 public void testGetAllEvents() throws IOException {
51   - for(WorkflowEvent event : testEvents) {
  53 + for(Event event : testEvents) {
52 54 service.pushEvent(workflowId, event);
53 55 }
54 56
55   - Collection<WorkflowEvent> events = service.getEventsSinceId(workflowId, -1);
56   - Iterator<WorkflowEvent> foundEvents = events.iterator();
  57 + Collection<Event> events = service.getEventsSinceId(workflowId, -1);
  58 + Iterator<Event> foundEvents = events.iterator();
57 59
58 60 assertTrue("No events returned", foundEvents.hasNext());
59   - for(WorkflowEvent sentEvent : testEvents) {
  61 + for(Event sentEvent : testEvents) {
60 62 assertEqualWorkflows(sentEvent, foundEvents.next());
61 63 }
62 64 assertFalse("Wrong number of events returned", foundEvents.hasNext());
@@ -64,30 +66,30 @@ public void testGetAllEvents() throws IOException {
64 66
65 67 @Test
66 68 public void testGetEventsSince() throws IOException {
67   - for(WorkflowEvent event : testEvents) {
  69 + for(Event event : testEvents) {
68 70 service.pushEvent(workflowId, event);
69 71 }
70 72
71 73 // first, peek at the first eventId
72   - Collection<WorkflowEvent> allEvents = service.getEventsSinceId(workflowId, -1);
73   - int sinceId = allEvents.iterator().next().getEventId();
  74 + Collection<Event> allEvents = service.getEventsSinceId(workflowId, -1);
  75 + int sinceId = allEvents.iterator().next().getId();
74 76
75 77 // get all events since the first
76   - Collection<WorkflowEvent> events = service.getEventsSinceId(workflowId, sinceId);
77   - Iterator<WorkflowEvent> foundEvents = events.iterator();
  78 + Collection<Event> events = service.getEventsSinceId(workflowId, sinceId);
  79 + Iterator<Event> foundEvents = events.iterator();
78 80
79 81 assertEquals("Wrong number of events returned", testEvents.length - 1, events.size());
80   - for(WorkflowEvent sentEvent : testEvents) {
81   - if (sentEvent.getEventId() <= sinceId) { continue; }
  82 + for(Event sentEvent : testEvents) {
  83 + if (sentEvent.getId() <= sinceId) { continue; }
82 84
83 85 assertEqualWorkflows(sentEvent, foundEvents.next());
84 86 }
85 87 assertFalse("Wrong number of events returned", foundEvents.hasNext());
86 88 }
87 89
88   - private void assertEqualWorkflows(WorkflowEvent expected, WorkflowEvent found) {
89   - assertEquals("Wrong eventId found", expected.getEventId(), found.getEventId());
90   - assertEquals("Wrong eventType found", expected.getEventType(), found.getEventType());
91   - assertEquals("Wrong eventData found", expected.getEventData(), found.getEventData());
  90 + private void assertEqualWorkflows(Event expected, Event found) {
  91 + assertEquals("Wrong eventId found", expected.getId(), found.getId());
  92 + assertEquals("Wrong eventType found", expected.getType(), found.getType());
  93 + assertEquals("Wrong eventData found", expected.getPayload(), found.getPayload());
92 94 }
93 95 }
40 pig/src/main/java/com/twitter/ambrose/pig/AmbrosePigProgressNotificationListener.java
@@ -15,12 +15,12 @@
15 15 */
16 16 package com.twitter.ambrose.pig;
17 17
  18 +import com.twitter.ambrose.model.DAGNode;
  19 +import com.twitter.ambrose.model.Event;
18 20 import com.twitter.ambrose.model.Job;
19   -import com.twitter.ambrose.model.WorkflowInfo;
  21 +import com.twitter.ambrose.model.Workflow;
20 22 import com.twitter.ambrose.model.hadoop.MapReduceJobState;
21   -import com.twitter.ambrose.service.DAGNode;
22 23 import com.twitter.ambrose.service.StatsWriteService;
23   -import com.twitter.ambrose.service.WorkflowEvent;
24 24 import org.apache.commons.logging.Log;
25 25 import org.apache.commons.logging.LogFactory;
26 26 import org.apache.hadoop.conf.Configuration;
@@ -59,8 +59,6 @@
59 59 public class AmbrosePigProgressNotificationListener implements PigProgressNotificationListener {
60 60 protected Log log = LogFactory.getLog(getClass());
61 61
62   - private static final String RUNTIME = "pig";
63   -
64 62 private StatsWriteService statsWriteService;
65 63
66 64 private String workflowVersion;
@@ -70,10 +68,6 @@
70 68
71 69 private HashSet<String> completedJobIds = new HashSet<String>();
72 70
73   - protected static enum WorkflowProgressField {
74   - workflowProgress;
75   - }
76   -
77 71 protected static enum JobProgressField {
78 72 jobId, jobName, trackingUrl, isComplete, isSuccessful,
79 73 mapProgress, reduceProgress, totalMappers, totalReducers;
@@ -164,7 +158,7 @@ public void jobStartedNotification(String scriptId, String assignedJobId) {
164 158 node.getJob().setId(assignedJobId);
165 159 addMapReduceJobState(node.getJob());
166 160 dagNodeJobIdMap.put(node.getJob().getId(), node);
167   - pushEvent(scriptId, WorkflowEvent.EVENT_TYPE.JOB_STARTED, node);
  161 + pushEvent(scriptId, new Event.JobStartedEvent(node));
168 162 }
169 163 }
170 164 }
@@ -184,7 +178,7 @@ public void jobFailedNotification(String scriptId, JobStats stats) {
184 178 }
185 179
186 180 addCompletedJobStats(node.getJob(), stats);
187   - pushEvent(scriptId, WorkflowEvent.EVENT_TYPE.JOB_FAILED, node);
  181 + pushEvent(scriptId, new Event.JobFailedEvent(node));
188 182 }
189 183
190 184 /**
@@ -201,7 +195,7 @@ public void jobFinishedNotification(String scriptId, JobStats stats) {
201 195 }
202 196
203 197 addCompletedJobStats(node.getJob(), stats);
204   - pushEvent(scriptId, WorkflowEvent.EVENT_TYPE.JOB_FINISHED, node);
  198 + pushEvent(scriptId, new Event.JobFinishedEvent(node));
205 199 }
206 200
207 201 /**
@@ -217,12 +211,12 @@ public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
217 211 if (workflowVersion == null) {
218 212 log.warn("scriptFingerprint not set for this script - not saving stats." );
219 213 } else {
220   - WorkflowInfo workflowInfo = new WorkflowInfo(scriptId, workflowVersion, jobs);
  214 + Workflow workflow = new Workflow(scriptId, workflowVersion, jobs);
221 215
222 216 try {
223   - outputStatsData(workflowInfo);
  217 + outputStatsData(workflow);
224 218 } catch (IOException e) {
225   - log.error("Exception outputting workflowInfo", e);
  219 + log.error("Exception outputting workflow", e);
226 220 }
227 221 }
228 222 }
@@ -236,9 +230,9 @@ public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
236 230 public void progressUpdatedNotification(String scriptId, int progress) {
237 231
238 232 // first we report the scripts progress
239   - Map<WorkflowProgressField, String> eventData = new HashMap<WorkflowProgressField, String>();
240   - eventData.put(WorkflowProgressField.workflowProgress, Integer.toString(progress));
241   - pushEvent(scriptId, WorkflowEvent.EVENT_TYPE.WORKFLOW_PROGRESS, eventData);
  233 + Map<Event.WorkflowProgressField, String> eventData = new HashMap<Event.WorkflowProgressField, String>();
  234 + eventData.put(Event.WorkflowProgressField.workflowProgress, Integer.toString(progress));
  235 + pushEvent(scriptId, new Event.WorkflowProgressEvent(eventData));
242 236
243 237 // then for each running job, we report the job progress
244 238 for (DAGNode<PigJob> node : dagNodeNameMap.values()) {
@@ -249,7 +243,7 @@ public void progressUpdatedNotification(String scriptId, int progress) {
249 243
250 244 //only push job progress events for a completed job once
251 245 if (node.getJob().getMapReduceJobState() != null && !completedJobIds.contains(node.getJob().getId())) {
252   - pushEvent(scriptId, WorkflowEvent.EVENT_TYPE.JOB_PROGRESS, node);
  246 + pushEvent(scriptId, new Event.JobProgressEvent(node));
253 247
254 248 if (node.getJob().getMapReduceJobState().isComplete()) {
255 249 completedJobIds.add(node.getJob().getId());
@@ -292,15 +286,15 @@ private void addCompletedJobStats(PigJob job, JobStats stats) {
292 286 jobs.add(job);
293 287 }
294 288
295   - private void outputStatsData(WorkflowInfo workflowInfo) throws IOException {
  289 + private void outputStatsData(Workflow workflow) throws IOException {
296 290 if(log.isDebugEnabled()) {
297   - log.debug("Collected stats for script:\n" + WorkflowInfo.toJSON(workflowInfo));
  291 + log.debug("Collected stats for script:\n" + Workflow.toJSON(workflow));
298 292 }
299 293 }
300 294
301   - private void pushEvent(String scriptId, WorkflowEvent.EVENT_TYPE eventType, Object eventData) {
  295 + private void pushEvent(String scriptId, Event event) {
302 296 try {
303   - statsWriteService.pushEvent(scriptId, new WorkflowEvent(eventType, eventData, RUNTIME));
  297 + statsWriteService.pushEvent(scriptId, event);
304 298 } catch (IOException e) {
305 299 log.error("Couldn't send event to StatsWriteService", e);
306 300 }
4 pig/src/main/java/com/twitter/ambrose/pig/InputInfo.java
@@ -2,14 +2,10 @@
2 2
3 3 import org.codehaus.jackson.annotate.JsonCreator;
4 4 import org.codehaus.jackson.annotate.JsonProperty;
5   -import org.codehaus.jackson.map.annotate.JsonSerialize;
6 5
7 6 /**
8 7 * Class that represents information about a data input to a job.
9 8 */
10   -@JsonSerialize(
11   - include=JsonSerialize.Inclusion.NON_NULL
12   -)
13 9 public class InputInfo {
14 10 private String name;
15 11 private String location;
4 pig/src/main/java/com/twitter/ambrose/pig/OutputInfo.java
@@ -2,14 +2,10 @@
2 2
3 3 import org.codehaus.jackson.annotate.JsonCreator;
4 4 import org.codehaus.jackson.annotate.JsonProperty;
5   -import org.codehaus.jackson.map.annotate.JsonSerialize;
6 5
7 6 /**
8 7 * Class that represents information about a data output of a job.
9 8 */
10   -@JsonSerialize(
11   - include=JsonSerialize.Inclusion.NON_NULL
12   -)
13 9 public class OutputInfo {
14 10 private String name;
15 11 private String location;
21 pig/src/main/java/com/twitter/ambrose/pig/PigJob.java
@@ -15,21 +15,21 @@
15 15 */
16 16 package com.twitter.ambrose.pig;
17 17
18   -import com.twitter.ambrose.model.Job;
19   -import com.twitter.ambrose.model.hadoop.CounterGroup;
20   -import com.twitter.ambrose.model.hadoop.MapReduceJobState;
  18 +import java.util.ArrayList;
  19 +import java.util.HashMap;
  20 +import java.util.List;
  21 +import java.util.Map;
  22 +import java.util.Properties;
  23 +
21 24 import org.apache.pig.tools.pigstats.InputStats;
22 25 import org.apache.pig.tools.pigstats.JobStats;
23 26 import org.apache.pig.tools.pigstats.OutputStats;
24 27 import org.codehaus.jackson.annotate.JsonCreator;
25 28 import org.codehaus.jackson.annotate.JsonProperty;
26   -import org.codehaus.jackson.map.annotate.JsonSerialize;
27 29
28   -import java.util.ArrayList;
29   -import java.util.HashMap;
30   -import java.util.List;
31   -import java.util.Map;
32   -import java.util.Properties;
  30 +import com.twitter.ambrose.model.Job;
  31 +import com.twitter.ambrose.model.hadoop.CounterGroup;
  32 +import com.twitter.ambrose.model.hadoop.MapReduceJobState;
33 33
34 34 /**
35 35 * Subclass of Job used to hold initialization logic and Pig-specific bindings for a Job.
@@ -39,9 +39,6 @@
39 39 *
40 40 * @author billg
41 41 */
42   -@JsonSerialize(
43   - include=JsonSerialize.Inclusion.NON_NULL
44   -)
45 42 public class PigJob extends Job {
46 43 private static final String RUNTIME = "pig";
47 44
18 pom.xml
@@ -65,6 +65,7 @@
65 65 <properties>
66 66 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
67 67 <slf4j.version>1.6.4</slf4j.version>
  68 + <fasterxml.jackson.version>2.1.1</fasterxml.jackson.version>
68 69 <apache.pig.version>0.11.0</apache.pig.version>
69 70 <apache.hadoop.version>0.22.0</apache.hadoop.version>
70 71 <mortbay.jetty.version>6.1.25</mortbay.jetty.version>
@@ -117,14 +118,19 @@
117 118
118 119 <!-- serialization -->
119 120 <dependency>
120   - <groupId>org.codehaus.jackson</groupId>
121   - <artifactId>jackson-core-asl</artifactId>
122   - <version>1.8.2</version>
  121 + <groupId>com.fasterxml.jackson.core</groupId>
  122 + <artifactId>jackson-core</artifactId>
  123 + <version>${fasterxml.jackson.version}</version>
123 124 </dependency>
124 125 <dependency>
125   - <groupId>org.codehaus.jackson</groupId>
126   - <artifactId>jackson-mapper-asl</artifactId>
127   - <version>1.8.2</version>
  126 + <groupId>com.fasterxml.jackson.core</groupId>
  127 + <artifactId>jackson-annotations</artifactId>
  128 + <version>${fasterxml.jackson.version}</version>
  129 + </dependency>
  130 + <dependency>
  131 + <groupId>com.fasterxml.jackson.core</groupId>
  132 + <artifactId>jackson-databind</artifactId>
  133 + <version>${fasterxml.jackson.version}</version>
128 134 </dependency>
129 135 <dependency>
130 136 <groupId>com.thoughtworks.xstream</groupId>

No commit comments for this range

Something went wrong with that request. Please try again.