Permalink
Browse files

Removed experimental appmaker package, replaced by fluent.

  • Loading branch information...
1 parent 21ff6dc commit a6e5cbde2a66cd2aa7a4d002c72f72fc19f36e92 Leo Neumeyer committed Dec 16, 2011
@@ -1,187 +0,0 @@
-package org.apache.s4.appmaker;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.fluent.FluentApp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.inject.AbstractModule;
-
-/**
- * A fluent API to build S4 applications.
- *
- * *
- * <p>
- * Usage example:
- *
- * <pre>
- * public class MyApp extends AppMaker {
- *
- * &#064;Override
- * void configure() {
- *
- * PEMaker pe1, pe2;
- * StreamMaker s1;
- * StreamMaker s2, s3;
- *
- * pe1 = addPE(PEZ.class);
- *
- * s1 = addStream(EventA.class).withName(&quot;My first stream.&quot;).withKey(&quot;{gender}&quot;).to(pe1);
- *
- * pe2 = addPE(PEY.class).to(s1);
- *
- * s2 = addStream(EventB.class).withName(&quot;My second stream.&quot;).withKey(&quot;{age}&quot;).to(pe2);
- *
- * s3 = addStream(EventB.class).withName(&quot;My third stream.&quot;).withKey(&quot;{height}&quot;).to(pe2);
- *
- * addPE(PEX.class).to(s2).to(s3);
- * }
- * }
- * </pre>
- */
-abstract public class AppMaker {
-
- private static final Logger logger = LoggerFactory.getLogger(AppMaker.class);
-
- /* Use multi-maps to save the graph. */
- private Multimap<PEMaker, StreamMaker> pe2stream = LinkedListMultimap.create();
- private Multimap<StreamMaker, PEMaker> stream2pe = LinkedListMultimap.create();
- private List<Element> order = Lists.newLinkedList();
-
- /**
- * Configure the application.
- */
- abstract protected void configure();
-
- /* Used internally to build the graph. */
- void add(PEMaker pem, StreamMaker stream) {
-
- pe2stream.put(pem, stream);
- logger.trace("Adding pe [{}] to stream [{}].", pem, stream);
- }
-
- /* Used internally to build the graph. */
- void add(StreamMaker stream, PEMaker pem) {
-
- stream2pe.put(stream, pem);
- logger.trace("Adding stream [{}] to pe [{}].", stream, pem);
- }
-
- protected PEMaker addPE(Class<? extends ProcessingElement> type) {
- PEMaker pe = new PEMaker(this, type);
- order.add(new Element(pe, null));
- return pe;
- }
-
- /**
- * Add a stream.
- *
- * @param eventType
- * the type of events emitted by this PE.
- *
- * @return a stream maker.
- */
- protected StreamMaker addStream(String propName, Class<? extends Event> type) {
- StreamMaker stream = new StreamMaker(this, propName, type);
- order.add(new Element(null, stream));
- return stream;
- }
-
- App make() {
-
- App app = null;
-
- /* Build the graph using the same order as configured in AppMaker. */
- for (Element element : order) {
-
- if (element.pe != null) {
- /* Create a PE. */
- ProcessingElement pe = app.createPE(element.pe.getType());
-
- } else {
- /* Create a stream. */
-
- }
- }
-
- Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
- for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
- // sb.append(entry.getKey() + ": ");
- for (StreamMaker sm : entry.getValue()) {
- // sb.append(sm + " ");
- }
- }
-
- Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
- for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
- // sb.append(entry.getKey() + ": ");
- for (PEMaker pm : entry.getValue()) {
- // sb.append(pm + " ");
- }
- }
-
- return null;
- }
-
- /**
- * A printable representation of the application graph.
- *
- * @return the application graph.
- */
- public String toString() {
-
- StringBuilder sb = new StringBuilder();
-
- Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
- for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
- sb.append(entry.getKey() + ": ");
- for (StreamMaker sm : entry.getValue()) {
- sb.append(sm + " ");
- }
- sb.append("\n");
- }
-
- Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
- for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
- sb.append(entry.getKey() + ": ");
- for (PEMaker pm : entry.getValue()) {
- sb.append(pm + " ");
- }
- sb.append("\n");
- }
-
- return sb.toString();
-
- }
-
- class Element {
-
- PEMaker pe;
- StreamMaker stream;
-
- Element(PEMaker pe, StreamMaker stream) {
- this.pe = pe;
- this.stream = stream;
- }
-
- }
-
- class Module extends AbstractModule {
-
- @Override
- protected void configure() {
-
- bind(FluentApp.class);
- bind(PEX.class);
- }
- }
-}
@@ -1,195 +0,0 @@
-package org.apache.s4.appmaker;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Event;
-import org.apache.s4.core.ProcessingElement;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Helper class to add a processing element to an S4 application.
- *
- * @see example {@link S4Maker}
- *
- */
-public class PEMaker {
-
- final private Class<? extends ProcessingElement> type;
- final private AppMaker app;
-
- private long timerInterval = 0;
-
- private long triggerInterval = 0;
- private Class<? extends Event> triggerEventType = null;
- private int triggerNumEvents = 0;
-
- private int cacheMaximumSize = 0;
- private long cacheDuration = 0;
-
- private PropertiesConfiguration properties = new PropertiesConfiguration();
-
- PEMaker(AppMaker app, Class<? extends ProcessingElement> type) {
- Preconditions.checkNotNull(type);
- this.type = type;
- this.app = app;
- app.add(this, null);
- }
-
- /**
- * Configure the PE expiration and cache size.
- * <p>
- * PE instances will be automatically removed from the cache once a fixed duration has elapsed after the PEs
- * creation, or last access.
- * <p>
- * Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
- * <p>
- * When this method is called all existing PE instances are destroyed.
- *
- *
- * @param maximumSize
- * the approximate maximum number of PEs in the cache.
- * @param duration
- * the PE duration
- * @param timeUnit
- * the time unit
- * @return the PEMaker
- */
- public PEMaker withPECache(int maximumSize, long duration, TimeUnit timeUnit) {
-
- cacheMaximumSize = maximumSize;
- cacheDuration = timeUnit.convert(duration, TimeUnit.MILLISECONDS);
-
- return this;
- }
-
- /**
- * Configure a trigger that is fired when the following conditions occur:
- *
- * <ul>
- * <li>An event of eventType arrived to the PE instance
- * <li>numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than
- * interval.
- * </ul>
- *
- * <p>
- * When the trigger fires, the method <tt>trigger(EventType event)</tt> is called. Where <tt>EventType</tt> matches
- * the argument eventType.
- *
- * @param eventType
- * the type of event on which this trigger will fire.
- * @param numEvents
- * number of events since last trigger activation. Must be greater than zero. (Set to one to trigger on
- * every input event.)
- * @param interval
- * minimum time between triggers. Set to zero if no time interval needed.
- * @param timeUnit
- * the TimeUnit for the argument interval. Can set to null if no time interval needed.
- * @return the PEMaker
- */
- public PEMaker withTrigger(Class<? extends Event> eventType, int numEvents, long interval, TimeUnit timeUnit) {
-
- triggerEventType = eventType;
- triggerNumEvents = numEvents;
-
- if (timeUnit != null)
- triggerInterval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
-
- return this;
- }
-
- /**
- * Set a timer that calls {@link ProcessingElement#onTime()}.
- *
- * If {@code interval==0} the timer is disabled.
- *
- * @param interval
- * in timeUnit
- * @param timeUnit
- * the timeUnit of interval
- * @return the PEMaker
- */
- public PEMaker withTimerInterval(long interval, TimeUnit timeUnit) {
- timerInterval = TimeUnit.MILLISECONDS.convert(interval, timeUnit);
-
- timerInterval = interval;
-
- return this;
- }
-
- public PEMaker property(String key, Object value) {
-
- properties.addProperty(key, value);
- return this;
- }
-
- /**
- * Send events from this PE to a stream.
- *
- * @param stream
- *
- *
- * @return the PE maker.
- */
- public PEMaker to(StreamMaker stream) {
- app.add(this, stream);
- return this;
- }
-
- /**
- * @return the timerInterval
- */
- long getTimerInterval() {
- return timerInterval;
- }
-
- /**
- * @return the triggerInterval
- */
- long getTriggerInterval() {
- return triggerInterval;
- }
-
- /**
- * @return the triggerEventType
- */
- Class<? extends Event> getTriggerEventType() {
- return triggerEventType;
- }
-
- /**
- * @return the triggerNumEvents
- */
- int getTriggerNumEvents() {
- return triggerNumEvents;
- }
-
- /**
- * @return the cacheMaximumSize
- */
- int getCacheMaximumSize() {
- return cacheMaximumSize;
- }
-
- /**
- * @return the cacheDuration
- */
- long getCacheDuration() {
- return cacheDuration;
- }
-
- /**
- * @return the type
- */
- Class<? extends ProcessingElement> getType() {
- return type;
- }
-
- /**
- * @return the properties
- */
- PropertiesConfiguration getProperties() {
- return properties;
- }
-}
Oops, something went wrong. Retry.

0 comments on commit a6e5cbd

Please sign in to comment.