Skip to content

Commit

Permalink
Remove ProcessingElement interface; PEs should always extend AbstractPE
Browse files Browse the repository at this point in the history
  • Loading branch information
Bruce Robbins committed Jun 7, 2011
1 parent 7b445ec commit ea9f23c
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 110 deletions.
58 changes: 29 additions & 29 deletions dev-notes.txt
Expand Up @@ -14,42 +14,42 @@ cd s4
gradlew clean allImage

# Create some environment variables.
export IMAGE_BASE=`pwd`'/build/s4-image'
export PYTHONPATH=${IMAGE_BASE}/s4-driver/lib/python
export PERLLIB=${IMAGE_BASE}/s4-driver/lib/perl
export S4_IMAGE=`pwd`'/build/s4-image'
export PYTHONPATH=${S4_IMAGE}/s4-driver/lib/python
export PERLLIB=${S4_IMAGE}/s4-driver/lib/perl

# Change script permissions.
chmod u+x $IMAGE_BASE/scripts/*
chmod u+x $IMAGE_BASE/s4-driver/scripts/*
chmod u+x $S4_IMAGE/scripts/*
chmod u+x $S4_IMAGE/s4-driver/scripts/*

# Copy speech02 app to apps deployment directory.
cp -fr $IMAGE_BASE/s4-example-apps/s4-example-speech02 $IMAGE_BASE/s4-apps
cp -fr $S4_IMAGE/s4-example-apps/s4-example-speech02 $S4_IMAGE/s4-apps

# Start S4 server in standalone mode.
$IMAGE_BASE/scripts/start-s4.sh -r client-adapter &
$S4_IMAGE/scripts/start-s4.sh -r client-adapter &

# Start client adapter.
$IMAGE_BASE/scripts/run-client-adapter.sh -s client-adapter -g s4 \
-x -d $IMAGE_BASE/s4-core/conf/default/client-stub-conf.xml &
$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 \
-x -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml &

# Inject events.
perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSpeech \
io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSpeech \
io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in

perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSentence \
io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSentence \
io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in


## Injecting Events with a Java Client

# Follow same steps as before to start S4 server and client adapter.

# Inject events.
$IMAGE_BASE/s4-driver/scripts/inject.sh localhost 2334 RawSpeech \
io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
$S4_IMAGE/s4-driver/scripts/inject.sh localhost 2334 RawSpeech \
io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in

$IMAGE_BASE/s4-driver/scripts/inject.sh localhost 2334 RawSentence \
io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
$S4_IMAGE/s4-driver/scripts/inject.sh localhost 2334 RawSentence \
io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in


## Receiving Events
Expand All @@ -58,7 +58,7 @@ io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in

# Start a reader client.

perl $IMAGE_BASE/s4-driver/scripts/read.pl \
perl $S4_IMAGE/s4-driver/scripts/read.pl \
'{
readMode => "select",
readInclude => ["SentenceJoined"]
Expand All @@ -67,28 +67,28 @@ perl $IMAGE_BASE/s4-driver/scripts/read.pl \
# In a different window, inject messages like in the previous section.

# Remember to initialize the environment variables in the new shell.
export IMAGE_BASE=`pwd`'/build/s4-image'
export PYTHONPATH=${IMAGE_BASE}/s4-driver/lib/python
export PERLLIB=${IMAGE_BASE}/s4-driver/lib/perl
export S4_IMAGE=`pwd`'/build/s4-image'
export PYTHONPATH=${S4_IMAGE}/s4-driver/lib/python
export PERLLIB=${S4_IMAGE}/s4-driver/lib/perl

# Inject events.
perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSpeech \
io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSpeech \
io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in

perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSentence \
io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSentence \
io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in


## Request-Response

# Example 1: query the prototype of the joiner (SentenceJoinPE)
# in the speech02 application.
python $IMAGE_BASE/s4-driver/scripts/request.py '#sentenceJoinPE' \
'io.s4.message.PrototypeRequest' < $IMAGE_BASE/testinput/proto-query
python $S4_IMAGE/s4-driver/scripts/request.py '#sentenceJoinPE' \
'io.s4.message.PrototypeRequest' < $S4_IMAGE/testinput/proto-query

# Example 2: request to a single PE from
python $IMAGE_BASE/s4-driver/scripts/request.py '#sentenceJoinPE' \
'io.s4.message.SinglePERequest' < $IMAGE_BASE/testinput/pe-query
python $S4_IMAGE/s4-driver/scripts/request.py '#sentenceJoinPE' \
'io.s4.message.SinglePERequest' < $S4_IMAGE/testinput/pe-query

-------

Expand Down
10 changes: 5 additions & 5 deletions s4-core/src/main/java/io/s4/MainApp.java
Expand Up @@ -15,8 +15,8 @@
*/
package io.s4;

