2626import java .io .BufferedReader ;
2727import java .io .InputStream ;
2828import java .io .InputStreamReader ;
29- import java .lang .reflect .Type ;
3029import java .util .ArrayList ;
3130import java .util .LinkedList ;
3231import java .util .List ;
3332import java .util .Queue ;
33+ import java .util .concurrent .atomic .AtomicBoolean ;
34+ import java .util .concurrent .atomic .AtomicLong ;
3435
3536import com .google .gson .reflect .TypeToken ;
3637
3738import com .twitter .clientlib .model .StreamingTweetResponse ;
3839
3940public class TweetsStreamListenersExecutor {
41+ private final static int TIMEOUT_MILLIS = 60000 ;
42+ private final static int SLEEP_MILLIS = 100 ;
4043 private final ITweetsQueue tweetsQueue ;
4144 private final List <TweetsStreamListener > listeners = new ArrayList <>();
4245 private final InputStream stream ;
43- private volatile boolean isRunning = true ;
46+ private final AtomicBoolean isRunning = new AtomicBoolean (true );
47+ private final AtomicLong tweetStreamedTime = new AtomicLong (0 );
48+ private Exception caughtException ;
4449
4550 public TweetsStreamListenersExecutor (InputStream stream ) {
4651 this .tweetsQueue = new LinkedListTweetsQueue ();
@@ -67,15 +72,34 @@ public void executeListeners() {
6772
6873 TweetsQueuer tweetsQueuer = new TweetsQueuer ();
6974 TweetsListenersExecutor tweetsListenersExecutor = new TweetsListenersExecutor ();
75+ StreamTimeoutChecker timeoutChecker = new StreamTimeoutChecker ();
7076 tweetsListenersExecutor .start ();
7177 tweetsQueuer .start ();
78+ timeoutChecker .start ();
7279 }
7380
74- public synchronized void shutdown () {
75- isRunning = false ;
81+ public void shutdown (Exception e ) {
82+ caughtException = e ;
83+ shutdown ();
84+ }
85+
86+ public void shutdown () {
87+ isRunning .set (false );
7688 System .out .println ("TweetsStreamListenersExecutor is shutting down." );
7789 }
7890
91+ public Exception getError () {
92+ return caughtException ;
93+ }
94+
95+ private void resetTweetStreamedTime () {
96+ tweetStreamedTime .set (System .currentTimeMillis ());
97+ }
98+
99+ private boolean isTweetStreamedError () {
100+ return System .currentTimeMillis () - tweetStreamedTime .get () > TIMEOUT_MILLIS ;
101+ }
102+
79103 private class TweetsListenersExecutor extends Thread {
80104 @ Override
81105 public void run () {
@@ -84,19 +108,26 @@ public void run() {
84108
85109 private void processTweets () {
86110 StreamingTweetResponse streamingTweet ;
111+ String tweetString ;
87112 try {
88- while (isRunning ) {
89- streamingTweet = tweetsQueue .poll ();
90- if (streamingTweet == null ) {
91- Thread .sleep (100 );
113+ while (isRunning . get () ) {
114+ tweetString = tweetsQueue .poll ();
115+ if (tweetString == null ) {
116+ Thread .sleep (SLEEP_MILLIS );
92117 continue ;
93118 }
94- for (TweetsStreamListener listener : listeners ) {
95- listener .actionOnTweetsStream (streamingTweet );
119+ try {
120+ streamingTweet = StreamingTweetResponse .fromJson (tweetString );
121+ for (TweetsStreamListener listener : listeners ) {
122+ listener .actionOnTweetsStream (streamingTweet );
123+ }
124+ } catch (Exception interExcep ) {
125+ interExcep .printStackTrace ();
96126 }
97127 }
98128 } catch (Exception e ) {
99129 e .printStackTrace ();
130+ shutdown (e );
100131 }
101132 }
102133 }
@@ -110,41 +141,59 @@ public void run() {
110141 public void queueTweets () {
111142 String line = null ;
112143 try (BufferedReader reader = new BufferedReader (new InputStreamReader (stream ))) {
113- while (isRunning ) {
144+ while (isRunning . get () ) {
114145 line = reader .readLine ();
146+ resetTweetStreamedTime ();
115147 if (line == null || line .isEmpty ()) {
116- Thread .sleep (100 );
148+ Thread .sleep (SLEEP_MILLIS );
117149 continue ;
118150 }
119- try {
120- tweetsQueue .add (StreamingTweetResponse .fromJson (line ));
121- } catch (Exception interExcep ) {
122- interExcep .printStackTrace ();
123- }
151+ tweetsQueue .add (line );
124152 }
125153 } catch (Exception e ) {
126154 e .printStackTrace ();
127- shutdown ();
155+ shutdown (e );
156+ }
157+ }
158+ }
159+
160+ private class StreamTimeoutChecker extends Thread {
161+ @ Override
162+ public void run () {
163+ checkTimes ();
164+ }
165+
166+ public void checkTimes () {
167+ resetTweetStreamedTime ();
168+ while (isRunning .get ()) {
169+ if (isTweetStreamedError ()) {
170+ shutdown (new ApiException ("Tweets are not streaming" ));
171+ }
172+ try {
173+ Thread .sleep (SLEEP_MILLIS );
174+ } catch (InterruptedException interExcep ) {
175+ interExcep .printStackTrace ();
176+ }
128177 }
129178 }
130179 }
131180}
132181
133182interface ITweetsQueue {
134- StreamingTweetResponse poll ();
135- void add (StreamingTweetResponse streamingTweet );
183+ String poll ();
184+ void add (String streamingTweet );
136185}
137186
138187class LinkedListTweetsQueue implements ITweetsQueue {
139- private final Queue <StreamingTweetResponse > tweetsQueue = new LinkedList <>();
188+ private final Queue <String > tweetsQueue = new LinkedList <>();
140189
141190 @ Override
142- public StreamingTweetResponse poll () {
191+ public String poll () {
143192 return tweetsQueue .poll ();
144193 }
145194
146195 @ Override
147- public void add (StreamingTweetResponse streamingTweet ) {
196+ public void add (String streamingTweet ) {
148197 tweetsQueue .add (streamingTweet );
149198 }
150199}
0 commit comments