Permalink
Browse files

Merge remote-tracking branch 'upstream/master'

Conflicts:
	settings.gradle
  • Loading branch information...
2 parents b3e6c14 + 48199f5 commit 531e49d5de44d592be554085aff84c16e80d6fea Matthieu Morel committed Aug 3, 2011
Showing with 48 additions and 1,540 deletions.
  1. +3 −3 README.md
  2. +2 −3 build.gradle
  3. +4 −4 s4-comm/src/main/java/io/s4/comm/core/GenericSender.java
  4. +1 −1 s4-comm/src/main/java/io/s4/comm/core/ListenerProcess.java
  5. +2 −2 s4-comm/src/main/java/io/s4/comm/core/MulticastSender.java
  6. +4 −4 s4-comm/src/main/java/io/s4/comm/core/SenderProcess.java
  7. +1 −1 s4-comm/src/main/java/io/s4/comm/file/StaticTaskManager.java
  8. +3 −3 s4-comm/src/main/java/io/s4/comm/zk/ZkQueue.java
  9. +1 −1 s4-comm/src/main/java/io/s4/comm/zk/ZkTaskManager.java
  10. +2 −2 s4-comm/src/main/java/io/s4/comm/zk/ZkTaskSetup.java
  11. +1 −1 s4-comm/src/main/java/io/s4/comm/zk/ZkUtil.java
  12. +1 −1 s4-core/src/main/java/io/s4/message/PrototypeRequest.java
  13. +1 −1 s4-core/src/main/java/io/s4/message/Request.java
  14. +2 −5 s4-core/src/main/java/io/s4/processor/AbstractPE.java
  15. +18 −2 s4-driver/examples/src/main/resources/scripts/inject.sh
  16. +2 −2 s4-driver/java/src/main/java/io/s4/client/Driver.java
  17. +0 −22 s4-examples/twittertopiccount/build.gradle
  18. +0 −118 ...amples/twittertopiccount/src/main/java/io/s4/example/twittertopiccount/DirectToFilePersister.java
  19. +0 −154 s4-examples/twittertopiccount/src/main/java/io/s4/example/twittertopiccount/Status.java
  20. +0 −178 s4-examples/twittertopiccount/src/main/java/io/s4/example/twittertopiccount/TopNTopicPE.java
  21. +0 −76 ...amples/twittertopiccount/src/main/java/io/s4/example/twittertopiccount/TopicCountAndReportPE.java
  22. +0 −114 s4-examples/twittertopiccount/src/main/java/io/s4/example/twittertopiccount/TopicExtractorPE.java
  23. +0 −67 s4-examples/twittertopiccount/src/main/java/io/s4/example/twittertopiccount/TopicSeen.java
  24. +0 −355 s4-examples/twittertopiccount/src/main/java/io/s4/example/twittertopiccount/TwitterFeedListener.java
  25. +0 −280 s4-examples/twittertopiccount/src/main/java/io/s4/example/twittertopiccount/User.java
  26. +0 −17 s4-examples/twittertopiccount/src/main/resources/adapter-conf.xml
  27. +0 −121 s4-examples/twittertopiccount/src/main/resources/s4-example-twittertopiccount-conf.xml
  28. +0 −2 settings.gradle
