From 032089d1423ba11c48715935cb37946a7b50642b Mon Sep 17 00:00:00 2001
From: tmoskovitch <tmoskovitch@twitter.com>
Date: Thu, 4 Aug 2022 11:46:42 +1200
Subject: [PATCH 1/7] Check streaming timeout. Added `StreamTimeoutChecker` to
 check if there is a timeout with the streaming. To avoid bottleneck due to
 slow data flow, `TweetsQueuer` sets the tweets Json strings into the
 `tweetsQueue` and `TweetsListenersExecutor` creates the tweets objects.

---
 .../clientlib/HelloWorldStreaming.java        | 12 +++
 .../TweetsStreamListenersExecutor.java        | 95 ++++++++++++++-----
 2 files changed, 84 insertions(+), 23 deletions(-)

diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
index c9ed12a..9d8abde 100644
--- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
+++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
@@ -61,6 +61,18 @@ public static void main(String[] args) {
       tsle.addListener(responder);
       tsle.executeListeners();
 
+      while(tsle.getError() == null) {
+        try {
+          System.err.println("==> sleeping 5 ");
+          Thread.sleep(5000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      tsle.shutdown();
+      if(tsle.getError() != null) {
+        System.err.println("==> Ended with error: " + tsle.getError());
+      }
 //      // Shutdown TweetsStreamListenersExecutor
 //      try {
 //        Thread.sleep(20000);
diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
index c3e9439..cfb443c 100644
--- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
+++ b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
@@ -26,21 +26,26 @@
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.gson.reflect.TypeToken;
 
 import com.twitter.clientlib.model.StreamingTweetResponse;
 
 public class TweetsStreamListenersExecutor {
+  private final static int TIMEOUT_MILLIS = 60000;
+  private final static int SLEEP_MILLIS = 100;
   private final ITweetsQueue tweetsQueue;
   private final List<TweetsStreamListener> listeners = new ArrayList<>();
   private final InputStream stream;
-  private volatile boolean isRunning = true;
+  private final AtomicBoolean isRunning = new AtomicBoolean(true);
+  private final AtomicLong tweetStreamedTime = new AtomicLong(0);
+  private Exception caughtException;
 
   public TweetsStreamListenersExecutor(InputStream stream) {
     this.tweetsQueue = new LinkedListTweetsQueue();
@@ -67,15 +72,34 @@ public void executeListeners() {
 
     TweetsQueuer tweetsQueuer = new TweetsQueuer();
     TweetsListenersExecutor tweetsListenersExecutor = new TweetsListenersExecutor();
+    StreamTimeoutChecker timeoutChecker = new StreamTimeoutChecker();
     tweetsListenersExecutor.start();
     tweetsQueuer.start();
+    timeoutChecker.start();
   }
 
-  public synchronized void shutdown() {
-    isRunning = false;
+  public void shutdown(Exception e) {
+    caughtException = e;
+    shutdown();
+  }
+
+  public void shutdown() {
+    isRunning.set(false);
     System.out.println("TweetsStreamListenersExecutor is shutting down.");
   }
 
+  public Exception getError() {
+    return caughtException;
+  }
+
+  private void resetTweetStreamedTime() {
+    tweetStreamedTime.set(System.currentTimeMillis());
+  }
+
+  private boolean isTweetStreamedError() {
+    return System.currentTimeMillis() - tweetStreamedTime.get() > TIMEOUT_MILLIS;
+  }
+
   private class TweetsListenersExecutor extends Thread {
     @Override
     public void run() {
@@ -84,19 +108,26 @@ public void run() {
 
     private void processTweets() {
       StreamingTweetResponse streamingTweet;
+      String tweetString;
       try {
-        while (isRunning) {
-          streamingTweet = tweetsQueue.poll();
-          if (streamingTweet == null) {
-            Thread.sleep(100);
+        while (isRunning.get()) {
+          tweetString = tweetsQueue.poll();
+          if (tweetString == null) {
+            Thread.sleep(SLEEP_MILLIS);
             continue;
           }
-          for (TweetsStreamListener listener : listeners) {
-            listener.actionOnTweetsStream(streamingTweet);
+          try {
+            streamingTweet = StreamingTweetResponse.fromJson(tweetString);
+            for (TweetsStreamListener listener : listeners) {
+              listener.actionOnTweetsStream(streamingTweet);
+            }
+          } catch (Exception interExcep) {
+            interExcep.printStackTrace();
           }
         }
       } catch (Exception e) {
         e.printStackTrace();
+        shutdown(e);
       }
     }
   }
@@ -110,41 +141,59 @@ public void run() {
     public void queueTweets() {
       String line = null;
       try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
-        while (isRunning) {
+        while (isRunning.get()) {
           line = reader.readLine();
+          resetTweetStreamedTime();
           if(line == null || line.isEmpty()) {
-            Thread.sleep(100);
+            Thread.sleep(SLEEP_MILLIS);
             continue;
           }
-          try {
-            tweetsQueue.add(StreamingTweetResponse.fromJson(line));
-          } catch (Exception interExcep) {
-            interExcep.printStackTrace();
-          }
+          tweetsQueue.add(line);
         }
       } catch (Exception e) {
         e.printStackTrace();
-        shutdown();
+        shutdown(e);
+      }
+    }
+  }
+
+  private class StreamTimeoutChecker extends Thread {
+    @Override
+    public void run() {
+      checkTimes();
+    }
+
+    public void checkTimes()  {
+      resetTweetStreamedTime();
+      while (isRunning.get()) {
+        if(isTweetStreamedError()) {
+          shutdown(new ApiException("Tweets are not streaming"));
+        }
+        try {
+          Thread.sleep(SLEEP_MILLIS);
+        } catch (InterruptedException interExcep) {
+          interExcep.printStackTrace();
+        }
       }
     }
   }
 }
 
 interface ITweetsQueue {
-  StreamingTweetResponse poll();
-  void add(StreamingTweetResponse streamingTweet);
+  String poll();
+  void add(String streamingTweet);
 }
 
 class LinkedListTweetsQueue implements ITweetsQueue {
-  private final Queue<StreamingTweetResponse> tweetsQueue = new LinkedList<>();
+  private final Queue<String> tweetsQueue = new LinkedList<>();
 
   @Override
-  public StreamingTweetResponse poll() {
+  public String poll() {
     return tweetsQueue.poll();
   }
 
   @Override
-  public void add(StreamingTweetResponse streamingTweet) {
+  public void add(String streamingTweet) {
     tweetsQueue.add(streamingTweet);
   }
 }

From b4f3c50e667ac8a343c0b5c6f603243eb2ce2f77 Mon Sep 17 00:00:00 2001
From: tmoskovitch <tmoskovitch@twitter.com>
Date: Thu, 11 Aug 2022 14:34:01 +1200
Subject: [PATCH 2/7] Check for streaming tweets for reconnect errors. In case
 of reconnect restart the `tweetsQueuer` thread.

---
 .../clientlib/HelloWorldStreaming.java        |  47 ++-----
 ...amListener.java => IStreamingHandler.java} |  12 +-
 .../twitter/clientlib/StreamingHandler.java   |  40 ++++++
 .../clientlib/StreamingTweetHandler.java      |  92 ++++++++++++++
 .../TweetsStreamListenersExecutor.java        | 118 ++++++++++--------
 5 files changed, 215 insertions(+), 94 deletions(-)
 rename examples/src/main/java/com/twitter/clientlib/{TweetsStreamListener.java => IStreamingHandler.java} (67%)
 create mode 100644 examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
 create mode 100644 examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java

diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
index 9d8abde..57de8a4 100644
--- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
+++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
@@ -23,13 +23,6 @@
 package com.twitter.clientlib;
 
 
-import java.util.HashSet;
-import java.util.Set;
-import java.io.InputStream;
-
-import com.twitter.clientlib.ApiException;
-import com.twitter.clientlib.TwitterCredentialsBearer;
-import com.twitter.clientlib.TweetsStreamListenersExecutor;
 import com.twitter.clientlib.api.TwitterApi;
 import com.twitter.clientlib.model.*;
 
@@ -44,23 +37,11 @@ public static void main(String[] args) {
      * to use the right credential object.
      */
     TwitterApi apiInstance = new TwitterApi(new TwitterCredentialsBearer(System.getenv("TWITTER_BEARER_TOKEN")));
-
-    Set<String> tweetFields = new HashSet<>();
-    tweetFields.add("author_id");
-    tweetFields.add("id");
-    tweetFields.add("created_at");
-
     try {
-      InputStream streamResult = apiInstance.tweets().sampleStream()
-        .backfillMinutes(0)
-        .tweetFields(tweetFields)
-        .execute();
-      // sampleStream with TweetsStreamListenersExecutor
-      Responder responder = new Responder();
-      TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor(streamResult);
-      tsle.addListener(responder);
-      tsle.executeListeners();
-
+      TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor();
+      tsle.stream()
+          .streamingHandler(new StreamingTweetHandler(apiInstance))
+          .executeListeners();
       while(tsle.getError() == null) {
         try {
           System.err.println("==> sleeping 5 ");
@@ -69,10 +50,11 @@ public static void main(String[] args) {
           e.printStackTrace();
         }
       }
-      tsle.shutdown();
+
       if(tsle.getError() != null) {
-        System.err.println("==> Ended with error: " + tsle.getError());
+        System.err.println("==> Finished " + tsle.getError());
       }
+
 //      // Shutdown TweetsStreamListenersExecutor
 //      try {
 //        Thread.sleep(20000);
@@ -110,18 +92,3 @@ public static void main(String[] args) {
   }
 }
 
-class Responder implements com.twitter.clientlib.TweetsStreamListener {
-  @Override
-  public void actionOnTweetsStream(StreamingTweetResponse streamingTweet) {
-    if(streamingTweet == null) {
-      System.err.println("Error: actionOnTweetsStream - streamingTweet is null ");
-      return;
-    }
-
-    if(streamingTweet.getErrors() != null) {
-      streamingTweet.getErrors().forEach(System.out::println);
-    } else if (streamingTweet.getData() != null) {
-      System.out.println("New streaming tweet: " + streamingTweet.getData().getText());
-    }
-  }
-}
diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListener.java b/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java
similarity index 67%
rename from examples/src/main/java/com/twitter/clientlib/TweetsStreamListener.java
rename to examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java
index 5df0650..e9c937c 100644
--- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListener.java
+++ b/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java
@@ -22,8 +22,12 @@
 
 package com.twitter.clientlib;
 
-import com.twitter.clientlib.model.StreamingTweetResponse;
-
-public interface TweetsStreamListener {
-  void actionOnTweetsStream(StreamingTweetResponse streamingTweet);
+import java.io.InputStream;
+
+public interface IStreamingHandler<R> {
+  InputStream connectStream() throws ApiException;
+  void actionOnStreamingObject(R streamingTweet) throws ApiException;
+  boolean processAndVerifyStreamingObject(String tweetString) throws Exception;
+  R getStreamingObject(String tweetString) throws Exception;
+  boolean hasReconnectErrors(R streamingTweet);
 }
diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
new file mode 100644
index 0000000..89359dd
--- /dev/null
+++ b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
@@ -0,0 +1,40 @@
+/*
+Copyright 2020 Twitter, Inc.
+SPDX-License-Identifier: Apache-2.0
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+https://openapi-generator.tech
+Do not edit the class manually.
+*/
+
+
+package com.twitter.clientlib;
+
+import com.twitter.clientlib.api.TwitterApi;
+
+public abstract class StreamingHandler<R> implements IStreamingHandler<R> {
+  protected final TwitterApi apiInstance;
+
+  public StreamingHandler(TwitterApi apiInstance) {
+    this.apiInstance = apiInstance;
+  }
+
+  @Override
+  public boolean processAndVerifyStreamingObject(String tweetString) throws Exception {
+    R tweet = getStreamingObject(tweetString);
+    actionOnStreamingObject(tweet);
+    return !hasReconnectErrors(tweet);
+  }
+}
diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java
new file mode 100644
index 0000000..79def97
--- /dev/null
+++ b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java
@@ -0,0 +1,92 @@
+/*
+Copyright 2020 Twitter, Inc.
+SPDX-License-Identifier: Apache-2.0
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+https://openapi-generator.tech
+Do not edit the class manually.
+*/
+
+
+package com.twitter.clientlib;
+
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.twitter.clientlib.api.TwitterApi;
+import com.twitter.clientlib.model.*;
+
+public class StreamingTweetHandler extends StreamingHandler<StreamingTweetResponse> {
+  public StreamingTweetHandler(TwitterApi apiInstance) {
+    super(apiInstance);
+  }
+
+  @Override
+  public InputStream connectStream() throws ApiException {
+    Set<String> tweetFields = new HashSet<>();
+    tweetFields.add("author_id");
+    tweetFields.add("id");
+    tweetFields.add("created_at");
+    tweetFields.add("geo");
+    Set<String> expansions = new HashSet<>();
+    expansions.add("geo.place_id");
+    Set<String> placeFields = new HashSet<>();
+    placeFields.add("geo");
+    placeFields.add("id");
+    placeFields.add("name");
+    placeFields.add("place_type");
+
+    return this.apiInstance.tweets().sampleStream()
+        .backfillMinutes(0)
+        .tweetFields(tweetFields).expansions(expansions).placeFields(placeFields)
+        .execute();
+  }
+
+  @Override
+  public void actionOnStreamingObject(StreamingTweetResponse streamingTweet) throws ApiException {
+    if(streamingTweet == null) {
+      System.err.println("Error: actionOnTweetsStream - streamingTweet is null ");
+      return;
+    }
+
+    if(streamingTweet.getErrors() != null) {
+      streamingTweet.getErrors().forEach(System.out::println);
+    } else if (streamingTweet.getData() != null) {
+      System.out.println("New streaming tweet: " + streamingTweet.getData().getText());
+    }
+  }
+
+  @Override
+  public StreamingTweetResponse getStreamingObject(String tweetString) throws Exception {
+    return  StreamingTweetResponse.fromJson(tweetString);
+  }
+
+  @Override
+  public boolean hasReconnectErrors(StreamingTweetResponse streamingTweet) {
+    boolean needToReconnect = false;
+    if (streamingTweet.getErrors() != null) {
+      for (Problem problem : streamingTweet.getErrors()) {
+        if (problem instanceof OperationalDisconnectProblem || problem instanceof ConnectionExceptionProblem) {
+          System.err.println("Re-connecting to the stream due to: " + problem);
+          needToReconnect = true;
+          break;
+        }
+      }
+    }
+    return needToReconnect;
+  }
+}
+
diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
index cfb443c..b78f8a0 100644
--- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
+++ b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
@@ -26,60 +26,29 @@
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.util.ArrayList;
+import java.io.InterruptedIOException;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.gson.reflect.TypeToken;
-
-import com.twitter.clientlib.model.StreamingTweetResponse;
-
 public class TweetsStreamListenersExecutor {
   private final static int TIMEOUT_MILLIS = 60000;
   private final static int SLEEP_MILLIS = 100;
-  private final ITweetsQueue tweetsQueue;
-  private final List<TweetsStreamListener> listeners = new ArrayList<>();
-  private final InputStream stream;
+  private TweetsQueuer tweetsQueuer;
+  private ITweetsQueue tweetsQueue = new LinkedListTweetsQueue();
   private final AtomicBoolean isRunning = new AtomicBoolean(true);
   private final AtomicLong tweetStreamedTime = new AtomicLong(0);
   private Exception caughtException;
+  private IStreamingHandler<?> streamingHandler;
 
-  public TweetsStreamListenersExecutor(InputStream stream) {
-    this.tweetsQueue = new LinkedListTweetsQueue();
-    this.stream = stream;
-  }
-
-  public TweetsStreamListenersExecutor(ITweetsQueue tweetsQueue, InputStream stream) {
-    this.tweetsQueue = tweetsQueue;
-    this.stream = stream;
-  }
-
-  public void addListener(TweetsStreamListener toAdd) {
-    listeners.add(toAdd);
-  }
-
-  public void executeListeners() {
-    if (stream == null) {
-      System.out.println("Error: stream is null.");
-      return;
-    } else if (this.tweetsQueue == null) {
-      System.out.println("Error: tweetsQueue is null.");
-      return;
-    }
-
-    TweetsQueuer tweetsQueuer = new TweetsQueuer();
-    TweetsListenersExecutor tweetsListenersExecutor = new TweetsListenersExecutor();
-    StreamTimeoutChecker timeoutChecker = new StreamTimeoutChecker();
-    tweetsListenersExecutor.start();
-    tweetsQueuer.start();
-    timeoutChecker.start();
+  public StreamListenersExecutorBuilder stream() {
+    return new StreamListenersExecutorBuilder();
   }
 
-  public void shutdown(Exception e) {
+  private void shutdown(Exception e) {
     caughtException = e;
+    e.printStackTrace();
     shutdown();
   }
 
@@ -92,6 +61,10 @@ public Exception getError() {
     return caughtException;
   }
 
+  private InputStream connectStream() throws ApiException {
+    return streamingHandler.connectStream();
+  }
+
   private void resetTweetStreamedTime() {
     tweetStreamedTime.set(System.currentTimeMillis());
   }
@@ -100,6 +73,18 @@ private boolean isTweetStreamedError() {
     return System.currentTimeMillis() - tweetStreamedTime.get() > TIMEOUT_MILLIS;
   }
 
+  private void restartTweetsQueuer() {
+    tweetsQueuer.shutdownQueuer();
+    try {
+      Thread.sleep(SLEEP_MILLIS); // Wait a bit before starting the TweetsQueuer and calling the API again.
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    tweetsQueuer.interrupt();
+    tweetsQueuer = new TweetsQueuer();
+    tweetsQueuer.start();
+  }
+
   private class TweetsListenersExecutor extends Thread {
     @Override
     public void run() {
@@ -107,7 +92,6 @@ public void run() {
     }
 
     private void processTweets() {
-      StreamingTweetResponse streamingTweet;
       String tweetString;
       try {
         while (isRunning.get()) {
@@ -117,41 +101,45 @@ private void processTweets() {
             continue;
           }
           try {
-            streamingTweet = StreamingTweetResponse.fromJson(tweetString);
-            for (TweetsStreamListener listener : listeners) {
-              listener.actionOnTweetsStream(streamingTweet);
+            if(!streamingHandler.processAndVerifyStreamingObject(tweetString)) {
+              restartTweetsQueuer();
             }
           } catch (Exception interExcep) {
             interExcep.printStackTrace();
           }
         }
       } catch (Exception e) {
-        e.printStackTrace();
         shutdown(e);
       }
     }
   }
 
   private class TweetsQueuer extends Thread {
+    private boolean isReconnecting;
     @Override
     public void run() {
+      isReconnecting = false;
       queueTweets();
     }
 
-    public void queueTweets() {
-      String line = null;
-      try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
-        while (isRunning.get()) {
+    public void shutdownQueuer() {
+      isReconnecting = true;
+    }
+    private void queueTweets() {
+      String line;
+      try (BufferedReader reader = new BufferedReader(new InputStreamReader(connectStream()))) {
+        while (isRunning.get() && !isReconnecting) {
           line = reader.readLine();
           resetTweetStreamedTime();
-          if(line == null || line.isEmpty()) {
+          if (line == null || line.isEmpty()) {
             Thread.sleep(SLEEP_MILLIS);
             continue;
           }
           tweetsQueue.add(line);
         }
-      } catch (Exception e) {
+      } catch (InterruptedIOException e) {
         e.printStackTrace();
+      } catch (Exception e) {
         shutdown(e);
       }
     }
@@ -177,6 +165,36 @@ public void checkTimes()  {
       }
     }
   }
+
+  public class StreamListenersExecutorBuilder {
+    public StreamListenersExecutorBuilder streamingHandler(IStreamingHandler<?> streamHandler) {
+      streamingHandler = streamHandler;
+      return this;
+    }
+
+    public StreamListenersExecutorBuilder tweetsQueue(ITweetsQueue queue) {
+      tweetsQueue = queue;
+      return this;
+    }
+
+    public void executeListeners() throws ApiException {
+      if (streamingHandler == null) {
+        throw new ApiException("Please set a streamingHandler");
+      } else if (tweetsQueue == null) {
+        System.out.println("Error: tweetsQueue is null.");
+        return;
+      }
+
+      TweetsListenersExecutor tweetsListenersExecutor;
+      StreamTimeoutChecker timeoutChecker;
+      tweetsQueuer = new TweetsQueuer();
+      tweetsListenersExecutor = new TweetsListenersExecutor();
+      timeoutChecker = new StreamTimeoutChecker();
+      tweetsListenersExecutor.start();
+      tweetsQueuer.start();
+      timeoutChecker.start();
+    }
+  }
 }
 
 interface ITweetsQueue {

From eba1e6ae32b7207ca2b060213547149772e7e722 Mon Sep 17 00:00:00 2001
From: tmoskovitch <tmoskovitch@twitter.com>
Date: Fri, 12 Aug 2022 10:57:32 +1200
Subject: [PATCH 3/7] Check for streaming tweets for reconnect errors. In case
 of reconnect restart the `tweetsQueuer` thread.

---
 .../java/com/twitter/clientlib/HelloWorldStreaming.java   | 2 +-
 .../twitter/clientlib/TweetsStreamListenersExecutor.java  | 8 +++++++-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
index 57de8a4..d62dfb6 100644
--- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
+++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
@@ -52,7 +52,7 @@ public static void main(String[] args) {
       }
 
       if(tsle.getError() != null) {
-        System.err.println("==> Finished " + tsle.getError());
+        System.err.println("==> Ended with error: " + tsle.getError());
       }
 
 //      // Shutdown TweetsStreamListenersExecutor
diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
index b78f8a0..c5cafb0 100644
--- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
+++ b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
@@ -35,12 +35,14 @@
 public class TweetsStreamListenersExecutor {
   private final static int TIMEOUT_MILLIS = 60000;
   private final static int SLEEP_MILLIS = 100;
+  private final static int BACKOFF_SLEEP_INTERVAL_MILLIS = 5000;
   private TweetsQueuer tweetsQueuer;
   private ITweetsQueue tweetsQueue = new LinkedListTweetsQueue();
   private final AtomicBoolean isRunning = new AtomicBoolean(true);
   private final AtomicLong tweetStreamedTime = new AtomicLong(0);
   private Exception caughtException;
   private IStreamingHandler<?> streamingHandler;
+  private long reconnecting = 0;
 
   public StreamListenersExecutorBuilder stream() {
     return new StreamListenersExecutorBuilder();
@@ -75,8 +77,12 @@ private boolean isTweetStreamedError() {
 
   private void restartTweetsQueuer() {
     tweetsQueuer.shutdownQueuer();
+    if(reconnecting < 7) {
+      reconnecting++;
+    }
     try {
-      Thread.sleep(SLEEP_MILLIS); // Wait a bit before starting the TweetsQueuer and calling the API again.
+      System.out.println("sleeping " + BACKOFF_SLEEP_INTERVAL_MILLIS * reconnecting);
+      Thread.sleep(BACKOFF_SLEEP_INTERVAL_MILLIS * reconnecting); // Wait a bit before starting the TweetsQueuer and calling the API again.
     } catch (InterruptedException e) {
       e.printStackTrace();
     }

From 0dee721d6e79ad7d496241a7c2099829662ae256 Mon Sep 17 00:00:00 2001
From: tmoskovitch <tmoskovitch@twitter.com>
Date: Mon, 15 Aug 2022 10:34:35 +1200
Subject: [PATCH 4/7] Another layer

---
 .../clientlib/HelloWorldStreaming.java        |  2 +-
 .../twitter/clientlib/StreamingHandler.java   |  2 +-
 .../clientlib/StreamingTweetHandler.java      | 42 +---------------
 .../clientlib/StreamingTweetHandlerImpl.java  | 50 +++++++++++++++++++
 4 files changed, 54 insertions(+), 42 deletions(-)
 create mode 100644 examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java

diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
index d62dfb6..d6d3d27 100644
--- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
+++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
@@ -40,7 +40,7 @@ public static void main(String[] args) {
     try {
       TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor();
       tsle.stream()
-          .streamingHandler(new StreamingTweetHandler(apiInstance))
+          .streamingHandler( new StreamingTweetHandlerImpl(apiInstance))
           .executeListeners();
       while(tsle.getError() == null) {
         try {
diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
index 89359dd..ce04509 100644
--- a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
+++ b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
@@ -34,7 +34,7 @@ public StreamingHandler(TwitterApi apiInstance) {
   @Override
   public boolean processAndVerifyStreamingObject(String tweetString) throws Exception {
     R tweet = getStreamingObject(tweetString);
-    actionOnStreamingObject(tweet);
+      actionOnStreamingObject(tweet);
     return !hasReconnectErrors(tweet);
   }
 }
diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java
index 79def97..e437054 100644
--- a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java
+++ b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandler.java
@@ -22,56 +22,18 @@
 
 package com.twitter.clientlib;
 
-import java.io.InputStream;
-import java.util.HashSet;
-import java.util.Set;
 
 import com.twitter.clientlib.api.TwitterApi;
 import com.twitter.clientlib.model.*;
 
-public class StreamingTweetHandler extends StreamingHandler<StreamingTweetResponse> {
+public abstract class StreamingTweetHandler extends StreamingHandler<StreamingTweetResponse> {
   public StreamingTweetHandler(TwitterApi apiInstance) {
     super(apiInstance);
   }
 
-  @Override
-  public InputStream connectStream() throws ApiException {
-    Set<String> tweetFields = new HashSet<>();
-    tweetFields.add("author_id");
-    tweetFields.add("id");
-    tweetFields.add("created_at");
-    tweetFields.add("geo");
-    Set<String> expansions = new HashSet<>();
-    expansions.add("geo.place_id");
-    Set<String> placeFields = new HashSet<>();
-    placeFields.add("geo");
-    placeFields.add("id");
-    placeFields.add("name");
-    placeFields.add("place_type");
-
-    return this.apiInstance.tweets().sampleStream()
-        .backfillMinutes(0)
-        .tweetFields(tweetFields).expansions(expansions).placeFields(placeFields)
-        .execute();
-  }
-
-  @Override
-  public void actionOnStreamingObject(StreamingTweetResponse streamingTweet) throws ApiException {
-    if(streamingTweet == null) {
-      System.err.println("Error: actionOnTweetsStream - streamingTweet is null ");
-      return;
-    }
-
-    if(streamingTweet.getErrors() != null) {
-      streamingTweet.getErrors().forEach(System.out::println);
-    } else if (streamingTweet.getData() != null) {
-      System.out.println("New streaming tweet: " + streamingTweet.getData().getText());
-    }
-  }
-
   @Override
   public StreamingTweetResponse getStreamingObject(String tweetString) throws Exception {
-    return  StreamingTweetResponse.fromJson(tweetString);
+    return StreamingTweetResponse.fromJson(tweetString);
   }
 
   @Override
diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java
new file mode 100644
index 0000000..26312ba
--- /dev/null
+++ b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java
@@ -0,0 +1,50 @@
+package com.twitter.clientlib;
+
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.twitter.clientlib.api.TwitterApi;
+import com.twitter.clientlib.model.StreamingTweetResponse;
+
+public class StreamingTweetHandlerImpl extends StreamingTweetHandler {
+  public StreamingTweetHandlerImpl(TwitterApi apiInstance) {
+    super(apiInstance);
+  }
+
+  @Override
+  public InputStream connectStream() throws ApiException {
+    Set<String> tweetFields = new HashSet<>();
+    tweetFields.add("author_id");
+    tweetFields.add("id");
+    tweetFields.add("created_at");
+    tweetFields.add("geo");
+    Set<String> expansions = new HashSet<>();
+    expansions.add("geo.place_id");
+    Set<String> placeFields = new HashSet<>();
+    placeFields.add("geo");
+    placeFields.add("id");
+    placeFields.add("name");
+    placeFields.add("place_type");
+
+    return this.apiInstance.tweets().sampleStream()
+        .backfillMinutes(0)
+        .tweetFields(tweetFields).expansions(expansions).placeFields(placeFields)
+        .execute();
+  }
+
+  @Override
+  public void actionOnStreamingObject(StreamingTweetResponse streamingTweet) throws ApiException {
+    if(streamingTweet == null) {
+      System.err.println("Error: actionOnTweetsStream - streamingTweet is null ");
+      return;
+    }
+
+    if(streamingTweet.getErrors() != null) {
+      streamingTweet.getErrors().forEach(System.out::println);
+    } else if (streamingTweet.getData() != null) {
+      System.out.println("New streaming tweet: " + streamingTweet.getData().getText());
+    }
+  }
+
+}

From 3c43f155a217424bb5ee0f21d45632c63271d871 Mon Sep 17 00:00:00 2001
From: tmoskovitch <tmoskovitch@twitter.com>
Date: Tue, 16 Aug 2022 10:25:55 +1200
Subject: [PATCH 5/7] Another layer

---
 .../clientlib/HelloWorldStreaming.java        |  2 +-
 .../twitter/clientlib/IStreamingHandler.java  |  8 +++---
 .../clientlib/ITweetsStreamListener.java      | 27 +++++++++++++++++++
 .../twitter/clientlib/StreamingHandler.java   |  8 +++---
 .../clientlib/StreamingTweetHandlerImpl.java  | 23 +++++++++++++++-
 5 files changed, 58 insertions(+), 10 deletions(-)
 create mode 100644 examples/src/main/java/com/twitter/clientlib/ITweetsStreamListener.java

diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
index d6d3d27..51568bd 100644
--- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
+++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
@@ -40,7 +40,7 @@ public static void main(String[] args) {
     try {
       TweetsStreamListenersExecutor tsle = new TweetsStreamListenersExecutor();
       tsle.stream()
-          .streamingHandler( new StreamingTweetHandlerImpl(apiInstance))
+          .streamingHandler(new StreamingTweetHandlerImpl(apiInstance))
           .executeListeners();
       while(tsle.getError() == null) {
         try {
diff --git a/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java
index e9c937c..8a2d62a 100644
--- a/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java
+++ b/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java
@@ -24,10 +24,10 @@
 
 import java.io.InputStream;
 
-public interface IStreamingHandler<R> {
+public interface IStreamingHandler<T> {
   InputStream connectStream() throws ApiException;
-  void actionOnStreamingObject(R streamingTweet) throws ApiException;
+  void actionOnStreamingObject(T streamingTweet) throws ApiException;
   boolean processAndVerifyStreamingObject(String tweetString) throws Exception;
-  R getStreamingObject(String tweetString) throws Exception;
-  boolean hasReconnectErrors(R streamingTweet);
+  T getStreamingObject(String tweetString) throws Exception;
+  boolean hasReconnectErrors(T streamingTweet);
 }
diff --git a/examples/src/main/java/com/twitter/clientlib/ITweetsStreamListener.java b/examples/src/main/java/com/twitter/clientlib/ITweetsStreamListener.java
new file mode 100644
index 0000000..2fd9638
--- /dev/null
+++ b/examples/src/main/java/com/twitter/clientlib/ITweetsStreamListener.java
@@ -0,0 +1,27 @@
+/*
+Copyright 2020 Twitter, Inc.
+SPDX-License-Identifier: Apache-2.0
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+https://openapi-generator.tech
+Do not edit the class manually.
+*/
+
+
+package com.twitter.clientlib;
+
+public interface ITweetsStreamListener<T> {
+  void actionOnStreamingObject(T streamingTweet)  throws ApiException;
+}
diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
index ce04509..6d34e73 100644
--- a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
+++ b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
@@ -24,8 +24,8 @@
 
 import com.twitter.clientlib.api.TwitterApi;
 
-public abstract class StreamingHandler<R> implements IStreamingHandler<R> {
-  protected final TwitterApi apiInstance;
+public abstract class StreamingHandler<T> implements IStreamingHandler<T> {
+protected final TwitterApi apiInstance;
 
   public StreamingHandler(TwitterApi apiInstance) {
     this.apiInstance = apiInstance;
@@ -33,8 +33,8 @@ public StreamingHandler(TwitterApi apiInstance) {
 
   @Override
   public boolean processAndVerifyStreamingObject(String tweetString) throws Exception {
-    R tweet = getStreamingObject(tweetString);
-      actionOnStreamingObject(tweet);
+    T tweet = getStreamingObject(tweetString);
+    actionOnStreamingObject(tweet);
     return !hasReconnectErrors(tweet);
   }
 }
diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java
index 26312ba..1655122 100644
--- a/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java
+++ b/examples/src/main/java/com/twitter/clientlib/StreamingTweetHandlerImpl.java
@@ -1,3 +1,25 @@
+/*
+Copyright 2020 Twitter, Inc.
+SPDX-License-Identifier: Apache-2.0
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+https://openapi-generator.tech
+Do not edit the class manually.
+*/
+
+
 package com.twitter.clientlib;
 
 import java.io.InputStream;
@@ -46,5 +68,4 @@ public void actionOnStreamingObject(StreamingTweetResponse streamingTweet) throw
       System.out.println("New streaming tweet: " + streamingTweet.getData().getText());
     }
   }
-
 }

From 397a24ce1d76c6c984fc5033242d6456b3c132d2 Mon Sep 17 00:00:00 2001
From: tmoskovitch <tmoskovitch@twitter.com>
Date: Mon, 29 Aug 2022 09:02:41 +1200
Subject: [PATCH 6/7] Use abstract StreamingHandler

---
 .../com/twitter/clientlib/HelloWorldStreaming.java     |  2 +-
 .../java/com/twitter/clientlib/StreamingHandler.java   | 10 ++++++++--
 .../clientlib/TweetsStreamListenersExecutor.java       |  4 ++--
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
index 51568bd..d82e358 100644
--- a/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
+++ b/examples/src/main/java/com/twitter/clientlib/HelloWorldStreaming.java
@@ -44,7 +44,7 @@ public static void main(String[] args) {
           .executeListeners();
       while(tsle.getError() == null) {
         try {
-          System.err.println("==> sleeping 5 ");
+          System.out.println("==> sleeping 5 ");
           Thread.sleep(5000);
         } catch (InterruptedException e) {
           e.printStackTrace();
diff --git a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
index 6d34e73..76a9abe 100644
--- a/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
+++ b/examples/src/main/java/com/twitter/clientlib/StreamingHandler.java
@@ -22,16 +22,22 @@
 
 package com.twitter.clientlib;
 
+import java.io.InputStream;
+
 import com.twitter.clientlib.api.TwitterApi;
 
-public abstract class StreamingHandler<T> implements IStreamingHandler<T> {
+public abstract class StreamingHandler<T> {
 protected final TwitterApi apiInstance;
 
   public StreamingHandler(TwitterApi apiInstance) {
     this.apiInstance = apiInstance;
   }
 
-  @Override
+  public abstract InputStream connectStream() throws ApiException;
+  public abstract void actionOnStreamingObject(T streamingTweet) throws ApiException;
+  public abstract T getStreamingObject(String tweetString) throws Exception;
+  public abstract boolean hasReconnectErrors(T streamingTweet);
+
   public boolean processAndVerifyStreamingObject(String tweetString) throws Exception {
     T tweet = getStreamingObject(tweetString);
     actionOnStreamingObject(tweet);
diff --git a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
index c5cafb0..5226f52 100644
--- a/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
+++ b/examples/src/main/java/com/twitter/clientlib/TweetsStreamListenersExecutor.java
@@ -41,7 +41,7 @@ public class TweetsStreamListenersExecutor {
   private final AtomicBoolean isRunning = new AtomicBoolean(true);
   private final AtomicLong tweetStreamedTime = new AtomicLong(0);
   private Exception caughtException;
-  private IStreamingHandler<?> streamingHandler;
+  private StreamingHandler<?> streamingHandler;
   private long reconnecting = 0;
 
   public StreamListenersExecutorBuilder stream() {
@@ -173,7 +173,7 @@ public void checkTimes()  {
   }
 
   public class StreamListenersExecutorBuilder {
-    public StreamListenersExecutorBuilder streamingHandler(IStreamingHandler<?> streamHandler) {
+    public StreamListenersExecutorBuilder streamingHandler(StreamingHandler<?> streamHandler) {
       streamingHandler = streamHandler;
       return this;
     }

From b333e7bdd9f15e83d0c258389475cd3e7be185fc Mon Sep 17 00:00:00 2001
From: tmoskovitch <tmoskovitch@twitter.com>
Date: Mon, 29 Aug 2022 09:04:19 +1200
Subject: [PATCH 7/7] Use abstract StreamingHandler

---
 .../twitter/clientlib/IStreamingHandler.java  | 33 -------------------
 1 file changed, 33 deletions(-)
 delete mode 100644 examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java

diff --git a/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java b/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java
deleted file mode 100644
index 8a2d62a..0000000
--- a/examples/src/main/java/com/twitter/clientlib/IStreamingHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Copyright 2020 Twitter, Inc.
-SPDX-License-Identifier: Apache-2.0
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
-https://openapi-generator.tech
-Do not edit the class manually.
-*/
-
-
-package com.twitter.clientlib;
-
-import java.io.InputStream;
-
-public interface IStreamingHandler<T> {
-  InputStream connectStream() throws ApiException;
-  void actionOnStreamingObject(T streamingTweet) throws ApiException;
-  boolean processAndVerifyStreamingObject(String tweetString) throws Exception;
-  T getStreamingObject(String tweetString) throws Exception;
-  boolean hasReconnectErrors(T streamingTweet);
-}