Permalink
Browse files

Merge remote-tracking branch 'upstream/master'

Conflicts:
	.gitignore
	build.gradle
	s4-core/src/main/java/io/s4/processor/AbstractPE.java
	s4-core/src/main/java/io/s4/processor/PEContainer.java
	s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java
	s4-core/src/main/resources/s4-core/conf/default/log4j.xml
  • Loading branch information...
2 parents 0e61639 + 7aaba95 commit 200ee8fa8ca0f23c798cb73b15645dbd7c15e974 Matthieu Morel committed Jul 15, 2011
Showing with 1,204 additions and 226 deletions.
  1. +8 −0 .gitignore
  2. +30 −10 README.md
  3. +9 −3 build.gradle
  4. +29 −29 dev-notes.txt
  5. BIN gradle/wrapper/gradle-wrapper.jar
  6. +2 −2 gradle/wrapper/gradle-wrapper.properties
  7. +0 −31 s4-core/README.md
  8. +13 −20 s4-core/src/main/java/io/s4/MainApp.java
  9. +17 −1 s4-core/src/main/java/io/s4/dispatcher/partitioner/RoundRobinPartitioner.java
  10. +2 −2 s4-core/src/main/java/io/s4/message/SinglePERequest.java
  11. +62 −2 s4-core/src/main/java/io/s4/processor/AbstractPE.java
  12. +1 −1 s4-core/src/main/java/io/s4/processor/ControlEventProcessor.java
  13. +0 −9 s4-core/src/main/java/io/s4/processor/JoinPE.java
  14. +11 −11 s4-core/src/main/java/io/s4/processor/PEContainer.java
  15. +0 −10 s4-core/src/main/java/io/s4/processor/PrintEventPE.java
  16. +10 −14 s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java
  17. +0 −9 s4-core/src/main/java/io/s4/processor/ReroutePE.java
  18. +0 −9 s4-core/src/main/java/io/s4/processor/SimpleCountingPE.java
  19. 0 s4-core/src/main/resources/s4-apps/{README → .gitignore}
  20. +1 −1 s4-core/src/main/resources/s4-core/conf/default/client-adapter-conf.xml
  21. +1 −1 s4-core/src/main/resources/s4-core/conf/default/client-stub-conf.xml
  22. +0 −6 s4-core/src/main/resources/s4-core/conf/default/clusters.xml
  23. +12 −12 s4-core/src/main/resources/s4-core/conf/default/log4j.xml
  24. +1 −1 s4-core/src/main/resources/s4-core/conf/default/s4-core-conf.xml
  25. +41 −0 s4-core/src/main/resources/s4-core/conf/dynamic/adapter-conf.xml
  26. +64 −0 s4-core/src/main/resources/s4-core/conf/dynamic/client-adapter-conf.xml
  27. +13 −0 s4-core/src/main/resources/s4-core/conf/dynamic/client-stub-conf.xml
  28. +18 −0 s4-core/src/main/resources/s4-core/conf/dynamic/clusters.xml
  29. +6 −0 s4-core/src/main/resources/s4-core/conf/dynamic/event-clock.xml
  30. +48 −0 s4-core/src/main/resources/s4-core/conf/dynamic/log4j.xml
  31. +152 −0 s4-core/src/main/resources/s4-core/conf/dynamic/s4-core-conf.xml
  32. +9 −0 s4-core/src/main/resources/s4-core/conf/dynamic/s4-core.properties-header
  33. +6 −0 s4-core/src/main/resources/s4-core/conf/dynamic/wall-clock.xml
  34. 0 s4-core/src/main/resources/s4-core/lock/{README → .gitignore}
  35. 0 s4-core/src/main/resources/s4-core/logs/{README → .gitignore}
  36. 0 s4-core/src/main/resources/s4-exts/{README → .gitignore}
  37. +1 −1 s4-core/src/main/resources/scripts/generate-load.sh
  38. +2 −0 s4-core/src/main/resources/scripts/run-adapter.sh
  39. +9 −5 s4-core/src/main/resources/scripts/run-client-adapter.sh
  40. 0 s4-core/src/main/resources/scripts/{s4-start.sh → start-s4.sh}
  41. +42 −0 s4-core/src/main/resources/scripts/task-setup.sh
  42. +1 −6 s4-core/src/test/java/io/s4/processor/MockPE.java
  43. +0 −1 s4-core/src/test/java/io/s4/processor/TestPrototypeWrapper.java
  44. +0 −2 s4-examples/speech01-scala/src/main/scala/SentenceReceiverPE.scala
  45. +0 −6 s4-examples/speech01/src/main/java/io/s4/example/speech01/SentenceReceiverPE.java
  46. 0 s4-examples/testinput/{ → src/main/resources}/pe-query
  47. 0 s4-examples/testinput/{ → src/main/resources}/proto-query
  48. 0 s4-examples/testinput/{ → src/main/resources}/sentence.in
  49. 0 s4-examples/testinput/{ → src/main/resources}/speech.in
  50. 0 s4-examples/testinput/{ → src/main/resources}/speeches.txt
  51. +0 −1 s4-examples/twittertopiccount-scala/src/main/scala/processor/TopNTopicPE.scala
  52. +0 −1 s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicCountAndReportPE.scala
  53. +0 −1 s4-examples/twittertopiccount-scala/src/main/scala/processor/TopicExtractorPE.scala
  54. +6 −18 s4-core/src/main/java/io/s4/processor/ProcessingElement.java → s4-tools/loadgenerator/build.gradle
  55. +349 −0 s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/LoadGenerator.java
  56. +131 −0 s4-tools/loadgenerator/src/main/java/io/s4/tools/loadgenerator/Pacer.java
  57. +93 −0 s4-tools/loadgenerator/src/main/resources/scripts/generate-load.sh
  58. +4 −0 settings.gradle
