Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

Commit

Permalink
Streaming API
Browse files Browse the repository at this point in the history
  • Loading branch information
habuma committed May 31, 2011
1 parent 40c046e commit ddbf6b2
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 38 deletions.
Expand Up @@ -19,7 +19,7 @@
* A stream event indicating that a tweet should be removed from the client.
* @author Craig Walls
*/
public class DeleteTweetEvent {
public class StreamDeleteEvent {

private final long tweetId;

Expand All @@ -39,7 +39,7 @@ public long getUserId() {
return userId;
}

public DeleteTweetEvent(long tweetId, long userId) {
public StreamDeleteEvent(long tweetId, long userId) {
this.tweetId = tweetId;
this.userId = userId;
}
Expand Down
Expand Up @@ -30,10 +30,9 @@ public interface StreamListener {
void onTweet(Tweet tweet);

/**
* Called when a delete message is availble on the stream
* @param deleteEvent
* Called when a delete message is available on the stream
*/
void onDelete(DeleteTweetEvent deleteEvent);
void onDelete(StreamDeleteEvent deleteEvent);

/**
* Called when the stream is being track limited.
Expand Down
Expand Up @@ -24,18 +24,18 @@
import org.codehaus.jackson.map.DeserializationContext;
import org.codehaus.jackson.map.JsonDeserializer;
import org.codehaus.jackson.map.annotate.JsonDeserialize;
import org.springframework.social.twitter.api.DeleteTweetEvent;
import org.springframework.social.twitter.api.StreamDeleteEvent;
import org.springframework.social.twitter.api.impl.DeleteTweetEventMixin.DeleteTweetEventDeserializer;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonDeserialize(using = DeleteTweetEventDeserializer.class)
class DeleteTweetEventMixin {

static final class DeleteTweetEventDeserializer extends JsonDeserializer<DeleteTweetEvent> {
static final class DeleteTweetEventDeserializer extends JsonDeserializer<StreamDeleteEvent> {
@Override
public DeleteTweetEvent deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
public StreamDeleteEvent deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
JsonNode deleteNode = jp.readValueAsTree().get("delete").get("status");
return new DeleteTweetEvent(deleteNode.get("id").getValueAsLong(), deleteNode.get("user_id").getValueAsLong());
return new StreamDeleteEvent(deleteNode.get("id").getValueAsLong(), deleteNode.get("user_id").getValueAsLong());
}
}

Expand Down
Expand Up @@ -17,44 +17,56 @@

import java.io.IOException;
import java.util.List;
import java.util.Queue;

import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.social.twitter.api.DeleteTweetEvent;
import org.springframework.social.twitter.api.StreamDeleteEvent;
import org.springframework.social.twitter.api.StreamListener;
import org.springframework.social.twitter.api.Tweet;

class StreamDispatcher implements Runnable {

private final List<StreamListener> listeners;

private final String line;

private ObjectMapper objectMapper;

private volatile boolean active;

private final Queue<String> queue;

public StreamDispatcher(List<StreamListener> listeners, String line) {
public StreamDispatcher(Queue<String> queue, List<StreamListener> listeners) {
this.queue = queue;
this.listeners = listeners;
this.line = line;
objectMapper = new ObjectMapper();
objectMapper.getDeserializationConfig().addMixInAnnotations(Tweet.class, TweetMixin.class);
objectMapper.getDeserializationConfig().addMixInAnnotations(DeleteTweetEvent.class, DeleteTweetEventMixin.class);
objectMapper.getDeserializationConfig().addMixInAnnotations(StreamDeleteEvent.class, DeleteTweetEventMixin.class);
active = true;
}

public void run() {
try {
if (line.contains("in_reply_to_status_id_str")) {
handleTweet();
} else if (line.startsWith("{\"limit")) {
handleLimit();
} else if (line.startsWith("{\"delete")) {
handleDelete();
while(active || queue.peek() != null) {
String line = queue.poll();
if(line == null) continue;
try {
if (line.contains("in_reply_to_status_id_str")) {
handleTweet(line);
} else if (line.startsWith("{\"limit")) {
handleLimit(line);
} else if (line.startsWith("{\"delete")) {
handleDelete(line);
}
} catch (IOException e) {
// TODO: Should only happen if Jackson doesn't know how to map the line
}
} catch (IOException e) {
// TODO: Should only happen if Jackson doesn't know how to map the line
}
}

public void stop() {
active = false;
}

private void handleDelete() throws IOException {
final DeleteTweetEvent deleteEvent = objectMapper.readValue(line, DeleteTweetEvent.class);
private void handleDelete(String line) throws IOException {
final StreamDeleteEvent deleteEvent = objectMapper.readValue(line, StreamDeleteEvent.class);
for (final StreamListener listener : listeners) {
new Thread(new Runnable() {
public void run() {
Expand All @@ -64,7 +76,7 @@ public void run() {
}
}

private void handleLimit() throws IOException {
private void handleLimit(String line) throws IOException {
final TrackLimitEvent limitEvent = objectMapper.readValue(line, TrackLimitEvent.class);
for (final StreamListener listener : listeners) {
new Thread(new Runnable() {
Expand All @@ -75,7 +87,7 @@ public void run() {
}
}

private void handleTweet() throws IOException {
private void handleTweet(String line) throws IOException {
final Tweet tweet = objectMapper.readValue(line, Tweet.class);
for (final StreamListener listener : listeners) {
new Thread(new Runnable() {
Expand Down
Expand Up @@ -20,24 +20,31 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.springframework.social.twitter.api.StreamListener;
import org.springframework.social.twitter.api.StreamingException;

class StreamImpl implements Stream {

private volatile boolean open;
private final List<StreamListener> listeners;


private final InputStream inputStream;

private final BufferedReader reader;

private final Queue<String> queue;

private final StreamDispatcher dispatcher;

public StreamImpl(InputStream inputStream, List<StreamListener> listeners) {
this.inputStream = inputStream;
this.reader = new BufferedReader(new InputStreamReader(inputStream));
this.listeners = listeners;
open = true;
queue = new ConcurrentLinkedQueue<String>();
dispatcher = new StreamDispatcher(queue, listeners);
new Thread(dispatcher).start();
open = true;
}

public void next() {
Expand All @@ -46,19 +53,21 @@ public void next() {
if(line == null) {
throw new IOException("Stream closed");
}
new Thread(new StreamDispatcher(listeners, line)).start();
queue.add(line);
} catch (IOException e) {
if(open) {
close();
throw new StreamingException("The Stream is closed", e);
}
}
}

public void close() {
try {
open = false;
dispatcher.stop();
inputStream.close();
} catch(IOException ignore) {}
}

}
@@ -0,0 +1,108 @@
/*
* Copyright 2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.social.twitter.api.impl;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.junit.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.social.twitter.api.StreamDeleteEvent;
import org.springframework.social.twitter.api.StreamListener;
import org.springframework.social.twitter.api.Tweet;

public class StreamDispatcherTest {

@Test
public void activeWithItemsInQueue() throws Exception {
StreamListener mockListener = mock(StreamListener.class);
Queue<String> queue = createQueueWithItems();
StreamDispatcher dispatcher = new StreamDispatcher(queue, Arrays.asList(mockListener));
runAndAssert(mockListener, dispatcher, 4, 2, 1);
dispatcher.stop();
}

@Test
public void activeWithEmptyQueue() throws Exception {
StreamListener mockListener = mock(StreamListener.class);
Queue<String> queue = new ConcurrentLinkedQueue<String>();
StreamDispatcher dispatcher = new StreamDispatcher(queue, Arrays.asList(mockListener));
runAndAssert(mockListener, dispatcher, 0, 0, 0);
dispatcher.stop();
}

@Test
public void stoppedWithItemsInQueue() throws Exception {
StreamListener mockListener = mock(StreamListener.class);
Queue<String> queue = createQueueWithItems();
StreamDispatcher dispatcher = new StreamDispatcher(queue, Arrays.asList(mockListener));
dispatcher.stop();
runAndAssert(mockListener, dispatcher, 4, 2, 1);
}

@Test
public void stoppedWithEmptyQueue() throws Exception {
StreamListener mockListener = mock(StreamListener.class);
Queue<String> queue = new ConcurrentLinkedQueue<String>();
StreamDispatcher dispatcher = new StreamDispatcher(queue, Arrays.asList(mockListener));
dispatcher.stop();
runAndAssert(mockListener, dispatcher, 0, 0, 0);
}

@Test
public void ignoreUnrecognizedEvent() throws Exception {
StreamListener mockListener = mock(StreamListener.class);
Queue<String> queue = new ConcurrentLinkedQueue<String>();
queue.add("BOGUS LINE");
queue.add("{\"unrecognized\":\"event\"}");
StreamDispatcher dispatcher = new StreamDispatcher(queue, Arrays.asList(mockListener));
runAndAssert(mockListener, dispatcher, 0, 0, 0);
dispatcher.stop();
}

private void runAndAssert(StreamListener mockListener, StreamDispatcher dispatcher, int tweetEvents, int deleteEvents, int limitEvents) throws Exception {
new Thread(dispatcher).start();
Thread.sleep(100); // pause to give thread opportunity to do its job
verify(mockListener, times(tweetEvents)).onTweet(any(Tweet.class));
verify(mockListener, times(deleteEvents)).onDelete(any(StreamDeleteEvent.class));
verify(mockListener, times(limitEvents)).onLimit(369);
}

private Queue<String> createQueueWithItems() {
InputStream inputStream = null;
Queue<String> queue = new ConcurrentLinkedQueue<String>();
try {
inputStream = new ClassPathResource("filter-stream-track.json", getClass()).getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
while (reader.ready()) {
queue.add(reader.readLine());
}
} catch (IOException e) {
try {
inputStream.close();
} catch (IOException ignore) {}
}
return queue;
}
}
@@ -0,0 +1,47 @@
/*
* Copyright 2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.social.twitter.api.impl;

import static java.util.Arrays.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.io.InputStream;

import org.junit.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.social.twitter.api.StreamListener;
import org.springframework.social.twitter.api.StreamingException;
import org.springframework.social.twitter.api.Tweet;


public class StreamImplTest {

@Test
public void next() throws Exception {
StreamListener mockListener = mock(StreamListener.class);
InputStream inputStream = new ClassPathResource("filter-stream-track.json", getClass()).getInputStream();
StreamImpl stream = new StreamImpl(inputStream, asList(mockListener));
try {
while(true) {
stream.next();
}
} catch(StreamingException e) {}
Thread.sleep(100);
verify(mockListener, times(4)).onTweet(any(Tweet.class));
inputStream.close();
}
}
Expand Up @@ -27,7 +27,7 @@
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.social.twitter.api.DeleteTweetEvent;
import org.springframework.social.twitter.api.StreamDeleteEvent;
import org.springframework.social.twitter.api.StreamListener;
import org.springframework.social.twitter.api.Tweet;

Expand Down Expand Up @@ -83,7 +83,7 @@ protected void shutdown() {
private abstract static class MockStreamListener implements StreamListener {

List<Tweet> tweetsReceived = new ArrayList<Tweet>();
List<DeleteTweetEvent> deletesReceived = new ArrayList<DeleteTweetEvent>();
List<StreamDeleteEvent> deletesReceived = new ArrayList<StreamDeleteEvent>();
List<Integer> limitsReceived = new ArrayList<Integer>();
private int stopAfter = Integer.MAX_VALUE;

Expand All @@ -96,7 +96,7 @@ public void onTweet(Tweet tweet) {
messageReceived();
}

public void onDelete(DeleteTweetEvent deleteEvent) {
public void onDelete(StreamDeleteEvent deleteEvent) {
deletesReceived.add(deleteEvent);
messageReceived();
}
Expand Down
@@ -0,0 +1 @@
{"delete":{"status":{"id":1234,"id_str":"1234","user_id":3,"user_id_str":"3"}}}
@@ -0,0 +1 @@
{"limit":{"track":369}}

0 comments on commit ddbf6b2

Please sign in to comment.