View
@@ -63,7 +63,7 @@ Running the Twitter Topic Count Example
git clone https://github.com/s4/s4.git
# Create image
-gradlew allImage
+./gradlew allImage
# set the S4_IMAGE environmental variable
cd build/s4-image/
@@ -88,8 +88,8 @@ $S4_IMAGE/scripts/start-s4.sh -r client-adapter &
# start the client adapter
$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml &
-# run a client to send events into the S4 cluster. Replace <your-twitter-user> and <your-twitter-password> with your Twitter userid and password.
-$TWIT_LISTENER/bin/twitter_feed_listener <your-twitter-user> <your-twitter-password> &
+# run a client to send events into the S4 cluster. Replace *your-twitter-user* and *your-twitter-password* with your Twitter userid and password.
+$TWIT_LISTENER/bin/twitter_feed_listener *your-twitter-user* *your-twitter-password* &
# Check output
cat /tmp/top_n_hashtags
View
@@ -46,8 +46,7 @@ platformProjects = [project(':s4-core'), project(':s4-comm'), project(':s4-drive
allprojects {
- version = new Version(major: 0, minor: 3, releaseType: 'SNAPSHOT')
- //version = new Version(major: 0, minor: 2, releaseType: 'beta', bugfix: 2)
+ version = new Version(major: 0, minor: 3, bugfix: 0)
archivesBaseName = 's4'
@@ -257,6 +256,6 @@ class Version {
String releaseType
String toString() {
- "$major.$minor-$releaseType${bugfix ?: ''}"
+ "$major.$minor.$bugfix${releaseType ? '-'+releaseType : ''}"
}
}
@@ -81,7 +81,7 @@ public GenericSender(String zkAddress, String senderAppName,
* This method will send the data to receivers in a round robin fashion
*
* @param data
- * @return
+ * @return true if data was successfully sent, false otherwise
*/
@SuppressWarnings("unchecked")
public boolean send(Object data) {
@@ -127,7 +127,7 @@ public boolean send(Object data) {
*
* @param partition
* @param data
- * @return
+ * @return true if data was successfully sent, false otherwise
*/
@SuppressWarnings("unchecked")
public boolean sendToPartition(int partition, Object data) {
@@ -164,9 +164,9 @@ public boolean sendToPartition(int partition, Object data) {
/**
* compute partition using hashcode and send to appropriate partition
*
- * @param partition
+ * @param hashcode
* @param data
- * @return
+ * @return true if data was successfully sent, false otherwise
*/
public boolean sendUsingHashCode(int hashcode, Object data) {
int partition = (hashcode & Integer.MAX_VALUE) % listenerTaskCount;
@@ -39,7 +39,7 @@ public ListenerProcess(String zkaddress, String clusterName) {
/**
* This will be a blocking call and will wait until it gets a task
*
- * @return
+ * @return listener configuration
*/
public Object acquireTaskAndCreateListener(Map<String, String> map) {
TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
@@ -47,7 +47,7 @@ public MulticastSender(Object senderConfigData) {
* This method will send the data to receivers in a round robin fashion
*
* @param data
- * @return
+ * @return true if data was successfully sent, false otherwise
*/
public boolean send(Object data) {
try {
@@ -70,7 +70,7 @@ public boolean send(Object data) {
*
* @param partition
* @param data
- * @return
+ * @return true if data was successfully sent, false otherwise
*/
public boolean send(int partition, Object data) {
return true;
@@ -93,7 +93,7 @@ public void createSenderFromConfig(Object senderConfig) {
* This method will send the data to receivers in a round robin fashion
*
* @param data
- * @return
+ * @return true if data was successfully sent, false otherwise
*/
public boolean send(Object data) {
return genericSender.send(data);
@@ -104,7 +104,7 @@ public boolean send(Object data) {
*
* @param partition
* @param data
- * @return
+ * @return true if data was successfully sent, false otherwise
*/
public boolean sendToPartition(int partition, Object data) {
return genericSender.sendToPartition(partition, data);
@@ -113,9 +113,9 @@ public boolean sendToPartition(int partition, Object data) {
/**
* compute partition using hashcode and send to appropriate partition
*
- * @param partition
+ * @param hashcode
* @param data
- * @return true on success, false on failure
+ * @return true if data was successfully sent, false otherwise
*/
public boolean sendUsingHashCode(int hashcode, Object data) {
@@ -44,7 +44,7 @@
* Constructor of TaskManager
*
* @param address
- * @param name
+ * @param clusterName
*/
public StaticTaskManager(String address, String clusterName,
ClusterType clusterType, CommEventCallback callbackHandler) {
@@ -61,8 +61,8 @@ public ZkQueue(String address, String name) {
/**
* Add element to the queue.
*
- * @param i
- * @return
+ * @param obj element to add
+ * @return true if add successful, false otherwise
*/
public boolean produce(Object obj) throws KeeperException,
@@ -78,7 +78,7 @@ public boolean produce(Object obj) throws KeeperException,
/**
* Remove first element from the queue.
*
- * @return
+ * @return first element from the queue
* @throws KeeperException
* @throws InterruptedException
*/
@@ -44,7 +44,7 @@ public ZkTaskManager(String address, String ClusterName, ClusterType clusterType
* Constructor of TaskManager
*
* @param address
- * @param name
+ * @param ClusterName
*/
public ZkTaskManager(String address, String ClusterName, ClusterType clusterType,
CommEventCallback callbackHandler) {
@@ -44,7 +44,7 @@ public ZkTaskSetup(String address, String clusterName, ClusterType clusterType)
* Constructor of ZkTaskSetup
*
* @param address
- * @param name
+ * @param clusterName
*/
public ZkTaskSetup(String address, String clusterName, ClusterType clusterType,
CommEventCallback callbackHandler) {
@@ -62,7 +62,7 @@ public void setUpTasks(Object[] data) {
/**
* Creates task nodes.
*
- * @param numTasks
+ * @param version
* @param data
*/
public void setUpTasks(String version, Object[] data) {
@@ -118,7 +118,7 @@ public static void main(String[] args) throws Exception {
if (method != null) {
ZkUtil zkUtil = new ZkUtil(address);
- Object ret = method.invoke(zkUtil, methodArgs);
+ Object ret = method.invoke(zkUtil, (Object[]) methodArgs);
if (ret != null) {
System.out.println("**********");
System.out.println(ret);
@@ -63,7 +63,7 @@ public String toString() {
/**
* Evaluate Request on a particular PE Prototype.
*
- * @param pe
+ * @param pw
* prototype
* @return Response object.
*/
@@ -70,7 +70,7 @@ public void setStream(String stream) {
* Partition Id from which this request originated. This may be used to
* return a response to the same partition.
*
- * @return
+ * @return partition id
*/
public int getPartition() {
return partition;
@@ -48,9 +48,7 @@
/**
- * This is the base class for processor classes. While it is possible to create
- * a processor class by implementing the {@link ProcessingElement} interface, we
- * suggest you instead extend this class.
+ * This is the base class for processor classes.
* <p>
* <code>AbstractProcessor</code> provides output frequency strategies that
* allow you to configure the rate at which your processor produces output (see
@@ -189,8 +187,7 @@ public AbstractPE() {
}
/**
- * This implements the <code>execute</code> method declared in the
- * {@link ProcessingElement} interface. You should not override this method.
+ * You should not override this method.
* Instead, you need to implement the <code>processEvent</code> method.
**/
public void execute(String streamName, CompoundKeyInfo compoundKeyInfo,
@@ -1,12 +1,28 @@
+#!/bin/bash
+
+osx=false
+case "`uname`" in
+Darwin*) osx=true;;
+esac
+
+if $osx; then
+ READLINK="stat"
+else
+ READLINK="readlink"
+fi
+
+BASE_DIR=`dirname $($READLINK -f $0)`
+INJECT_HOME=`$READLINK -f ${BASE_DIR}/..`
+
CP_SEP=":"
-CLASSPATH=`find ../ -name "*.jar" | awk '{p=$0"'$CP_SEP'"p;} END {print p}'`
+CLASSPATH=`find ${INJECT_HOME} -name "*.jar" | awk '{p=$0"'$CP_SEP'"p;} END {print p}'`
JAVA_LOC=""
if [ "x$JAVA_HOME" != "x" ] ; then
JAVA_LOC=${JAVA_HOME}"/bin/"
fi
CMD="${JAVA_LOC}java -classpath $CLASSPATH io.s4.client.example.Inject $1 $2 $3 $4"
-echo $CMD
+#echo $CMD
$CMD
@@ -63,7 +63,7 @@
* Note: this does not create a connection to the adapter.
*
* @see #init()
- * @see #connect(ReadMode, WriteMode)
+ * @see #connect()
*
* @param hostname
* Name of S4 client adapter host.
@@ -220,7 +220,7 @@ public State getState() {
* is compatible with the protocol used by the adapter. This does not
* actually establish a connection for sending and receiving events.
*
- * @see #connect(ReadMode, WriteMode)
+ * @see #connect()
*
* @return true if and only if the adapter issued a valid ID to this client,
* and the protocol is found to be compatible.
@@ -1,22 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-dependencies {
- compile( libraries.json )
- compile( libraries.commons_codec )
- compile( libraries.commons_httpclient )
- compile project(':s4-core')
-}
-
Oops, something went wrong.

0 comments on commit 531e49d

Please sign in to comment.