Browse files

ProcessingElement interface requires clock and id mutators, AbstractP…

…E has default implementations
  • Loading branch information...
1 parent 732f95d commit f335b0f1d0d6d4a3a12d839f632a9a715ca9afa9 Bruce Robbins committed Jun 3, 2011
View
25 s4-core/src/main/java/io/s4/MainApp.java
@@ -201,9 +201,9 @@ public static void main(String args[]) throws Exception {
coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls, coreContext);
ApplicationContext context = coreContext;
- Clock s4Clock = (Clock) context.getBean("clock");
- if (s4Clock instanceof EventClock && seedTime > 0) {
- EventClock s4EventClock = (EventClock)s4Clock;
+ Clock clock = (Clock) context.getBean("clock");
+ if (clock instanceof EventClock && seedTime > 0) {
+ EventClock s4EventClock = (EventClock)clock;
s4EventClock.updateTime(seedTime);
System.out.println("Intializing event clock time with seed time " + s4EventClock.getCurrentTime());
}
@@ -238,19 +238,12 @@ public static void main(String args[]) throws Exception {
// Container
String[] processingElementBeanNames = context.getBeanNamesForType(ProcessingElement.class);
for (String processingElementBeanName : processingElementBeanNames) {
- Object bean = context.getBean(processingElementBeanName);
- try {
- Method getS4ClockMethod = bean.getClass().getMethod("getS4Clock");
-
- if (getS4ClockMethod.getReturnType().equals(Clock.class)) {
- if (getS4ClockMethod.invoke(bean) == null) {
- Method setS4ClockMethod = bean.getClass().getMethod("setS4Clock", Clock.class);
- setS4ClockMethod.invoke(bean, coreContext.getBean("clock"));
- }
- }
- }
- catch (NoSuchMethodException mnfe) {
- // acceptable
+ ProcessingElement bean = (ProcessingElement) context.getBean(processingElementBeanName);
+ bean.setClock(clock);
+
+ // if the application did not specify an id, use the Spring bean name
+ if (bean.getId() == null) {
+ bean.setId(processingElementBeanName);
}
System.out.println("Adding processing element with bean name "
+ processingElementBeanName + ", id "
View
25 s4-core/src/main/java/io/s4/processor/AbstractPE.java
@@ -59,7 +59,7 @@ public String getName() {
}
}
- private Clock s4Clock;
+ private Clock clock;
private int outputFrequency = 1;
private FrequencyType outputFrequencyType = FrequencyType.EVENTCOUNT;
private int outputFrequencyOffset = 0;
@@ -76,6 +76,7 @@ public String getName() {
private long pauseTimeInMillis;
private boolean logPauses = false;
private String initMethod = null;
+ private String id;
public void setSaveKeyRecord(boolean saveKeyRecord) {
this.saveKeyRecord = saveKeyRecord;
@@ -93,9 +94,17 @@ public void setLogPauses(boolean logPauses) {
this.logPauses = logPauses;
}
- public void setS4Clock(Clock s4Clock) {
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public void setClock(Clock clock) {
synchronized (this) {
- this.s4Clock = s4Clock;
+ this.clock = clock;
this.notify();
}
}
@@ -113,8 +122,8 @@ public String getInitMethod() {
return this.initMethod;
}
- public Clock getS4Clock() {
- return s4Clock;
+ public Clock getClock() {
+ return clock;
}
private OverloadDispatcher overloadDispatcher;
@@ -169,7 +178,7 @@ public void execute(String streamName, CompoundKeyInfo compoundKeyInfo,
}
public long getCurrentTime() {
- return s4Clock.getCurrentTime();
+ return clock.getCurrentTime();
}
/**
@@ -412,7 +421,7 @@ public void setLookupTable(Persister lookupTable) {
class OutputInvoker implements Runnable {
public void run() {
synchronized (AbstractPE.this) {
- while (s4Clock == null) {
+ while (clock == null) {
try {
AbstractPE.this.wait();
} catch (InterruptedException ie) {
@@ -427,7 +436,7 @@ public void run() {
long currentBoundary = (currentTime / frequencyInMillis)
* frequencyInMillis;
long nextBoundary = currentBoundary + frequencyInMillis;
- currentTime = s4Clock.waitForTime(nextBoundary
+ currentTime = clock.waitForTime(nextBoundary
+ (outputFrequencyOffset * 1000));
if (lookupTable != null) {
Set peKeys = lookupTable.keySet();
View
9 s4-core/src/main/java/io/s4/processor/JoinPE.java
@@ -35,19 +35,10 @@
private Map<String, Object> eventsToJoin;
private EventDispatcher dispatcher;
private Monitor monitor;
- private String id = "JoinPE";
private String outputStreamName;
private String outputClassName;
private Class<?> outputClass;
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
public void setDispatcher(EventDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
View
16 s4-core/src/main/java/io/s4/processor/PEContainer.java
@@ -47,7 +47,7 @@
BlockingQueue<EventWrapper> workQueue;
private List<PrototypeWrapper> prototypeWrappers = new ArrayList<PrototypeWrapper>();
private Monitor monitor;
- private Clock s4Clock;
+ private Clock clock;
private int maxQueueSize = 1000;
private boolean trackByKey;
private Map<String, Integer> countByEventType = Collections.synchronizedMap(new HashMap<String, Integer>());
@@ -62,12 +62,12 @@ public void setMonitor(Monitor monitor) {
this.monitor = monitor;
}
- public void setS4Clock(Clock s4Clock) {
- this.s4Clock = s4Clock;
+ public void setClock(Clock s4Clock) {
+ this.clock = s4Clock;
}
- public Clock getS4Clock() {
- return s4Clock;
+ public Clock getClock() {
+ return clock;
}
public void setTrackByKey(boolean trackByKey) {
@@ -76,7 +76,7 @@ public void setTrackByKey(boolean trackByKey) {
public void addProcessor(ProcessingElement processor) {
System.out.println("adding pe: " + processor);
- PrototypeWrapper pw = new PrototypeWrapper(processor, s4Clock);
+ PrototypeWrapper pw = new PrototypeWrapper(processor, clock);
prototypeWrappers.add(pw);
adviceLists.add(pw.advise());
}
@@ -179,8 +179,8 @@ public void run() {
EventWrapper eventWrapper = null;
try {
eventWrapper = workQueue.take();
- if (s4Clock instanceof EventClock) {
- EventClock eventClock = (EventClock) s4Clock;
+ if (clock instanceof EventClock) {
+ EventClock eventClock = (EventClock) clock;
eventClock.update(eventWrapper);
// To what time to update the clock
}
View
10 s4-core/src/main/java/io/s4/processor/PrintEventPE.java
@@ -19,16 +19,6 @@
public class PrintEventPE extends AbstractPE {
- private String id = "PrintEventPE";
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
@Override
public void output() {
// TODO Auto-generated method stub
View
7 s4-core/src/main/java/io/s4/processor/ProcessingElement.java
@@ -16,6 +16,7 @@
package io.s4.processor;
import io.s4.dispatcher.partitioner.CompoundKeyInfo;
+import io.s4.util.clock.Clock;
import java.util.List;
@@ -30,5 +31,11 @@
public String getId();
+ public void setId(String id);
+
+ public Clock getClock();
+
+ public void setClock(Clock clock);
+
public String getInitMethod();
}
View
9 s4-core/src/main/java/io/s4/processor/ReroutePE.java
@@ -30,17 +30,8 @@
private EventDispatcher dispatcher;
private Transformer[] transformers = new Transformer[0];
// private List<EventAdvice> keys;
- private String id = "ReroutePE";
private String outputStreamName;
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
public void setDispatcher(EventDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
View
9 s4-core/src/main/java/io/s4/processor/SimpleCountingPE.java
@@ -26,15 +26,6 @@
private int persistTime;
private String keyPrefix = "s4:counter";
private boolean dirty = false;
- private String id = "SimpleCountingPE";
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
public void setClearOnOutput(boolean clearOnOutput) {
this.clearOnOutput = clearOnOutput;
View
2 s4-core/src/main/resources/s4-core/conf/default/s4-core-conf.xml
@@ -101,7 +101,7 @@
<property name="maxQueueSize" value="${pe_container_max_queue_size}"/>
<property name="monitor" ref="monitor"/>
<property name="trackByKey" value="true"/>
- <property name="s4Clock" ref="clock"/>
+ <property name="clock" ref="clock"/>
<property name="controlEventProcessor" ref="ctrlHandler"/>
</bean>
View
2 s4-core/src/main/resources/s4-core/conf/dynamic/s4-core-conf.xml
@@ -101,7 +101,7 @@
<property name="maxQueueSize" value="${pe_container_max_queue_size}"/>
<property name="monitor" ref="monitor"/>
<property name="trackByKey" value="true"/>
- <property name="s4Clock" ref="clock"/>
+ <property name="clock" ref="clock"/>
<property name="controlEventProcessor" ref="ctrlHandler"/>
</bean>
View
5 s4-core/src/test/java/io/s4/processor/MockPE.java
@@ -13,11 +13,6 @@ public void testInitialize() {
public void processEvent(Object obj) {
}
-
- @Override
- public String getId() {
- return null;
- }
@Override
public void output() {
View
2 s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala
@@ -23,6 +23,4 @@ class SentenceReceiverPE extends AbstractPE {
def processEvent(sentence: Sentence): Unit= println("Sentence is '" + sentence.text + "', location '" + sentence.location + "'")
def output(): Unit= {}
-
- def getId(): String= return this.getClass().getName()
}
View
6 s4-examples/speech01/src/main/java/io/s4/example/speech01/SentenceReceiverPE.java
@@ -12,10 +12,4 @@ public void processEvent(Sentence sentence) {
public void output() {
// not called in this example
}
-
- @Override
- public String getId() {
- return this.getClass().getName();
- }
-
}
View
1 s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala
@@ -32,7 +32,6 @@ import io.s4.processor.AbstractPE
import io.s4.example.twittertopiccount.event._
class TopNTopicPE extends AbstractPE {
- @BeanProperty var id: String = _
@BeanProperty var persistKey = "myapp:topNTopics"
@BeanProperty var persister: Persister = _
@BeanProperty var entryCount = 10
View
1 s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala
@@ -24,7 +24,6 @@ import io.s4.processor.AbstractPE
import io.s4.example.twittertopiccount.event._
class TopicCountAndReportPE extends AbstractPE {
- @BeanProperty var id: String = _
@BeanProperty var dispatcher: EventDispatcher = _
@BeanProperty var outputStreamName: String = _
@BeanProperty var threshold: Int = _
View
1 s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala
@@ -24,7 +24,6 @@ import io.s4.processor.AbstractPE
import io.s4.example.twittertopiccount.event._
class TopicExtractorPE extends AbstractPE {
- @BeanProperty var id: String = _
@BeanProperty var dispatcher: EventDispatcher = _
@BeanProperty var outputStreamName: String = _

0 comments on commit f335b0f

Please sign in to comment.