Permalink
Browse files

TFJ-652 add test case and example app for RawStreamListener

  • Loading branch information...
1 parent 3f8a314 commit fa30b5fb2eb892e7df00715297c73a1c6ede6cc4 @yusuke committed Dec 1, 2012
@@ -0,0 +1,9 @@
+echo off
+SETLOCAL enabledelayedexpansion
+cd ..
+call setEnv.cmd
+
+echo on
+"%JAVA_HOME%\bin\java" %MEM_ARGS% -classpath "%CLASSPATH%" twitter4j.examples.stream.PrintRawSampleStream %*
+
+ENDLOCAL
@@ -0,0 +1,6 @@
+#!/bin/sh
+cd ..
+. ./setEnv.sh
+RUN_CMD="$JAVA_HOME/bin/java $MEM_ARGS -cp $CLASSPATH twitter4j.examples.stream.PrintRawSampleStream"
+echo $RUN_CMD ${1+"$@"}
+exec $RUN_CMD ${1+"$@"}
@@ -22,7 +22,7 @@
* @author Yusuke Yamamoto - yusuke at mac.com
* @since Twitter4J 2.1.7
*/
-public class PrintLinksStream extends StatusAdapter {
+public class PrintLinksStream {
/**
* Main entry of this application.
*
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2007 Yusuke Yamamoto
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package twitter4j.examples.stream;
+
+import twitter4j.RawStreamListener;
+import twitter4j.TwitterException;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+
+/**
+ * <p>This is a code example of Twitter4J Streaming API - sample method support.<br>
+ * Usage: java twitter4j.examples.PrintRawSampleStream<br>
+ * </p>
+ *
+ * @author Yusuke Yamamoto - yusuke at mac.com
+ */
+public class PrintRawSampleStream {
+ /**
+ * Main entry of this application.
+ *
+ * @param args
+ */
+ public static void main(String[] args) throws TwitterException {
+ TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
+ RawStreamListener listener = new RawStreamListener() {
+ @Override
+ public void onMessage(String rawJSON) {
+ System.out.println(rawJSON);
+ }
+ @Override
+ public void onException(Exception ex) {
+ ex.printStackTrace();
+ }
+ };
+ twitterStream.addListener(listener);
+ twitterStream.sample();
+ }
+}
@@ -22,7 +22,7 @@
* @author Yusuke Yamamoto - yusuke at mac.com
* @since Twitter4J 2.1.7
*/
-public class PrintRetweetStream extends StatusAdapter {
+public class PrintRetweetStream {
/**
* Main entry of this application.
*
@@ -25,7 +25,7 @@
*
* @author Yusuke Yamamoto - yusuke at mac.com
*/
-public final class PrintSampleStream extends StatusAdapter {
+public final class PrintSampleStream {
/**
* Main entry of this application.
*
@@ -30,7 +30,7 @@
* @author Yusuke Yamamoto - yusuke at mac.com
* @since Twitter4J 2.1.8
*/
-class SiteStreamsImpl extends AbstractStreamImplementation {
+class SiteStreamsImpl extends StatusStreamBase {
private final StreamController cs;
@@ -37,7 +37,7 @@
* @author Yusuke Yamamoto - yusuke at mac.com
* @since Twitter4J 2.1.8
*/
-abstract class AbstractStreamImplementation implements StatusStream {
+abstract class StatusStreamBase implements StatusStream {
protected static final Logger logger = Logger.getLogger(StatusStreamImpl.class);
private boolean streamAlive = true;
@@ -50,7 +50,7 @@
/*package*/
- AbstractStreamImplementation(Dispatcher dispatcher, InputStream stream, Configuration conf) throws IOException {
+ StatusStreamBase(Dispatcher dispatcher, InputStream stream, Configuration conf) throws IOException {
this.is = stream;
this.br = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
this.dispatcher = dispatcher;
@@ -59,7 +59,7 @@
}
/*package*/
- AbstractStreamImplementation(Dispatcher dispatcher, HttpResponse response, Configuration conf) throws IOException {
+ StatusStreamBase(Dispatcher dispatcher, HttpResponse response, Configuration conf) throws IOException {
this(dispatcher, response.asStream(), conf);
this.response = response;
}
@@ -202,7 +202,7 @@ public void run() {
}
protected void onMessage(String rawString, RawStreamListener[] listeners) throws TwitterException {
- logger.warn("Unhandled event: onSender");
+ logger.warn("Unhandled event: onMessage");
}
protected void onSender(JSONObject json, StreamListener[] listeners) throws TwitterException {
@@ -25,15 +25,14 @@
import java.io.IOException;
import java.io.InputStream;
-import java.util.List;
/**
* StatusStream implementation. This class is NOT intended to be extended but left non-final for the ease of mock testing.
*
* @author Yusuke Yamamoto - yusuke at mac.com
* @since Twitter4J 2.1.2
*/
-class StatusStreamImpl extends AbstractStreamImplementation {
+class StatusStreamImpl extends StatusStreamBase {
/*package*/
StatusStreamImpl(Dispatcher dispatcher, InputStream stream, Configuration conf) throws IOException {
@@ -64,6 +63,13 @@ protected String parseLine(String line) {
}
@Override
+ protected void onMessage(String rawString, RawStreamListener[] listeners) throws TwitterException {
+ for (RawStreamListener listener : listeners) {
+ listener.onMessage(rawString);
+ }
+ }
+
+ @Override
protected void onStatus(JSONObject json, StreamListener[] listeners) throws TwitterException {
for (StreamListener listener : listeners) {
((StatusListener) listener).onStatus(asStatus(json));
@@ -436,7 +436,7 @@ When a HTTP error (> 200) is returned, back off exponentially.
static int count = 0;
abstract class TwitterStreamConsumer extends Thread {
- private AbstractStreamImplementation stream = null;
+ private StatusStreamBase stream = null;
private final String NAME = "Twitter Stream consumer-" + (++count);
private volatile boolean closed = false;
private final StreamListener[] streamListeners;
@@ -459,7 +459,7 @@ public void run() {
// try establishing connection
logger.info("Establishing connection.");
setStatus("[Establishing connection]");
- stream = (AbstractStreamImplementation) getStream();
+ stream = (StatusStreamBase) getStream();
connected = true;
logger.info("Connection established.");
for (ConnectionLifeCycleListener listener : lifeCycleListeners) {
@@ -23,9 +23,7 @@
import twitter4j.json.DataObjectFactory;
import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
public class StreamAPITest extends TwitterTestBase implements StatusListener, ConnectionLifeCycleListener {
protected TwitterStream twitterStream = null;
@@ -53,6 +51,7 @@ protected void setUp() throws Exception {
protected void tearDown() throws Exception {
super.tearDown();
+ twitterStream.shutdown();
}
public void testToString() throws Exception {
@@ -68,6 +67,28 @@ public void testEquality() throws Exception {
assertEquals(2, map.size());
}
+ List<String> received = new ArrayList<String>();
+ Object lock = new Object();
+ public void testRawStreamListener() throws Exception{
+ twitterStream.addListener(new RawStreamListener() {
+ @Override
+ public void onMessage(String rawString) {
+ received.add(rawString);
+ synchronized (lock) {
+ lock.notify();
+ }
+ }
+
+ @Override
+ public void onException(Exception ex) {
+ }
+ });
+ twitterStream.sample();
+ synchronized (lock) {
+ lock.wait();
+ }
+ assertTrue(received.size() > 0);
+ }
public void testNoListener() throws Exception {
TwitterStream twitterStream;
twitterStream = new TwitterStreamFactory().getInstance();

0 comments on commit fa30b5f

Please sign in to comment.