diff --git a/CHANGELOG.md b/CHANGELOG.md index 9eb4ca16f..4e7a493b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to the LaunchDarkly Java SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). +## [2.0.5] - 2016-11-09 +### Changed +- The StreamProcessor now listens for heartbeats from the streaming API, and will automatically reconnect if heartbeats are not received. + ## [2.0.4] - 2016-10-12 ### Changed - Updated GSON dependency version to 2.7 diff --git a/build.gradle b/build.gradle index 4b78d6980..c0521cead 100644 --- a/build.gradle +++ b/build.gradle @@ -19,7 +19,7 @@ repositories { allprojects { group = 'com.launchdarkly' - version = "2.0.4" + version = "2.0.5-SNAPSHOT" sourceCompatibility = 1.7 targetCompatibility = 1.7 } @@ -32,7 +32,7 @@ dependencies { compile "com.google.guava:guava:19.0" compile "joda-time:joda-time:2.9.3" compile "org.slf4j:slf4j-api:1.7.21" - compile group: "com.launchdarkly", name: "okhttp-eventsource", version: "0.2.3", changing: true + compile group: "com.launchdarkly", name: "okhttp-eventsource", version: "1.0.0", changing: true compile "redis.clients:jedis:2.8.1" testCompile "org.easymock:easymock:3.4" testCompile 'junit:junit:4.12' diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index bea59696a..20c7d58be 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -1,16 +1,19 @@ package com.launchdarkly.client; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.Gson; import com.launchdarkly.eventsource.EventHandler; import com.launchdarkly.eventsource.EventSource; import com.launchdarkly.eventsource.MessageEvent; +import com.launchdarkly.eventsource.ReadyState; import okhttp3.Headers; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; -import java.util.concurrent.Future; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; class StreamProcessor implements UpdateProcessor { @@ -20,12 +23,15 @@ class StreamProcessor implements UpdateProcessor { private static final String INDIRECT_PUT = "indirect/put"; private static final String INDIRECT_PATCH = "indirect/patch"; private static final Logger logger = LoggerFactory.getLogger(StreamProcessor.class); + private static final int DEAD_CONNECTION_INTERVAL_SECONDS = 300; private final FeatureStore store; private final LDConfig config; private final String sdkKey; private final FeatureRequestor requestor; - private EventSource es; + private final ScheduledExecutorService heartbeatDetectorService; + private volatile DateTime lastHeartbeat; + private volatile EventSource es; private AtomicBoolean initialized = new AtomicBoolean(false); @@ -34,6 +40,11 @@ class StreamProcessor implements UpdateProcessor { this.config = config; this.sdkKey = sdkKey; this.requestor = requestor; + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("LaunchDarkly-HeartbeatDetector-%d") + .build(); + this.heartbeatDetectorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + heartbeatDetectorService.scheduleAtFixedRate(new HeartbeatDetector(), 1, 1, TimeUnit.MINUTES); } @Override @@ -50,11 +61,11 @@ public Future start() { @Override public void onOpen() throws Exception { - } @Override public void onMessage(String name, MessageEvent event) throws Exception { + lastHeartbeat = DateTime.now(); Gson gson = new Gson(); switch (name) { case PUT: @@ -100,6 +111,12 @@ public void onMessage(String name, MessageEvent event) throws Exception { } } + @Override + public void onComment(String comment) { + logger.debug("Received a heartbeat"); + lastHeartbeat = DateTime.now(); + } + @Override public void onError(Throwable throwable) { logger.error("Encountered EventSource error: " + throwable.getMessage()); @@ -125,6 +142,14 @@ public void close() throws IOException { if (store != null) { store.close(); } + if (heartbeatDetectorService != null) { + heartbeatDetectorService.shutdownNow(); + try { + heartbeatDetectorService.awaitTermination(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.error("Encountered an exception terminating heartbeat detector: " + e.getMessage()); + } + } } @Override @@ -171,4 +196,28 @@ int version() { } } + + private final class HeartbeatDetector implements Runnable { + + @Override + public void run() { + DateTime reconnectThresholdTime = DateTime.now().minusSeconds(DEAD_CONNECTION_INTERVAL_SECONDS); + // We only want to force the reconnect if the ES connection is open. If not, it's already trying to + // connect anyway, or this processor was shut down + if (lastHeartbeat.isBefore(reconnectThresholdTime) && es.getState() == ReadyState.OPEN) { + try { + logger.info("Stream stopped receiving heartbeats- reconnecting."); + es.close(); + } catch (IOException e) { + logger.error("Encountered exception closing stream connection: " + e.getMessage()); + } finally { + if (es.getState() == ReadyState.SHUTDOWN) { + start(); + } else { + logger.error("Expected ES to be in state SHUTDOWN, but it's currently in state " + es.getState().toString()); + } + } + } + } + } }