View
@@ -6,6 +6,7 @@
# Build directory
target/
build/
+bin/
# SVN
.svn
@@ -21,3 +22,10 @@ build/
# class files
*.class
+.DS_Store
+
+# create an empty .gitignore file when you want to track an empty
+# directory but want to ignore its content. Example: to keep the
+# "logs" dir do: "touch logs/.gitignore"
+#
+!.gitignore
View
@@ -65,23 +65,43 @@ git clone https://github.com/s4/s4.git
# Create image
gradlew allImage
-# Change permissions
-chmod u+x ./build/s4-image/scripts/*
+# set the S4_IMAGE environmental variable
+cd build/s4-image/
+export S4_IMAGE=`pwd`
-# Copy S4 application to deployment dir (s4-apps)
-cp -rp build/s4-image/s4-example-apps/s4-example-twittertopiccount build/s4-image/s4-apps/
+# get the sample application
+git clone git://github.com/s4/twittertopiccount.git
-# Enter your twitter user/pass in config file
-$EDITOR build/s4-image/s4-apps/s4-example-twittertopiccount/adapter-conf.xml
+# build the sample application
+./gradlew install
-# Start server with s4-example-twittertopiccount app
-./build/s4-image/scripts/s4-start.sh &
+# deploy the sample application into the S4 image (relies in the S4_IMAGE environmental variable)
+./gradlew deploy
-# Start adapter
- ./build/s4-image/scripts/run-adapter.sh -x -u build/s4-image/s4-apps/s4-example-twittertopiccount/lib/s4-example-twittertopiccount-0.3-SNAPSHOT.jar -d build/s4-image/s4-apps/s4-example-twittertopiccount/adapter-conf.xml &
+# set the TWIT_LISTENER environmental variable
+cd build/install/twitter_feed_listener
+export TWIT_LISTENER=`pwd`
+
+# Start server with twittertopiccount app
+$S4_IMAGE/scripts/start-s4.sh -r client-adapter &
+
+# start the client adapter
+$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml &
+
+# run a client to send events into the S4 cluster. Replace <your-twitter-user> and <your-twitter-password> with your Twitter userid and password.
+$TWIT_LISTENER/bin/twitter_feed_listener <your-twitter-user> <your-twitter-password> &
# Check output
cat /tmp/top_n_hashtags
</pre>
+Developing with Eclipse
+-----------------------
+
+The command `gradle eclipse` will create an eclipse project that you can import from the Eclipse IDE.
+
+There is now a [Gradle plugin for the Eclipse IDE](http://static.springsource.org/sts/docs/2.7.0.M1/reference/html/gradle/index.html).
+To install Gradle without installing the full Spring development environment follow the
+[instructions](http://static.springsource.org/sts/docs/2.7.0.M1/reference/html/gradle/installation.html) under the heading
+"Installing from update site". There is also a discussion in the [Gradle mailing list](http://gradle.1045684.n5.nabble.com/ANN-Gradle-Eclipse-Plugin-td4387658.html).
View
@@ -69,7 +69,6 @@ libraries = [
json: 'org.json:json:20090211',
lift_json: 'net.liftweb:lift-json_2.8.1:2.2',
gson: 'com.google.code.gson:gson:1.6',
-junit: 'junit:junit:4.4',
zk: 'org.apache.zookeeper:zookeeper:3.3.1',
log4j: 'log4j:log4j:1.2.15',
flexjson: 'net.sf.flexjson:flexjson:2.1',
@@ -85,6 +84,7 @@ commons_jexl: 'commons-jexl:commons-jexl:1.1',
commons_codec: 'commons-codec:commons-codec:1.4',
commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
spring: 'org.springframework:spring:2.5.6',
+junit: 'junit:junit:4.4',
scala_compiler: 'org.scala-lang:scala-compiler:2.8.1',
scala_library: 'org.scala-lang:scala-library:2.8.1',
jedis: 'redis.clients:jedis:1.5.2',
@@ -104,6 +104,11 @@ subprojects {
//defaultTasks 'build'
group = 'io.s4'
+
+ /* Remove this once this bug is fixed: http://issues.gradle.org/browse/GRADLE-1157 */
+ eclipseClasspath {
+ downloadSources = false; // required for eclipseClasspath to work
+ }
/* Common dependencies applied to all subprojects. */
dependencies {
@@ -220,6 +225,7 @@ task allImage(type: Copy, dependsOn: s4Javadoc) {
with allDistImage
}
+
task binTgz( type: Tar) {
description = "Build binary bundle in GZIP format"
classifier = 'bin'
@@ -239,9 +245,9 @@ task allTgz( type: Tar, dependsOn: s4Javadoc) {
}
/* Generates the gradlew scripts.
-http://www.gradle.org/1.0-milestone-1/docs/userguide/gradle_wrapper.html */
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
task wrapper(type: Wrapper) {
- gradleVersion = '1.0-milestone-1'
+ gradleVersion = '1.0-milestone-3'
}
class Version {
View
@@ -14,42 +14,42 @@ cd s4
gradlew clean allImage
# Create some environment variables.
-export IMAGE_BASE=`pwd`'/build/s4-image'
-export PYTHONPATH=${IMAGE_BASE}/s4-driver/lib/python
-export PERLLIB=${IMAGE_BASE}/s4-driver/lib/perl
+export S4_IMAGE=`pwd`'/build/s4-image'
+export PYTHONPATH=${S4_IMAGE}/s4-driver/lib/python
+export PERLLIB=${S4_IMAGE}/s4-driver/lib/perl
# Change script permissions.
-chmod u+x $IMAGE_BASE/scripts/*
-chmod u+x $IMAGE_BASE/s4-driver/scripts/*
+chmod u+x $S4_IMAGE/scripts/*
+chmod u+x $S4_IMAGE/s4-driver/scripts/*
# Copy speech02 app to apps deployment directory.
-cp -fr $IMAGE_BASE/s4-example-apps/s4-example-speech02 $IMAGE_BASE/s4-apps
+cp -fr $S4_IMAGE/s4-example-apps/s4-example-speech02 $S4_IMAGE/s4-apps
# Start S4 server in standalone mode.
-$IMAGE_BASE/scripts/s4-start.sh -r client-adapter &
+$S4_IMAGE/scripts/start-s4.sh -r client-adapter &
# Start client adapter.
-$IMAGE_BASE/scripts/run-client-adapter.sh -s client-adapter -g s4 \
--x -d $IMAGE_BASE/s4-core/conf/default/client-stub-conf.xml &
+$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 \
+-x -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml &
# Inject events.
-perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSpeech \
-io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
+perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSpeech \
+io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in
-perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSentence \
-io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
+perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSentence \
+io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in
## Injecting Events with a Java Client
# Follow same steps as before to start S4 server and client adapter.
# Inject events.
-$IMAGE_BASE/s4-driver/scripts/inject.sh localhost 2334 RawSpeech \
-io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
+$S4_IMAGE/s4-driver/scripts/inject.sh localhost 2334 RawSpeech \
+io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in
-$IMAGE_BASE/s4-driver/scripts/inject.sh localhost 2334 RawSentence \
-io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
+$S4_IMAGE/s4-driver/scripts/inject.sh localhost 2334 RawSentence \
+io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in
## Receiving Events
@@ -58,7 +58,7 @@ io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
# Start a reader client.
-perl $IMAGE_BASE/s4-driver/scripts/read.pl \
+perl $S4_IMAGE/s4-driver/scripts/read.pl \
'{
readMode => "select",
readInclude => ["SentenceJoined"]
@@ -67,28 +67,28 @@ perl $IMAGE_BASE/s4-driver/scripts/read.pl \
# In a different window, inject messages like in the previous section.
# Remember to initialize the environment variables in the new shell.
-export IMAGE_BASE=`pwd`'/build/s4-image'
-export PYTHONPATH=${IMAGE_BASE}/s4-driver/lib/python
-export PERLLIB=${IMAGE_BASE}/s4-driver/lib/perl
+export S4_IMAGE=`pwd`'/build/s4-image'
+export PYTHONPATH=${S4_IMAGE}/s4-driver/lib/python
+export PERLLIB=${S4_IMAGE}/s4-driver/lib/perl
# Inject events.
-perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSpeech \
-io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
+perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSpeech \
+io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in
-perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSentence \
-io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
+perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSentence \
+io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in
## Request-Response
# Example 1: query the prototype of the joiner (SentenceJoinPE)
# in the speech02 application.
-python $IMAGE_BASE/s4-driver/scripts/request.py '#sentenceJoinPE' \
-'io.s4.message.PrototypeRequest' < $IMAGE_BASE/testinput/proto-query
+python $S4_IMAGE/s4-driver/scripts/request.py '#sentenceJoinPE' \
+'io.s4.message.PrototypeRequest' < $S4_IMAGE/testinput/proto-query
# Example 2: request to a single PE from
- python $IMAGE_BASE/s4-driver/scripts/request.py '#sentenceJoinPE' \
-'io.s4.message.SinglePERequest' < $IMAGE_BASE/testinput/pe-query
+ python $S4_IMAGE/s4-driver/scripts/request.py '#sentenceJoinPE' \
+'io.s4.message.SinglePERequest' < $S4_IMAGE/testinput/pe-query
-------
Binary file not shown.
@@ -1,6 +1,6 @@
-#Mon Mar 07 11:53:07 PST 2011
+#Fri May 27 15:18:16 PDT 2011
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=http\://gradle.artifactoryonline.com/gradle/distributions/gradle-1.0-milestone-1-bin.zip
+distributionUrl=http\://repo.gradle.org/gradle/distributions/gradle-1.0-milestone-3-bin.zip
View
@@ -1,31 +0,0 @@
-S4 Core Classes
-===============
-
-Introduction
-------------
-This is a component of the S4 streaming system. For more information, see [s4.io](http://s4.io)
-
-Requirements
-------------
-
-* Linux
-* Java 1.6
-* Maven
-* S4 Communication Layer
-
-Build Instructions
-------------------
-
-1. First build and install the comm package in your Maven repository.
-
-2. Kryo, Reflectasm, and minlog must be installed to your local Maven repository manually.
- The jars are present in lib/ within this project. To install, run the following commands:
-
- mvn install:install-file -DgroupId=com.esotericsoftware -DartifactId=kryo -Dversion=1.01 -Dpackaging=jar -Dfile=lib/kryo-1.01.jar
- mvn install:install-file -DgroupId=com.esotericsoftware -DartifactId=reflectasm -Dversion=0.8 -Dpackaging=jar -Dfile=lib/reflectasm-0.8.jar
- mvn install:install-file -DgroupId=com.esotericsoftware -DartifactId=minlog -Dversion=1.2 -Dpackaging=jar -Dfile=lib/minlog-1.2.jar
-
-3. Build and install using Maven
-
- mvn assembly:assembly install
-
@@ -15,8 +15,8 @@
*/
package io.s4;
+import io.s4.processor.AbstractPE;
import io.s4.processor.PEContainer;
-import io.s4.processor.ProcessingElement;
import io.s4.util.S4Util;
import io.s4.util.Watcher;
import io.s4.util.clock.Clock;
@@ -201,9 +201,9 @@ public static void main(String args[]) throws Exception {
coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls, coreContext);
ApplicationContext context = coreContext;
- Clock s4Clock = (Clock) context.getBean("clock");
- if (s4Clock instanceof EventClock && seedTime > 0) {
- EventClock s4EventClock = (EventClock)s4Clock;
+ Clock clock = (Clock) context.getBean("clock");
+ if (clock instanceof EventClock && seedTime > 0) {
+ EventClock s4EventClock = (EventClock)clock;
s4EventClock.updateTime(seedTime);
System.out.println("Intializing event clock time with seed time " + s4EventClock.getCurrentTime());
}
@@ -236,26 +236,19 @@ public static void main(String args[]) throws Exception {
context);
// attach any beans that implement ProcessingElement to the PE
// Container
- String[] processingElementBeanNames = context.getBeanNamesForType(ProcessingElement.class);
+ String[] processingElementBeanNames = context.getBeanNamesForType(AbstractPE.class);
for (String processingElementBeanName : processingElementBeanNames) {
- Object bean = context.getBean(processingElementBeanName);
- try {
- Method getS4ClockMethod = bean.getClass().getMethod("getS4Clock");
-
- if (getS4ClockMethod.getReturnType().equals(Clock.class)) {
- if (getS4ClockMethod.invoke(bean) == null) {
- Method setS4ClockMethod = bean.getClass().getMethod("setS4Clock", Clock.class);
- setS4ClockMethod.invoke(bean, coreContext.getBean("clock"));
- }
- }
- }
- catch (NoSuchMethodException mnfe) {
- // acceptable
+ AbstractPE bean = (AbstractPE) context.getBean(processingElementBeanName);
+ bean.setClock(clock);
+
+ // if the application did not specify an id, use the Spring bean name
+ if (bean.getId() == null) {
+ bean.setId(processingElementBeanName);
}
System.out.println("Adding processing element with bean name "
+ processingElementBeanName + ", id "
- + ((ProcessingElement) bean).getId());
- peContainer.addProcessor((ProcessingElement) bean);
+ + ((AbstractPE) bean).getId());
+ peContainer.addProcessor((AbstractPE) bean);
}
}
}
@@ -16,13 +16,29 @@
package io.s4.dispatcher.partitioner;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class RoundRobinPartitioner implements Partitioner {
private int counter = 0;
+ private Set<String> streamNameSet;
+
+ public void setStreamNames(String[] streamNames) {
+ streamNameSet = new HashSet<String>(streamNames.length);
+ for (String eventType : streamNames) {
+ streamNameSet.add(eventType);
+ }
+ }
@Override
- public List<CompoundKeyInfo> partition(String streamName, Object event, int partitionCount) {
+ public List<CompoundKeyInfo> partition(String streamName, Object event,
+ int partitionCount) {
+
+ if (streamName != null && streamNameSet != null
+ && !streamNameSet.contains(streamName)) {
+ return null;
+ }
CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
int partitionId = 0;
@@ -18,7 +18,7 @@
import io.s4.dispatcher.partitioner.CompoundKeyInfo;
import io.s4.dispatcher.partitioner.Hasher;
import io.s4.dispatcher.partitioner.KeyInfo;
-import io.s4.processor.ProcessingElement;
+import io.s4.processor.AbstractPE;
import io.s4.util.MethodInvoker;
import java.util.ArrayList;
@@ -83,7 +83,7 @@ public String toString() {
* @param pe
* @return Response object.
*/
- public Response evaluate(ProcessingElement pe) {
+ public Response evaluate(AbstractPE pe) {
HashMap<String, Object> results = new HashMap<String, Object>();
HashMap<String, String> exceptions = new HashMap<String, String>();
Oops, something went wrong.

0 comments on commit 200ee8f

Please sign in to comment.