Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Remove ProcessingElement interface; PEs should always extend AbstractPE

  • Loading branch information...
commit ea9f23c3ed7806fce3499911abf996c49e2304d1 1 parent 7b445ec
Bruce Robbins authored
View
58 dev-notes.txt
@@ -14,30 +14,30 @@ cd s4
gradlew clean allImage
# Create some environment variables.
-export IMAGE_BASE=`pwd`'/build/s4-image'
-export PYTHONPATH=${IMAGE_BASE}/s4-driver/lib/python
-export PERLLIB=${IMAGE_BASE}/s4-driver/lib/perl
+export S4_IMAGE=`pwd`'/build/s4-image'
+export PYTHONPATH=${S4_IMAGE}/s4-driver/lib/python
+export PERLLIB=${S4_IMAGE}/s4-driver/lib/perl
# Change script permissions.
-chmod u+x $IMAGE_BASE/scripts/*
-chmod u+x $IMAGE_BASE/s4-driver/scripts/*
+chmod u+x $S4_IMAGE/scripts/*
+chmod u+x $S4_IMAGE/s4-driver/scripts/*
# Copy speech02 app to apps deployment directory.
-cp -fr $IMAGE_BASE/s4-example-apps/s4-example-speech02 $IMAGE_BASE/s4-apps
+cp -fr $S4_IMAGE/s4-example-apps/s4-example-speech02 $S4_IMAGE/s4-apps
# Start S4 server in standalone mode.
-$IMAGE_BASE/scripts/start-s4.sh -r client-adapter &
+$S4_IMAGE/scripts/start-s4.sh -r client-adapter &
# Start client adapter.
-$IMAGE_BASE/scripts/run-client-adapter.sh -s client-adapter -g s4 \
--x -d $IMAGE_BASE/s4-core/conf/default/client-stub-conf.xml &
+$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 \
+-x -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml &
# Inject events.
-perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSpeech \
-io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
+perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSpeech \
+io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in
-perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSentence \
-io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
+perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSentence \
+io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in
## Injecting Events with a Java Client
@@ -45,11 +45,11 @@ io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
# Follow same steps as before to start S4 server and client adapter.
# Inject events.
-$IMAGE_BASE/s4-driver/scripts/inject.sh localhost 2334 RawSpeech \
-io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
+$S4_IMAGE/s4-driver/scripts/inject.sh localhost 2334 RawSpeech \
+io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in
-$IMAGE_BASE/s4-driver/scripts/inject.sh localhost 2334 RawSentence \
-io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
+$S4_IMAGE/s4-driver/scripts/inject.sh localhost 2334 RawSentence \
+io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in
## Receiving Events
@@ -58,7 +58,7 @@ io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
# Start a reader client.
-perl $IMAGE_BASE/s4-driver/scripts/read.pl \
+perl $S4_IMAGE/s4-driver/scripts/read.pl \
'{
readMode => "select",
readInclude => ["SentenceJoined"]
@@ -67,28 +67,28 @@ perl $IMAGE_BASE/s4-driver/scripts/read.pl \
# In a different window, inject messages like in the previous section.
# Remember to initialize the environment variables in the new shell.
-export IMAGE_BASE=`pwd`'/build/s4-image'
-export PYTHONPATH=${IMAGE_BASE}/s4-driver/lib/python
-export PERLLIB=${IMAGE_BASE}/s4-driver/lib/perl
+export S4_IMAGE=`pwd`'/build/s4-image'
+export PYTHONPATH=${S4_IMAGE}/s4-driver/lib/python
+export PERLLIB=${S4_IMAGE}/s4-driver/lib/perl
# Inject events.
-perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSpeech \
-io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
+perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSpeech \
+io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in
-perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSentence \
-io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
+perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSentence \
+io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in
## Request-Response
# Example 1: query the prototype of the joiner (SentenceJoinPE)
# in the speech02 application.
-python $IMAGE_BASE/s4-driver/scripts/request.py '#sentenceJoinPE' \
-'io.s4.message.PrototypeRequest' < $IMAGE_BASE/testinput/proto-query
+python $S4_IMAGE/s4-driver/scripts/request.py '#sentenceJoinPE' \
+'io.s4.message.PrototypeRequest' < $S4_IMAGE/testinput/proto-query
# Example 2: request to a single PE from
- python $IMAGE_BASE/s4-driver/scripts/request.py '#sentenceJoinPE' \
-'io.s4.message.SinglePERequest' < $IMAGE_BASE/testinput/pe-query
+ python $S4_IMAGE/s4-driver/scripts/request.py '#sentenceJoinPE' \
+'io.s4.message.SinglePERequest' < $S4_IMAGE/testinput/pe-query
-------
View
10 s4-core/src/main/java/io/s4/MainApp.java
@@ -15,8 +15,8 @@
*/
package io.s4;
+import io.s4.processor.AbstractPE;
import io.s4.processor.PEContainer;
-import io.s4.processor.ProcessingElement;
import io.s4.util.S4Util;
import io.s4.util.Watcher;
import io.s4.util.clock.Clock;
@@ -236,9 +236,9 @@ public static void main(String args[]) throws Exception {
context);
// attach any beans that implement ProcessingElement to the PE
// Container
- String[] processingElementBeanNames = context.getBeanNamesForType(ProcessingElement.class);
+ String[] processingElementBeanNames = context.getBeanNamesForType(AbstractPE.class);
for (String processingElementBeanName : processingElementBeanNames) {
- ProcessingElement bean = (ProcessingElement) context.getBean(processingElementBeanName);
+ AbstractPE bean = (AbstractPE) context.getBean(processingElementBeanName);
bean.setClock(clock);
// if the application did not specify an id, use the Spring bean name
@@ -247,8 +247,8 @@ public static void main(String args[]) throws Exception {
}
System.out.println("Adding processing element with bean name "
+ processingElementBeanName + ", id "
- + ((ProcessingElement) bean).getId());
- peContainer.addProcessor((ProcessingElement) bean);
+ + ((AbstractPE) bean).getId());
+ peContainer.addProcessor((AbstractPE) bean);
}
}
}
View
4 s4-core/src/main/java/io/s4/message/SinglePERequest.java
@@ -18,7 +18,7 @@
import io.s4.dispatcher.partitioner.CompoundKeyInfo;
import io.s4.dispatcher.partitioner.Hasher;
import io.s4.dispatcher.partitioner.KeyInfo;
-import io.s4.processor.ProcessingElement;
+import io.s4.processor.AbstractPE;
import io.s4.util.MethodInvoker;
import java.util.ArrayList;
@@ -83,7 +83,7 @@ public String toString() {
* @param pe
* @return Response object.
*/
- public Response evaluate(ProcessingElement pe) {
+ public Response evaluate(AbstractPE pe) {
HashMap<String, Object> results = new HashMap<String, Object>();
HashMap<String, String> exceptions = new HashMap<String, String>();
View
19 s4-core/src/main/java/io/s4/processor/AbstractPE.java
@@ -44,7 +44,7 @@
* {@link AbstractPE#setOutputFrequencyByEventCount} and
* {@link AbstractPE#setOutputFrequencyByTimeBoundary}.
*/
-public abstract class AbstractPE implements ProcessingElement {
+public abstract class AbstractPE implements Cloneable {
public static enum FrequencyType {
TIMEBOUNDARY("timeboundary"), EVENTCOUNT("eventcount");
@@ -75,7 +75,6 @@ public String getName() {
private int outputsBeforePause = -1;
private long pauseTimeInMillis;
private boolean logPauses = false;
- private String initMethod = null;
private String id;
public void setSaveKeyRecord(boolean saveKeyRecord) {
@@ -110,18 +109,14 @@ public void setClock(Clock clock) {
}
/**
- * The name of a method to be used as an initializer. The method will be
- * called after the object is cloned from the prototype PE.
+ * This method will be called after the object is cloned from the
+ * prototype PE. The concrete PE class should override this if
+ * it has any special set-up requirements.
*/
- public void setInitMethod(String initMethod)
- {
- this.initMethod = initMethod;
+ public void initInstance() {
+ // default implementation does nothing.
}
-
- public String getInitMethod() {
- return this.initMethod;
- }
-
+
public Clock getClock() {
return clock;
}
View
2  s4-core/src/main/java/io/s4/processor/ControlEventProcessor.java
@@ -60,7 +60,7 @@ protected void execute(EventWrapper e, PrototypeWrapper p) {
String keyVal = keyInfo.getCompoundValue();
- ProcessingElement pe = p.lookupPE(keyVal);
+ AbstractPE pe = p.lookupPE(keyVal);
Response response = ((SinglePERequest) event).evaluate(pe);
String stream = response.getRInfo().getStream();
View
6 s4-core/src/main/java/io/s4/processor/PEContainer.java
@@ -74,14 +74,14 @@ public void setTrackByKey(boolean trackByKey) {
this.trackByKey = trackByKey;
}
- public void addProcessor(ProcessingElement processor) {
+ public void addProcessor(AbstractPE processor) {
System.out.println("adding pe: " + processor);
PrototypeWrapper pw = new PrototypeWrapper(processor, clock);
prototypeWrappers.add(pw);
adviceLists.add(pw.advise());
}
- public void setProcessors(ProcessingElement[] processors) {
+ public void setProcessors(AbstractPE[] processors) {
// prototypeWrappers = new ArrayList<PrototypeWrapper>();
for (int i = 0; i < processors.length; i++) {
@@ -280,7 +280,7 @@ public void run() {
}
}
- private void invokePE(ProcessingElement pe, EventWrapper eventWrapper,
+ private void invokePE(AbstractPE pe, EventWrapper eventWrapper,
CompoundKeyInfo compoundKeyInfo) {
try {
long startTime = System.currentTimeMillis();
View
41 s4-core/src/main/java/io/s4/processor/ProcessingElement.java
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.processor;
-
-import io.s4.dispatcher.partitioner.CompoundKeyInfo;
-import io.s4.util.clock.Clock;
-
-import java.util.List;
-
-public interface ProcessingElement extends Cloneable {
- void execute(String streamName, CompoundKeyInfo compoundKeyInfo, Object event);
-
- List<EventAdvice> advise();
-
- public int getTtl();
-
- public Object clone();
-
- public String getId();
-
- public void setId(String id);
-
- public Clock getClock();
-
- public void setClock(Clock clock);
-
- public String getInitMethod();
-}
View
25 s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java
@@ -27,14 +27,14 @@
public class PrototypeWrapper {
private static Logger logger = Logger.getLogger(PrototypeWrapper.class);
- private ProcessingElement prototype;
+ private AbstractPE prototype;
Persister lookupTable;
public String getId() {
return prototype.getId();
}
- public PrototypeWrapper(ProcessingElement prototype, Clock s4Clock) {
+ public PrototypeWrapper(AbstractPE prototype, Clock s4Clock) {
this.prototype = prototype;
lookupTable = new ConMapPersister(s4Clock);
System.out.println("Using ConMapPersister ..");
@@ -66,18 +66,13 @@ public PrototypeWrapper(ProcessingElement prototype, Clock s4Clock) {
* key value
* @return PE corresponding to keyValue.
*/
- public ProcessingElement getPE(String keyValue) {
- ProcessingElement pe = null;
+ public AbstractPE getPE(String keyValue) {
+ AbstractPE pe = null;
try {
- pe = (ProcessingElement) lookupTable.get(keyValue);
+ pe = (AbstractPE) lookupTable.get(keyValue);
if (pe == null) {
- pe = (ProcessingElement) prototype.clone();
- //invoke the initialization method if it has been specified
- if (pe.getInitMethod() != null) {
- Method initMethod = pe.getClass().getMethod(pe.getInitMethod(), new Class[0]);
- initMethod.invoke(pe, (new Object[0]));
- }
-
+ pe = (AbstractPE) prototype.clone();
+ pe.initInstance();
}
// update the last update time on the entry
lookupTable.set(keyValue, pe, prototype.getTtl());
@@ -98,11 +93,11 @@ public ProcessingElement getPE(String keyValue) {
* @return PE corresponding to keyValue, if such a PE exists. Null
* otherwise.
*/
- public ProcessingElement lookupPE(String keyValue) {
- ProcessingElement pe = null;
+ public AbstractPE lookupPE(String keyValue) {
+ AbstractPE pe = null;
try {
- pe = (ProcessingElement) lookupTable.get(keyValue);
+ pe = (AbstractPE) lookupTable.get(keyValue);
} catch (Exception e) {
logger.error("exception when looking up pe for key:" + keyValue, e);
View
2  s4-core/src/test/java/io/s4/processor/MockPE.java
@@ -7,7 +7,7 @@
private int initializeCount = 0;
- public void testInitialize() {
+ public void initInstance() {
initializeCount++;
}
View
1  s4-core/src/test/java/io/s4/processor/TestPrototypeWrapper.java
@@ -16,7 +16,6 @@
@Test
public void testCloneAndInitialize() {
MockPE prototype = new MockPE();
- prototype.setInitMethod("testInitialize");
PrototypeWrapper prototypeWrapper = new PrototypeWrapper(prototype, new WallClock());
Please sign in to comment.
Something went wrong with that request. Please try again.