import io.s4.processor.AbstractPE;
import io.s4.processor.PEContainer;
import io.s4.processor.ProcessingElement;
import io.s4.util.S4Util;
import io.s4.util.Watcher;
import io.s4.util.clock.Clock;
Expand Down Expand Up @@ -236,9 +236,9 @@ public static void main(String args[]) throws Exception {
context);
// attach any beans that implement ProcessingElement to the PE
// Container
String[] processingElementBeanNames = context.getBeanNamesForType(ProcessingElement.class);
String[] processingElementBeanNames = context.getBeanNamesForType(AbstractPE.class);
for (String processingElementBeanName : processingElementBeanNames) {
ProcessingElement bean = (ProcessingElement) context.getBean(processingElementBeanName);
AbstractPE bean = (AbstractPE) context.getBean(processingElementBeanName);
bean.setClock(clock);

// if the application did not specify an id, use the Spring bean name
Expand All @@ -247,8 +247,8 @@ public static void main(String args[]) throws Exception {
}
System.out.println("Adding processing element with bean name "
+ processingElementBeanName + ", id "
+ ((ProcessingElement) bean).getId());
peContainer.addProcessor((ProcessingElement) bean);
+ ((AbstractPE) bean).getId());
peContainer.addProcessor((AbstractPE) bean);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions s4-core/src/main/java/io/s4/message/SinglePERequest.java
Expand Up @@ -18,7 +18,7 @@
import io.s4.dispatcher.partitioner.CompoundKeyInfo;
import io.s4.dispatcher.partitioner.Hasher;
import io.s4.dispatcher.partitioner.KeyInfo;
import io.s4.processor.ProcessingElement;
import io.s4.processor.AbstractPE;
import io.s4.util.MethodInvoker;

