Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
Conflicts:
	.gitignore
	build.gradle
	s4-core/src/main/java/io/s4/processor/AbstractPE.java
	s4-core/src/main/java/io/s4/processor/PEContainer.java
	s4-core/src/main/java/io/s4/processor/PrototypeWrapper.java
	s4-core/src/main/resources/s4-core/conf/default/log4j.xml
  • Loading branch information
Matthieu Morel committed Jul 15, 2011
2 parents 0e61639 + 7aaba95 commit 200ee8f
Show file tree
Hide file tree
Showing 58 changed files with 1,204 additions and 226 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Expand Up @@ -6,6 +6,7 @@
# Build directory
target/
build/
bin/

# SVN
.svn
Expand All @@ -21,3 +22,10 @@ build/

# class files
*.class
.DS_Store

# create an empty .gitignore file when you want to track an empty
# directory but want to ignore its content. Example: to keep the
# "logs" dir do: "touch logs/.gitignore"
#
!.gitignore
40 changes: 30 additions & 10 deletions README.md
Expand Up @@ -65,23 +65,43 @@ git clone https://github.com/s4/s4.git
# Create image
gradlew allImage

# Change permissions
chmod u+x ./build/s4-image/scripts/*
# set the S4_IMAGE environmental variable
cd build/s4-image/
export S4_IMAGE=`pwd`

# Copy S4 application to deployment dir (s4-apps)
cp -rp build/s4-image/s4-example-apps/s4-example-twittertopiccount build/s4-image/s4-apps/
# get the sample application
git clone git://github.com/s4/twittertopiccount.git

# Enter your twitter user/pass in config file
$EDITOR build/s4-image/s4-apps/s4-example-twittertopiccount/adapter-conf.xml
# build the sample application
./gradlew install

# Start server with s4-example-twittertopiccount app
./build/s4-image/scripts/s4-start.sh &
# deploy the sample application into the S4 image (relies in the S4_IMAGE environmental variable)
./gradlew deploy

# Start adapter
./build/s4-image/scripts/run-adapter.sh -x -u build/s4-image/s4-apps/s4-example-twittertopiccount/lib/s4-example-twittertopiccount-0.3-SNAPSHOT.jar -d build/s4-image/s4-apps/s4-example-twittertopiccount/adapter-conf.xml &
# set the TWIT_LISTENER environmental variable
cd build/install/twitter_feed_listener
export TWIT_LISTENER=`pwd`

# Start server with twittertopiccount app
$S4_IMAGE/scripts/start-s4.sh -r client-adapter &

# start the client adapter
$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml &

# run a client to send events into the S4 cluster. Replace <your-twitter-user> and <your-twitter-password> with your Twitter userid and password.
$TWIT_LISTENER/bin/twitter_feed_listener <your-twitter-user> <your-twitter-password> &

# Check output
cat /tmp/top_n_hashtags
</pre>

Developing with Eclipse
-----------------------

The command `gradle eclipse` will create an eclipse project that you can import from the Eclipse IDE.

There is now a [Gradle plugin for the Eclipse IDE](http://static.springsource.org/sts/docs/2.7.0.M1/reference/html/gradle/index.html).
To install Gradle without installing the full Spring development environment follow the
[instructions](http://static.springsource.org/sts/docs/2.7.0.M1/reference/html/gradle/installation.html) under the heading
"Installing from update site". There is also a discussion in the [Gradle mailing list](http://gradle.1045684.n5.nabble.com/ANN-Gradle-Eclipse-Plugin-td4387658.html).

12 changes: 9 additions & 3 deletions build.gradle
Expand Up @@ -69,7 +69,6 @@ libraries = [
json: 'org.json:json:20090211',
lift_json: 'net.liftweb:lift-json_2.8.1:2.2',
gson: 'com.google.code.gson:gson:1.6',
junit: 'junit:junit:4.4',
zk: 'org.apache.zookeeper:zookeeper:3.3.1',
log4j: 'log4j:log4j:1.2.15',
flexjson: 'net.sf.flexjson:flexjson:2.1',
Expand All @@ -85,6 +84,7 @@ commons_jexl: 'commons-jexl:commons-jexl:1.1',
commons_codec: 'commons-codec:commons-codec:1.4',
commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
spring: 'org.springframework:spring:2.5.6',
junit: 'junit:junit:4.4',
scala_compiler: 'org.scala-lang:scala-compiler:2.8.1',
scala_library: 'org.scala-lang:scala-library:2.8.1',
jedis: 'redis.clients:jedis:1.5.2',
Expand All @@ -104,6 +104,11 @@ subprojects {
//defaultTasks 'build'

group = 'io.s4'

/* Remove this once this bug is fixed: http://issues.gradle.org/browse/GRADLE-1157 */
eclipseClasspath {
downloadSources = false; // required for eclipseClasspath to work
}

