Permalink
Browse files

Added data structures to save ap graph and added toString() to print …

…the app graph.
  • Loading branch information...
1 parent b8cea4a commit d6e3684ccc0f11c8cd06264096d0820ab87faada Leo Neumeyer committed Dec 11, 2011
@@ -1,48 +1,61 @@
package org.apache.s4.appbuilder;
+import java.util.Collection;
+import java.util.Map;
+
import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
-public class AppMaker {
+abstract public class AppMaker {
+
+ /**
+ * NOTES: reflection+guice:
+ * <code>http://groups.google.com/group/google-guice/browse_thread/thread/23f4bf986a999e00/73f83a98c288a3e1?lnk=gst&q=binding+api#73f83a98c288a3e1</code>
+ */
/**
* The app graph is stored as follows:
* <p>
- *
+ * PE to Stream
* <p>
* PE[1]: S[1,1], S[1,2], ...
* <p>
* PE[2]: S[2,1], S[2,2], ...
+ * <p>
+ * Stream to PE
+ * <p>
+ * S[1]: PE[1]
+ * <p>
+ * S[2] : PE[2]
+ *
*/
- private Multimap<PEMaker, StreamMaker> graph = LinkedListMultimap.create();
+ private Multimap<PEMaker, StreamMaker> psGraph = LinkedListMultimap.create();
+ private Map<StreamMaker, PEMaker> spGraph = Maps.newHashMap();
public AppMaker() {
}
- /**
- * Add a processing element.
- *
- * @param streams
- * events emitted by this PE will be put into these streams.
- *
- * @return a pe maker.
- */
- // public <T extends Event> PEMaker addPE(StreamMaker<T>... streams) {
- //
- // PEMaker pem = new PEMaker();
- // for (int i = 0; i < streams.length; i++)
- // graph.put(pem, streams[i]);
- //
- // return pem;
- // }
+ abstract protected void define();
+
+ void add(PEMaker pem, StreamMaker stream) {
+
+ psGraph.put(pem, stream);
+ }
+
+ void add(StreamMaker stream, PEMaker pem) {
+
+ spGraph.put(stream, pem);
+ }
+
public PEMaker addPE(Class<? extends ProcessingElement> type) {
- return new PEMaker(type);
+ return new PEMaker(this, type);
}
/**
@@ -55,11 +68,33 @@ public PEMaker addPE(Class<? extends ProcessingElement> type) {
*/
public StreamMaker addStream(Class<? extends Event> type) {
- return new StreamMaker(type);
+ return new StreamMaker(this, type);
}
public App make() {
return null;
}
+
+ public String toString() {
+
+ StringBuilder sb = new StringBuilder();
+ Map<PEMaker, Collection<StreamMaker>> psMap = psGraph.asMap();
+
+ for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : psMap.entrySet()) {
+ sb.append(entry.getKey() + ": ");
+ for (StreamMaker sm : entry.getValue()) {
+ sb.append(sm + " ");
+ }
+ sb.append("\n");
+ }
+
+ for (Map.Entry<StreamMaker, PEMaker> entry : spGraph.entrySet()) {
+ sb.append(entry.getKey() + ": " + entry.getValue());
+ sb.append("\n");
+ }
+
+ return sb.toString();
+
+ }
}
@@ -0,0 +1,33 @@
+package org.apache.s4.appbuilder;
+
+public class MyApp extends AppMaker {
+
+ /**
+ * @param args
+ */
+ public void define() {
+
+ PEMaker pem1, pem2;
+ StreamMaker s1;
+ StreamMaker s2, s3;
+
+ pem1 = addPE(PEZ.class);
+
+ s1 = addStream(EventA.class).withName("My first stream.").withKey("{gender}").to(pem1);
+
+ pem2 = addPE(PEY.class).to(s1);
+
+ s2 = addStream(EventB.class).withName("My second stream.").withKey("{age}").to(pem2);
+
+ s3 = addStream(EventB.class).withName("My third stream.").withKey("{height}").to(pem2);
+
+ addPE(PEX.class).to(s2).to(s3);
+ }
+
+ public static void main(String[] args) {
+
+ MyApp myApp = new MyApp();
+ myApp.define();
+ System.out.println(myApp.toString());
+ }
+}
@@ -8,10 +8,12 @@
public class PEMaker {
private Class<? extends ProcessingElement> type;
+ private AppMaker appMaker;
/* Only package classes can instantiate this class. */
- PEMaker(Class<? extends ProcessingElement> type) {
+ PEMaker(AppMaker appMaker, Class<? extends ProcessingElement> type) {
this.type = type;
+ this.appMaker = appMaker;
}
public PEMaker withTrigger(Class<? extends Event> eventType, int numEvents, long interval, TimeUnit timeUnit) {
@@ -25,6 +27,7 @@ public PEMaker withTimerInterval(long interval, TimeUnit timeUnit) {
}
public <T extends Event> PEMaker to(StreamMaker stream) {
+ appMaker.add(this, stream);
return this;
}
@@ -10,10 +10,12 @@
private String keyFinderString;
private PEMaker pem;
private Class<? extends Event> type;
+ private AppMaker appMaker;
/* Only package classes can instantiate this class. */
- StreamMaker(Class<? extends Event> type) {
+ StreamMaker(AppMaker appMaker, Class<? extends Event> type) {
this.type = type;
+ this.appMaker = appMaker;
}
/**
@@ -35,7 +37,7 @@ public StreamMaker withName(String name) {
* a function to lookup the value of the key.
* @return the stream maker object
*/
- public StreamMaker withKeyFinder(KeyFinder<?> keyFinder) {
+ public StreamMaker withKey(KeyFinder<?> keyFinder) {
this.keyFinder = keyFinder;
this.keyFinderString = null;
return this;
@@ -48,7 +50,7 @@ public StreamMaker withKeyFinder(KeyFinder<?> keyFinder) {
* a descriptor to lookup the value of the key.
* @return the stream maker object
*/
- public StreamMaker withKeyFinder(String keyFinderString) {
+ public StreamMaker withKey(String keyFinderString) {
this.keyFinder = null;
this.keyFinderString = keyFinderString;
return this;
@@ -62,6 +64,7 @@ public StreamMaker withKeyFinder(String keyFinderString) {
* @return the stream maker object
*/
public StreamMaker to(PEMaker pem) {
+ appMaker.add(this, pem);
this.pem = pem;
return this;
}

0 comments on commit d6e3684

Please sign in to comment.