From 9abc1a399d64a34d6ed792df9a5be8f46361e312 Mon Sep 17 00:00:00 2001 From: kohlerpop1 Date: Mon, 22 Sep 2025 22:30:52 -0400 Subject: [PATCH] Change static schedulers to AsyncHandler to hold for heartbeat and reconnect logic. --- .../jwdeveloper/tiktok/TikTokLiveClient.java | 14 +++--- .../tiktok/TikTokLiveClientBuilder.java | 2 +- .../tiktok/common/AsyncHandler.java | 22 ++++++++++ .../websocket/TikTokWebSocketClient.java | 2 +- .../websocket/WebSocketHeartbeatTask.java | 44 ++++++------------- 5 files changed, 45 insertions(+), 39 deletions(-) create mode 100644 Client/src/main/java/io/github/jwdeveloper/tiktok/common/AsyncHandler.java diff --git a/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClient.java b/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClient.java index d38c5c63..bb58426c 100644 --- a/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClient.java +++ b/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClient.java @@ -23,6 +23,7 @@ package io.github.jwdeveloper.tiktok; import com.google.protobuf.ByteString; +import io.github.jwdeveloper.tiktok.common.AsyncHandler; import io.github.jwdeveloper.tiktok.data.events.*; import io.github.jwdeveloper.tiktok.data.events.common.TikTokEvent; import io.github.jwdeveloper.tiktok.data.events.control.*; @@ -39,7 +40,7 @@ import lombok.Getter; import java.util.Base64; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.*; import java.util.function.Consumer; import java.util.logging.Logger; @@ -89,12 +90,11 @@ public void connect() { tikTokEventHandler.publish(this, new TikTokDisconnectedEvent("Exception: " + e.getMessage())); if (e instanceof TikTokLiveOfflineHostException && clientSettings.isRetryOnConnectionFailure()) { - try { - Thread.sleep(clientSettings.getRetryConnectionTimeout().toMillis()); - } catch (Exception ignored) {} - logger.info("Reconnecting"); - tikTokEventHandler.publish(this, new TikTokReconnectingEvent()); - this.connect(); + AsyncHandler.getReconnectScheduler().schedule(() -> { + logger.info("Reconnecting"); + tikTokEventHandler.publish(this, new TikTokReconnectingEvent()); + this.connect(); + }, clientSettings.getRetryConnectionTimeout().toMillis(), TimeUnit.MILLISECONDS); } throw e; } catch (Exception e) { diff --git a/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClientBuilder.java b/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClientBuilder.java index ff1316cf..66aa9428 100644 --- a/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClientBuilder.java +++ b/Client/src/main/java/io/github/jwdeveloper/tiktok/TikTokLiveClientBuilder.java @@ -133,7 +133,7 @@ public LiveClient build() { //networking dependance.registerSingleton(HttpClientFactory.class); - dependance.registerSingleton(WebSocketHeartbeatTask.class); // True global singleton - Static objects are located to serve as global + dependance.registerSingleton(WebSocketHeartbeatTask.class); if (clientSettings.isOffline()) { dependance.registerSingleton(LiveSocketClient.class, TikTokWebSocketOfflineClient.class); dependance.registerSingleton(LiveHttpClient.class, TikTokLiveHttpOfflineClient.class); diff --git a/Client/src/main/java/io/github/jwdeveloper/tiktok/common/AsyncHandler.java b/Client/src/main/java/io/github/jwdeveloper/tiktok/common/AsyncHandler.java new file mode 100644 index 00000000..172845ed --- /dev/null +++ b/Client/src/main/java/io/github/jwdeveloper/tiktok/common/AsyncHandler.java @@ -0,0 +1,22 @@ +package io.github.jwdeveloper.tiktok.common; + +import lombok.Getter; + +import java.util.concurrent.*; + +public class AsyncHandler +{ + @Getter + private static final ScheduledExecutorService heartBeatScheduler = Executors.newScheduledThreadPool(1, r -> { + Thread t = new Thread(r, "heartbeat-pool"); + t.setDaemon(true); + return t; + }); + + @Getter + private static final ScheduledExecutorService reconnectScheduler = Executors.newScheduledThreadPool(0, r -> { + Thread t = new Thread(r, "reconnect-pool"); + t.setDaemon(true); + return t; + }); +} \ No newline at end of file diff --git a/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/TikTokWebSocketClient.java b/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/TikTokWebSocketClient.java index 6a2fc601..61d336d0 100644 --- a/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/TikTokWebSocketClient.java +++ b/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/TikTokWebSocketClient.java @@ -142,7 +142,7 @@ public void stop(LiveClientStopType type) { case DISCONNECT -> webSocketClient.closeConnection(CloseFrame.NORMAL, ""); default -> webSocketClient.close(); } - heartbeatTask.stop(webSocketClient); + heartbeatTask.stop(); } webSocketClient = null; } diff --git a/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/WebSocketHeartbeatTask.java b/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/WebSocketHeartbeatTask.java index 6c196b33..375dba7a 100644 --- a/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/WebSocketHeartbeatTask.java +++ b/Client/src/main/java/io/github/jwdeveloper/tiktok/websocket/WebSocketHeartbeatTask.java @@ -22,6 +22,7 @@ */ package io.github.jwdeveloper.tiktok.websocket; +import io.github.jwdeveloper.tiktok.common.AsyncHandler; import org.java_websocket.WebSocket; import java.util.*; @@ -29,47 +30,30 @@ public class WebSocketHeartbeatTask { - // Single shared pool for all heartbeat tasks - private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, r -> { - Thread t = new Thread(r, "heartbeat-pool"); - t.setDaemon(true); - return t; - }); - private static final Map> tasks = new ConcurrentHashMap<>(); - private static final Map commTime = new ConcurrentHashMap<>(); + private ScheduledFuture task; + private Long commTime; - private final byte[] heartbeatBytes = Base64.getDecoder().decode("MgJwYjoCaGI="); // Used to be '3A026862' aka ':\x02hb', now is '2\x02pb:\x02hb'. + private final static byte[] heartbeatBytes = Base64.getDecoder().decode("MgJwYjoCaGI="); // Used to be '3A026862' aka ':\x02hb', now is '2\x02pb:\x02hb'. public void run(WebSocket webSocket, long pingTaskTime) { - stop(webSocket); // remove existing task if any + stop(); // remove existing task if any - tasks.put(webSocket, scheduler.scheduleAtFixedRate(() -> { + task = AsyncHandler.getHeartBeatScheduler().scheduleAtFixedRate(() -> { try { if (webSocket.isOpen()) { webSocket.send(heartbeatBytes); - commTime.put(webSocket, System.currentTimeMillis()); - } else { - Long time = commTime.get(webSocket); - if (time != null && System.currentTimeMillis() - time >= 60_000) // Stop if disconnected longer than 60s - stop(webSocket); - } + commTime = System.currentTimeMillis(); + } else if (commTime != null && System.currentTimeMillis() - commTime >= 60_000) // Stop if disconnected longer than 60s + stop(); } catch (Exception e) { e.printStackTrace(); - stop(webSocket); + stop(); } - }, 0, pingTaskTime, TimeUnit.MILLISECONDS)); + }, 0, pingTaskTime, TimeUnit.MILLISECONDS); } - public void stop(WebSocket webSocket) { - ScheduledFuture future = tasks.remove(webSocket); - if (future != null) - future.cancel(true); - commTime.remove(webSocket); - } - - public void shutdown() { - tasks.values().forEach(f -> f.cancel(true)); - commTime.clear(); - scheduler.shutdownNow(); + public void stop() { + if (task != null) + task.cancel(true); } } \ No newline at end of file