import java.util.ArrayList;
Expand Down Expand Up @@ -83,7 +83,7 @@ public List<String> getQuery() {
* @param pe
* @return Response object.
*/
public Response evaluate(ProcessingElement pe) {
public Response evaluate(AbstractPE pe) {

HashMap<String, Object> results = new HashMap<String, Object>();
HashMap<String, String> exceptions = new HashMap<String, String>();
Expand Down
19 changes: 7 additions & 12 deletions s4-core/src/main/java/io/s4/processor/AbstractPE.java
Expand Up @@ -44,7 +44,7 @@
* {@link AbstractPE#setOutputFrequencyByEventCount} and
* {@link AbstractPE#setOutputFrequencyByTimeBoundary}.
*/
public abstract class AbstractPE implements ProcessingElement {
public abstract class AbstractPE implements Cloneable {
public static enum FrequencyType {
TIMEBOUNDARY("timeboundary"), EVENTCOUNT("eventcount");

Expand Down Expand Up @@ -75,7 +75,6 @@ public String getName() {
private int outputsBeforePause = -1;
private long pauseTimeInMillis;
private boolean logPauses = false;
private String initMethod = null;
private String id;

public void setSaveKeyRecord(boolean saveKeyRecord) {
Expand Down Expand Up @@ -110,18 +109,14 @@ public void setClock(Clock clock) {
}

/**
* The name of a method to be used as an initializer. The method will be
* called after the object is cloned from the prototype PE.
* This method will be called after the object is cloned from the
* prototype PE. The concrete PE class should override this if
* it has any special set-up requirements.
*/
public void setInitMethod(String initMethod)
{
this.initMethod = initMethod;
public void initInstance() {
// default implementation does nothing.
}

public String getInitMethod() {
return this.initMethod;
}


public Clock getClock() {
return clock;
}
Expand Down
Expand Up @@ -60,7 +60,7 @@ protected void execute(EventWrapper e, PrototypeWrapper p) {

String keyVal = keyInfo.getCompoundValue();

ProcessingElement pe = p.lookupPE(keyVal);
AbstractPE pe = p.lookupPE(keyVal);

Response response = ((SinglePERequest) event).evaluate(pe);
String stream = response.getRInfo().getStream();
Expand Down
6 changes: 3 additions & 3 deletions s4-core/src/main/java/io/s4/processor/PEContainer.java
Expand Up @@ -74,14 +74,14 @@ public void setTrackByKey(boolean trackByKey) {
this.trackByKey = trackByKey;
}

public void addProcessor(ProcessingElement processor) {
public void addProcessor(AbstractPE processor) {
System.out.println("adding pe: " + processor);
PrototypeWrapper pw = new PrototypeWrapper(processor, clock);
prototypeWrappers.add(pw);
adviceLists.add(pw.advise());
}

public void setProcessors(ProcessingElement[] processors) {
public void setProcessors(AbstractPE[] processors) {
// prototypeWrappers = new ArrayList<PrototypeWrapper>();

for (int i = 0; i < processors.length; i++) {
Expand Down Expand Up @@ -280,7 +280,7 @@ public void run() {
}
}

private void invokePE(ProcessingElement pe, EventWrapper eventWrapper,
private void invokePE(AbstractPE pe, EventWrapper eventWrapper,
CompoundKeyInfo compoundKeyInfo) {
try {
long startTime = System.currentTimeMillis();
Expand Down
41 changes: 0 additions & 41 deletions s4-core/src/main/java/io/s4/processor/ProcessingElement.java

This file was deleted.

25 changes: 10 additions & 15 deletions s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java
Expand Up @@ -27,14 +27,14 @@
public class PrototypeWrapper {

private static Logger logger = Logger.getLogger(PrototypeWrapper.class);
private ProcessingElement prototype;
private AbstractPE prototype;
Persister lookupTable;

public String getId() {
return prototype.getId();
}

public PrototypeWrapper(ProcessingElement prototype, Clock s4Clock) {
public PrototypeWrapper(AbstractPE prototype, Clock s4Clock) {
this.prototype = prototype;
lookupTable = new ConMapPersister(s4Clock);
System.out.println("Using ConMapPersister ..");
Expand Down Expand Up @@ -66,18 +66,13 @@ public PrototypeWrapper(ProcessingElement prototype, Clock s4Clock) {
* key value
* @return PE corresponding to keyValue.
*/
public ProcessingElement getPE(String keyValue) {
ProcessingElement pe = null;
public AbstractPE getPE(String keyValue) {
AbstractPE pe = null;
try {
pe = (ProcessingElement) lookupTable.get(keyValue);
pe = (AbstractPE) lookupTable.get(keyValue);
if (pe == null) {
pe = (ProcessingElement) prototype.clone();
//invoke the initialization method if it has been specified
if (pe.getInitMethod() != null) {
Method initMethod = pe.getClass().getMethod(pe.getInitMethod(), new Class[0]);
initMethod.invoke(pe, (new Object[0]));
}

pe = (AbstractPE) prototype.clone();
pe.initInstance();
}
// update the last update time on the entry
lookupTable.set(keyValue, pe, prototype.getTtl());
Expand All @@ -98,11 +93,11 @@ public ProcessingElement getPE(String keyValue) {
* @return PE corresponding to keyValue, if such a PE exists. Null
* otherwise.
*/
public ProcessingElement lookupPE(String keyValue) {
ProcessingElement pe = null;
public AbstractPE lookupPE(String keyValue) {
AbstractPE pe = null;

try {
pe = (ProcessingElement) lookupTable.get(keyValue);
pe = (AbstractPE) lookupTable.get(keyValue);

} catch (Exception e) {
logger.error("exception when looking up pe for key:" + keyValue, e);
Expand Down
2 changes: 1 addition & 1 deletion s4-core/src/test/java/io/s4/processor/MockPE.java
Expand Up @@ -7,7 +7,7 @@ public class MockPE extends AbstractPE {

private int initializeCount = 0;

public void testInitialize() {
public void initInstance() {
initializeCount++;
}

Expand Down
Expand Up @@ -16,7 +16,6 @@ public class TestPrototypeWrapper
@Test
public void testCloneAndInitialize() {
MockPE prototype = new MockPE();
prototype.setInitMethod("testInitialize");

PrototypeWrapper prototypeWrapper = new PrototypeWrapper(prototype, new WallClock());

Expand Down

0 comments on commit ea9f23c

Please sign in to comment.