diff --git a/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/DeleteTweetEvent.java b/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/StreamDeleteEvent.java similarity index 92% rename from spring-social-twitter/src/main/java/org/springframework/social/twitter/api/DeleteTweetEvent.java rename to spring-social-twitter/src/main/java/org/springframework/social/twitter/api/StreamDeleteEvent.java index 8a625d08..0f8345df 100644 --- a/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/DeleteTweetEvent.java +++ b/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/StreamDeleteEvent.java @@ -19,7 +19,7 @@ * A stream event indicating that a tweet should be removed from the client. * @author Craig Walls */ -public class DeleteTweetEvent { +public class StreamDeleteEvent { private final long tweetId; @@ -39,7 +39,7 @@ public long getUserId() { return userId; } - public DeleteTweetEvent(long tweetId, long userId) { + public StreamDeleteEvent(long tweetId, long userId) { this.tweetId = tweetId; this.userId = userId; } diff --git a/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/StreamListener.java b/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/StreamListener.java index 67152554..b3a54061 100644 --- a/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/StreamListener.java +++ b/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/StreamListener.java @@ -30,10 +30,9 @@ public interface StreamListener { void onTweet(Tweet tweet); /** - * Called when a delete message is availble on the stream - * @param deleteEvent + * Called when a delete message is available on the stream */ - void onDelete(DeleteTweetEvent deleteEvent); + void onDelete(StreamDeleteEvent deleteEvent); /** * Called when the stream is being track limited. diff --git a/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/DeleteTweetEventMixin.java b/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/DeleteTweetEventMixin.java index fcfbfcc4..e7903a6b 100644 --- a/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/DeleteTweetEventMixin.java +++ b/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/DeleteTweetEventMixin.java @@ -24,18 +24,18 @@ import org.codehaus.jackson.map.DeserializationContext; import org.codehaus.jackson.map.JsonDeserializer; import org.codehaus.jackson.map.annotate.JsonDeserialize; -import org.springframework.social.twitter.api.DeleteTweetEvent; +import org.springframework.social.twitter.api.StreamDeleteEvent; import org.springframework.social.twitter.api.impl.DeleteTweetEventMixin.DeleteTweetEventDeserializer; @JsonIgnoreProperties(ignoreUnknown = true) @JsonDeserialize(using = DeleteTweetEventDeserializer.class) class DeleteTweetEventMixin { - static final class DeleteTweetEventDeserializer extends JsonDeserializer { + static final class DeleteTweetEventDeserializer extends JsonDeserializer { @Override - public DeleteTweetEvent deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { + public StreamDeleteEvent deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { JsonNode deleteNode = jp.readValueAsTree().get("delete").get("status"); - return new DeleteTweetEvent(deleteNode.get("id").getValueAsLong(), deleteNode.get("user_id").getValueAsLong()); + return new StreamDeleteEvent(deleteNode.get("id").getValueAsLong(), deleteNode.get("user_id").getValueAsLong()); } } diff --git a/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/StreamDispatcher.java b/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/StreamDispatcher.java index 9f3e9cbd..52240410 100644 --- a/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/StreamDispatcher.java +++ b/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/StreamDispatcher.java @@ -17,9 +17,10 @@ import java.io.IOException; import java.util.List; +import java.util.Queue; import org.codehaus.jackson.map.ObjectMapper; -import org.springframework.social.twitter.api.DeleteTweetEvent; +import org.springframework.social.twitter.api.StreamDeleteEvent; import org.springframework.social.twitter.api.StreamListener; import org.springframework.social.twitter.api.Tweet; @@ -27,34 +28,45 @@ class StreamDispatcher implements Runnable { private final List listeners; - private final String line; - private ObjectMapper objectMapper; + + private volatile boolean active; + + private final Queue queue; - public StreamDispatcher(List listeners, String line) { + public StreamDispatcher(Queue queue, List listeners) { + this.queue = queue; this.listeners = listeners; - this.line = line; objectMapper = new ObjectMapper(); objectMapper.getDeserializationConfig().addMixInAnnotations(Tweet.class, TweetMixin.class); - objectMapper.getDeserializationConfig().addMixInAnnotations(DeleteTweetEvent.class, DeleteTweetEventMixin.class); + objectMapper.getDeserializationConfig().addMixInAnnotations(StreamDeleteEvent.class, DeleteTweetEventMixin.class); + active = true; } public void run() { - try { - if (line.contains("in_reply_to_status_id_str")) { - handleTweet(); - } else if (line.startsWith("{\"limit")) { - handleLimit(); - } else if (line.startsWith("{\"delete")) { - handleDelete(); + while(active || queue.peek() != null) { + String line = queue.poll(); + if(line == null) continue; + try { + if (line.contains("in_reply_to_status_id_str")) { + handleTweet(line); + } else if (line.startsWith("{\"limit")) { + handleLimit(line); + } else if (line.startsWith("{\"delete")) { + handleDelete(line); + } + } catch (IOException e) { + // TODO: Should only happen if Jackson doesn't know how to map the line } - } catch (IOException e) { - // TODO: Should only happen if Jackson doesn't know how to map the line } } + + public void stop() { + active = false; + } - private void handleDelete() throws IOException { - final DeleteTweetEvent deleteEvent = objectMapper.readValue(line, DeleteTweetEvent.class); + private void handleDelete(String line) throws IOException { + final StreamDeleteEvent deleteEvent = objectMapper.readValue(line, StreamDeleteEvent.class); for (final StreamListener listener : listeners) { new Thread(new Runnable() { public void run() { @@ -64,7 +76,7 @@ public void run() { } } - private void handleLimit() throws IOException { + private void handleLimit(String line) throws IOException { final TrackLimitEvent limitEvent = objectMapper.readValue(line, TrackLimitEvent.class); for (final StreamListener listener : listeners) { new Thread(new Runnable() { @@ -75,7 +87,7 @@ public void run() { } } - private void handleTweet() throws IOException { + private void handleTweet(String line) throws IOException { final Tweet tweet = objectMapper.readValue(line, Tweet.class); for (final StreamListener listener : listeners) { new Thread(new Runnable() { diff --git a/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/StreamImpl.java b/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/StreamImpl.java index b6ebaf4a..741c700c 100644 --- a/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/StreamImpl.java +++ b/spring-social-twitter/src/main/java/org/springframework/social/twitter/api/impl/StreamImpl.java @@ -20,6 +20,8 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.springframework.social.twitter.api.StreamListener; import org.springframework.social.twitter.api.StreamingException; @@ -27,17 +29,22 @@ class StreamImpl implements Stream { private volatile boolean open; - private final List listeners; - + private final InputStream inputStream; private final BufferedReader reader; + private final Queue queue; + + private final StreamDispatcher dispatcher; + public StreamImpl(InputStream inputStream, List listeners) { this.inputStream = inputStream; this.reader = new BufferedReader(new InputStreamReader(inputStream)); - this.listeners = listeners; - open = true; + queue = new ConcurrentLinkedQueue(); + dispatcher = new StreamDispatcher(queue, listeners); + new Thread(dispatcher).start(); + open = true; } public void next() { @@ -46,7 +53,7 @@ public void next() { if(line == null) { throw new IOException("Stream closed"); } - new Thread(new StreamDispatcher(listeners, line)).start(); + queue.add(line); } catch (IOException e) { if(open) { close(); @@ -54,11 +61,13 @@ public void next() { } } } - + public void close() { try { + open = false; + dispatcher.stop(); inputStream.close(); } catch(IOException ignore) {} } - + } diff --git a/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/StreamDispatcherTest.java b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/StreamDispatcherTest.java new file mode 100644 index 00000000..7bdccc2d --- /dev/null +++ b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/StreamDispatcherTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2011 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.social.twitter.api.impl; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.junit.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.social.twitter.api.StreamDeleteEvent; +import org.springframework.social.twitter.api.StreamListener; +import org.springframework.social.twitter.api.Tweet; + +public class StreamDispatcherTest { + + @Test + public void activeWithItemsInQueue() throws Exception { + StreamListener mockListener = mock(StreamListener.class); + Queue queue = createQueueWithItems(); + StreamDispatcher dispatcher = new StreamDispatcher(queue, Arrays.asList(mockListener)); + runAndAssert(mockListener, dispatcher, 4, 2, 1); + dispatcher.stop(); + } + + @Test + public void activeWithEmptyQueue() throws Exception { + StreamListener mockListener = mock(StreamListener.class); + Queue queue = new ConcurrentLinkedQueue(); + StreamDispatcher dispatcher = new StreamDispatcher(queue, Arrays.asList(mockListener)); + runAndAssert(mockListener, dispatcher, 0, 0, 0); + dispatcher.stop(); + } + + @Test + public void stoppedWithItemsInQueue() throws Exception { + StreamListener mockListener = mock(StreamListener.class); + Queue queue = createQueueWithItems(); + StreamDispatcher dispatcher = new StreamDispatcher(queue, Arrays.asList(mockListener)); + dispatcher.stop(); + runAndAssert(mockListener, dispatcher, 4, 2, 1); + } + + @Test + public void stoppedWithEmptyQueue() throws Exception { + StreamListener mockListener = mock(StreamListener.class); + Queue queue = new ConcurrentLinkedQueue(); + StreamDispatcher dispatcher = new StreamDispatcher(queue, Arrays.asList(mockListener)); + dispatcher.stop(); + runAndAssert(mockListener, dispatcher, 0, 0, 0); + } + + @Test + public void ignoreUnrecognizedEvent() throws Exception { + StreamListener mockListener = mock(StreamListener.class); + Queue queue = new ConcurrentLinkedQueue(); + queue.add("BOGUS LINE"); + queue.add("{\"unrecognized\":\"event\"}"); + StreamDispatcher dispatcher = new StreamDispatcher(queue, Arrays.asList(mockListener)); + runAndAssert(mockListener, dispatcher, 0, 0, 0); + dispatcher.stop(); + } + + private void runAndAssert(StreamListener mockListener, StreamDispatcher dispatcher, int tweetEvents, int deleteEvents, int limitEvents) throws Exception { + new Thread(dispatcher).start(); + Thread.sleep(100); // pause to give thread opportunity to do its job + verify(mockListener, times(tweetEvents)).onTweet(any(Tweet.class)); + verify(mockListener, times(deleteEvents)).onDelete(any(StreamDeleteEvent.class)); + verify(mockListener, times(limitEvents)).onLimit(369); + } + + private Queue createQueueWithItems() { + InputStream inputStream = null; + Queue queue = new ConcurrentLinkedQueue(); + try { + inputStream = new ClassPathResource("filter-stream-track.json", getClass()).getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + while (reader.ready()) { + queue.add(reader.readLine()); + } + } catch (IOException e) { + try { + inputStream.close(); + } catch (IOException ignore) {} + } + return queue; + } +} diff --git a/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/StreamImplTest.java b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/StreamImplTest.java new file mode 100644 index 00000000..b279821e --- /dev/null +++ b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/StreamImplTest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2011 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.social.twitter.api.impl; + +import static java.util.Arrays.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.io.InputStream; + +import org.junit.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.social.twitter.api.StreamListener; +import org.springframework.social.twitter.api.StreamingException; +import org.springframework.social.twitter.api.Tweet; + + +public class StreamImplTest { + + @Test + public void next() throws Exception { + StreamListener mockListener = mock(StreamListener.class); + InputStream inputStream = new ClassPathResource("filter-stream-track.json", getClass()).getInputStream(); + StreamImpl stream = new StreamImpl(inputStream, asList(mockListener)); + try { + while(true) { + stream.next(); + } + } catch(StreamingException e) {} + Thread.sleep(100); + verify(mockListener, times(4)).onTweet(any(Tweet.class)); + inputStream.close(); + } +} diff --git a/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/StreamingTemplateTest.java b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/StreamingTemplateTest.java index 17d9410d..3c5b9b65 100644 --- a/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/StreamingTemplateTest.java +++ b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/StreamingTemplateTest.java @@ -27,7 +27,7 @@ import org.junit.Ignore; import org.junit.Test; import org.springframework.core.io.ClassPathResource; -import org.springframework.social.twitter.api.DeleteTweetEvent; +import org.springframework.social.twitter.api.StreamDeleteEvent; import org.springframework.social.twitter.api.StreamListener; import org.springframework.social.twitter.api.Tweet; @@ -83,7 +83,7 @@ protected void shutdown() { private abstract static class MockStreamListener implements StreamListener { List tweetsReceived = new ArrayList(); - List deletesReceived = new ArrayList(); + List deletesReceived = new ArrayList(); List limitsReceived = new ArrayList(); private int stopAfter = Integer.MAX_VALUE; @@ -96,7 +96,7 @@ public void onTweet(Tweet tweet) { messageReceived(); } - public void onDelete(DeleteTweetEvent deleteEvent) { + public void onDelete(StreamDeleteEvent deleteEvent) { deletesReceived.add(deleteEvent); messageReceived(); } diff --git a/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/delete-event.json b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/delete-event.json new file mode 100644 index 00000000..910c6793 --- /dev/null +++ b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/delete-event.json @@ -0,0 +1 @@ +{"delete":{"status":{"id":1234,"id_str":"1234","user_id":3,"user_id_str":"3"}}} diff --git a/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/limit-event.json b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/limit-event.json new file mode 100644 index 00000000..e74e4985 --- /dev/null +++ b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/limit-event.json @@ -0,0 +1 @@ +{"limit":{"track":369}} diff --git a/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/single-tweet.json b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/single-tweet.json new file mode 100644 index 00000000..6002e407 --- /dev/null +++ b/spring-social-twitter/src/test/java/org/springframework/social/twitter/api/impl/single-tweet.json @@ -0,0 +1,59 @@ +{ + "in_reply_to_status_id_str":null, + "retweet_count":0, + "text":"", + "coordinates":null, + "in_reply_to_screen_name":null, + "in_reply_to_user_id_str":null, + "created_at":"Tue May 24 19:56:39 +0000 2011", + "contributors":null, + "retweeted":false, + "truncated":false, + "source":"\u003Ca href=\"http:\/\/twitter.com\/download\/android\" rel=\"nofollow\"\u003ETwitter for Android\u003C\/a\u003E", + "id_str":"73115210814193664", + "in_reply_to_user_id":null, + "in_reply_to_status_id":null, + "place":null, + "user": + { + "default_profile":false, + "statuses_count":384, + "time_zone":"Central Time (US & Canada)", + "profile_use_background_image":true, + "notifications":null, + "created_at":"Sat Aug 21 22:34:05 +0000 2010", + "profile_background_color":"022330", + "listed_count":0, + "profile_background_image_url":"http:\/\/a0.twimg.com\/images\/themes\/theme15\/bg.png", + "followers_count":12, + "description":"Just your average test user", + "is_translator":false, + "id_str":"181248285", + "friends_count":18, + "profile_text_color":"333333", + "profile_sidebar_fill_color":"C0DFEC", + "default_profile_image":false, + "lang":"en", + "profile_background_tile":false, + "protected":false, + "url":null, + "profile_image_url":null, + "show_all_inline_media":true, + "geo_enabled":true, + "name":"Just A. Test", + "contributors_enabled":false, + "verified":false, + "favourites_count":3, + "profile_link_color":"0084B4", + "screen_name":"justatest", + "id":181248285, + "follow_request_sent":null, + "following":null, + "utc_offset":-21600, + "profile_sidebar_border_color":"a8c7f7", + "location":"Lincoln Nebraska " + }, + "id":73115210814193664, + "geo":null, + "favorited":false +}