From aeb88c27ea84022dc3ae8ce2d3e5946ebc4621b3 Mon Sep 17 00:00:00 2001
From: Dan Richelson
Date: Mon, 4 Apr 2016 14:07:32 -0700
Subject: [PATCH 01/13] bump eventsource dep
---
build.gradle | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/build.gradle b/build.gradle
index 676e19717..899bb477b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -24,7 +24,7 @@ dependencies {
compile "com.google.code.gson:gson:2.2.4"
compile "com.google.guava:guava:19.0"
compile "org.slf4j:slf4j-api:1.7.7"
- compile "com.launchdarkly:okhttp-eventsource:0.1.0"
+ compile "com.launchdarkly:okhttp-eventsource:0.1.2"
compile "redis.clients:jedis:2.8.0"
testCompile "org.easymock:easymock:3.3"
testCompile 'junit:junit:[4.10,)'
From e1ac5f8c646c86e303a313f9aed11e8391dc0e2f Mon Sep 17 00:00:00 2001
From: Dan Richelson
Date: Tue, 5 Apr 2016 17:00:05 -0700
Subject: [PATCH 02/13] Implement bounded resource consumption.
---
.../client/InMemoryFeatureStore.java | 6 +-
.../com/launchdarkly/client/LDClient.java | 86 +++++++----
.../com/launchdarkly/client/LDConfig.java | 63 ++++++--
.../launchdarkly/client/PollingProcessor.java | 62 ++++++++
.../launchdarkly/client/StreamProcessor.java | 34 +++--
.../launchdarkly/client/UpdateProcessor.java | 23 +++
.../launchdarkly/client/VeryBasicFuture.java | 30 ++++
.../com/launchdarkly/client/LDClientTest.java | 143 +++++++++++++++---
.../com/launchdarkly/client/LDConfigTest.java | 18 ++-
.../client/PollingProcessorTest.java | 61 ++++++++
10 files changed, 445 insertions(+), 81 deletions(-)
create mode 100644 src/main/java/com/launchdarkly/client/PollingProcessor.java
create mode 100644 src/main/java/com/launchdarkly/client/UpdateProcessor.java
create mode 100644 src/main/java/com/launchdarkly/client/VeryBasicFuture.java
create mode 100644 src/test/java/com/launchdarkly/client/PollingProcessorTest.java
diff --git a/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java b/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java
index 9a5d2f6a3..a4837955f 100644
--- a/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java
+++ b/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java
@@ -12,9 +12,9 @@
*/
public class InMemoryFeatureStore implements FeatureStore {
- final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- final Map> features = new HashMap<>();
- volatile boolean initialized = false;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Map> features = new HashMap<>();
+ private volatile boolean initialized = false;
/**
diff --git a/src/main/java/com/launchdarkly/client/LDClient.java b/src/main/java/com/launchdarkly/client/LDClient.java
index a5f5bdff4..3cf43ff77 100644
--- a/src/main/java/com/launchdarkly/client/LDClient.java
+++ b/src/main/java/com/launchdarkly/client/LDClient.java
@@ -1,6 +1,7 @@
package com.launchdarkly.client;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonElement;
import org.apache.http.annotation.ThreadSafe;
import org.slf4j.Logger;
@@ -9,14 +10,15 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.URL;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
/**
- *
* A client for the LaunchDarkly API. Client instances are thread-safe. Applications should instantiate
* a single {@code LDClient} for the lifetime of their application.
- *
*/
@ThreadSafe
public class LDClient implements Closeable {
@@ -24,7 +26,7 @@ public class LDClient implements Closeable {
private final LDConfig config;
private final FeatureRequestor requestor;
private final EventProcessor eventProcessor;
- private final StreamProcessor streamProcessor;
+ private UpdateProcessor updateProcessor;
protected static final String CLIENT_VERSION = getClientVersion();
private volatile boolean offline = false;
@@ -33,46 +35,80 @@ public class LDClient implements Closeable {
* Creates a new client instance that connects to LaunchDarkly with the default configuration. In most
* cases, you should use this constructor.
*
- * @param apiKey the API key for your account
+ * @param apiKey the API key for your account
+ * @param waitForMillis when set to greater than zero allows callers to block until the client
+ * has connected to LaunchDarkly and is properly initialized
*/
- public LDClient(String apiKey) {
- this(apiKey, LDConfig.DEFAULT);
+ public LDClient(String apiKey, Long waitForMillis) {
+ this(apiKey, LDConfig.DEFAULT, waitForMillis);
}
/**
* Creates a new client to connect to LaunchDarkly with a custom configuration. This constructor
* can be used to configure advanced client features, such as customizing the LaunchDarkly base URL.
*
- * @param apiKey the API key for your account
- * @param config a client configuration object
+ * @param apiKey the API key for your account
+ * @param config a client configuration object
+ * @param waitForMillis when set to greater than zero allows callers to block until the client
+ * has connected to LaunchDarkly and is properly initialized
*/
- public LDClient(String apiKey, LDConfig config) {
+ public LDClient(String apiKey, LDConfig config, Long waitForMillis) {
this.config = config;
this.requestor = createFeatureRequestor(apiKey, config);
this.eventProcessor = createEventProcessor(apiKey, config);
+ if (config.offline || config.useLdd) {
+ logger.info("Starting LaunchDarkly client in offline mode");
+ setOffline();
+ return;
+ }
+
if (config.stream) {
- logger.debug("Enabling streaming API");
- this.streamProcessor = createStreamProcessor(apiKey, config, requestor);
- this.streamProcessor.subscribe();
+ logger.info("Enabling streaming API");
+ this.updateProcessor = createStreamProcessor(apiKey, config, requestor);
} else {
- logger.debug("Streaming API disabled");
- this.streamProcessor = null;
+ logger.info("Disabling streaming API");
+ this.updateProcessor = createPollingProcessor(config);
+ }
+
+ Future startFuture = updateProcessor.start();
+
+ if (waitForMillis > 0L) {
+ logger.info("Waiting up to " + waitForMillis + " milliseconds for LaunchDarkly client to start...");
+ try {
+ startFuture.get(waitForMillis, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ logger.error("Timeout encountered waiting for LaunchDarkly client initialization");
+ } catch (Exception e) {
+ logger.error("Exception encountered waiting for LaunchDarkly client initialization", e);
+ }
}
}
+ public boolean initialized() {
+ return isOffline() || config.useLdd || updateProcessor.initialized();
+ }
+
+ @VisibleForTesting
protected FeatureRequestor createFeatureRequestor(String apiKey, LDConfig config) {
return new FeatureRequestor(apiKey, config);
}
+ @VisibleForTesting
protected EventProcessor createEventProcessor(String apiKey, LDConfig config) {
return new EventProcessor(apiKey, config);
}
+ @VisibleForTesting
protected StreamProcessor createStreamProcessor(String apiKey, LDConfig config, FeatureRequestor requestor) {
return new StreamProcessor(apiKey, config, requestor);
}
+ @VisibleForTesting
+ protected PollingProcessor createPollingProcessor(LDConfig config) {
+ return new PollingProcessor(config, requestor);
+ }
+
/**
* Tracks that a user performed an event.
@@ -145,24 +181,11 @@ public boolean getFlag(String featureKey, LDUser user, boolean defaultValue) {
* @return whether or not the flag should be enabled, or {@code defaultValue} if the flag is disabled in the LaunchDarkly control panel
*/
public boolean toggle(String featureKey, LDUser user, boolean defaultValue) {
- if (this.offline) {
+ if (!initialized()) {
return defaultValue;
}
try {
- FeatureRep result;
- if (this.config.stream && this.streamProcessor != null && this.streamProcessor.initialized()) {
- logger.debug("Using feature flag stored from streaming API");
- result = (FeatureRep) this.streamProcessor.getFeature(featureKey);
- if (config.debugStreaming) {
- FeatureRep pollingResult = requestor.makeRequest(featureKey, true);
- if (!result.equals(pollingResult)) {
- logger.warn("Mismatch between streaming and polling feature! Streaming: {} Polling: {}", result, pollingResult);
- }
- }
- } else {
- // If streaming is enabled, always get the latest version of the feature while polling
- result = requestor.makeRequest(featureKey, this.config.stream);
- }
+ FeatureRep result = (FeatureRep) config.featureStore.get(featureKey);
if (result == null) {
logger.warn("Unknown feature flag " + featureKey + "; returning default value");
sendFlagRequestEvent(featureKey, user, defaultValue, defaultValue);
@@ -195,8 +218,8 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) {
@Override
public void close() throws IOException {
this.eventProcessor.close();
- if (this.streamProcessor != null) {
- this.streamProcessor.close();
+ if (this.updateProcessor != null) {
+ this.updateProcessor.close();
}
}
@@ -226,7 +249,6 @@ public void setOnline() {
}
/**
- *
* @return whether the client is in offline mode
*/
public boolean isOffline() {
diff --git a/src/main/java/com/launchdarkly/client/LDConfig.java b/src/main/java/com/launchdarkly/client/LDConfig.java
index d1e25928b..0a7757991 100644
--- a/src/main/java/com/launchdarkly/client/LDConfig.java
+++ b/src/main/java/com/launchdarkly/client/LDConfig.java
@@ -21,6 +21,7 @@ public final class LDConfig {
private static final int DEFAULT_CONNECT_TIMEOUT = 2000;
private static final int DEFAULT_SOCKET_TIMEOUT = 10000;
private static final int DEFAULT_FLUSH_INTERVAL = 5;
+ private static final long DEFAULT_POLLING_INTERVAL_MILLIS = 1000L;
private static final Logger logger = LoggerFactory.getLogger(LDConfig.class);
protected static final LDConfig DEFAULT = new Builder().build();
@@ -37,6 +38,8 @@ public final class LDConfig {
final boolean debugStreaming;
final FeatureStore featureStore;
final boolean useLdd;
+ final boolean offline;
+ final long pollingIntervalMillis;
protected LDConfig(Builder builder) {
this.baseURI = builder.baseURI;
@@ -51,21 +54,26 @@ protected LDConfig(Builder builder) {
this.debugStreaming = builder.debugStreaming;
this.featureStore = builder.featureStore;
this.useLdd = builder.useLdd;
+ this.offline = builder.offline;
+ if (builder.pollingIntervalMillis < DEFAULT_POLLING_INTERVAL_MILLIS) {
+ this.pollingIntervalMillis = DEFAULT_POLLING_INTERVAL_MILLIS;
+ } else {
+ this.pollingIntervalMillis = builder.pollingIntervalMillis;
+ }
}
/**
* A builder that helps construct {@link com.launchdarkly.client.LDConfig} objects. Builder
* calls can be chained, enabling the following pattern:
- *
+ *
*
* LDConfig config = new LDConfig.Builder()
* .connectTimeout(3)
* .socketTimeout(3)
* .build()
*
- *
*/
- public static class Builder{
+ public static class Builder {
private URI baseURI = DEFAULT_BASE_URI;
private URI eventsURI = DEFAULT_EVENTS_URI;
private URI streamURI = DEFAULT_STREAM_URI;
@@ -79,6 +87,8 @@ public static class Builder{
private boolean stream = true;
private boolean debugStreaming = false;
private boolean useLdd = false;
+ private boolean offline = false;
+ private long pollingIntervalMillis = DEFAULT_POLLING_INTERVAL_MILLIS;
private FeatureStore featureStore = new InMemoryFeatureStore();
/**
@@ -231,6 +241,7 @@ public Builder capacity(int capacity) {
* If none of {@link #proxyHost(String)}, {@link #proxyPort(int)} or {@link #proxyScheme(String)} are specified,
* a proxy will not be used, and {@link LDClient} will connect to LaunchDarkly directly.
*
+ *
* @param host
* @return the builder
*/
@@ -242,11 +253,12 @@ public Builder proxyHost(String host) {
/**
* Set the port to use for an HTTP proxy for making connections to LaunchDarkly. If not set (but {@link #proxyHost(String)}
* or {@link #proxyScheme(String)} are specified, the default port for the scheme will be used.
- *
+ *
*
* If none of {@link #proxyHost(String)}, {@link #proxyPort(int)} or {@link #proxyScheme(String)} are specified,
* a proxy will not be used, and {@link LDClient} will connect to LaunchDarkly directly.
*
+ *
* @param port
* @return the builder
*/
@@ -258,11 +270,12 @@ public Builder proxyPort(int port) {
/**
* Set the scheme to use for an HTTP proxy for making connections to LaunchDarkly. If not set (but {@link #proxyHost(String)}
* or {@link #proxyPort(int)} are specified, the default https scheme will be used.
- *
+ *
*
* If none of {@link #proxyHost(String)}, {@link #proxyPort(int)} or {@link #proxyScheme(String)} are specified,
* a proxy will not be used, and {@link LDClient} will connect to LaunchDarkly directly.
*
+ *
* @param scheme
* @return the builder
*/
@@ -274,6 +287,7 @@ public Builder proxyScheme(String scheme) {
/**
* Set whether this client should subscribe to the streaming API, or whether the LaunchDarkly daemon is in use
* instead
+ *
* @param useLdd
* @return the builder
*/
@@ -282,6 +296,29 @@ public Builder useLdd(boolean useLdd) {
return this;
}
+ /**
+ * Set whether this client is offline.
+ *
+ * @param offline when set to true no calls to LaunchDarkly will be made.
+ * @return the builder
+ */
+ public Builder offline(boolean offline) {
+ this.offline = offline;
+ return this;
+ }
+
+ /**
+ * Set the polling interval (when streaming is disabled). Values less than {@value #DEFAULT_POLLING_INTERVAL_MILLIS}
+ * will be set to the default of {@value #DEFAULT_POLLING_INTERVAL_MILLIS}
+ *
+ * @param pollingIntervalMillis rule update polling interval in milliseconds.
+ * @return the builder
+ */
+ public Builder pollingIntervalMillis(long pollingIntervalMillis) {
+ this.pollingIntervalMillis = pollingIntervalMillis;
+ return this;
+ }
+
HttpHost proxyHost() {
if (this.proxyHost == null && this.proxyPort == -1 && this.proxyScheme == null) {
return null;
@@ -294,6 +331,7 @@ HttpHost proxyHost() {
/**
* Build the configured {@link com.launchdarkly.client.LDConfig} object
+ *
* @return the {@link com.launchdarkly.client.LDConfig} configured by this builder
*/
public LDConfig build() {
@@ -311,9 +349,9 @@ private URIBuilder getBuilder() {
private URIBuilder getEventsBuilder() {
return new URIBuilder()
- .setScheme(eventsURI.getScheme())
- .setHost(eventsURI.getHost())
- .setPort(eventsURI.getPort());
+ .setScheme(eventsURI.getScheme())
+ .setHost(eventsURI.getHost())
+ .setPort(eventsURI.getPort());
}
HttpGet getRequest(String apiKey, String path) {
@@ -325,8 +363,7 @@ HttpGet getRequest(String apiKey, String path) {
request.addHeader("User-Agent", "JavaClient/" + LDClient.CLIENT_VERSION);
return request;
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.error("Unhandled exception in LaunchDarkly client", e);
return null;
}
@@ -341,8 +378,7 @@ HttpPost postRequest(String apiKey, String path) {
request.addHeader("User-Agent", "JavaClient/" + LDClient.CLIENT_VERSION);
return request;
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.error("Unhandled exception in LaunchDarkly client", e);
return null;
}
@@ -357,8 +393,7 @@ HttpPost postEventsRequest(String apiKey, String path) {
request.addHeader("User-Agent", "JavaClient/" + LDClient.CLIENT_VERSION);
return request;
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.error("Unhandled exception in LaunchDarkly client", e);
return null;
}
diff --git a/src/main/java/com/launchdarkly/client/PollingProcessor.java b/src/main/java/com/launchdarkly/client/PollingProcessor.java
new file mode 100644
index 000000000..abf31ae57
--- /dev/null
+++ b/src/main/java/com/launchdarkly/client/PollingProcessor.java
@@ -0,0 +1,62 @@
+package com.launchdarkly.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class PollingProcessor implements UpdateProcessor {
+ private static final Logger logger = LoggerFactory.getLogger(PollingProcessor.class);
+
+ private final FeatureRequestor requestor;
+ private final LDConfig config;
+ private final FeatureStore store;
+ private volatile boolean initialized = false;
+ private ScheduledExecutorService scheduler = null;
+
+ PollingProcessor(LDConfig config, FeatureRequestor requestor) {
+ this.requestor = requestor;
+ this.config = config;
+ this.store = config.featureStore;
+ }
+
+ @Override
+ public boolean initialized() {
+ return initialized && config.featureStore.initialized();
+ }
+
+ @Override
+ public void close() throws IOException {
+ scheduler.shutdown();
+ }
+
+ @Override
+ public Future start() {
+ logger.info("Starting LaunchDarkly polling client with interval: "
+ + config.pollingIntervalMillis + " milliseconds");
+ final VeryBasicFuture initFuture = new VeryBasicFuture();
+ scheduler = Executors.newScheduledThreadPool(1);
+
+ scheduler.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ store.init(requestor.makeAllRequest(true));
+ if (!initialized) {
+ logger.info("Initialized LaunchDarkly client.");
+ initialized = true;
+ initFuture.completed(null);
+ }
+ } catch (IOException e) {
+ logger.error("Encountered exception in LaunchDarkly client when retrieving update", e);
+ }
+ }
+ }, 0L, config.pollingIntervalMillis, TimeUnit.MILLISECONDS);
+
+ return initFuture;
+ }
+}
diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java
index 951920afd..27da2f6d4 100644
--- a/src/main/java/com/launchdarkly/client/StreamProcessor.java
+++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java
@@ -2,20 +2,20 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+import com.launchdarkly.eventsource.EventHandler;
+import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.MessageEvent;
import okhttp3.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.Map;
-import com.launchdarkly.eventsource.EventSource;
-import com.launchdarkly.eventsource.EventHandler;
+import java.util.concurrent.Future;
-class StreamProcessor implements Closeable {
+class StreamProcessor implements UpdateProcessor {
private static final String PUT = "put";
private static final String PATCH = "patch";
private static final String DELETE = "delete";
@@ -28,6 +28,7 @@ class StreamProcessor implements Closeable {
private final String apiKey;
private final FeatureRequestor requestor;
private EventSource es;
+ private volatile boolean initialized = false;
StreamProcessor(String apiKey, LDConfig config, FeatureRequestor requestor) {
@@ -37,11 +38,9 @@ class StreamProcessor implements Closeable {
this.requestor = requestor;
}
- void subscribe() {
- // If the LaunchDarkly daemon is to be used, then do not subscribe to the stream
- if (config.useLdd) {
- return;
- }
+ @Override
+ public Future start() {
+ final VeryBasicFuture initFuture = new VeryBasicFuture();
Headers headers = new Headers.Builder()
.add("Authorization", "api_key " + this.apiKey)
@@ -63,6 +62,11 @@ public void onMessage(String name, MessageEvent event) throws Exception {
Type type = new TypeToken