diff --git a/.gitignore b/.gitignore index db817aa..2c010a5 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ # Build directory target/ build/ +bin/ # SVN .svn @@ -21,3 +22,10 @@ build/ # class files *.class +.DS_Store + +# create an empty .gitignore file when you want to track an empty +# directory but want to ignore its content. Example: to keep the +# "logs" dir do: "touch logs/.gitignore" +# +!.gitignore diff --git a/README.md b/README.md index d77924b..d589e91 100644 --- a/README.md +++ b/README.md @@ -65,23 +65,43 @@ git clone https://github.com/s4/s4.git # Create image gradlew allImage -# Change permissions -chmod u+x ./build/s4-image/scripts/* +# set the S4_IMAGE environmental variable +cd build/s4-image/ +export S4_IMAGE=`pwd` -# Copy S4 application to deployment dir (s4-apps) -cp -rp build/s4-image/s4-example-apps/s4-example-twittertopiccount build/s4-image/s4-apps/ +# get the sample application +git clone git://github.com/s4/twittertopiccount.git -# Enter your twitter user/pass in config file -$EDITOR build/s4-image/s4-apps/s4-example-twittertopiccount/adapter-conf.xml +# build the sample application +./gradlew install -# Start server with s4-example-twittertopiccount app -./build/s4-image/scripts/s4-start.sh & +# deploy the sample application into the S4 image (relies in the S4_IMAGE environmental variable) +./gradlew deploy -# Start adapter - ./build/s4-image/scripts/run-adapter.sh -x -u build/s4-image/s4-apps/s4-example-twittertopiccount/lib/s4-example-twittertopiccount-0.3-SNAPSHOT.jar -d build/s4-image/s4-apps/s4-example-twittertopiccount/adapter-conf.xml & +# set the TWIT_LISTENER environmental variable +cd build/install/twitter_feed_listener +export TWIT_LISTENER=`pwd` + +# Start server with twittertopiccount app +$S4_IMAGE/scripts/start-s4.sh -r client-adapter & + +# start the client adapter +$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml & + +# run a client to send events into the S4 cluster. Replace and with your Twitter userid and password. +$TWIT_LISTENER/bin/twitter_feed_listener & # Check output cat /tmp/top_n_hashtags +Developing with Eclipse +----------------------- + +The command `gradle eclipse` will create an eclipse project that you can import from the Eclipse IDE. + +There is now a [Gradle plugin for the Eclipse IDE](http://static.springsource.org/sts/docs/2.7.0.M1/reference/html/gradle/index.html). +To install Gradle without installing the full Spring development environment follow the +[instructions](http://static.springsource.org/sts/docs/2.7.0.M1/reference/html/gradle/installation.html) under the heading +"Installing from update site". There is also a discussion in the [Gradle mailing list](http://gradle.1045684.n5.nabble.com/ANN-Gradle-Eclipse-Plugin-td4387658.html). diff --git a/build.gradle b/build.gradle index 60a2831..d536d59 100644 --- a/build.gradle +++ b/build.gradle @@ -69,7 +69,6 @@ libraries = [ json: 'org.json:json:20090211', lift_json: 'net.liftweb:lift-json_2.8.1:2.2', gson: 'com.google.code.gson:gson:1.6', -junit: 'junit:junit:4.4', zk: 'org.apache.zookeeper:zookeeper:3.3.1', log4j: 'log4j:log4j:1.2.15', flexjson: 'net.sf.flexjson:flexjson:2.1', @@ -85,6 +84,7 @@ commons_jexl: 'commons-jexl:commons-jexl:1.1', commons_codec: 'commons-codec:commons-codec:1.4', commons_httpclient: 'commons-httpclient:commons-httpclient:3.1', spring: 'org.springframework:spring:2.5.6', +junit: 'junit:junit:4.4', scala_compiler: 'org.scala-lang:scala-compiler:2.8.1', scala_library: 'org.scala-lang:scala-library:2.8.1', jedis: 'redis.clients:jedis:1.5.2', @@ -104,6 +104,11 @@ subprojects { //defaultTasks 'build' group = 'io.s4' + + /* Remove this once this bug is fixed: http://issues.gradle.org/browse/GRADLE-1157 */ + eclipseClasspath { + downloadSources = false; // required for eclipseClasspath to work + } /* Common dependencies applied to all subprojects. */ dependencies { @@ -220,6 +225,7 @@ task allImage(type: Copy, dependsOn: s4Javadoc) { with allDistImage } + task binTgz( type: Tar) { description = "Build binary bundle in GZIP format" classifier = 'bin' @@ -239,9 +245,9 @@ task allTgz( type: Tar, dependsOn: s4Javadoc) { } /* Generates the gradlew scripts. -http://www.gradle.org/1.0-milestone-1/docs/userguide/gradle_wrapper.html */ +http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */ task wrapper(type: Wrapper) { - gradleVersion = '1.0-milestone-1' + gradleVersion = '1.0-milestone-3' } class Version { diff --git a/dev-notes.txt b/dev-notes.txt index faeadf1..579416a 100644 --- a/dev-notes.txt +++ b/dev-notes.txt @@ -14,30 +14,30 @@ cd s4 gradlew clean allImage # Create some environment variables. -export IMAGE_BASE=`pwd`'/build/s4-image' -export PYTHONPATH=${IMAGE_BASE}/s4-driver/lib/python -export PERLLIB=${IMAGE_BASE}/s4-driver/lib/perl +export S4_IMAGE=`pwd`'/build/s4-image' +export PYTHONPATH=${S4_IMAGE}/s4-driver/lib/python +export PERLLIB=${S4_IMAGE}/s4-driver/lib/perl # Change script permissions. -chmod u+x $IMAGE_BASE/scripts/* -chmod u+x $IMAGE_BASE/s4-driver/scripts/* +chmod u+x $S4_IMAGE/scripts/* +chmod u+x $S4_IMAGE/s4-driver/scripts/* # Copy speech02 app to apps deployment directory. -cp -fr $IMAGE_BASE/s4-example-apps/s4-example-speech02 $IMAGE_BASE/s4-apps +cp -fr $S4_IMAGE/s4-example-apps/s4-example-speech02 $S4_IMAGE/s4-apps # Start S4 server in standalone mode. -$IMAGE_BASE/scripts/s4-start.sh -r client-adapter & +$S4_IMAGE/scripts/start-s4.sh -r client-adapter & # Start client adapter. -$IMAGE_BASE/scripts/run-client-adapter.sh -s client-adapter -g s4 \ --x -d $IMAGE_BASE/s4-core/conf/default/client-stub-conf.xml & +$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 \ +-x -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml & # Inject events. -perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSpeech \ -io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in +perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSpeech \ +io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in -perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSentence \ -io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in +perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSentence \ +io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in ## Injecting Events with a Java Client @@ -45,11 +45,11 @@ io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in # Follow same steps as before to start S4 server and client adapter. # Inject events. -$IMAGE_BASE/s4-driver/scripts/inject.sh localhost 2334 RawSpeech \ -io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in +$S4_IMAGE/s4-driver/scripts/inject.sh localhost 2334 RawSpeech \ +io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in -$IMAGE_BASE/s4-driver/scripts/inject.sh localhost 2334 RawSentence \ -io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in +$S4_IMAGE/s4-driver/scripts/inject.sh localhost 2334 RawSentence \ +io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in ## Receiving Events @@ -58,7 +58,7 @@ io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in # Start a reader client. -perl $IMAGE_BASE/s4-driver/scripts/read.pl \ +perl $S4_IMAGE/s4-driver/scripts/read.pl \ '{ readMode => "select", readInclude => ["SentenceJoined"] @@ -67,28 +67,28 @@ perl $IMAGE_BASE/s4-driver/scripts/read.pl \ # In a different window, inject messages like in the previous section. # Remember to initialize the environment variables in the new shell. -export IMAGE_BASE=`pwd`'/build/s4-image' -export PYTHONPATH=${IMAGE_BASE}/s4-driver/lib/python -export PERLLIB=${IMAGE_BASE}/s4-driver/lib/perl +export S4_IMAGE=`pwd`'/build/s4-image' +export PYTHONPATH=${S4_IMAGE}/s4-driver/lib/python +export PERLLIB=${S4_IMAGE}/s4-driver/lib/perl # Inject events. -perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSpeech \ -io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in +perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSpeech \ +io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in -perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSentence \ -io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in +perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSentence \ +io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in ## Request-Response # Example 1: query the prototype of the joiner (SentenceJoinPE) # in the speech02 application. -python $IMAGE_BASE/s4-driver/scripts/request.py '#sentenceJoinPE' \ -'io.s4.message.PrototypeRequest' < $IMAGE_BASE/testinput/proto-query +python $S4_IMAGE/s4-driver/scripts/request.py '#sentenceJoinPE' \ +'io.s4.message.PrototypeRequest' < $S4_IMAGE/testinput/proto-query # Example 2: request to a single PE from - python $IMAGE_BASE/s4-driver/scripts/request.py '#sentenceJoinPE' \ -'io.s4.message.SinglePERequest' < $IMAGE_BASE/testinput/pe-query + python $S4_IMAGE/s4-driver/scripts/request.py '#sentenceJoinPE' \ +'io.s4.message.SinglePERequest' < $S4_IMAGE/testinput/pe-query ------- diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 73dbae7..45bfb5c 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 597c49b..976c1a1 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Mon Mar 07 11:53:07 PST 2011 +#Fri May 27 15:18:16 PDT 2011 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=http\://gradle.artifactoryonline.com/gradle/distributions/gradle-1.0-milestone-1-bin.zip +distributionUrl=http\://repo.gradle.org/gradle/distributions/gradle-1.0-milestone-3-bin.zip diff --git a/s4-core/README.md b/s4-core/README.md deleted file mode 100644 index 19e423a..0000000 --- a/s4-core/README.md +++ /dev/null @@ -1,31 +0,0 @@ -S4 Core Classes -=============== - -Introduction ------------- -This is a component of the S4 streaming system. For more information, see [s4.io](http://s4.io) - -Requirements ------------- - -* Linux -* Java 1.6 -* Maven -* S4 Communication Layer - -Build Instructions ------------------- - -1. First build and install the comm package in your Maven repository. - -2. Kryo, Reflectasm, and minlog must be installed to your local Maven repository manually. - The jars are present in lib/ within this project. To install, run the following commands: - - mvn install:install-file -DgroupId=com.esotericsoftware -DartifactId=kryo -Dversion=1.01 -Dpackaging=jar -Dfile=lib/kryo-1.01.jar - mvn install:install-file -DgroupId=com.esotericsoftware -DartifactId=reflectasm -Dversion=0.8 -Dpackaging=jar -Dfile=lib/reflectasm-0.8.jar - mvn install:install-file -DgroupId=com.esotericsoftware -DartifactId=minlog -Dversion=1.2 -Dpackaging=jar -Dfile=lib/minlog-1.2.jar - -3. Build and install using Maven - - mvn assembly:assembly install - diff --git a/s4-core/src/main/java/io/s4/MainApp.java b/s4-core/src/main/java/io/s4/MainApp.java index d8e06b7..c7648ff 100644 --- a/s4-core/src/main/java/io/s4/MainApp.java +++ b/s4-core/src/main/java/io/s4/MainApp.java @@ -15,8 +15,8 @@ */ package io.s4; +import io.s4.processor.AbstractPE; import io.s4.processor.PEContainer; -import io.s4.processor.ProcessingElement; import io.s4.util.S4Util; import io.s4.util.Watcher; import io.s4.util.clock.Clock; @@ -201,9 +201,9 @@ public static void main(String args[]) throws Exception { coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls, coreContext); ApplicationContext context = coreContext; - Clock s4Clock = (Clock) context.getBean("clock"); - if (s4Clock instanceof EventClock && seedTime > 0) { - EventClock s4EventClock = (EventClock)s4Clock; + Clock clock = (Clock) context.getBean("clock"); + if (clock instanceof EventClock && seedTime > 0) { + EventClock s4EventClock = (EventClock)clock; s4EventClock.updateTime(seedTime); System.out.println("Intializing event clock time with seed time " + s4EventClock.getCurrentTime()); } @@ -236,26 +236,19 @@ public static void main(String args[]) throws Exception { context); // attach any beans that implement ProcessingElement to the PE // Container - String[] processingElementBeanNames = context.getBeanNamesForType(ProcessingElement.class); + String[] processingElementBeanNames = context.getBeanNamesForType(AbstractPE.class); for (String processingElementBeanName : processingElementBeanNames) { - Object bean = context.getBean(processingElementBeanName); - try { - Method getS4ClockMethod = bean.getClass().getMethod("getS4Clock"); - - if (getS4ClockMethod.getReturnType().equals(Clock.class)) { - if (getS4ClockMethod.invoke(bean) == null) { - Method setS4ClockMethod = bean.getClass().getMethod("setS4Clock", Clock.class); - setS4ClockMethod.invoke(bean, coreContext.getBean("clock")); - } - } - } - catch (NoSuchMethodException mnfe) { - // acceptable + AbstractPE bean = (AbstractPE) context.getBean(processingElementBeanName); + bean.setClock(clock); + + // if the application did not specify an id, use the Spring bean name + if (bean.getId() == null) { + bean.setId(processingElementBeanName); } System.out.println("Adding processing element with bean name " + processingElementBeanName + ", id " - + ((ProcessingElement) bean).getId()); - peContainer.addProcessor((ProcessingElement) bean); + + ((AbstractPE) bean).getId()); + peContainer.addProcessor((AbstractPE) bean); } } } diff --git a/s4-core/src/main/java/io/s4/dispatcher/partitioner/RoundRobinPartitioner.java b/s4-core/src/main/java/io/s4/dispatcher/partitioner/RoundRobinPartitioner.java index d60f829..162cf46 100644 --- a/s4-core/src/main/java/io/s4/dispatcher/partitioner/RoundRobinPartitioner.java +++ b/s4-core/src/main/java/io/s4/dispatcher/partitioner/RoundRobinPartitioner.java @@ -16,13 +16,29 @@ package io.s4.dispatcher.partitioner; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class RoundRobinPartitioner implements Partitioner { private int counter = 0; + private Set streamNameSet; + + public void setStreamNames(String[] streamNames) { + streamNameSet = new HashSet(streamNames.length); + for (String eventType : streamNames) { + streamNameSet.add(eventType); + } + } @Override - public List partition(String streamName, Object event, int partitionCount) { + public List partition(String streamName, Object event, + int partitionCount) { + + if (streamName != null && streamNameSet != null + && !streamNameSet.contains(streamName)) { + return null; + } CompoundKeyInfo partitionInfo = new CompoundKeyInfo(); int partitionId = 0; diff --git a/s4-core/src/main/java/io/s4/message/SinglePERequest.java b/s4-core/src/main/java/io/s4/message/SinglePERequest.java index c3d1dd8..d3f7fed 100644 --- a/s4-core/src/main/java/io/s4/message/SinglePERequest.java +++ b/s4-core/src/main/java/io/s4/message/SinglePERequest.java @@ -18,7 +18,7 @@ import io.s4.dispatcher.partitioner.CompoundKeyInfo; import io.s4.dispatcher.partitioner.Hasher; import io.s4.dispatcher.partitioner.KeyInfo; -import io.s4.processor.ProcessingElement; +import io.s4.processor.AbstractPE; import io.s4.util.MethodInvoker; import java.util.ArrayList; @@ -83,7 +83,7 @@ public List getQuery() { * @param pe * @return Response object. */ - public Response evaluate(ProcessingElement pe) { + public Response evaluate(AbstractPE pe) { HashMap results = new HashMap(); HashMap exceptions = new HashMap(); diff --git a/s4-core/src/main/java/io/s4/processor/AbstractPE.java b/s4-core/src/main/java/io/s4/processor/AbstractPE.java index 4e2b883..97aa2ab 100644 --- a/s4-core/src/main/java/io/s4/processor/AbstractPE.java +++ b/s4-core/src/main/java/io/s4/processor/AbstractPE.java @@ -57,7 +57,7 @@ * {@link AbstractPE#setOutputFrequencyByEventCount} and * {@link AbstractPE#setOutputFrequencyByTimeBoundary}. */ -public abstract class AbstractPE implements ProcessingElement { +public abstract class AbstractPE implements Cloneable { public static enum FrequencyType { TIMEBOUNDARY("timeboundary"), EVENTCOUNT("eventcount"); @@ -72,6 +72,7 @@ public String getName() { } } +<<<<<<< HEAD public static enum PeriodicInvokerType { OUTPUT, CHECKPOINTING; @@ -105,6 +106,25 @@ public String getName() { transient private boolean logPauses = false; transient private String initMethod = null; transient protected SchemaContainer schemaContainer = new SchemaContainer(); +======= + private Clock clock; + private int outputFrequency = 1; + private FrequencyType outputFrequencyType = FrequencyType.EVENTCOUNT; + private int outputFrequencyOffset = 0; + private int eventCount = 0; + private int ttl = -1; + private Persister lookupTable; + private List eventAdviceList = new ArrayList(); + private List keyValue; + private List keyRecord; + private String keyValueString; + private String streamName; + private boolean saveKeyRecord = false; + private int outputsBeforePause = -1; + private long pauseTimeInMillis; + private boolean logPauses = false; + private String id; +>>>>>>> upstream/master transient private boolean recoveryAttempted = false; // true if state may have changed @@ -149,14 +169,30 @@ public void setLogPauses(boolean logPauses) { this.logPauses = logPauses; } +<<<<<<< HEAD public void setS4Clock(Clock s4Clock) { this.s4Clock = s4Clock; if (this.s4Clock != null) { this.s4ClockSetSignal.countDown(); +======= + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public void setClock(Clock clock) { + synchronized (this) { + this.clock = clock; + this.notify(); +>>>>>>> upstream/master } } /** +<<<<<<< HEAD * The name of a method to be used as an initializer. The method will be * called after the object is cloned from the prototype PE. */ @@ -170,6 +206,18 @@ public String getInitMethod() { public Clock getS4Clock() { return s4Clock; +======= + * This method will be called after the object is cloned from the + * prototype PE. The concrete PE class should override this if + * it has any special set-up requirements. + */ + public void initInstance() { + // default implementation does nothing. + } + + public Clock getClock() { + return clock; +>>>>>>> upstream/master } public AbstractPE() { @@ -245,7 +293,7 @@ public void execute(String streamName, CompoundKeyInfo compoundKeyInfo, } public long getCurrentTime() { - return s4Clock.getCurrentTime(); + return clock.getCurrentTime(); } /** @@ -536,6 +584,7 @@ public void setLookupTable(Persister lookupTable) { **/ abstract public void output(); +<<<<<<< HEAD protected void checkpoint() { byte[] serializedState = serializeState(); @@ -618,6 +667,12 @@ private void restoreFieldsForClass(Class currentInOldStateClassHierarchy, Abstra if (!Modifier.isPublic(field.getModifiers())) { field.setAccessible(true); } +======= + class OutputInvoker implements Runnable { + public void run() { + synchronized (AbstractPE.this) { + while (clock == null) { +>>>>>>> upstream/master try { // TODO use reflectasm field.set(this, field.get(oldState)); @@ -701,9 +756,14 @@ public void run() { long currentBoundary = (currentTime / frequencyInMillis) * frequencyInMillis; long nextBoundary = currentBoundary + frequencyInMillis; +<<<<<<< HEAD currentTime = s4Clock.waitForTime(nextBoundary + (getFrequencyOffset() * 1000)); +======= + currentTime = clock.waitForTime(nextBoundary + + (outputFrequencyOffset * 1000)); +>>>>>>> upstream/master if (lookupTable != null) { Set peKeys = lookupTable.keySet(); for (Iterator it = peKeys.iterator(); it.hasNext();) { diff --git a/s4-core/src/main/java/io/s4/processor/ControlEventProcessor.java b/s4-core/src/main/java/io/s4/processor/ControlEventProcessor.java index 9208293..0e2774c 100644 --- a/s4-core/src/main/java/io/s4/processor/ControlEventProcessor.java +++ b/s4-core/src/main/java/io/s4/processor/ControlEventProcessor.java @@ -60,7 +60,7 @@ protected void execute(EventWrapper e, PrototypeWrapper p) { String keyVal = keyInfo.getCompoundValue(); - ProcessingElement pe = p.lookupPE(keyVal); + AbstractPE pe = p.lookupPE(keyVal); Response response = ((SinglePERequest) event).evaluate(pe); String stream = response.getRInfo().getStream(); diff --git a/s4-core/src/main/java/io/s4/processor/JoinPE.java b/s4-core/src/main/java/io/s4/processor/JoinPE.java index acf23ca..e0e18b7 100644 --- a/s4-core/src/main/java/io/s4/processor/JoinPE.java +++ b/s4-core/src/main/java/io/s4/processor/JoinPE.java @@ -35,19 +35,10 @@ public class JoinPE extends AbstractPE { private Map eventsToJoin; private EventDispatcher dispatcher; private Monitor monitor; - private String id = "JoinPE"; private String outputStreamName; private String outputClassName; private Class outputClass; - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - public void setDispatcher(EventDispatcher dispatcher) { this.dispatcher = dispatcher; } diff --git a/s4-core/src/main/java/io/s4/processor/PEContainer.java b/s4-core/src/main/java/io/s4/processor/PEContainer.java index 8b47f50..09ed009 100644 --- a/s4-core/src/main/java/io/s4/processor/PEContainer.java +++ b/s4-core/src/main/java/io/s4/processor/PEContainer.java @@ -49,7 +49,7 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor { BlockingQueue workQueue; private List prototypeWrappers = new ArrayList(); private Monitor monitor; - private Clock s4Clock; + private Clock clock; private int maxQueueSize = 1000; private boolean trackByKey; private Map countByEventType = Collections.synchronizedMap(new HashMap()); @@ -65,12 +65,12 @@ public void setMonitor(Monitor monitor) { this.monitor = monitor; } - public void setS4Clock(Clock s4Clock) { - this.s4Clock = s4Clock; + public void setClock(Clock s4Clock) { + this.clock = s4Clock; } - public Clock getS4Clock() { - return s4Clock; + public Clock getClock() { + return clock; } public void setTrackByKey(boolean trackByKey) { @@ -81,15 +81,15 @@ public void setSafeKeeper(SafeKeeper sk) { this.safeKeeper = sk; } - public void addProcessor(ProcessingElement processor) { + public void addProcessor(AbstractPE processor) { System.out.println("adding pe: " + processor); - PrototypeWrapper pw = new PrototypeWrapper(processor, s4Clock); + PrototypeWrapper pw = new PrototypeWrapper(processor, clock); pw.setSafeKeeper(safeKeeper); prototypeWrappers.add(pw); adviceLists.add(pw.advise()); } - public void setProcessors(ProcessingElement[] processors) { + public void setProcessors(AbstractPE[] processors) { // prototypeWrappers = new ArrayList(); for (int i = 0; i < processors.length; i++) { @@ -188,8 +188,8 @@ public void run() { EventWrapper eventWrapper = null; try { eventWrapper = workQueue.take(); - if (s4Clock instanceof EventClock) { - EventClock eventClock = (EventClock) s4Clock; + if (clock instanceof EventClock) { + EventClock eventClock = (EventClock) clock; eventClock.update(eventWrapper); // To what time to update the clock } @@ -335,7 +335,7 @@ private void handleCheckpointingOrRecovery(EventWrapper eventWrapper) { } - private void invokePE(ProcessingElement pe, EventWrapper eventWrapper, + private void invokePE(AbstractPE pe, EventWrapper eventWrapper, CompoundKeyInfo compoundKeyInfo) { try { long startTime = System.currentTimeMillis(); diff --git a/s4-core/src/main/java/io/s4/processor/PrintEventPE.java b/s4-core/src/main/java/io/s4/processor/PrintEventPE.java index e6d79ee..546f53e 100644 --- a/s4-core/src/main/java/io/s4/processor/PrintEventPE.java +++ b/s4-core/src/main/java/io/s4/processor/PrintEventPE.java @@ -19,16 +19,6 @@ public class PrintEventPE extends AbstractPE { - private String id = "PrintEventPE"; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - @Override public void output() { // TODO Auto-generated method stub diff --git a/s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java b/s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java index 37ae049..8ea8732 100644 --- a/s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java +++ b/s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java @@ -28,7 +28,7 @@ public class PrototypeWrapper { private static Logger logger = Logger.getLogger(PrototypeWrapper.class); - private ProcessingElement prototype; + private AbstractPE prototype; Persister lookupTable; SafeKeeper safeKeeper; @@ -36,7 +36,7 @@ public String getId() { return prototype.getId(); } - public PrototypeWrapper(ProcessingElement prototype, Clock s4Clock) { + public PrototypeWrapper(AbstractPE prototype, Clock s4Clock) { this.prototype = prototype; lookupTable = new ConMapPersister(s4Clock); // TODO lookup table with PEIds @@ -70,21 +70,17 @@ public PrototypeWrapper(ProcessingElement prototype, Clock s4Clock) { * key value * @return PE corresponding to keyValue. */ - public ProcessingElement getPE(String keyValue) { - ProcessingElement pe = null; + public AbstractPE getPE(String keyValue) { + AbstractPE pe = null; try { - pe = (ProcessingElement) lookupTable.get(keyValue); + pe = (AbstractPE) lookupTable.get(keyValue); if (pe == null) { - pe = (ProcessingElement) prototype.clone(); + pe = (AbstractPE) prototype.clone(); if (pe instanceof AbstractPE) { // Logger.getLogger("s4").info("injecting safekeeper"); ((AbstractPE) pe).setSafeKeeper(safeKeeper); } - //invoke the initialization method if it has been specified - if (pe.getInitMethod() != null) { - Method initMethod = pe.getClass().getMethod(pe.getInitMethod(), new Class[0]); - initMethod.invoke(pe, (new Object[0])); - } + pe.initInstance(); } // update the last update time on the entry @@ -106,11 +102,11 @@ public ProcessingElement getPE(String keyValue) { * @return PE corresponding to keyValue, if such a PE exists. Null * otherwise. */ - public ProcessingElement lookupPE(String keyValue) { - ProcessingElement pe = null; + public AbstractPE lookupPE(String keyValue) { + AbstractPE pe = null; try { - pe = (ProcessingElement) lookupTable.get(keyValue); + pe = (AbstractPE) lookupTable.get(keyValue); } catch (Exception e) { logger.error("exception when looking up pe for key:" + keyValue, e); diff --git a/s4-core/src/main/java/io/s4/processor/ReroutePE.java b/s4-core/src/main/java/io/s4/processor/ReroutePE.java index a9e2444..64b51ba 100644 --- a/s4-core/src/main/java/io/s4/processor/ReroutePE.java +++ b/s4-core/src/main/java/io/s4/processor/ReroutePE.java @@ -30,17 +30,8 @@ public class ReroutePE extends AbstractPE { private EventDispatcher dispatcher; private Transformer[] transformers = new Transformer[0]; // private List keys; - private String id = "ReroutePE"; private String outputStreamName; - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - public void setDispatcher(EventDispatcher dispatcher) { this.dispatcher = dispatcher; } diff --git a/s4-core/src/main/java/io/s4/processor/SimpleCountingPE.java b/s4-core/src/main/java/io/s4/processor/SimpleCountingPE.java index e1b2392..5aea05c 100644 --- a/s4-core/src/main/java/io/s4/processor/SimpleCountingPE.java +++ b/s4-core/src/main/java/io/s4/processor/SimpleCountingPE.java @@ -26,15 +26,6 @@ public class SimpleCountingPE extends AbstractPE { private int persistTime; private String keyPrefix = "s4:counter"; private boolean dirty = false; - private String id = "SimpleCountingPE"; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } public void setClearOnOutput(boolean clearOnOutput) { this.clearOnOutput = clearOnOutput; diff --git a/s4-core/src/main/resources/s4-apps/README b/s4-core/src/main/resources/s4-apps/.gitignore similarity index 100% rename from s4-core/src/main/resources/s4-apps/README rename to s4-core/src/main/resources/s4-apps/.gitignore diff --git a/s4-core/src/main/resources/s4-core/conf/default/client-adapter-conf.xml b/s4-core/src/main/resources/s4-core/conf/default/client-adapter-conf.xml index 6e26d87..51e8607 100644 --- a/s4-core/src/main/resources/s4-core/conf/default/client-adapter-conf.xml +++ b/s4-core/src/main/resources/s4-core/conf/default/client-adapter-conf.xml @@ -2,7 +2,7 @@ - classpath:adapter.properties + classpath:client-adapter.properties diff --git a/s4-core/src/main/resources/s4-core/conf/default/client-stub-conf.xml b/s4-core/src/main/resources/s4-core/conf/default/client-stub-conf.xml index 0137e60..851c0a5 100644 --- a/s4-core/src/main/resources/s4-core/conf/default/client-stub-conf.xml +++ b/s4-core/src/main/resources/s4-core/conf/default/client-stub-conf.xml @@ -2,7 +2,7 @@ - classpath:adapter.properties + classpath:client-adapter.properties diff --git a/s4-core/src/main/resources/s4-core/conf/default/clusters.xml b/s4-core/src/main/resources/s4-core/conf/default/clusters.xml index d0aa99e..8e75609 100644 --- a/s4-core/src/main/resources/s4-core/conf/default/clusters.xml +++ b/s4-core/src/main/resources/s4-core/conf/default/clusters.xml @@ -7,12 +7,6 @@ s4node-0 - - - localhost - adapter-0 - - 0 diff --git a/s4-core/src/main/resources/s4-core/conf/default/log4j.xml b/s4-core/src/main/resources/s4-core/conf/default/log4j.xml index 77818a1..4f8a0cc 100644 --- a/s4-core/src/main/resources/s4-core/conf/default/log4j.xml +++ b/s4-core/src/main/resources/s4-core/conf/default/log4j.xml @@ -1,6 +1,18 @@ + + + + + + + + + + + + @@ -29,18 +41,6 @@ - - - - - - - - - - - - diff --git a/s4-core/src/main/resources/s4-core/conf/default/s4-core-conf.xml b/s4-core/src/main/resources/s4-core/conf/default/s4-core-conf.xml index 734d768..1eccd39 100644 --- a/s4-core/src/main/resources/s4-core/conf/default/s4-core-conf.xml +++ b/s4-core/src/main/resources/s4-core/conf/default/s4-core-conf.xml @@ -101,7 +101,7 @@ - + diff --git a/s4-core/src/main/resources/s4-core/conf/dynamic/adapter-conf.xml b/s4-core/src/main/resources/s4-core/conf/dynamic/adapter-conf.xml new file mode 100644 index 0000000..4d1f3e5 --- /dev/null +++ b/s4-core/src/main/resources/s4-core/conf/dynamic/adapter-conf.xml @@ -0,0 +1,41 @@ + + + + + classpath:adapter.properties + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/s4-core/src/main/resources/s4-core/conf/dynamic/client-adapter-conf.xml b/s4-core/src/main/resources/s4-core/conf/dynamic/client-adapter-conf.xml new file mode 100644 index 0000000..51e8607 --- /dev/null +++ b/s4-core/src/main/resources/s4-core/conf/dynamic/client-adapter-conf.xml @@ -0,0 +1,64 @@ + + + + + classpath:client-adapter.properties + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/s4-core/src/main/resources/s4-core/conf/dynamic/client-stub-conf.xml b/s4-core/src/main/resources/s4-core/conf/dynamic/client-stub-conf.xml new file mode 100644 index 0000000..851c0a5 --- /dev/null +++ b/s4-core/src/main/resources/s4-core/conf/dynamic/client-stub-conf.xml @@ -0,0 +1,13 @@ + + + + + classpath:client-adapter.properties + + + + + + + + diff --git a/s4-core/src/main/resources/s4-core/conf/dynamic/clusters.xml b/s4-core/src/main/resources/s4-core/conf/dynamic/clusters.xml new file mode 100644 index 0000000..8e75609 --- /dev/null +++ b/s4-core/src/main/resources/s4-core/conf/dynamic/clusters.xml @@ -0,0 +1,18 @@ + + + + 0 + localhost + 5077 + s4node-0 + + + + + 0 + localhost + client-adapter-0 + 6077 + + + diff --git a/s4-core/src/main/resources/s4-core/conf/dynamic/event-clock.xml b/s4-core/src/main/resources/s4-core/conf/dynamic/event-clock.xml new file mode 100644 index 0000000..9305786 --- /dev/null +++ b/s4-core/src/main/resources/s4-core/conf/dynamic/event-clock.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/s4-core/src/main/resources/s4-core/conf/dynamic/log4j.xml b/s4-core/src/main/resources/s4-core/conf/dynamic/log4j.xml new file mode 100644 index 0000000..3f2f262 --- /dev/null +++ b/s4-core/src/main/resources/s4-core/conf/dynamic/log4j.xml @@ -0,0 +1,48 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/s4-core/src/main/resources/s4-core/conf/dynamic/s4-core-conf.xml b/s4-core/src/main/resources/s4-core/conf/dynamic/s4-core-conf.xml new file mode 100644 index 0000000..1eccd39 --- /dev/null +++ b/s4-core/src/main/resources/s4-core/conf/dynamic/s4-core-conf.xml @@ -0,0 +1,152 @@ + + + + + classpath:s4-core.properties + + + + 2048 + 262144 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @${adapter_app_name} + + + + + + + + + @${adapter_app_name} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/s4-core/src/main/resources/s4-core/conf/dynamic/s4-core.properties-header b/s4-core/src/main/resources/s4-core/conf/dynamic/s4-core.properties-header new file mode 100644 index 0000000..231b678 --- /dev/null +++ b/s4-core/src/main/resources/s4-core/conf/dynamic/s4-core.properties-header @@ -0,0 +1,9 @@ +gc_opts=-server -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit +mem_opts=-Xms800m -Xmx2000m +remote_debug_enabled=no +pe_container_max_queue_size=8000 +listener_max_queue_size=8000 +commlayer_mode=dynamic +zk_session_timeout=5000 +deamon_mode_enabled=false +zk_address=localhost:2181 diff --git a/s4-core/src/main/resources/s4-core/conf/dynamic/wall-clock.xml b/s4-core/src/main/resources/s4-core/conf/dynamic/wall-clock.xml new file mode 100644 index 0000000..e149ecc --- /dev/null +++ b/s4-core/src/main/resources/s4-core/conf/dynamic/wall-clock.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/s4-core/src/main/resources/s4-core/lock/README b/s4-core/src/main/resources/s4-core/lock/.gitignore similarity index 100% rename from s4-core/src/main/resources/s4-core/lock/README rename to s4-core/src/main/resources/s4-core/lock/.gitignore diff --git a/s4-core/src/main/resources/s4-core/logs/README b/s4-core/src/main/resources/s4-core/logs/.gitignore similarity index 100% rename from s4-core/src/main/resources/s4-core/logs/README rename to s4-core/src/main/resources/s4-core/logs/.gitignore diff --git a/s4-core/src/main/resources/s4-exts/README b/s4-core/src/main/resources/s4-exts/.gitignore similarity index 100% rename from s4-core/src/main/resources/s4-exts/README rename to s4-core/src/main/resources/s4-exts/.gitignore diff --git a/s4-core/src/main/resources/scripts/generate-load.sh b/s4-core/src/main/resources/scripts/generate-load.sh index e6c4bb5..e09f68b 100755 --- a/s4-core/src/main/resources/scripts/generate-load.sh +++ b/s4-core/src/main/resources/scripts/generate-load.sh @@ -115,7 +115,7 @@ echo `${JAVA_LOC}java -version` CLASSPATH=`find $CORE_HOME -name "*.jar" | awk '{p=$0"'$CP_SEP'"p;} END {print p}'` if [ $REDBUTTON_MODE == "true" ] ; then - CLASSPATH=${CLASSPATH}${CP_SEP}${CORE_HOME}/conf/redbutton + CLASSPATH=${CLASSPATH}${CP_SEP}${CORE_HOME}/conf/default JAVA_OPTS="$JAVA_OPTS -Dcommlayer.mode=static" fi diff --git a/s4-core/src/main/resources/scripts/run-adapter.sh b/s4-core/src/main/resources/scripts/run-adapter.sh index b570916..d62f44b 100755 --- a/s4-core/src/main/resources/scripts/run-adapter.sh +++ b/s4-core/src/main/resources/scripts/run-adapter.sh @@ -142,6 +142,8 @@ TMP1=`mktemp -d $MKTEMP_ARGS` echo "Temp is $TMP1" echo "appName=${SENDER_CLUSTER_NAME}" > $TMP1/adapter.properties echo "listenerAppName=${LISTENER_CLUSTER_NAME}" >> $TMP1/adapter.properties +echo "zk_address=${CLUSTER_MANAGER}" >> $TMP1/adapter.properties + cat $TMP1/adapter.properties CLASSPATH=${CLASSPATH}${CP_SEP}${TMP1} diff --git a/s4-core/src/main/resources/scripts/run-client-adapter.sh b/s4-core/src/main/resources/scripts/run-client-adapter.sh index de0e193..b5f1a0f 100755 --- a/s4-core/src/main/resources/scripts/run-client-adapter.sh +++ b/s4-core/src/main/resources/scripts/run-client-adapter.sh @@ -69,11 +69,14 @@ if [ "x$CONF_TYPE" == "x" ] ; then CONF_TYPE="default" fi -CONF_FILE=${CORE_HOME}"/conf/"${CONF_TYPE}"/adapter-conf.xml" +CONF_FILE=${CORE_HOME}"/conf/"${CONF_TYPE}"/client-adapter-conf.xml" CONF_LOC=`dirname $CONF_FILE` LOG_LOC="${CORE_HOME}/logs" COMMLAYER_MODE=$(get_property "commlayer_mode") +echo "configuration location is ${CONF_LOC}" +echo "commlayer mode is ${COMMLAYER_MODE}" + if [ "x$CLUSTER_MANAGER" == "x" ] ; then CLUSTER_MANAGER="localhost:2181" fi @@ -141,10 +144,11 @@ fi TMP1=`mktemp -d $MKTEMP_ARGS` echo "Temp is $TMP1" -echo "appName=${SENDER_CLUSTER_NAME}" > $TMP1/adapter.properties -echo "listenerAppName=${LISTENER_CLUSTER_NAME}" >> $TMP1/adapter.properties -echo "listener_max_queue_size=8000" >> $TMP1/adapter.properties -cat $TMP1/adapter.properties +echo "appName=${SENDER_CLUSTER_NAME}" > $TMP1/client-adapter.properties +echo "listenerAppName=${LISTENER_CLUSTER_NAME}" >> $TMP1/client-adapter.properties +echo "zk_address=${CLUSTER_MANAGER}" >> $TMP1/client-adapter.properties +echo "listener_max_queue_size=8000" >> $TMP1/client-adapter.properties +cat $TMP1/client-adapter.properties CLASSPATH=${CLASSPATH}${CP_SEP}${TMP1} diff --git a/s4-core/src/main/resources/scripts/s4-start.sh b/s4-core/src/main/resources/scripts/start-s4.sh similarity index 100% rename from s4-core/src/main/resources/scripts/s4-start.sh rename to s4-core/src/main/resources/scripts/start-s4.sh diff --git a/s4-core/src/main/resources/scripts/task-setup.sh b/s4-core/src/main/resources/scripts/task-setup.sh new file mode 100644 index 0000000..7ecd17a --- /dev/null +++ b/s4-core/src/main/resources/scripts/task-setup.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +osx=false +case "`uname`" in +Darwin*) osx=true;; +esac + +if $osx; then + READLINK="stat" +else + READLINK="readlink" +fi + +BASE_DIR=`dirname $($READLINK -f $0)` +CORE_HOME=`$READLINK -f ${BASE_DIR}/../s4-core` +CP_SEP=":" + +while getopts ":c:" opt; +do case "$opt" in + c) CORE_HOME=$OPTARG;; + \?) + echo "Invalid option: -$OPTARG" >&2 + exit 1 + ;; + :) + echo "Option -$OPTARG requires an argument." >&2 + exit 1 + ;; + esac +done +shift $(($OPTIND-1)) + +JAVA_LOC="" +if [ "x$JAVA_HOME" != "x" ] ; then + JAVA_LOC=${JAVA_HOME}"/bin/" +fi + +CP=`find ${CORE_HOME}/lib -name "*.jar" | awk '{p=$0"'$CP_SEP'"p;} END {print p}'` + +cmd="${JAVA_LOC}java -classpath $CP io.s4.comm.tools.TaskSetupApp $*" +echo "RUNNING $cmd" +$cmd diff --git a/s4-core/src/test/java/io/s4/processor/MockPE.java b/s4-core/src/test/java/io/s4/processor/MockPE.java index 140bc86..9dcd863 100644 --- a/s4-core/src/test/java/io/s4/processor/MockPE.java +++ b/s4-core/src/test/java/io/s4/processor/MockPE.java @@ -7,17 +7,12 @@ public class MockPE extends AbstractPE { private int initializeCount = 0; - public void testInitialize() { + public void initInstance() { initializeCount++; } public void processEvent(Object obj) { } - - @Override - public String getId() { - return null; - } @Override public void output() { diff --git a/s4-core/src/test/java/io/s4/processor/TestPrototypeWrapper.java b/s4-core/src/test/java/io/s4/processor/TestPrototypeWrapper.java index 20615bb..894a239 100644 --- a/s4-core/src/test/java/io/s4/processor/TestPrototypeWrapper.java +++ b/s4-core/src/test/java/io/s4/processor/TestPrototypeWrapper.java @@ -16,7 +16,6 @@ public class TestPrototypeWrapper @Test public void testCloneAndInitialize() { MockPE prototype = new MockPE(); - prototype.setInitMethod("testInitialize"); PrototypeWrapper prototypeWrapper = new PrototypeWrapper(prototype, new WallClock()); diff --git a/s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala b/s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala index 3c215d6..d966fc0 100644 --- a/s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala +++ b/s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala @@ -23,6 +23,4 @@ class SentenceReceiverPE extends AbstractPE { def processEvent(sentence: Sentence): Unit= println("Sentence is '" + sentence.text + "', location '" + sentence.location + "'") def output(): Unit= {} - - def getId(): String= return this.getClass().getName() } diff --git a/s4-examples/speech01/src/main/java/io/s4/example/speech01/SentenceReceiverPE.java b/s4-examples/speech01/src/main/java/io/s4/example/speech01/SentenceReceiverPE.java index 724073e..b673b49 100644 --- a/s4-examples/speech01/src/main/java/io/s4/example/speech01/SentenceReceiverPE.java +++ b/s4-examples/speech01/src/main/java/io/s4/example/speech01/SentenceReceiverPE.java @@ -12,10 +12,4 @@ public void processEvent(Sentence sentence) { public void output() { // not called in this example } - - @Override - public String getId() { - return this.getClass().getName(); - } - } diff --git a/s4-examples/testinput/pe-query b/s4-examples/testinput/src/main/resources/pe-query similarity index 100% rename from s4-examples/testinput/pe-query rename to s4-examples/testinput/src/main/resources/pe-query diff --git a/s4-examples/testinput/proto-query b/s4-examples/testinput/src/main/resources/proto-query similarity index 100% rename from s4-examples/testinput/proto-query rename to s4-examples/testinput/src/main/resources/proto-query diff --git a/s4-examples/testinput/sentence.in b/s4-examples/testinput/src/main/resources/sentence.in similarity index 100% rename from s4-examples/testinput/sentence.in rename to s4-examples/testinput/src/main/resources/sentence.in diff --git a/s4-examples/testinput/speech.in b/s4-examples/testinput/src/main/resources/speech.in similarity index 100% rename from s4-examples/testinput/speech.in rename to s4-examples/testinput/src/main/resources/speech.in diff --git a/s4-examples/testinput/speeches.txt b/s4-examples/testinput/src/main/resources/speeches.txt similarity index 100% rename from s4-examples/testinput/speeches.txt rename to s4-examples/testinput/src/main/resources/speeches.txt diff --git a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala index 8016e05..8c75d07 100644 --- a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala +++ b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala @@ -32,7 +32,6 @@ import io.s4.processor.AbstractPE import io.s4.example.twittertopiccount.event._ class TopNTopicPE extends AbstractPE { - @BeanProperty var id: String = _ @BeanProperty var persistKey = "myapp:topNTopics" @BeanProperty var persister: Persister = _ @BeanProperty var entryCount = 10 diff --git a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala index 5d4c736..8f70511 100644 --- a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala +++ b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala @@ -24,7 +24,6 @@ import io.s4.processor.AbstractPE import io.s4.example.twittertopiccount.event._ class TopicCountAndReportPE extends AbstractPE { - @BeanProperty var id: String = _ @BeanProperty var dispatcher: EventDispatcher = _ @BeanProperty var outputStreamName: String = _ @BeanProperty var threshold: Int = _ diff --git a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala index 8cb803d..8da09a8 100644 --- a/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala +++ b/s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala @@ -24,7 +24,6 @@ import io.s4.processor.AbstractPE import io.s4.example.twittertopiccount.event._ class TopicExtractorPE extends AbstractPE { - @BeanProperty var id: String = _ @BeanProperty var dispatcher: EventDispatcher = _ @BeanProperty var outputStreamName: String = _ diff --git a/s4-core/src/main/java/io/s4/processor/ProcessingElement.java b/s4-tools/loadgenerator/build.gradle similarity index 62% rename from s4-core/src/main/java/io/s4/processor/ProcessingElement.java rename to s4-tools/loadgenerator/build.gradle index a48cabf..baca42d 100644 --- a/s4-core/src/main/java/io/s4/processor/ProcessingElement.java +++ b/s4-tools/loadgenerator/build.gradle @@ -13,22 +13,10 @@ * language governing permissions and limitations under the * License. See accompanying LICENSE file. */ -package io.s4.processor; - -import io.s4.dispatcher.partitioner.CompoundKeyInfo; - -import java.util.List; - -public interface ProcessingElement extends Cloneable { - void execute(String streamName, CompoundKeyInfo compoundKeyInfo, Object event); - - List advise(); - - public int getTtl(); - - public Object clone(); - - public String getId(); - - public String getInitMethod(); +dependencies { + compile( libraries.json ) + compile( libraries.commons_cli ) + compile project(':s4-driver') + compile project(':s4-core') } + diff --git a/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java b/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java new file mode 100644 index 0000000..2fc88b5 --- /dev/null +++ b/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java @@ -0,0 +1,349 @@ +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package io.s4.tools.loadgenerator; + +import io.s4.client.Driver; +import io.s4.client.Message; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.json.JSONException; +import org.json.JSONObject; + +public class LoadGenerator { + + public static void main(String args[]) { + Options options = new Options(); + boolean warmUp = false; + + options.addOption(OptionBuilder.withArgName("rate") + .hasArg() + .withDescription("Rate (events per second)") + .create("r")); + + options.addOption(OptionBuilder.withArgName("display_rate") + .hasArg() + .withDescription("Display Rate at specified second boundary") + .create("d")); + + options.addOption(OptionBuilder.withArgName("adapter_address") + .hasArg() + .withDescription("Address of client adapter") + .create("a")); + + options.addOption(OptionBuilder.withArgName("listener_application_name") + .hasArg() + .withDescription("Listener application name") + .create("g")); + + options.addOption(OptionBuilder.withArgName("sleep_overhead") + .hasArg() + .withDescription("Sleep overhead") + .create("o")); + + options.addOption(new Option("w", "Warm-up")); + + CommandLineParser parser = new GnuParser(); + + CommandLine line = null; + try { + // parse the command line arguments + line = parser.parse(options, args); + } catch (ParseException exp) { + // oops, something went wrong + System.err.println("Parsing failed. Reason: " + exp.getMessage()); + System.exit(1); + } + + int expectedRate = 250; + if (line.hasOption("r")) { + try { + expectedRate = Integer.parseInt(line.getOptionValue("r")); + } catch (Exception e) { + System.err.println("Bad expected rate specified " + + line.getOptionValue("r")); + System.exit(1); + } + } + + int displayRateIntervalSeconds = 20; + if (line.hasOption("d")) { + try { + displayRateIntervalSeconds = Integer.parseInt(line.getOptionValue("d")); + } catch (Exception e) { + System.err.println("Bad display rate value specified " + + line.getOptionValue("d")); + System.exit(1); + } + } + + int updateFrequency = 0; + if (line.hasOption("f")) { + try { + updateFrequency = Integer.parseInt(line.getOptionValue("f")); + } catch (Exception e) { + System.err.println("Bad query udpdate frequency specified " + + line.getOptionValue("f")); + System.exit(1); + } + System.out.printf("Update frequency is %d\n", updateFrequency); + } + + String clientAdapterAddress = null; + String clientAdapterHost = null; + int clientAdapterPort = -1; + if (line.hasOption("a")) { + clientAdapterAddress = line.getOptionValue("a"); + String[] parts = clientAdapterAddress.split(":"); + if (parts.length != 2) { + System.err.println("Bad adapter address specified " + + clientAdapterAddress); + System.exit(1); + } + clientAdapterHost = parts[0]; + + try { + clientAdapterPort = Integer.parseInt(parts[1]); + } + catch (NumberFormatException nfe) { + System.err.println("Bad adapter address specified " + + clientAdapterAddress); + System.exit(1); + } + } + + long sleepOverheadMicros = -1; + if (line.hasOption("o")) { + try { + sleepOverheadMicros = Long.parseLong(line.getOptionValue("o")); + } catch (NumberFormatException e) { + System.err.println("Bad sleep overhead specified " + + line.getOptionValue("o")); + System.exit(1); + } + System.out.printf("Specified sleep overhead is %d\n", + sleepOverheadMicros); + } + + if (line.hasOption("w")) { + warmUp = true; + } + + List loArgs = line.getArgList(); + if (loArgs.size() < 1) { + System.err.println("No input file specified"); + System.exit(1); + } + + String inputFilename = (String) loArgs.get(0); + + LoadGenerator loadGenerator = new LoadGenerator(); + loadGenerator.setInputFilename(inputFilename); + loadGenerator.setDisplayRateInterval(displayRateIntervalSeconds); + loadGenerator.setExpectedRate(expectedRate); + loadGenerator.setClientAdapterHost(clientAdapterHost); + loadGenerator.setClientAdapterPort(clientAdapterPort); + loadGenerator.run(); + + System.exit(0); + } + + private String inputFilename; + private int emitCount; + private int displayRateInterval = 0; + private int expectedRate = 200; + private String clientAdapterHost = null; + private int clientAdapterPort = -1; + + private int adjustedExpectedRate = 1; + private Map eventTypeInfoMap = new HashMap(); + + public int getEmitCount() { + return emitCount; + } + + public void setInputFilename(String inputFilename) { + this.inputFilename = inputFilename; + } + + public void setDisplayRateInterval(int displayRateInterval) { + this.displayRateInterval = displayRateInterval; + } + + public void setExpectedRate(int expectedRate) { + this.expectedRate = expectedRate; + } + + public void setClientAdapterHost(String clientAdapterHost) { + this.clientAdapterHost = clientAdapterHost; + } + + public void setClientAdapterPort(int clientAdapterPort) { + this.clientAdapterPort = clientAdapterPort; + } + + public LoadGenerator() { + + } + + public void run() { + // for now, no warm-up mechanism + adjustedExpectedRate = expectedRate; + + long intervalStart = 0; + int emitCountStart = 0; + + BufferedReader br = null; + Reader inputReader = null; + Driver driver = null; + try { + driver = new Driver(clientAdapterHost, clientAdapterPort); + boolean init = driver.init(); + init &= driver.connect(); + if (!init) { + System.err.println("Failed to initialize client adapter driver"); + return; + } + + if (inputFilename.equals("-")) { + inputReader = new InputStreamReader(System.in); + } else { + inputReader = new FileReader(inputFilename); + } + br = new BufferedReader(inputReader); + String inputLine = null; + boolean firstLine = true; + + Pacer pacer = new Pacer(adjustedExpectedRate); + while ((inputLine = br.readLine()) != null) { + if (firstLine) { + JSONObject jsonRecord = new JSONObject(inputLine); + createEventTypeInfo(jsonRecord); + System.out.println(eventTypeInfoMap); + if (eventTypeInfoMap.size() == 0) { + return; + } + firstLine = false; + continue; + } + + pacer.startCycle(); + + try { + JSONObject jsonRecord = new JSONObject(inputLine); + int classIndex = jsonRecord.getInt("_index"); + EventTypeInfo eventTypeInfo = eventTypeInfoMap.get(classIndex); + + if (eventTypeInfo == null) { + System.err.printf("Invalid _index value %d\n", + classIndex); + return; + } + + Message message = new Message(eventTypeInfo.getStreamName(), eventTypeInfo.getClassName(), inputLine); + driver.send(message); + emitCount++; + } catch (JSONException je) { + je.printStackTrace(); + System.err.printf("Bad input data %s\n", inputLine); + continue; + } + + // if it's time, display the actual emit rate + if (intervalStart == 0) { + intervalStart = System.currentTimeMillis(); + } else { + long interval = System.currentTimeMillis() - intervalStart; + if (interval >= (displayRateInterval * 1000)) { + double rate = (emitCount - emitCountStart) + / (interval / 1000.0); + System.out.println("Rate is " + rate); + intervalStart = System.currentTimeMillis(); + emitCountStart = emitCount; + } + } + + pacer.endCycle(); + pacer.maintainPace(); + } + System.out.printf("Emitted %d events\n", emitCount); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + try { + br.close(); + } catch (Exception e) { + } + try { + inputReader.close(); + } catch (Exception e) { + } + try { + driver.disconnect(); + } catch (Exception e) { + } + } + } + + @SuppressWarnings("unchecked") + public void createEventTypeInfo(JSONObject classInfo) { + String className = ""; + try { + for (Iterator it = classInfo.keys(); it.hasNext();) { + className = (String) it.next(); + JSONObject jsonEventTypeInfo = classInfo.getJSONObject(className); + int classIndex = (Integer) jsonEventTypeInfo.getInt("classIndex"); + String streamName = jsonEventTypeInfo.getString("streamName"); + eventTypeInfoMap.put(classIndex, new EventTypeInfo(className, + streamName)); + } + } catch (JSONException je) { + je.printStackTrace(); + } + } + + static class EventTypeInfo { + private String className; + private String streamName; + + public EventTypeInfo(String clazz, String streamName) { + this.className = clazz; + this.streamName = streamName; + } + + public String getClassName() { + return className; + } + + public String getStreamName() { + return streamName; + } + } +} diff --git a/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java b/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java new file mode 100644 index 0000000..7aba882 --- /dev/null +++ b/s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package io.s4.tools.loadgenerator;; + +public class Pacer { + private long sleepOverheadMicros = -1; + private int expectedRate = -1; + private int adjustedExpectedRate = 1; + private long startTime; + private int cycleCount = 0; + + private static int PROCESS_TIME_LIST_MAX_SIZE = 15; + private long[] processTimes = new long[PROCESS_TIME_LIST_MAX_SIZE]; + private int processTimePointer = 0; + private long[] rateInfo = new long[] {0,100}; + + public Pacer(int expectedRate) { + this.expectedRate = expectedRate; + this.adjustedExpectedRate = expectedRate; // the same for now + + if (sleepOverheadMicros == -1) { + // calculate sleep overhead + long totalSleepOverhead = 0; + for (int i = 0; i < 50; i++) { + long startTime = System.nanoTime(); + try { + Thread.sleep(1); + } catch (InterruptedException ie) { + } + totalSleepOverhead += (System.nanoTime() - startTime) + - (1 * 1000 * 1000); + } + sleepOverheadMicros = (totalSleepOverhead / 50) / 1000; + } + } + + public void startCycle() { + startTime = System.nanoTime(); + } + + public void endCycle() { + processTimes[processTimePointer] = System.nanoTime() - startTime; + processTimePointer = (processTimePointer == PROCESS_TIME_LIST_MAX_SIZE - 1) ? 0 + : processTimePointer + 1; + + cycleCount++; + + } + + public void maintainPace() { + if (cycleCount == 1 || cycleCount % 20 == 0) { + rateInfo = getRateInfo(rateInfo); + } + if (rateInfo[1] == 0 || cycleCount % rateInfo[1] == 0) { + try { + Thread.sleep(rateInfo[0]); + } catch (InterruptedException ie) { + } + } + } + + private long[] getRateInfo(long[] rateInfo) { + long totalTimeNanos = 0; + int entryCount = 0; + for (int i = 0; i < processTimes.length; i++) { + if (processTimes[i] == Long.MIN_VALUE) { + break; + } + entryCount++; + totalTimeNanos += processTimes[i]; + } + long averageTimeMicros = (long) ((totalTimeNanos / (double) entryCount) / 1000.0); + // fudge the time for additional overhead + averageTimeMicros += (long) (averageTimeMicros * 0.30); + + if (cycleCount % 5000 == 0) { + // System.out.println("Average time in micros is " + + // averageTimeMicros); + } + + long sleepTimeMicros = 0; + long millis = 0; + + long timeToMeetRateMicros = adjustedExpectedRate * averageTimeMicros; + long leftOver = 1000000 - timeToMeetRateMicros; + if (leftOver <= 0) { + sleepTimeMicros = 0; + } else { + sleepTimeMicros = (leftOver / adjustedExpectedRate) + - sleepOverheadMicros; + } + + // how many events can be processed in the nanos time? + int eventsBeforeSleep = 1; + if (sleepTimeMicros < 1000) { + // less than 1 millisecond sleep time, so need to stagger sleeps to + // emulate such a sleep + sleepTimeMicros = 1000 + sleepOverheadMicros; + millis = 1; + double numNapsDouble = ((double) leftOver / sleepTimeMicros); + int numNaps = (int) Math.ceil(numNapsDouble); + if (numNaps > 0) { + eventsBeforeSleep = adjustedExpectedRate / numNaps; + } + + if (leftOver <= 0) { + millis = 0; + eventsBeforeSleep = 1000; + } + } else { + millis = sleepTimeMicros / 1000; + } + + rateInfo[0] = millis; + rateInfo[1] = eventsBeforeSleep; + return rateInfo; + } +} diff --git a/s4-tools/loadgenerator/src/main/resources/scripts/generate-load.sh b/s4-tools/loadgenerator/src/main/resources/scripts/generate-load.sh new file mode 100755 index 0000000..c9013cc --- /dev/null +++ b/s4-tools/loadgenerator/src/main/resources/scripts/generate-load.sh @@ -0,0 +1,93 @@ +#!/bin/bash + +osx=false +case "`uname`" in +Darwin*) osx=true;; +esac + +if $osx; then + READLINK="stat" +else + READLINK="readlink" +fi + +#--------------------------------------------- +# USAGE and read arguments +#--------------------------------------------- + +if [ "$1" == "-h" ]; then + echo "Usage: $0" >&2 + echo " -c s4 core home" >&2 + echo " -a adapter address" >&2 + echo " -r emit rate" >&2 + echo " -d rate display interval" >&2 + echo " -s comma delimited list of schema files" >&2 + echo " -h help" >&2 + exit 1 +fi + +BASE_DIR=`dirname $($READLINK -f $0)` +CORE_HOME=`$READLINK -f ${BASE_DIR}/../../s4-core` +LOAD_GENERATOR_HOME=`$READLINK -f ${BASE_DIR}/..` +CP_SEP=":" + +while getopts ":c:a:r:d:l:" opt; +do case "$opt" in + c) CORE_HOME=$OPTARG;; + a) ADAPTER_ADDRESS=$OPTARG;; + r) RATE=$OPTARG;; + d) DISPLAY_INTERVAL=$OPTARG;; + l) LOCK_DIR=$OPTARG;; + \?) + echo "Invalid option: -$OPTARG" >&2 + exit 1 + ;; + :) + echo "Option -$OPTARG requires an argument." >&2 + exit 1 + ;; + esac +done +shift $(($OPTIND-1)) + +INPUT_FILE=$1 + +if [ "x$ADAPTER_ADDRESS" == "x" ] ; then + ADAPTER_ADDRESS="localhost:2334" +fi + +if [ "x$RATE" == "x" ] ; then + RATE=80 +fi + +if [ "x$DISPLAY_INTERVAL" == "x" ] ; then + DISPLAY_INTERVAL=15 +fi + +if [ "x$LOCK_DIR" == "x" ] ; then + LOCK_DIR="${CORE_HOME}/lock" +fi + +JAVA_LOC="" +if [ "x$JAVA_HOME" != "x" ] ; then + JAVA_LOC=${JAVA_HOME}"/bin/" +fi + +JAVA_OPTS="" +if [ "x$LOCK_DIR" != "x" ] ; then + JAVA_OPTS="$JAVA_OPTS -Dlock_dir=$LOCK_DIR " +fi + +#echo "java location is ${JAVA_LOC}" +#echo -n "JAVA VERSION=" +#echo `${JAVA_LOC}java -version` +#--------------------------------------------- +#ADDING CORE JARS TO CLASSPATH +#--------------------------------------------- + +CLASSPATH=`find $CORE_HOME -name "*.jar" | awk '{p=$0"'$CP_SEP'"p;} END {print p}'` +CLASSPATH=$CLASSPATH$CP_SEP`find $LOAD_GENERATOR_HOME -name "*.jar" | awk '{p=$0"'$CP_SEP'"p;} END {print p}'` + +CMD="${JAVA_LOC}java $JAVA_OPTS -classpath $CLASSPATH io.s4.tools.loadgenerator.LoadGenerator -a ${ADAPTER_ADDRESS} -r${RATE} -d ${DISPLAY_INTERVAL} $INPUT_FILE" +#echo "Running ${CMD}" +$CMD diff --git a/settings.gradle b/settings.gradle index 463177e..6630f87 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,6 +25,8 @@ include 's4-example-twittertopiccount-ft' include 's4-example-speech01' include 's4-example-speech01-scala' include 's4-example-speech02' +include 's4-example-testinput' +include 's4-tools-loadgenerator' /* Set dirs for projects whose name doesn't follow the dir structure. */ /* TODO: Write code to map proj name to dir. */ @@ -36,3 +38,5 @@ project(':s4-example-twittertopiccount-scala').projectDir = new File(settingsDir project(':s4-example-speech01').projectDir = new File(settingsDir, 's4-examples/speech01') project(':s4-example-speech01-scala').projectDir = new File(settingsDir, 's4-examples/speech01-scala') project(':s4-example-speech02').projectDir = new File(settingsDir, 's4-examples/speech02') +project(':s4-example-testinput').projectDir = new File(settingsDir, 's4-examples/testinput') +project(':s4-tools-loadgenerator').projectDir = new File(settingsDir, 's4-tools/loadgenerator')