/* Common dependencies applied to all subprojects. */
dependencies {
Expand Down Expand Up @@ -220,6 +225,7 @@ task allImage(type: Copy, dependsOn: s4Javadoc) {
with allDistImage
}


task binTgz( type: Tar) {
description = "Build binary bundle in GZIP format"
classifier = 'bin'
Expand All @@ -239,9 +245,9 @@ task allTgz( type: Tar, dependsOn: s4Javadoc) {
}

/* Generates the gradlew scripts.
http://www.gradle.org/1.0-milestone-1/docs/userguide/gradle_wrapper.html */
http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
task wrapper(type: Wrapper) {
gradleVersion = '1.0-milestone-1'
gradleVersion = '1.0-milestone-3'
}

class Version {
Expand Down
58 changes: 29 additions & 29 deletions dev-notes.txt
Expand Up @@ -14,42 +14,42 @@ cd s4
gradlew clean allImage

# Create some environment variables.
export IMAGE_BASE=`pwd`'/build/s4-image'
export PYTHONPATH=${IMAGE_BASE}/s4-driver/lib/python
export PERLLIB=${IMAGE_BASE}/s4-driver/lib/perl
export S4_IMAGE=`pwd`'/build/s4-image'
export PYTHONPATH=${S4_IMAGE}/s4-driver/lib/python
export PERLLIB=${S4_IMAGE}/s4-driver/lib/perl

# Change script permissions.
chmod u+x $IMAGE_BASE/scripts/*
chmod u+x $IMAGE_BASE/s4-driver/scripts/*
chmod u+x $S4_IMAGE/scripts/*
chmod u+x $S4_IMAGE/s4-driver/scripts/*

# Copy speech02 app to apps deployment directory.
cp -fr $IMAGE_BASE/s4-example-apps/s4-example-speech02 $IMAGE_BASE/s4-apps
cp -fr $S4_IMAGE/s4-example-apps/s4-example-speech02 $S4_IMAGE/s4-apps

# Start S4 server in standalone mode.
$IMAGE_BASE/scripts/s4-start.sh -r client-adapter &
$S4_IMAGE/scripts/start-s4.sh -r client-adapter &

# Start client adapter.
$IMAGE_BASE/scripts/run-client-adapter.sh -s client-adapter -g s4 \
-x -d $IMAGE_BASE/s4-core/conf/default/client-stub-conf.xml &
$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 \
-x -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml &

# Inject events.
perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSpeech \
io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSpeech \
io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in

perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSentence \
io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSentence \
io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in


## Injecting Events with a Java Client

# Follow same steps as before to start S4 server and client adapter.

# Inject events.
$IMAGE_BASE/s4-driver/scripts/inject.sh localhost 2334 RawSpeech \
io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
$S4_IMAGE/s4-driver/scripts/inject.sh localhost 2334 RawSpeech \
io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in

$IMAGE_BASE/s4-driver/scripts/inject.sh localhost 2334 RawSentence \
io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
$S4_IMAGE/s4-driver/scripts/inject.sh localhost 2334 RawSentence \
io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in


## Receiving Events
Expand All @@ -58,7 +58,7 @@ io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in

# Start a reader client.

perl $IMAGE_BASE/s4-driver/scripts/read.pl \
perl $S4_IMAGE/s4-driver/scripts/read.pl \
'{
readMode => "select",
readInclude => ["SentenceJoined"]
Expand All @@ -67,28 +67,28 @@ perl $IMAGE_BASE/s4-driver/scripts/read.pl \
# In a different window, inject messages like in the previous section.

# Remember to initialize the environment variables in the new shell.
export IMAGE_BASE=`pwd`'/build/s4-image'
export PYTHONPATH=${IMAGE_BASE}/s4-driver/lib/python
export PERLLIB=${IMAGE_BASE}/s4-driver/lib/perl
export S4_IMAGE=`pwd`'/build/s4-image'
export PYTHONPATH=${S4_IMAGE}/s4-driver/lib/python
export PERLLIB=${S4_IMAGE}/s4-driver/lib/perl

# Inject events.
perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSpeech \
io.s4.example.speech01.Speech < $IMAGE_BASE/testinput/speech.in
perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSpeech \
io.s4.example.speech01.Speech < $S4_IMAGE/testinput/speech.in

perl $IMAGE_BASE/s4-driver/scripts/inject.pl RawSentence \
io.s4.example.speech01.Sentence < $IMAGE_BASE/testinput/sentence.in
perl $S4_IMAGE/s4-driver/scripts/inject.pl RawSentence \
io.s4.example.speech01.Sentence < $S4_IMAGE/testinput/sentence.in


## Request-Response

# Example 1: query the prototype of the joiner (SentenceJoinPE)
# in the speech02 application.
python $IMAGE_BASE/s4-driver/scripts/request.py '#sentenceJoinPE' \
'io.s4.message.PrototypeRequest' < $IMAGE_BASE/testinput/proto-query
python $S4_IMAGE/s4-driver/scripts/request.py '#sentenceJoinPE' \
'io.s4.message.PrototypeRequest' < $S4_IMAGE/testinput/proto-query

# Example 2: request to a single PE from
python $IMAGE_BASE/s4-driver/scripts/request.py '#sentenceJoinPE' \
'io.s4.message.SinglePERequest' < $IMAGE_BASE/testinput/pe-query
python $S4_IMAGE/s4-driver/scripts/request.py '#sentenceJoinPE' \
'io.s4.message.SinglePERequest' < $S4_IMAGE/testinput/pe-query

-------

Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
#Mon Mar 07 11:53:07 PST 2011
#Fri May 27 15:18:16 PDT 2011
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=http\://gradle.artifactoryonline.com/gradle/distributions/gradle-1.0-milestone-1-bin.zip
distributionUrl=http\://repo.gradle.org/gradle/distributions/gradle-1.0-milestone-3-bin.zip
31 changes: 0 additions & 31 deletions s4-core/README.md

This file was deleted.

33 changes: 13 additions & 20 deletions s4-core/src/main/java/io/s4/MainApp.java
Expand Up @@ -15,8 +15,8 @@
*/
package io.s4;

import io.s4.processor.AbstractPE;
import io.s4.processor.PEContainer;
import io.s4.processor.ProcessingElement;
import io.s4.util.S4Util;
import io.s4.util.Watcher;
import io.s4.util.clock.Clock;
Expand Down Expand Up @@ -201,9 +201,9 @@ public static void main(String args[]) throws Exception {
coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls, coreContext);
ApplicationContext context = coreContext;

Clock s4Clock = (Clock) context.getBean("clock");
if (s4Clock instanceof EventClock && seedTime > 0) {
EventClock s4EventClock = (EventClock)s4Clock;
Clock clock = (Clock) context.getBean("clock");
if (clock instanceof EventClock && seedTime > 0) {
EventClock s4EventClock = (EventClock)clock;
s4EventClock.updateTime(seedTime);
System.out.println("Intializing event clock time with seed time " + s4EventClock.getCurrentTime());
}
Expand Down Expand Up @@ -236,26 +236,19 @@ public static void main(String args[]) throws Exception {
context);
// attach any beans that implement ProcessingElement to the PE
// Container
String[] processingElementBeanNames = context.getBeanNamesForType(ProcessingElement.class);
String[] processingElementBeanNames = context.getBeanNamesForType(AbstractPE.class);
for (String processingElementBeanName : processingElementBeanNames) {
Object bean = context.getBean(processingElementBeanName);
try {
Method getS4ClockMethod = bean.getClass().getMethod("getS4Clock");

if (getS4ClockMethod.getReturnType().equals(Clock.class)) {
if (getS4ClockMethod.invoke(bean) == null) {
Method setS4ClockMethod = bean.getClass().getMethod("setS4Clock", Clock.class);
setS4ClockMethod.invoke(bean, coreContext.getBean("clock"));
}
}
}
catch (NoSuchMethodException mnfe) {
// acceptable
AbstractPE bean = (AbstractPE) context.getBean(processingElementBeanName);
bean.setClock(clock);

// if the application did not specify an id, use the Spring bean name
if (bean.getId() == null) {
bean.setId(processingElementBeanName);
}
System.out.println("Adding processing element with bean name "
+ processingElementBeanName + ", id "
+ ((ProcessingElement) bean).getId());
peContainer.addProcessor((ProcessingElement) bean);
+ ((AbstractPE) bean).getId());
peContainer.addProcessor((AbstractPE) bean);
}
}
}
Expand Down
Expand Up @@ -16,13 +16,29 @@
package io.s4.dispatcher.partitioner;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class RoundRobinPartitioner implements Partitioner {
private int counter = 0;
private Set<String> streamNameSet;

public void setStreamNames(String[] streamNames) {
streamNameSet = new HashSet<String>(streamNames.length);
for (String eventType : streamNames) {
streamNameSet.add(eventType);
}
}

@Override
public List<CompoundKeyInfo> partition(String streamName, Object event, int partitionCount) {
public List<CompoundKeyInfo> partition(String streamName, Object event,
int partitionCount) {

if (streamName != null && streamNameSet != null
&& !streamNameSet.contains(streamName)) {
return null;
}

CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
int partitionId = 0;
Expand Down
4 changes: 2 additions & 2 deletions s4-core/src/main/java/io/s4/message/SinglePERequest.java
Expand Up @@ -18,7 +18,7 @@
import io.s4.dispatcher.partitioner.CompoundKeyInfo;
import io.s4.dispatcher.partitioner.Hasher;
import io.s4.dispatcher.partitioner.KeyInfo;
import io.s4.processor.ProcessingElement;
import io.s4.processor.AbstractPE;
import io.s4.util.MethodInvoker;

import java.util.ArrayList;
Expand Down Expand Up @@ -83,7 +83,7 @@ public List<String> getQuery() {
* @param pe
* @return Response object.
*/
public Response evaluate(ProcessingElement pe) {
public Response evaluate(AbstractPE pe) {

HashMap<String, Object> results = new HashMap<String, Object>();
HashMap<String, String> exceptions = new HashMap<String, String>();
Expand Down

0 comments on commit 200ee8f

Please sign in to comment.