From 032089d1423ba11c48715935cb37946a7b50642b Mon Sep 17 00:00:00 2001 From: tmoskovitch <tmoskovitch@twitter.com> Date: Thu, 4 Aug 2022 11:46:42 +1200 Subject: [PATCH 1/7] Check streaming timeout. Added `StreamTimeoutChecker` to check if there is a timeout with the streaming. To avoid bottleneck due to slow data flow, `TweetsQueuer` sets the tweets Json strings into the `tweetsQueue` and `TweetsListenersExecutor` creates the tweets objects. --- .../clientlib/HelloWorldStreaming.java | 12 +++ .../TweetsStreamListenersExecutor.java | 95 ++++++++++++++----- 2 files changed, 84 insertions(+), 23 deletions(-) diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index c9ed12a..9d8abde 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -61,6 +61,18 @@ public static void main(String[] args) { tsle.addListener(responder); tsle.executeListeners(); + while(tsle.getError() == null) { + try { + System.err.println("==> sleeping 5 "); + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + tsle.shutdown(); + if(tsle.getError() != null) { + System.err.println("==> Ended with error: " + tsle.getError()); + } // // Shutdown TweetsStreamListenersExecutor // try { // Thread.sleep(20000); diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java index c3e9439..cfb443c 100644 --- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java +++ b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java @@ -26,21 +26,26 @@ import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; -import java.lang.reflect.Type; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import com.google.gson.reflect.TypeToken; import com.twitter.clientlib.model.StreamingTweetResponse; public class TweetsStreamListenersExecutor { + private final static int TIMEOUT_MILLIS = 60000; + private final static int SLEEP_MILLIS = 100; private final ITweetsQueue tweetsQueue; private final List<TweetsStreamListener> listeners = new ArrayList<>(); private final InputStream stream; - private volatile boolean isRunning = true; + private final AtomicBoolean isRunning = new AtomicBoolean(true); + private final AtomicLong tweetStreamedTime = new AtomicLong(0); + private Exception caughtException; public TweetsStreamListenersExecutor(InputStream stream) { this.tweetsQueue = new LinkedListTweetsQueue(); @@ -67,15 +72,34 @@ public void executeListeners() { TweetsQueuer tweetsQueuer = new TweetsQueuer(); TweetsListenersExecutor tweetsListenersExecutor = new TweetsListenersExecutor(); + StreamTimeoutChecker timeoutChecker = new StreamTimeoutChecker(); tweetsListenersExecutor.start(); tweetsQueuer.start(); + timeoutChecker.start(); } - public synchronized void shutdown() { - isRunning = false; + public void shutdown(Exception e) { + caughtException = e; + shutdown(); + } + + public void shutdown() { + isRunning.set(false); System.out.println("TweetsStreamListenersExecutor is shutting down."); } + public Exception getError() { + return caughtException; + } + + private void resetTweetStreamedTime() { + tweetStreamedTime.set(System.currentTimeMillis()); + } + + private boolean isTweetStreamedError() { + return System.currentTimeMillis() - tweetStreamedTime.get() > TIMEOUT_MILLIS; + } + private class TweetsListenersExecutor extends Thread { @Override public void run() { @@ -84,19 +108,26 @@ public void run() { private void processTweets() { StreamingTweetResponse streamingTweet; + String tweetString; try { - while (isRunning) { - streamingTweet = tweetsQueue.poll(); - if (streamingTweet == null) { - Thread.sleep(100); + while (isRunning.get()) { + tweetString = tweetsQueue.poll(); + if (tweetString == null) { + Thread.sleep(SLEEP_MILLIS); continue; } - for (TweetsStreamListener listener : listeners) { - listener.actionOnTweetsStream(streamingTweet); + try { + streamingTweet = StreamingTweetResponse.fromJson(tweetString); + for (TweetsStreamListener listener : listeners) { + listener.actionOnTweetsStream(streamingTweet); + } + } catch (Exception interExcep) { + interExcep.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); + shutdown(e); } } } @@ -110,41 +141,59 @@ public void run() { public void queueTweets() { String line = null; try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) { - while (isRunning) { + while (isRunning.get()) { line = reader.readLine(); + resetTweetStreamedTime(); if(line == null || line.isEmpty()) { - Thread.sleep(100); + Thread.sleep(SLEEP_MILLIS); continue; } - try { - tweetsQueue.add(StreamingTweetResponse.fromJson(line)); - } catch (Exception interExcep) { - interExcep.printStackTrace(); - } + tweetsQueue.add(line); } } catch (Exception e) { e.printStackTrace(); - shutdown(); + shutdown(e); + } + } + } + + private class StreamTimeoutChecker extends Thread { + @Override + public void run() { + checkTimes(); + } + + public void checkTimes() { + resetTweetStreamedTime(); + while (isRunning.get()) { + if(isTweetStreamedError()) { + shutdown(new ApiException("Tweets are not streaming")); + } + try { + Thread.sleep(SLEEP_MILLIS); + } catch (InterruptedException interExcep) { + interExcep.printStackTrace(); + } } } } } interface ITweetsQueue { - StreamingTweetResponse poll(); - void add(StreamingTweetResponse streamingTweet); + String poll(); + void add(String streamingTweet); } class LinkedListTweetsQueue implements ITweetsQueue { - private final Queue<StreamingTweetResponse> tweetsQueue = new LinkedList<>(); + private final Queue<String> tweetsQueue = new LinkedList<>(); @Override - public StreamingTweetResponse poll() { + public String poll() { return tweetsQueue.poll(); } @Override - public void add(StreamingTweetResponse streamingTweet) { + public void add(String streamingTweet) { tweetsQueue.add(streamingTweet); } } From b4f3c50e667ac8a343c0b5c6f603243eb2ce2f77 Mon Sep 17 00:00:00 2001 From: tmoskovitch <tmoskovitch@twitter.com> Date: Thu, 11 Aug 2022 14:34:01 +1200 Subject: [PATCH 2/7] Check for streaming tweets for reconnect errors. In case of reconnect restart the `tweetsQueuer` thread. --- .../clientlib/HelloWorldStreaming.java | 47 ++----- ...amListener.java => IStreamingHandler.java} | 12 +- .../twitter/clientlib/StreamingHandler.java | 40 ++++++ .../clientlib/StreamingTweetHandler.java | 92 ++++++++++++++ .../TweetsStreamListenersExecutor.java | 118 ++++++++++-------- 5 files changed, 215 insertions(+), 94 deletions(-) rename examples/src/main/java/com/twitter/clientlib/{TweetsStreamListener.java => IStreamingHandler.java} (67%) create mode 100644 examples/src/main/java/com/twitter/clientlib/StreamingHandler.java create mode 100644 examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index 9d8abde..57de8a4 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -23,13 +23,6 @@ package com.twitter.clientlib; -import java.util.HashSet; -import java.util.Set; -import java.io.InputStream; - -import com.twitter.clientlib.ApiException; -import com.twitter.clientlib.TwitterCredentialsBearer; -import com.twitter.clientlib.TweetsStreamListenersExecutor; import com.twitter.clientlib.api.TwitterApi; import com.twitter.clientlib.model.*; @@ -44,23 +37,11 @@ public static void main(String[] args) { * to use the right credential object. */ TwitterApi apiInstance = new TwitterApi(new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN"))); - - Set<String> tweetFields = new HashSet<>(); - tweetFields.add("author_id"); - tweetFields.add("id"); - tweetFields.add("created_at"); - try { - InputStream streamResult = apiInstance.tweets().sampleStream() - .backfillMinutes(0) - .tweetFields(tweetFields) - .execute(); - // sampleStream with TweetsStreamListenersExecutor - Responder responder = new Responder(); - TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor(streamResult); - tsle.addListener(responder); - tsle.executeListeners(); - + TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor(); + tsle.stream() + .streamingHandler(new StreamingTweetHandler(apiInstance)) + .executeListeners(); while(tsle.getError() == null) { try { System.err.println("==> sleeping 5 "); @@ -69,10 +50,11 @@ public static void main(String[] args) { e.printStackTrace(); } } - tsle.shutdown(); + if(tsle.getError() != null) { - System.err.println("==> Ended with error: " + tsle.getError()); + System.err.println("==> Finished " + tsle.getError()); } + // // Shutdown TweetsStreamListenersExecutor // try { // Thread.sleep(20000); @@ -110,18 +92,3 @@ public static void main(String[] args) { } } -class Responder implements com.twitter.clientlib.TweetsStreamListener { - @Override - public void actionOnTweetsStream(StreamingTweetResponse streamingTweet) { - if(streamingTweet == null) { - System.err.println("Error: actionOnTweetsStream - streamingTweet is null "); - return; - } - - if(streamingTweet.getErrors() != null) { - streamingTweet.getErrors().forEach(System.out::println); - } else if (streamingTweet.getData() != null) { - System.out.println("New streaming tweet: " + streamingTweet.getData().getText()); - } - } -} diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListener.java b/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java similarity index 67% rename from examples/src/main/java/com/twitter/clientlib/TweetsStreamListener.java rename to examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java index 5df0650..e9c937c 100644 --- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListener.java +++ b/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java @@ -22,8 +22,12 @@ package com.twitter.clientlib; -import com.twitter.clientlib.model.StreamingTweetResponse; - -public interface TweetsStreamListener { - void actionOnTweetsStream(StreamingTweetResponse streamingTweet); +import java.io.InputStream; + +public interface IStreamingHandler<R> { + InputStream connectStream() throws ApiException; + void actionOnStreamingObject(R streamingTweet) throws ApiException; + boolean processAndVerifyStreamingObject(String tweetString) throws Exception; + R getStreamingObject(String tweetString) throws Exception; + boolean hasReconnectErrors(R streamingTweet); } diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java new file mode 100644 index 0000000..89359dd --- /dev/null +++ b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java @@ -0,0 +1,40 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). +https://openapi-generator.tech +Do not edit the class manually. +*/ + + +package com.twitter.clientlib; + +import com.twitter.clientlib.api.TwitterApi; + +public abstract class StreamingHandler<R> implements IStreamingHandler<R> { + protected final TwitterApi apiInstance; + + public StreamingHandler(TwitterApi apiInstance) { + this.apiInstance = apiInstance; + } + + @Override + public boolean processAndVerifyStreamingObject(String tweetString) throws Exception { + R tweet = getStreamingObject(tweetString); + actionOnStreamingObject(tweet); + return !hasReconnectErrors(tweet); + } +} diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java new file mode 100644 index 0000000..79def97 --- /dev/null +++ b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java @@ -0,0 +1,92 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). +https://openapi-generator.tech +Do not edit the class manually. +*/ + + +package com.twitter.clientlib; + +import java.io.InputStream; +import java.util.HashSet; +import java.util.Set; + +import com.twitter.clientlib.api.TwitterApi; +import com.twitter.clientlib.model.*; + +public class StreamingTweetHandler extends StreamingHandler<StreamingTweetResponse> { + public StreamingTweetHandler(TwitterApi apiInstance) { + super(apiInstance); + } + + @Override + public InputStream connectStream() throws ApiException { + Set<String> tweetFields = new HashSet<>(); + tweetFields.add("author_id"); + tweetFields.add("id"); + tweetFields.add("created_at"); + tweetFields.add("geo"); + Set<String> expansions = new HashSet<>(); + expansions.add("geo.place_id"); + Set<String> placeFields = new HashSet<>(); + placeFields.add("geo"); + placeFields.add("id"); + placeFields.add("name"); + placeFields.add("place_type"); + + return this.apiInstance.tweets().sampleStream() + .backfillMinutes(0) + .tweetFields(tweetFields).expansions(expansions).placeFields(placeFields) + .execute(); + } + + @Override + public void actionOnStreamingObject(StreamingTweetResponse streamingTweet) throws ApiException { + if(streamingTweet == null) { + System.err.println("Error: actionOnTweetsStream - streamingTweet is null "); + return; + } + + if(streamingTweet.getErrors() != null) { + streamingTweet.getErrors().forEach(System.out::println); + } else if (streamingTweet.getData() != null) { + System.out.println("New streaming tweet: " + streamingTweet.getData().getText()); + } + } + + @Override + public StreamingTweetResponse getStreamingObject(String tweetString) throws Exception { + return StreamingTweetResponse.fromJson(tweetString); + } + + @Override + public boolean hasReconnectErrors(StreamingTweetResponse streamingTweet) { + boolean needToReconnect = false; + if (streamingTweet.getErrors() != null) { + for (Problem problem : streamingTweet.getErrors()) { + if (problem instanceof OperationalDisconnectProblem || problem instanceof ConnectionExceptionProblem) { + System.err.println("Re-connecting to the stream due to: " + problem); + needToReconnect = true; + break; + } + } + } + return needToReconnect; + } +} + diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java index cfb443c..b78f8a0 100644 --- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java +++ b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java @@ -26,60 +26,29 @@ import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; -import java.util.ArrayList; +import java.io.InterruptedIOException; import java.util.LinkedList; -import java.util.List; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import com.google.gson.reflect.TypeToken; - -import com.twitter.clientlib.model.StreamingTweetResponse; - public class TweetsStreamListenersExecutor { private final static int TIMEOUT_MILLIS = 60000; private final static int SLEEP_MILLIS = 100; - private final ITweetsQueue tweetsQueue; - private final List<TweetsStreamListener> listeners = new ArrayList<>(); - private final InputStream stream; + private TweetsQueuer tweetsQueuer; + private ITweetsQueue tweetsQueue = new LinkedListTweetsQueue(); private final AtomicBoolean isRunning = new AtomicBoolean(true); private final AtomicLong tweetStreamedTime = new AtomicLong(0); private Exception caughtException; + private IStreamingHandler<?> streamingHandler; - public TweetsStreamListenersExecutor(InputStream stream) { - this.tweetsQueue = new LinkedListTweetsQueue(); - this.stream = stream; - } - - public TweetsStreamListenersExecutor(ITweetsQueue tweetsQueue, InputStream stream) { - this.tweetsQueue = tweetsQueue; - this.stream = stream; - } - - public void addListener(TweetsStreamListener toAdd) { - listeners.add(toAdd); - } - - public void executeListeners() { - if (stream == null) { - System.out.println("Error: stream is null."); - return; - } else if (this.tweetsQueue == null) { - System.out.println("Error: tweetsQueue is null."); - return; - } - - TweetsQueuer tweetsQueuer = new TweetsQueuer(); - TweetsListenersExecutor tweetsListenersExecutor = new TweetsListenersExecutor(); - StreamTimeoutChecker timeoutChecker = new StreamTimeoutChecker(); - tweetsListenersExecutor.start(); - tweetsQueuer.start(); - timeoutChecker.start(); + public StreamListenersExecutorBuilder stream() { + return new StreamListenersExecutorBuilder(); } - public void shutdown(Exception e) { + private void shutdown(Exception e) { caughtException = e; + e.printStackTrace(); shutdown(); } @@ -92,6 +61,10 @@ public Exception getError() { return caughtException; } + private InputStream connectStream() throws ApiException { + return streamingHandler.connectStream(); + } + private void resetTweetStreamedTime() { tweetStreamedTime.set(System.currentTimeMillis()); } @@ -100,6 +73,18 @@ private boolean isTweetStreamedError() { return System.currentTimeMillis() - tweetStreamedTime.get() > TIMEOUT_MILLIS; } + private void restartTweetsQueuer() { + tweetsQueuer.shutdownQueuer(); + try { + Thread.sleep(SLEEP_MILLIS); // Wait a bit before starting the TweetsQueuer and calling the API again. + } catch (InterruptedException e) { + e.printStackTrace(); + } + tweetsQueuer.interrupt(); + tweetsQueuer = new TweetsQueuer(); + tweetsQueuer.start(); + } + private class TweetsListenersExecutor extends Thread { @Override public void run() { @@ -107,7 +92,6 @@ public void run() { } private void processTweets() { - StreamingTweetResponse streamingTweet; String tweetString; try { while (isRunning.get()) { @@ -117,41 +101,45 @@ private void processTweets() { continue; } try { - streamingTweet = StreamingTweetResponse.fromJson(tweetString); - for (TweetsStreamListener listener : listeners) { - listener.actionOnTweetsStream(streamingTweet); + if(!streamingHandler.processAndVerifyStreamingObject(tweetString)) { + restartTweetsQueuer(); } } catch (Exception interExcep) { interExcep.printStackTrace(); } } } catch (Exception e) { - e.printStackTrace(); shutdown(e); } } } private class TweetsQueuer extends Thread { + private boolean isReconnecting; @Override public void run() { + isReconnecting = false; queueTweets(); } - public void queueTweets() { - String line = null; - try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) { - while (isRunning.get()) { + public void shutdownQueuer() { + isReconnecting = true; + } + private void queueTweets() { + String line; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(connectStream()))) { + while (isRunning.get() && !isReconnecting) { line = reader.readLine(); resetTweetStreamedTime(); - if(line == null || line.isEmpty()) { + if (line == null || line.isEmpty()) { Thread.sleep(SLEEP_MILLIS); continue; } tweetsQueue.add(line); } - } catch (Exception e) { + } catch (InterruptedIOException e) { e.printStackTrace(); + } catch (Exception e) { shutdown(e); } } @@ -177,6 +165,36 @@ public void checkTimes() { } } } + + public class StreamListenersExecutorBuilder { + public StreamListenersExecutorBuilder streamingHandler(IStreamingHandler<?> streamHandler) { + streamingHandler = streamHandler; + return this; + } + + public StreamListenersExecutorBuilder tweetsQueue(ITweetsQueue queue) { + tweetsQueue = queue; + return this; + } + + public void executeListeners() throws ApiException { + if (streamingHandler == null) { + throw new ApiException("Please set a streamingHandler"); + } else if (tweetsQueue == null) { + System.out.println("Error: tweetsQueue is null."); + return; + } + + TweetsListenersExecutor tweetsListenersExecutor; + StreamTimeoutChecker timeoutChecker; + tweetsQueuer = new TweetsQueuer(); + tweetsListenersExecutor = new TweetsListenersExecutor(); + timeoutChecker = new StreamTimeoutChecker(); + tweetsListenersExecutor.start(); + tweetsQueuer.start(); + timeoutChecker.start(); + } + } } interface ITweetsQueue { From eba1e6ae32b7207ca2b060213547149772e7e722 Mon Sep 17 00:00:00 2001 From: tmoskovitch <tmoskovitch@twitter.com> Date: Fri, 12 Aug 2022 10:57:32 +1200 Subject: [PATCH 3/7] Check for streaming tweets for reconnect errors. In case of reconnect restart the `tweetsQueuer` thread. --- .../java/com/twitter/clientlib/HelloWorldStreaming.java | 2 +- .../twitter/clientlib/TweetsStreamListenersExecutor.java | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index 57de8a4..d62dfb6 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -52,7 +52,7 @@ public static void main(String[] args) { } if(tsle.getError() != null) { - System.err.println("==> Finished " + tsle.getError()); + System.err.println("==> Ended with error: " + tsle.getError()); } // // Shutdown TweetsStreamListenersExecutor diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java index b78f8a0..c5cafb0 100644 --- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java +++ b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java @@ -35,12 +35,14 @@ public class TweetsStreamListenersExecutor { private final static int TIMEOUT_MILLIS = 60000; private final static int SLEEP_MILLIS = 100; + private final static int BACKOFF_SLEEP_INTERVAL_MILLIS = 5000; private TweetsQueuer tweetsQueuer; private ITweetsQueue tweetsQueue = new LinkedListTweetsQueue(); private final AtomicBoolean isRunning = new AtomicBoolean(true); private final AtomicLong tweetStreamedTime = new AtomicLong(0); private Exception caughtException; private IStreamingHandler<?> streamingHandler; + private long reconnecting = 0; public StreamListenersExecutorBuilder stream() { return new StreamListenersExecutorBuilder(); @@ -75,8 +77,12 @@ private boolean isTweetStreamedError() { private void restartTweetsQueuer() { tweetsQueuer.shutdownQueuer(); + if(reconnecting < 7) { + reconnecting++; + } try { - Thread.sleep(SLEEP_MILLIS); // Wait a bit before starting the TweetsQueuer and calling the API again. + System.out.println("sleeping " + BACKOFF_SLEEP_INTERVAL_MILLIS * reconnecting); + Thread.sleep(BACKOFF_SLEEP_INTERVAL_MILLIS * reconnecting); // Wait a bit before starting the TweetsQueuer and calling the API again. } catch (InterruptedException e) { e.printStackTrace(); } From 0dee721d6e79ad7d496241a7c2099829662ae256 Mon Sep 17 00:00:00 2001 From: tmoskovitch <tmoskovitch@twitter.com> Date: Mon, 15 Aug 2022 10:34:35 +1200 Subject: [PATCH 4/7] Another layer --- .../clientlib/HelloWorldStreaming.java | 2 +- .../twitter/clientlib/StreamingHandler.java | 2 +- .../clientlib/StreamingTweetHandler.java | 42 +--------------- .../clientlib/StreamingTweetHandlerImpl.java | 50 +++++++++++++++++++ 4 files changed, 54 insertions(+), 42 deletions(-) create mode 100644 examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index d62dfb6..d6d3d27 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -40,7 +40,7 @@ public static void main(String[] args) { try { TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor(); tsle.stream() - .streamingHandler(new StreamingTweetHandler(apiInstance)) + .streamingHandler( new StreamingTweetHandlerImpl(apiInstance)) .executeListeners(); while(tsle.getError() == null) { try { diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java index 89359dd..ce04509 100644 --- a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java +++ b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java @@ -34,7 +34,7 @@ public StreamingHandler(TwitterApi apiInstance) { @Override public boolean processAndVerifyStreamingObject(String tweetString) throws Exception { R tweet = getStreamingObject(tweetString); - actionOnStreamingObject(tweet); + actionOnStreamingObject(tweet); return !hasReconnectErrors(tweet); } } diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java index 79def97..e437054 100644 --- a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java +++ b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java @@ -22,56 +22,18 @@ package com.twitter.clientlib; -import java.io.InputStream; -import java.util.HashSet; -import java.util.Set; import com.twitter.clientlib.api.TwitterApi; import com.twitter.clientlib.model.*; -public class StreamingTweetHandler extends StreamingHandler<StreamingTweetResponse> { +public abstract class StreamingTweetHandler extends StreamingHandler<StreamingTweetResponse> { public StreamingTweetHandler(TwitterApi apiInstance) { super(apiInstance); } - @Override - public InputStream connectStream() throws ApiException { - Set<String> tweetFields = new HashSet<>(); - tweetFields.add("author_id"); - tweetFields.add("id"); - tweetFields.add("created_at"); - tweetFields.add("geo"); - Set<String> expansions = new HashSet<>(); - expansions.add("geo.place_id"); - Set<String> placeFields = new HashSet<>(); - placeFields.add("geo"); - placeFields.add("id"); - placeFields.add("name"); - placeFields.add("place_type"); - - return this.apiInstance.tweets().sampleStream() - .backfillMinutes(0) - .tweetFields(tweetFields).expansions(expansions).placeFields(placeFields) - .execute(); - } - - @Override - public void actionOnStreamingObject(StreamingTweetResponse streamingTweet) throws ApiException { - if(streamingTweet == null) { - System.err.println("Error: actionOnTweetsStream - streamingTweet is null "); - return; - } - - if(streamingTweet.getErrors() != null) { - streamingTweet.getErrors().forEach(System.out::println); - } else if (streamingTweet.getData() != null) { - System.out.println("New streaming tweet: " + streamingTweet.getData().getText()); - } - } - @Override public StreamingTweetResponse getStreamingObject(String tweetString) throws Exception { - return StreamingTweetResponse.fromJson(tweetString); + return StreamingTweetResponse.fromJson(tweetString); } @Override diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java new file mode 100644 index 0000000..26312ba --- /dev/null +++ b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java @@ -0,0 +1,50 @@ +package com.twitter.clientlib; + +import java.io.InputStream; +import java.util.HashSet; +import java.util.Set; + +import com.twitter.clientlib.api.TwitterApi; +import com.twitter.clientlib.model.StreamingTweetResponse; + +public class StreamingTweetHandlerImpl extends StreamingTweetHandler { + public StreamingTweetHandlerImpl(TwitterApi apiInstance) { + super(apiInstance); + } + + @Override + public InputStream connectStream() throws ApiException { + Set<String> tweetFields = new HashSet<>(); + tweetFields.add("author_id"); + tweetFields.add("id"); + tweetFields.add("created_at"); + tweetFields.add("geo"); + Set<String> expansions = new HashSet<>(); + expansions.add("geo.place_id"); + Set<String> placeFields = new HashSet<>(); + placeFields.add("geo"); + placeFields.add("id"); + placeFields.add("name"); + placeFields.add("place_type"); + + return this.apiInstance.tweets().sampleStream() + .backfillMinutes(0) + .tweetFields(tweetFields).expansions(expansions).placeFields(placeFields) + .execute(); + } + + @Override + public void actionOnStreamingObject(StreamingTweetResponse streamingTweet) throws ApiException { + if(streamingTweet == null) { + System.err.println("Error: actionOnTweetsStream - streamingTweet is null "); + return; + } + + if(streamingTweet.getErrors() != null) { + streamingTweet.getErrors().forEach(System.out::println); + } else if (streamingTweet.getData() != null) { + System.out.println("New streaming tweet: " + streamingTweet.getData().getText()); + } + } + +} From 3c43f155a217424bb5ee0f21d45632c63271d871 Mon Sep 17 00:00:00 2001 From: tmoskovitch <tmoskovitch@twitter.com> Date: Tue, 16 Aug 2022 10:25:55 +1200 Subject: [PATCH 5/7] Another layer --- .../clientlib/HelloWorldStreaming.java | 2 +- .../twitter/clientlib/IStreamingHandler.java | 8 +++--- .../clientlib/ITweetsStreamListener.java | 27 +++++++++++++++++++ .../twitter/clientlib/StreamingHandler.java | 8 +++--- .../clientlib/StreamingTweetHandlerImpl.java | 23 +++++++++++++++- 5 files changed, 58 insertions(+), 10 deletions(-) create mode 100644 examples/src/main/java/com/twitter/clientlib/ITweetsStreamListener.java diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index d6d3d27..51568bd 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -40,7 +40,7 @@ public static void main(String[] args) { try { TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor(); tsle.stream() - .streamingHandler( new StreamingTweetHandlerImpl(apiInstance)) + .streamingHandler(new StreamingTweetHandlerImpl(apiInstance)) .executeListeners(); while(tsle.getError() == null) { try { diff --git a/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java index e9c937c..8a2d62a 100644 --- a/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java +++ b/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java @@ -24,10 +24,10 @@ import java.io.InputStream; -public interface IStreamingHandler<R> { +public interface IStreamingHandler<T> { InputStream connectStream() throws ApiException; - void actionOnStreamingObject(R streamingTweet) throws ApiException; + void actionOnStreamingObject(T streamingTweet) throws ApiException; boolean processAndVerifyStreamingObject(String tweetString) throws Exception; - R getStreamingObject(String tweetString) throws Exception; - boolean hasReconnectErrors(R streamingTweet); + T getStreamingObject(String tweetString) throws Exception; + boolean hasReconnectErrors(T streamingTweet); } diff --git a/examples/src/main/java/com/twitter/clientlib/ITweetsStreamListener.java b/examples/src/main/java/com/twitter/clientlib/ITweetsStreamListener.java new file mode 100644 index 0000000..2fd9638 --- /dev/null +++ b/examples/src/main/java/com/twitter/clientlib/ITweetsStreamListener.java @@ -0,0 +1,27 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). +https://openapi-generator.tech +Do not edit the class manually. +*/ + + +package com.twitter.clientlib; + +public interface ITweetsStreamListener<T> { + void actionOnStreamingObject(T streamingTweet) throws ApiException; +} diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java index ce04509..6d34e73 100644 --- a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java +++ b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java @@ -24,8 +24,8 @@ import com.twitter.clientlib.api.TwitterApi; -public abstract class StreamingHandler<R> implements IStreamingHandler<R> { - protected final TwitterApi apiInstance; +public abstract class StreamingHandler<T> implements IStreamingHandler<T> { +protected final TwitterApi apiInstance; public StreamingHandler(TwitterApi apiInstance) { this.apiInstance = apiInstance; @@ -33,8 +33,8 @@ public StreamingHandler(TwitterApi apiInstance) { @Override public boolean processAndVerifyStreamingObject(String tweetString) throws Exception { - R tweet = getStreamingObject(tweetString); - actionOnStreamingObject(tweet); + T tweet = getStreamingObject(tweetString); + actionOnStreamingObject(tweet); return !hasReconnectErrors(tweet); } } diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java index 26312ba..1655122 100644 --- a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java +++ b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java @@ -1,3 +1,25 @@ +/* +Copyright 2020 Twitter, Inc. +SPDX-License-Identifier: Apache-2.0 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). +https://openapi-generator.tech +Do not edit the class manually. +*/ + + package com.twitter.clientlib; import java.io.InputStream; @@ -46,5 +68,4 @@ public void actionOnStreamingObject(StreamingTweetResponse streamingTweet) throw System.out.println("New streaming tweet: " + streamingTweet.getData().getText()); } } - } From 397a24ce1d76c6c984fc5033242d6456b3c132d2 Mon Sep 17 00:00:00 2001 From: tmoskovitch <tmoskovitch@twitter.com> Date: Mon, 29 Aug 2022 09:02:41 +1200 Subject: [PATCH 6/7] Use abstract StreamingHandler --- .../com/twitter/clientlib/HelloWorldStreaming.java | 2 +- .../java/com/twitter/clientlib/StreamingHandler.java | 10 ++++++++-- .../clientlib/TweetsStreamListenersExecutor.java | 4 ++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java index 51568bd..d82e358 100644 --- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java +++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java @@ -44,7 +44,7 @@ public static void main(String[] args) { .executeListeners(); while(tsle.getError() == null) { try { - System.err.println("==> sleeping 5 "); + System.out.println("==> sleeping 5 "); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java index 6d34e73..76a9abe 100644 --- a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java +++ b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java @@ -22,16 +22,22 @@ package com.twitter.clientlib; +import java.io.InputStream; + import com.twitter.clientlib.api.TwitterApi; -public abstract class StreamingHandler<T> implements IStreamingHandler<T> { +public abstract class StreamingHandler<T> { protected final TwitterApi apiInstance; public StreamingHandler(TwitterApi apiInstance) { this.apiInstance = apiInstance; } - @Override + public abstract InputStream connectStream() throws ApiException; + public abstract void actionOnStreamingObject(T streamingTweet) throws ApiException; + public abstract T getStreamingObject(String tweetString) throws Exception; + public abstract boolean hasReconnectErrors(T streamingTweet); + public boolean processAndVerifyStreamingObject(String tweetString) throws Exception { T tweet = getStreamingObject(tweetString); actionOnStreamingObject(tweet); diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java index c5cafb0..5226f52 100644 --- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java +++ b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java @@ -41,7 +41,7 @@ public class TweetsStreamListenersExecutor { private final AtomicBoolean isRunning = new AtomicBoolean(true); private final AtomicLong tweetStreamedTime = new AtomicLong(0); private Exception caughtException; - private IStreamingHandler<?> streamingHandler; + private StreamingHandler<?> streamingHandler; private long reconnecting = 0; public StreamListenersExecutorBuilder stream() { @@ -173,7 +173,7 @@ public void checkTimes() { } public class StreamListenersExecutorBuilder { - public StreamListenersExecutorBuilder streamingHandler(IStreamingHandler<?> streamHandler) { + public StreamListenersExecutorBuilder streamingHandler(StreamingHandler<?> streamHandler) { streamingHandler = streamHandler; return this; } From b333e7bdd9f15e83d0c258389475cd3e7be185fc Mon Sep 17 00:00:00 2001 From: tmoskovitch <tmoskovitch@twitter.com> Date: Mon, 29 Aug 2022 09:04:19 +1200 Subject: [PATCH 7/7] Use abstract StreamingHandler --- .../twitter/clientlib/IStreamingHandler.java | 33 ------------------- 1 file changed, 33 deletions(-) delete mode 100644 examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java diff --git a/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java deleted file mode 100644 index 8a2d62a..0000000 --- a/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java +++ /dev/null @@ -1,33 +0,0 @@ -/* -Copyright 2020 Twitter, Inc. -SPDX-License-Identifier: Apache-2.0 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech). -https://openapi-generator.tech -Do not edit the class manually. -*/ - - -package com.twitter.clientlib; - -import java.io.InputStream; - -public interface IStreamingHandler<T> { - InputStream connectStream() throws ApiException; - void actionOnStreamingObject(T streamingTweet) throws ApiException; - boolean processAndVerifyStreamingObject(String tweetString) throws Exception; - T getStreamingObject(String tweetString) throws Exception; - boolean hasReconnectErrors(T streamingTweet); -}