From a0aa42b690fabe4fb85060e9e2789f11047d052b Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Mon, 17 Aug 2015 14:47:42 -0700 Subject: [PATCH 01/15] Initial support for streaming mode --- build.gradle | 3 +- .../com/launchdarkly/client/EventSource.java | 664 ++++++++++++++++++ .../com/launchdarkly/client/FeatureRep.java | 16 + .../com/launchdarkly/client/FeatureStore.java | 15 + .../client/InMemoryFeatureStore.java | 97 +++ .../com/launchdarkly/client/LDClient.java | 110 +-- .../com/launchdarkly/client/LDConfig.java | 27 + .../launchdarkly/client/StreamProcessor.java | 123 ++++ 8 files changed, 1014 insertions(+), 41 deletions(-) create mode 100644 src/main/java/com/launchdarkly/client/EventSource.java create mode 100644 src/main/java/com/launchdarkly/client/FeatureStore.java create mode 100644 src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java create mode 100644 src/main/java/com/launchdarkly/client/StreamProcessor.java diff --git a/build.gradle b/build.gradle index 8c3f07667..8b614f25c 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ repositories { allprojects { group = 'com.launchdarkly' - version = "0.10.0" + version = "0.11.0" sourceCompatibility = 1.6 targetCompatibility = 1.6 } @@ -22,6 +22,7 @@ dependencies { compile "commons-codec:commons-codec:1.5" compile "com.google.code.gson:gson:2.2.4" compile "org.slf4j:slf4j-api:1.7.7" + compile "org.glassfish.jersey.media:jersey-media-sse:2.20" testCompile "org.easymock:easymock:3.3" testCompile 'junit:junit:[4.10,)' testRuntime "org.slf4j:slf4j-simple:1.7.7" diff --git a/src/main/java/com/launchdarkly/client/EventSource.java b/src/main/java/com/launchdarkly/client/EventSource.java new file mode 100644 index 000000000..63fed2d54 --- /dev/null +++ b/src/main/java/com/launchdarkly/client/EventSource.java @@ -0,0 +1,664 @@ +package com.launchdarkly.client; + +import org.glassfish.jersey.internal.util.collection.StringKeyIgnoreCaseMultivaluedMap; +import org.glassfish.jersey.media.sse.*; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.ServiceUnavailableException; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MultivaluedMap; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; + +// EventSource class modified from +// https://github.com/jersey/jersey/blob/master/media/sse/src/main/java/org/glassfish/jersey/media/sse/EventSource.java +// Modifications: +// - support for custom headers +// - set spawned thread as a daemon to permit application shutdown +public class EventSource implements EventListener { + + /** + * Default SSE {@link EventSource} reconnect delay value in milliseconds. + * + * @since 2.3 + */ + public static final long RECONNECT_DEFAULT = 500; + + private static enum State { + READY, OPEN, CLOSED + } + + private static final Level CONNECTION_ERROR_LEVEL = Level.FINE; + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EventSource.class); + + /** + * SSE streaming resource target. + */ + private final WebTarget target; + /** + * Default reconnect delay. + */ + private final long reconnectDelay; + /** + * Flag indicating if the persistent HTTP connections should be disabled. + */ + private final boolean disableKeepAlive; + /** + * Incoming SSE event processing task executor. + */ + private final ScheduledExecutorService executor; + /** + * Event source internal state. + */ + private final AtomicReference state = new AtomicReference(State.READY); + /** + * List of all listeners not bound to receive only events of a particular name. + */ + private final List unboundListeners = new CopyOnWriteArrayList(); + /** + * A map of listeners bound to receive only events of a particular name. + */ + private final ConcurrentMap> boundListeners = new ConcurrentHashMap>(); + + private final MultivaluedMap headers; + + /** + * Jersey {@link EventSource} builder class. + * + * Event source builder provides methods that let you conveniently configure and subsequently build + * a new {@code EventSource} instance. You can obtain a new event source builder instance using + * a static {@link EventSource#target(javax.ws.rs.client.WebTarget) EventSource.target(endpoint)} factory method. + *

+ * For example: + *

+   * EventSource es = EventSource.target(endpoint).named("my source")
+   *                             .reconnectingEvery(5, SECONDS)
+   *                             .open();
+   * 
+ *

+ * + * @since 2.3 + */ + public static class Builder { + + private final WebTarget endpoint; + + private long reconnect = EventSource.RECONNECT_DEFAULT; + private String name = null; + private boolean disableKeepAlive = true; + + private Builder(final WebTarget endpoint) { + this.endpoint = endpoint; + } + + private MultivaluedMap headers = new StringKeyIgnoreCaseMultivaluedMap(); + + /** + * Set a custom name for the event source. + *

+ * At present, custom event source name is mainly useful to be able to distinguish different event source + * event processing threads from one another. If not set, a default name will be generated using the + * SSE endpoint URI. + *

+ * + * @param name custom event source name. + * @return updated event source builder instance. + */ + public Builder named(String name) { + this.name = name; + return this; + } + + /** + * Instruct event source to use + * persistent HTTP connections when connecting + * (or reconnecting) to the SSE endpoint, provided the mechanism is supported by the underlying client + * {@link org.glassfish.jersey.client.spi.Connector}. + *

+ * By default, the persistent HTTP connections are disabled for the reasons discussed in the {@link EventSource} + * javadoc. + *

+ * + * @return updated event source builder instance. + */ + public Builder usePersistentConnections() { + disableKeepAlive = false; + return this; + } + + public Builder header(String name, Object value) { + if (value == null) { + headers.remove(name); + } else { + headers.add(name, value); + } + return this; + } + + /** + * Set the initial reconnect delay to be used by the event source. + *

+ * Note that this value may be later overridden by the SSE endpoint using either a {@code retry} SSE event field + * or HTTP 503 + {@value javax.ws.rs.core.HttpHeaders#RETRY_AFTER} mechanism as described + * in the {@link EventSource} javadoc. + *

+ * + * @param delay the default time to wait before attempting to recover from a connection loss. + * @param unit time unit of the reconnect delay parameter. + * @return updated event source builder instance. + */ + public Builder reconnectingEvery(final long delay, TimeUnit unit) { + reconnect = unit.toMillis(delay); + return this; + } + + /** + * Build new SSE event source pointing at a SSE streaming {@link WebTarget web target}. + *

+ * The returned event source is ready, but not {@link EventSource#open() connected} to the SSE endpoint. + * It is expected that you will manually invoke its {@link #open()} method once you are ready to start + * receiving SSE events. In case you want to build an event source instance that is already connected + * to the SSE endpoint, use the event source builder {@link #open()} method instead. + *

+ *

+ * Once the event source is open, the incoming events are processed by the event source in an + * asynchronous task that runs in an internal single-threaded {@link ScheduledExecutorService + * scheduled executor service}. + *

+ * + * @return new event source instance, ready to be connected to the SSE endpoint. + * @see #open() + */ + public EventSource build() { + return new EventSource(endpoint, name, reconnect, disableKeepAlive, false, headers); + } + + /** + * Build new SSE event source pointing at a SSE streaming {@link WebTarget web target}. + *

+ * The returned event source is already {@link EventSource#open() connected} to the SSE endpoint + * and is processing any new incoming events. In case you want to build an event source instance + * that is already ready, but not automatically connected to the SSE endpoint, use the event source + * builder {@link #build()} method instead. + *

+ *

+ * The incoming events are processed by the event source in an asynchronous task that runs in an + * internal single-threaded {@link ScheduledExecutorService scheduled executor service}. + *

+ * + * @return new event source instance, already connected to the SSE endpoint. + * @see #build() + */ + public EventSource open() { + // opening directly in the constructor is just plain ugly... + final EventSource source = new EventSource(endpoint, name, reconnect, disableKeepAlive, false, headers); + source.open(); + return source; + } + } + + /** + * Create a new {@link EventSource.Builder event source builder} that provides convenient way how to + * configure and fine-tune various aspects of a newly prepared event source instance. + * + * @param endpoint SSE streaming endpoint. Must not be {@code null}. + * @return a builder of a new event source instance pointing at the specified SSE streaming endpoint. + * @throws NullPointerException in case the supplied web target is {@code null}. + * @since 2.3 + */ + public static Builder target(WebTarget endpoint) { + return new Builder(endpoint); + } + + /** + * Create new SSE event source and open a connection it to the supplied SSE streaming {@link WebTarget web target}. + * + * This constructor is performs the same series of actions as a call to: + *
EventSource.target(endpoint).open()
+ *

+ * The created event source instance automatically {@link #open opens a connection} to the supplied SSE streaming + * web target and starts processing incoming {@link org.glassfish.jersey.media.sse.InboundEvent events}. + *

+ *

+ * The incoming events are processed by the event source in an asynchronous task that runs in an + * internal single-threaded {@link ScheduledExecutorService scheduled executor service}. + *

+ * + * @param endpoint SSE streaming endpoint. Must not be {@code null}. + * @throws NullPointerException in case the supplied web target is {@code null}. + */ + public EventSource(final WebTarget endpoint) { + this(endpoint, true, new StringKeyIgnoreCaseMultivaluedMap()); + } + + /** + * Create new SSE event source pointing at a SSE streaming {@link WebTarget web target}. + * + * This constructor is performs the same series of actions as a call to: + *
+   * if (open) {
+   *     EventSource.target(endpoint).open();
+   * } else {
+   *     EventSource.target(endpoint).build();
+   * }
+ *

+ * If the supplied {@code open} flag is {@code true}, the created event source instance automatically + * {@link #open opens a connection} to the supplied SSE streaming web target and starts processing incoming + * {@link org.glassfish.jersey.media.sse.InboundEvent events}. + * Otherwise, if the {@code open} flag is set to {@code false}, the created event source instance + * is not automatically connected to the web target. In this case it is expected that the user who + * created the event source will manually invoke its {@link #open()} method. + *

+ *

+ * Once the event source is open, the incoming events are processed by the event source in an + * asynchronous task that runs in an internal single-threaded {@link ScheduledExecutorService + * scheduled executor service}. + *

+ * + * @param endpoint SSE streaming endpoint. Must not be {@code null}. + * @param open if {@code true}, the event source will immediately connect to the SSE endpoint, + * if {@code false}, the connection will not be established until {@link #open()} method is + * called explicitly on the event stream. + * @throws NullPointerException in case the supplied web target is {@code null}. + */ + public EventSource(final WebTarget endpoint, final boolean open, final MultivaluedMap headers) { + this(endpoint, null, RECONNECT_DEFAULT, true, open, headers); + } + + private EventSource(final WebTarget target, + final String name, + final long reconnectDelay, + final boolean disableKeepAlive, + final boolean open, + final MultivaluedMap headers) { + if (target == null) { + throw new NullPointerException("Web target is 'null'."); + } + this.target = target; // SseFeature.register(target); + this.reconnectDelay = reconnectDelay; + this.disableKeepAlive = disableKeepAlive; + + final String esName = (name == null) ? createDefaultName(target) : name; + this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, esName); + t.setDaemon(true); + return t; + } + }); + + this.headers = new StringKeyIgnoreCaseMultivaluedMap(); + this.headers.putAll(headers); + + if (open) { + open(); + } + } + + private static String createDefaultName(WebTarget target) { + return String.format("jersey-sse-event-source-[%s]", target.getUri().toASCIIString()); + } + + /** + * Open the connection to the supplied SSE underlying {@link WebTarget web target} and start processing incoming + * {@link org.glassfish.jersey.media.sse.InboundEvent events}. + * + * @throws IllegalStateException in case the event source has already been opened earlier. + */ + public void open() { + if (!state.compareAndSet(State.READY, State.OPEN)) { + switch (state.get()) { + case OPEN: + throw new IllegalStateException(LocalizationMessages.EVENT_SOURCE_ALREADY_CONNECTED()); + case CLOSED: + throw new IllegalStateException(LocalizationMessages.EVENT_SOURCE_ALREADY_CLOSED()); + } + } + + EventProcessor processor = new EventProcessor(reconnectDelay, null, headers); + executor.submit(processor); + + // return only after the first request to the SSE endpoint has been made + //processor.awaitFirstContact(); + } + + /** + * Check if this event source instance has already been {@link #open() opened}. + * + * @return {@code true} if this event source is open, {@code false} otherwise. + */ + public boolean isOpen() { + return state.get() == State.OPEN; + } + + /** + * Register new {@link EventListener event listener} to receive all streamed {@link org.glassfish.jersey.media.sse.InboundEvent SSE events}. + * + * @param listener event listener to be registered with the event source. + * @see #register(EventListener, String, String...) + */ + public void register(final EventListener listener) { + register(listener, null); + } + + /** + * Add name-bound {@link EventListener event listener} which will be called only for incoming SSE + * {@link org.glassfish.jersey.media.sse.InboundEvent events} whose {@link org.glassfish.jersey.media.sse.InboundEvent#getName() name} is equal to the specified + * name(s). + * + * @param listener event listener to register with this event source. + * @param eventName inbound event name. + * @param eventNames additional event names. + * @see #register(EventListener) + */ + public void register(final EventListener listener, final String eventName, final String... eventNames) { + if (eventName == null) { + unboundListeners.add(listener); + } else { + addBoundListener(eventName, listener); + + if (eventNames != null) { + for (String name : eventNames) { + addBoundListener(name, listener); + } + } + } + } + + private void addBoundListener(final String name, final EventListener listener) { + List listeners = boundListeners.putIfAbsent(name, + new CopyOnWriteArrayList(Collections.singleton(listener))); + if (listeners != null) { + // alas, new listener collection registration conflict: + // need to add the new listener to the existing listener collection + listeners.add(listener); + } + } + + /** + * {@inheritDoc} + *

+ * The default {@code EventSource} implementation is empty, users can override this method to handle + * incoming {@link org.glassfish.jersey.media.sse.InboundEvent}s. + *

+ *

+ * Note that overriding this method may be necessary to make sure no {@code InboundEvent incoming events} + * are lost in case the event source is constructed using {@link #EventSource(javax.ws.rs.client.WebTarget)} + * constructor or in case a {@code true} flag is passed to the {@link #EventSource(javax.ws.rs.client.WebTarget, boolean, javax.ws.rs.core.MultivaluedMap)} + * constructor, since the connection is opened as as part of the constructor call and the event processing starts + * immediately. Therefore any {@link EventListener}s registered later after the event source has been constructed + * may miss the notifications about the one or more events that arrive immediately after the connection to the + * event source is established. + *

+ * + * @param inboundEvent received inbound event. + */ + @Override + public void onEvent(final InboundEvent inboundEvent) { + // do nothing + } + + /** + * Close this event source. + * + * The method will wait up to 5 seconds for the internal event processing task to complete. + */ + public void close() { + close(5, TimeUnit.SECONDS); + } + + /** + * Close this event source and wait for the internal event processing task to complete + * for up to the specified amount of wait time. + *

+ * The method blocks until the event processing task has completed execution after a shutdown + * request, or until the timeout occurs, or the current thread is interrupted, whichever happens + * first. + *

+ *

+ * In case the waiting for the event processing task has been interrupted, this method restores + * the {@link Thread#interrupted() interrupt} flag on the thread before returning {@code false}. + *

+ * + * @param timeout the maximum time to wait. + * @param unit the time unit of the timeout argument. + * @return {@code true} if this executor terminated and {@code false} if the timeout elapsed + * before termination or the termination was interrupted. + */ + public boolean close(final long timeout, final TimeUnit unit) { + shutdown(); + try { + if (!executor.awaitTermination(timeout, unit)) { + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + return true; + } + + private void shutdown() { + if (state.getAndSet(State.CLOSED) != State.CLOSED) { + executor.shutdownNow(); + } + } + + /** + * Private event processor task responsible for connecting to the SSE stream and processing + * incoming SSE events as well as handling any connection issues. + */ + private class EventProcessor implements Runnable, EventListener { + + /** + * Open connection response arrival synchronization latch. + */ + private final CountDownLatch firstContactSignal; + private final MultivaluedMap headers; + /** + * Last received event id. + */ + private String lastEventId; + /** + * Re-connect delay. + */ + private long reconnectDelay; + + public EventProcessor(final long reconnectDelay, final String lastEventId, final MultivaluedMap headers) { + /** + * Synchronization barrier used to signal that the initial contact with SSE endpoint + * has been made. + */ + this.firstContactSignal = new CountDownLatch(1); + + this.reconnectDelay = reconnectDelay; + this.lastEventId = lastEventId; + this.headers = headers; + } + + private EventProcessor(final EventProcessor that) { + this.firstContactSignal = null; + + this.reconnectDelay = that.reconnectDelay; + this.lastEventId = that.lastEventId; + this.headers = that.headers; + } + + @Override + public void run() { + logger.debug("Listener task started."); + + EventInput eventInput = null; + try { + try { + final Invocation.Builder request = prepareHandshakeRequest(); + if (state.get() == State.OPEN) { // attempt to connect only if even source is open + logger.debug("Connecting..."); + eventInput = request.get(EventInput.class); + logger.debug("Connected!"); + } + } finally { + if (firstContactSignal != null) { + // release the signal regardless of event source state or connection request outcome + firstContactSignal.countDown(); + } + } + + final Thread execThread = Thread.currentThread(); + + while (state.get() == State.OPEN && !execThread.isInterrupted()) { + if (eventInput == null || eventInput.isClosed()) { + logger.debug("Connection lost - scheduling reconnect in {0} ms", reconnectDelay); + scheduleReconnect(reconnectDelay); + break; + } else { + this.onEvent(eventInput.read()); + } + } + } catch (ServiceUnavailableException ex) { + logger.debug("Received HTTP 503"); + long delay = reconnectDelay; + if (ex.hasRetryAfter()) { + logger.debug("Recovering from HTTP 503 using HTTP Retry-After header value as a reconnect delay"); + final Date requestTime = new Date(); + delay = ex.getRetryTime(requestTime).getTime() - requestTime.getTime(); + delay = (delay > 0) ? delay : 0; + } + + logger.debug("Recovering from HTTP 503 - scheduling to reconnect in {0} ms", delay); + scheduleReconnect(delay); + } catch (Exception ex) { + logger.warn(String.format("Unable to connect - closing the event source to %s.", + target.getUri().toASCIIString()), ex); + // if we're here, an unrecoverable error has occurred - just turn off the lights... + EventSource.this.shutdown(); + } finally { + if (eventInput != null && !eventInput.isClosed()) { + eventInput.close(); + } + logger.debug("Listener task finished."); + } + } + + /** + * Called by the event source when an inbound event is received. + * + * This listener aggregator method is responsible for invoking {@link EventSource#onEvent(InboundEvent)} + * method on the owning event source as well as for notifying all registered {@link EventListener event listeners}. + * + * @param event incoming {@link InboundEvent inbound event}. + */ + @Override + public void onEvent(final InboundEvent event) { + if (event == null) { + return; + } + + logger.debug("New event received."); + + if (event.getId() != null) { + lastEventId = event.getId(); + } + if (event.isReconnectDelaySet()) { + reconnectDelay = event.getReconnectDelay(); + } + + notify(EventSource.this, event); + notify(unboundListeners, event); + + final String eventName = event.getName(); + if (eventName != null) { + final List eventListeners = boundListeners.get(eventName); + if (eventListeners != null) { + notify(eventListeners, event); + } + } + } + + private void notify(final Collection listeners, final InboundEvent event) { + for (EventListener listener : listeners) { + notify(listener, event); + } + } + + private void notify(final EventListener listener, final InboundEvent event) { + try { + listener.onEvent(event); + } catch (Exception ex) { + logger.warn(String.format("Event notification in a listener of %s class failed.", + listener.getClass().getName()), ex); + } + } + + /** + * Schedule a new event processor task to reconnect after the specified {@code delay} [milliseconds]. + * + * If the {@code delay} is zero or negative, the new reconnect task will be scheduled immediately. + * The {@code reconnectDelay} and {@code lastEventId} field values are propagated into the newly + * scheduled task. + *

+ * The method will silently abort in case the event source is not {@link EventSource#isOpen() open}. + *

+ * + * @param delay specifies the amount of time [milliseconds] to wait before attempting a reconnect. + * If zero or negative, the new reconnect task will be scheduled immediately. + */ + private void scheduleReconnect(final long delay) { + final State s = state.get(); + if (s != State.OPEN) { + logger.debug("Aborting reconnect of event source in {0} state", state); + return; + } + + // propagate the current reconnectDelay, but schedule based on the delay parameter + final EventProcessor processor = new EventProcessor(this); + if (delay > 0) { + executor.schedule(processor, delay, TimeUnit.MILLISECONDS); + } else { + executor.submit(processor); + } + } + + private Invocation.Builder prepareHandshakeRequest() { + final Invocation.Builder request = target.request(SseFeature.SERVER_SENT_EVENTS_TYPE); + // TODO add the SERVER_SENT_EVENTS_TYPE header + request.headers(this.headers); + if (lastEventId != null && !lastEventId.isEmpty()) { + request.header(SseFeature.LAST_EVENT_ID_HEADER, lastEventId); + } + if (disableKeepAlive) { + request.header("Connection", "close"); + } + return request; + } + + /** + * Await the initial contact with the SSE endpoint. + */ + public void awaitFirstContact() { + logger.debug("Awaiting first contact signal."); + try { + if (firstContactSignal == null) { + return; + } + + try { + firstContactSignal.await(); + } catch (InterruptedException ex) { + logger.warn(LocalizationMessages.EVENT_SOURCE_OPEN_CONNECTION_INTERRUPTED(), ex); + Thread.currentThread().interrupt(); + } + } finally { + logger.debug("First contact signal released."); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/launchdarkly/client/FeatureRep.java b/src/main/java/com/launchdarkly/client/FeatureRep.java index bb803d18f..0f13b2733 100644 --- a/src/main/java/com/launchdarkly/client/FeatureRep.java +++ b/src/main/java/com/launchdarkly/client/FeatureRep.java @@ -13,6 +13,8 @@ class FeatureRep { String salt; boolean on; List> variations; + boolean deleted; + int version; private static final float long_scale = (float)0xFFFFFFFFFFFFFFFL; @@ -25,6 +27,8 @@ public FeatureRep() { this.key = b.key; this.salt = b.salt; this.on = b.on; + this.deleted = b.deleted; + this.version = b.version; this.variations = new ArrayList>(b.variations); } @@ -93,6 +97,8 @@ static class Builder { private String key; private boolean on; private String salt; + private boolean deleted; + private int version; private List> variations; Builder(String name, String key) { @@ -118,6 +124,16 @@ Builder variation(Variation v) { return this; } + Builder deleted(boolean d) { + this.deleted = d; + return this; + } + + Builder version(int v) { + this.version = v; + return this; + } + FeatureRep build() { return new FeatureRep(this); } diff --git a/src/main/java/com/launchdarkly/client/FeatureStore.java b/src/main/java/com/launchdarkly/client/FeatureStore.java new file mode 100644 index 000000000..30809121b --- /dev/null +++ b/src/main/java/com/launchdarkly/client/FeatureStore.java @@ -0,0 +1,15 @@ +package com.launchdarkly.client; + +import java.util.Map; + +/** + * Created by jkodumal on 8/13/15. + */ +public interface FeatureStore { + FeatureRep get(String key); + Map> all(); + void init(Map> features); + void delete(String key, int version); + void upsert(String key, FeatureRep feature); + boolean initialized(); +} diff --git a/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java b/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java new file mode 100644 index 000000000..4e012fed9 --- /dev/null +++ b/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java @@ -0,0 +1,97 @@ +package com.launchdarkly.client; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Created by jkodumal on 8/13/15. + */ +public class InMemoryFeatureStore implements FeatureStore { + + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + final Map> features = new HashMap>(); + boolean initialized = false; + + @Override + public FeatureRep get(String key) { + try { + lock.readLock().lock(); + FeatureRep rep = features.get(key); + if (rep == null || rep.deleted) { + return null; + } + return rep; + } finally { + lock.readLock().unlock(); + } + } + + @Override + public Map> all() { + try { + lock.readLock().lock(); + Map> fs = new HashMap>(); + + for (Map.Entry> entry : features.entrySet()) { + if (!entry.getValue().deleted) { + fs.put(entry.getKey(), entry.getValue()); + } + } + return fs; + } finally { + lock.readLock().unlock(); + } + } + + @Override + public void init(Map> fs) { + try { + lock.writeLock().lock(); + this.features.clear(); + this.features.putAll(fs); + initialized = true; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void delete(String key, int version) { + try { + lock.writeLock().lock(); + FeatureRep f = features.get(key); + if (f != null && f.version < version) { + f.deleted = true; + f.version = version; + features.put(key, f); + } + else if (f == null) { + f = new FeatureRep.Builder(key, key).deleted(true).version(version).build(); + features.put(key, f); + } + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void upsert(String key, FeatureRep feature) { + try { + lock.writeLock().lock(); + FeatureRep old = features.get(key); + + if (old == null || old.version < feature.version) { + features.put(key, feature); + } + } + finally { + lock.writeLock().unlock(); + } + } + + @Override + public boolean initialized() { + return initialized; + } +} diff --git a/src/main/java/com/launchdarkly/client/LDClient.java b/src/main/java/com/launchdarkly/client/LDClient.java index 29d4b4db2..93f540d65 100644 --- a/src/main/java/com/launchdarkly/client/LDClient.java +++ b/src/main/java/com/launchdarkly/client/LDClient.java @@ -3,7 +3,6 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; -import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; import org.apache.http.HttpStatus; import org.apache.http.annotation.ThreadSafe; @@ -40,6 +39,7 @@ public class LDClient implements Closeable { private final LDConfig config; private final CloseableHttpClient client; private final EventProcessor eventProcessor; + private final StreamProcessor streamProcessor; private final String apiKey; protected static final String CLIENT_VERSION = getClientVersion(); private volatile boolean offline = false; @@ -67,12 +67,25 @@ public LDClient(String apiKey, LDConfig config) { this.config = config; this.client = createClient(); this.eventProcessor = createEventProcessor(apiKey, config); + + if (config.stream) { + logger.debug("Enabling streaming API"); + this.streamProcessor = createStreamProcessor(apiKey, config); + this.streamProcessor.subscribe(); + } else { + logger.debug("Streaming API disabled"); + this.streamProcessor = null; + } } protected EventProcessor createEventProcessor(String apiKey, LDConfig config) { return new EventProcessor(apiKey, config); } + protected StreamProcessor createStreamProcessor(String apiKey, LDConfig config) { + return new StreamProcessor(apiKey, config); + } + protected CloseableHttpClient createClient() { CloseableHttpClient client; PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(); @@ -172,7 +185,37 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) { if (this.offline) { return defaultValue; } + try { + FeatureRep result; + if (this.config.stream && this.streamProcessor != null && this.streamProcessor.initialized()) { + logger.debug("Using feature flag stored from streaming API"); + result = (FeatureRep) this.streamProcessor.getFeature(featureKey); + } else { + result = fetchFeature(featureKey); + } + if (result == null) { + logger.warn("Unknown feature flag " + featureKey + "; returning default value"); + sendFlagRequestEvent(featureKey, user, defaultValue); + return defaultValue; + } + + Boolean val = result.evaluate(user); + if (val == null) { + sendFlagRequestEvent(featureKey, user, defaultValue); + return defaultValue; + } else { + boolean value = val.booleanValue(); + sendFlagRequestEvent(featureKey, user, value); + return value; + } + } catch (Exception e) { + logger.error("Encountered exception in LaunchDarkly client", e); + sendFlagRequestEvent(featureKey, user, defaultValue); + return defaultValue; + } + } + private FeatureRep fetchFeature(String featureKey) throws IOException { Gson gson = new Gson(); HttpCacheContext context = HttpCacheContext.create(); HttpGet request = config.getRequest(apiKey, "/api/eval/features/" + featureKey); @@ -183,23 +226,23 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) { CacheResponseStatus responseStatus = context.getCacheResponseStatus(); - switch (responseStatus) { - case CACHE_HIT: - logger.debug("A response was generated from the cache with " + - "no requests sent upstream"); - break; - case CACHE_MODULE_RESPONSE: - logger.debug("The response was generated directly by the " + - "caching module"); - break; - case CACHE_MISS: - logger.debug("The response came from an upstream server"); - break; - case VALIDATED: - logger.debug("The response was generated from the cache " + - "after validating the entry with the origin server"); - break; - } + switch (responseStatus) { + case CACHE_HIT: + logger.debug("A response was generated from the cache with " + + "no requests sent upstream"); + break; + case CACHE_MODULE_RESPONSE: + logger.debug("The response was generated directly by the " + + "caching module"); + break; + case CACHE_MISS: + logger.debug("The response came from an upstream server"); + break; + case VALIDATED: + logger.debug("The response was generated from the cache " + + "after validating the entry with the origin server"); + break; + } int status = response.getStatusLine().getStatusCode(); @@ -211,34 +254,18 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) { } else { logger.error("Unexpected status code: " + status); } - sendFlagRequestEvent(featureKey, user, defaultValue); - return defaultValue; + throw new IOException("Failed to fetch flag"); } - Type boolType = new TypeToken>(){}.getType(); + Type boolType = new TypeToken>() {}.getType(); FeatureRep result = gson.fromJson(EntityUtils.toString(response.getEntity()), boolType); - - Boolean val = result.evaluate(user); - - if (val == null) { - sendFlagRequestEvent(featureKey, user, defaultValue); - return defaultValue; - } else { - boolean value = val.booleanValue(); - sendFlagRequestEvent(featureKey, user, value); - return value; - } - - } catch (Exception e) { - logger.error("Unhandled exception in LaunchDarkly client", e); - sendFlagRequestEvent(featureKey, user, defaultValue); - return defaultValue; - } finally { + return result; + } + finally { try { if (response != null) response.close(); } catch (IOException e) { - logger.error("Unhandled exception in LaunchDarkly client", e); } } } @@ -252,6 +279,9 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) { @Override public void close() throws IOException { this.eventProcessor.close(); + if (this.streamProcessor != null) { + this.streamProcessor.close(); + } } /** @@ -263,7 +293,7 @@ public void flush() { /** * Puts the LaunchDarkly client in offline mode. - * In offline mode, all calls to {@link #getFlag(String, LDUser, boolean)} will return the default value, and + * In offline mode, all calls to {@link #toggle(String, LDUser, boolean)} will return the default value, and * {@link #track(String, LDUser, com.google.gson.JsonElement)} will be a no-op. * */ diff --git a/src/main/java/com/launchdarkly/client/LDConfig.java b/src/main/java/com/launchdarkly/client/LDConfig.java index dbe9a0c8c..f411b0467 100644 --- a/src/main/java/com/launchdarkly/client/LDConfig.java +++ b/src/main/java/com/launchdarkly/client/LDConfig.java @@ -15,6 +15,7 @@ */ public final class LDConfig { private static final URI DEFAULT_BASE_URI = URI.create("https://app.launchdarkly.com"); + private static final URI DEFAULT_STREAM_URI = URI.create("https://stream.launchdarkly.com"); private static final int DEFAULT_CAPACITY = 10000; private static final int DEFAULT_CONNECT_TIMEOUT = 2000; private static final int DEFAULT_SOCKET_TIMEOUT = 10000; @@ -24,11 +25,13 @@ public final class LDConfig { protected static final LDConfig DEFAULT = new Builder().build(); final URI baseURI; + final URI streamURI; final int capacity; final int connectTimeout; final int socketTimeout; final int flushInterval; final HttpHost proxyHost; + final boolean stream; protected LDConfig(Builder builder) { this.baseURI = builder.baseURI; @@ -37,6 +40,8 @@ protected LDConfig(Builder builder) { this.socketTimeout = builder.socketTimeout; this.flushInterval = builder.flushInterval; this.proxyHost = builder.proxyHost(); + this.streamURI = builder.streamURI; + this.stream = builder.stream; } /** @@ -53,6 +58,7 @@ protected LDConfig(Builder builder) { */ public static class Builder{ private URI baseURI = DEFAULT_BASE_URI; + private URI streamURI = DEFAULT_STREAM_URI; private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; private int socketTimeout = DEFAULT_SOCKET_TIMEOUT; private int capacity = DEFAULT_CAPACITY; @@ -60,6 +66,7 @@ public static class Builder{ private String proxyHost; private int proxyPort = -1; private String proxyScheme; + private boolean stream = false; /** * Creates a builder with all configuration parameters set to the default @@ -77,6 +84,26 @@ public Builder baseURI(URI baseURI) { return this; } + /** + * Set the base URL of the LaunchDarkly streaming server for this configuration + * @param streamURI the base URL of the LaunchDarkly streaming server + * @return the builder + */ + public Builder streamURI(URI streamURI) { + this.streamURI = streamURI; + return this; + } + + /** + * Set whether streaming mode should be enabled + * @param stream whether streaming mode should be enabled + * @return the builder + */ + public Builder stream(boolean stream) { + this.stream = stream; + return this; + } + /** * Set the connection timeout in seconds for the configuration. This is the time allowed for the underlying HTTP client to connect * to the LaunchDarkly server. The default is 2 seconds. diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java new file mode 100644 index 000000000..24a91b139 --- /dev/null +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -0,0 +1,123 @@ +package com.launchdarkly.client; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.glassfish.jersey.internal.util.collection.StringKeyIgnoreCaseMultivaluedMap; +import org.glassfish.jersey.media.sse.InboundEvent; +import org.glassfish.jersey.media.sse.SseFeature; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MultivaluedMap; +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.Map; + +/** + * Created by jkodumal on 8/13/15. + */ +class StreamProcessor implements Closeable { + private static final String PUT_FEATURES = "put/features"; + private static final String PATCH_FEATURE = "patch/features"; + private static final String DELETE_FEATURE = "delete/features"; + + private final Client client; + private final FeatureStore store; + private final LDConfig config; + private final String apiKey; + private EventSource es; + + StreamProcessor(String apiKey, LDConfig config) { + this.client = ClientBuilder.newBuilder().register(SseFeature.class).build(); + this.store = new InMemoryFeatureStore(); + this.config = config; + this.apiKey = apiKey; + } + + void subscribe() { + MultivaluedMap headers = new StringKeyIgnoreCaseMultivaluedMap(); + headers.putSingle("Authorization", "api_key " + this.apiKey); + headers.putSingle("User-Agent", "JavaClient/" + LDClient.CLIENT_VERSION); + headers.putSingle("Accept", SseFeature.SERVER_SENT_EVENTS_TYPE); + + WebTarget target = client.target(config.streamURI.toASCIIString() + "/"); + + es = new EventSource(target, true, headers) { + @Override + public void onEvent(InboundEvent event) { + Gson gson = new Gson(); + System.out.println("Received an event: " + event.getName()); + if (event.getName().equals(PUT_FEATURES)) { + Type type = new TypeToken>>(){}.getType(); + Map> features = gson.fromJson(event.readData(), type); + store.init(features); + } + else if (event.getName().equals(PATCH_FEATURE)) { + FeaturePatchData data = gson.fromJson(event.readData(), FeaturePatchData.class); + store.upsert(data.key(), data.feature()); + } + else if (event.getName().equals(DELETE_FEATURE)) { + FeatureDeleteData data = gson.fromJson(event.readData(), FeatureDeleteData.class); + store.delete(data.key(), data.version()); + } + else { + // TODO log an error + } + } + }; + + } + + @Override + public void close() throws IOException { + if (es != null) { + es.close(); + } + } + + boolean initialized() { + return store.initialized(); + } + + FeatureRep getFeature(String key) { + return store.get(key); + } + + private static final class FeaturePatchData { + String path; + FeatureRep data; + + public FeaturePatchData() { + + } + + String key() { + return path.substring(1); + } + + FeatureRep feature() { + return data; + } + + } + + private static final class FeatureDeleteData { + String path; + int version; + + public FeatureDeleteData() { + + } + + String key() { + return path.substring(1); + } + + int version() { + return version; + } + + } +} From ce091e83025ca6172335f29c8936f90fdaa65d60 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Tue, 18 Aug 2015 10:34:13 -0700 Subject: [PATCH 02/15] Remove unused variable --- src/main/java/com/launchdarkly/client/EventSource.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/launchdarkly/client/EventSource.java b/src/main/java/com/launchdarkly/client/EventSource.java index 63fed2d54..8317fc821 100644 --- a/src/main/java/com/launchdarkly/client/EventSource.java +++ b/src/main/java/com/launchdarkly/client/EventSource.java @@ -34,8 +34,6 @@ private static enum State { READY, OPEN, CLOSED } - private static final Level CONNECTION_ERROR_LEVEL = Level.FINE; - private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EventSource.class); /** From ff8c51d39ba199cd6b44d7c01150441f816b85d6 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Tue, 18 Aug 2015 10:35:00 -0700 Subject: [PATCH 03/15] Change version to SNAPSHOT --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 8b614f25c..c4848f49c 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ repositories { allprojects { group = 'com.launchdarkly' - version = "0.11.0" + version = "0.11.0-SNAPSHOT" sourceCompatibility = 1.6 targetCompatibility = 1.6 } From c8b0ffb7caaa33e0bfc54d52b7d349af93cbda39 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Tue, 18 Aug 2015 20:49:00 -0700 Subject: [PATCH 04/15] Update javadocs --- .../com/launchdarkly/client/FeatureStore.java | 61 ++++++++++++++++++- .../com/launchdarkly/client/LDClient.java | 2 +- .../launchdarkly/client/StreamProcessor.java | 3 - 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/launchdarkly/client/FeatureStore.java b/src/main/java/com/launchdarkly/client/FeatureStore.java index 30809121b..c17c6fc62 100644 --- a/src/main/java/com/launchdarkly/client/FeatureStore.java +++ b/src/main/java/com/launchdarkly/client/FeatureStore.java @@ -3,13 +3,72 @@ import java.util.Map; /** - * Created by jkodumal on 8/13/15. + * A thread-safe, versioned store for {@link com.launchdarkly.client.FeatureRep} objects. + * Implementations should permit concurrent access and updates. + * + * Delete and upsert requests are versioned-- if the version number in the request is less than + * the currently stored version of the feature, the request should be ignored. + * + * These semantics support the primary use case for the store, which synchronizes a collection + * of features based on update messages that may be received out-of-order. + * */ public interface FeatureStore { + /** + * + * Returns the {@link com.launchdarkly.client.FeatureRep} to which the specified key is mapped, or + * null if the key is not associated or the associated {@link com.launchdarkly.client.FeatureRep} has + * been deleted. + * + * @param key the key whose associated {@link com.launchdarkly.client.FeatureRep} is to be returned + * @return the {@link com.launchdarkly.client.FeatureRep} to which the specified key is mapped, or + * null if the key is not associated or the associated {@link com.launchdarkly.client.FeatureRep} has + * been deleted. + */ FeatureRep get(String key); + + /** + * Returns a {@link java.util.Map} of all associated features. + * + * + * @return a map of all associated features. + */ Map> all(); + + /** + * Initializes (or re-initializes) the store with the specified set of features. Any existing entries + * will be removed. Implementations can assume that this set of features is up to date-- there is no + * need to perform individual version comparisons between the existing features and the supplied + * features. + * + * + * @param features the features to set the store + */ void init(Map> features); + + /** + * + * Deletes the feature associated with the specified key, if it exists and its version + * is less than or equal to the specified version. + * + * @param key the key of the feature to be deleted + * @param version the version for the delete operation + */ void delete(String key, int version); + + /** + * Update or insert the feature associated with the specified key, if its version + * is less than or equal to the version specified in the argument feature. + * + * @param key + * @param feature + */ void upsert(String key, FeatureRep feature); + + /** + * Returns true if this store has been initialized (i.e. if {@link #init(java.util.Map)} has been called) + * + * @return true if this store has been initialized + */ boolean initialized(); } diff --git a/src/main/java/com/launchdarkly/client/LDClient.java b/src/main/java/com/launchdarkly/client/LDClient.java index 93f540d65..bb63e2444 100644 --- a/src/main/java/com/launchdarkly/client/LDClient.java +++ b/src/main/java/com/launchdarkly/client/LDClient.java @@ -29,7 +29,7 @@ /** * - * A client for the LaunchDarkly API. Client instances are thread-safe. Applications can safely instantiate + * A client for the LaunchDarkly API. Client instances are thread-safe. Applications should instantiate * a single {@code LDClient} for the lifetime of their application. * */ diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index 24a91b139..385c6db4e 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -15,9 +15,6 @@ import java.lang.reflect.Type; import java.util.Map; -/** - * Created by jkodumal on 8/13/15. - */ class StreamProcessor implements Closeable { private static final String PUT_FEATURES = "put/features"; private static final String PATCH_FEATURE = "patch/features"; From 5147809d760f696dd9956f0d114cd59907cd92b8 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Tue, 18 Aug 2015 20:54:44 -0700 Subject: [PATCH 05/15] Initialized should be volatile --- .../java/com/launchdarkly/client/InMemoryFeatureStore.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java b/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java index 4e012fed9..aaf1c7d7f 100644 --- a/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java +++ b/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java @@ -4,14 +4,11 @@ import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; -/** - * Created by jkodumal on 8/13/15. - */ public class InMemoryFeatureStore implements FeatureStore { final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final Map> features = new HashMap>(); - boolean initialized = false; + volatile boolean initialized = false; @Override public FeatureRep get(String key) { From 1351052f8f51c482650c82e30e7d757f76d4be09 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Tue, 25 Aug 2015 17:05:35 -0700 Subject: [PATCH 06/15] When reconnecting, pass in any custom headers that may have been specified --- src/main/java/com/launchdarkly/client/EventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/launchdarkly/client/EventSource.java b/src/main/java/com/launchdarkly/client/EventSource.java index 8317fc821..8931871fd 100644 --- a/src/main/java/com/launchdarkly/client/EventSource.java +++ b/src/main/java/com/launchdarkly/client/EventSource.java @@ -617,7 +617,7 @@ private void scheduleReconnect(final long delay) { } // propagate the current reconnectDelay, but schedule based on the delay parameter - final EventProcessor processor = new EventProcessor(this); + final EventProcessor processor = new EventProcessor(reconnectDelay, null, headers); if (delay > 0) { executor.schedule(processor, delay, TimeUnit.MILLISECONDS); } else { From b27fd8f6a3752423a10d6a843c9372dcc45daa3b Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Tue, 25 Aug 2015 20:59:55 -0700 Subject: [PATCH 07/15] Bump snapshot version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index c4848f49c..2ce02c7cb 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ repositories { allprojects { group = 'com.launchdarkly' - version = "0.11.0-SNAPSHOT" + version = "0.12.0-SNAPSHOT" sourceCompatibility = 1.6 targetCompatibility = 1.6 } From 2cdedb506174879016590ec2173af363442c2f04 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Wed, 2 Sep 2015 17:09:30 -0700 Subject: [PATCH 08/15] Don't shutdown in the case of an exception-- just keep trying --- build.gradle | 2 +- src/main/java/com/launchdarkly/client/EventSource.java | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index 2ce02c7cb..2450c6c57 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ repositories { allprojects { group = 'com.launchdarkly' - version = "0.12.0-SNAPSHOT" + version = "0.13.0-SNAPSHOT" sourceCompatibility = 1.6 targetCompatibility = 1.6 } diff --git a/src/main/java/com/launchdarkly/client/EventSource.java b/src/main/java/com/launchdarkly/client/EventSource.java index 8931871fd..fb82feb97 100644 --- a/src/main/java/com/launchdarkly/client/EventSource.java +++ b/src/main/java/com/launchdarkly/client/EventSource.java @@ -534,10 +534,9 @@ public void run() { logger.debug("Recovering from HTTP 503 - scheduling to reconnect in {0} ms", delay); scheduleReconnect(delay); } catch (Exception ex) { - logger.warn(String.format("Unable to connect - closing the event source to %s.", - target.getUri().toASCIIString()), ex); - // if we're here, an unrecoverable error has occurred - just turn off the lights... - EventSource.this.shutdown(); + logger.debug("Recovering from exception " + ex.getMessage() + "-- scheduling reconnect"); + scheduleReconnect(reconnectDelay); + } finally { if (eventInput != null && !eventInput.isClosed()) { eventInput.close(); From d3b157754c3f6682f6b523c0e4a5b7a92c072fbd Mon Sep 17 00:00:00 2001 From: Patrick Kaeding Date: Wed, 2 Sep 2015 17:16:43 -0700 Subject: [PATCH 09/15] added logging around state during reconnect --- src/main/java/com/launchdarkly/client/EventSource.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/launchdarkly/client/EventSource.java b/src/main/java/com/launchdarkly/client/EventSource.java index fb82feb97..f52317e21 100644 --- a/src/main/java/com/launchdarkly/client/EventSource.java +++ b/src/main/java/com/launchdarkly/client/EventSource.java @@ -75,12 +75,12 @@ private static enum State { * a static {@link EventSource#target(javax.ws.rs.client.WebTarget) EventSource.target(endpoint)} factory method. *

* For example: + *

*
    * EventSource es = EventSource.target(endpoint).named("my source")
    *                             .reconnectingEvery(5, SECONDS)
    *                             .open();
    * 
- *

* * @since 2.3 */ @@ -497,12 +497,15 @@ public void run() { EventInput eventInput = null; try { try { + logger.debug("current state is {0}", state.get()); final Invocation.Builder request = prepareHandshakeRequest(); if (state.get() == State.OPEN) { // attempt to connect only if even source is open logger.debug("Connecting..."); eventInput = request.get(EventInput.class); logger.debug("Connected!"); } + } catch (Exception e) { + logger.warn("Encountered error trying to connect", e); } finally { if (firstContactSignal != null) { // release the signal regardless of event source state or connection request outcome From 2b13ab2583d7c3f85a2dfbd27ec7dbf1585e2bec Mon Sep 17 00:00:00 2001 From: Patrick Kaeding Date: Wed, 2 Sep 2015 19:38:36 -0700 Subject: [PATCH 10/15] fixed log formatting --- src/main/java/com/launchdarkly/client/EventSource.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/launchdarkly/client/EventSource.java b/src/main/java/com/launchdarkly/client/EventSource.java index f52317e21..d26abae22 100644 --- a/src/main/java/com/launchdarkly/client/EventSource.java +++ b/src/main/java/com/launchdarkly/client/EventSource.java @@ -497,7 +497,7 @@ public void run() { EventInput eventInput = null; try { try { - logger.debug("current state is {0}", state.get()); + logger.debug("current state is {}", state.get()); final Invocation.Builder request = prepareHandshakeRequest(); if (state.get() == State.OPEN) { // attempt to connect only if even source is open logger.debug("Connecting..."); @@ -517,7 +517,7 @@ public void run() { while (state.get() == State.OPEN && !execThread.isInterrupted()) { if (eventInput == null || eventInput.isClosed()) { - logger.debug("Connection lost - scheduling reconnect in {0} ms", reconnectDelay); + logger.debug("Connection lost - scheduling reconnect in {} ms", reconnectDelay); scheduleReconnect(reconnectDelay); break; } else { @@ -534,7 +534,7 @@ public void run() { delay = (delay > 0) ? delay : 0; } - logger.debug("Recovering from HTTP 503 - scheduling to reconnect in {0} ms", delay); + logger.debug("Recovering from HTTP 503 - scheduling to reconnect in {} ms", delay); scheduleReconnect(delay); } catch (Exception ex) { logger.debug("Recovering from exception " + ex.getMessage() + "-- scheduling reconnect"); @@ -614,7 +614,7 @@ private void notify(final EventListener listener, final InboundEvent event) { private void scheduleReconnect(final long delay) { final State s = state.get(); if (s != State.OPEN) { - logger.debug("Aborting reconnect of event source in {0} state", state); + logger.debug("Aborting reconnect of event source in {} state", state); return; } From 9574ae18aaadcfdbb29557e976093dd99ec7b099 Mon Sep 17 00:00:00 2001 From: Patrick Kaeding Date: Thu, 3 Sep 2015 10:20:55 -0700 Subject: [PATCH 11/15] concatenate log messages --- src/main/java/com/launchdarkly/client/EventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/launchdarkly/client/EventSource.java b/src/main/java/com/launchdarkly/client/EventSource.java index d26abae22..cd67ba4f5 100644 --- a/src/main/java/com/launchdarkly/client/EventSource.java +++ b/src/main/java/com/launchdarkly/client/EventSource.java @@ -497,7 +497,7 @@ public void run() { EventInput eventInput = null; try { try { - logger.debug("current state is {}", state.get()); + logger.debug("current state is " + state.get()); final Invocation.Builder request = prepareHandshakeRequest(); if (state.get() == State.OPEN) { // attempt to connect only if even source is open logger.debug("Connecting..."); From 3e069d184995a41b0462083179cfe45d53c77928 Mon Sep 17 00:00:00 2001 From: Patrick Kaeding Date: Thu, 3 Sep 2015 16:03:53 -0700 Subject: [PATCH 12/15] moar logging --- src/main/java/com/launchdarkly/client/EventSource.java | 4 ++-- src/main/java/com/launchdarkly/client/StreamProcessor.java | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/launchdarkly/client/EventSource.java b/src/main/java/com/launchdarkly/client/EventSource.java index cd67ba4f5..ca673d1d1 100644 --- a/src/main/java/com/launchdarkly/client/EventSource.java +++ b/src/main/java/com/launchdarkly/client/EventSource.java @@ -497,7 +497,7 @@ public void run() { EventInput eventInput = null; try { try { - logger.debug("current state is " + state.get()); + logger.debug("current state is {}", state.get()); final Invocation.Builder request = prepareHandshakeRequest(); if (state.get() == State.OPEN) { // attempt to connect only if even source is open logger.debug("Connecting..."); @@ -537,7 +537,7 @@ public void run() { logger.debug("Recovering from HTTP 503 - scheduling to reconnect in {} ms", delay); scheduleReconnect(delay); } catch (Exception ex) { - logger.debug("Recovering from exception " + ex.getMessage() + "-- scheduling reconnect"); + logger.debug("Recovering from exception -- scheduling reconnect in {} ms", reconnectDelay, ex); scheduleReconnect(reconnectDelay); } finally { diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index 385c6db4e..dc67e1ba0 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -50,14 +50,17 @@ public void onEvent(InboundEvent event) { Type type = new TypeToken>>(){}.getType(); Map> features = gson.fromJson(event.readData(), type); store.init(features); + System.out.println("replaced all features: " + gson.toJson(features)); } else if (event.getName().equals(PATCH_FEATURE)) { FeaturePatchData data = gson.fromJson(event.readData(), FeaturePatchData.class); store.upsert(data.key(), data.feature()); + System.out.println("replaced single feature: " + gson.toJson(data.feature())); } else if (event.getName().equals(DELETE_FEATURE)) { FeatureDeleteData data = gson.fromJson(event.readData(), FeatureDeleteData.class); store.delete(data.key(), data.version()); + System.out.println("deleted feature " + data.key()); } else { // TODO log an error From edcb4e2c7d858f55f3f4bd290398c69941eb355a Mon Sep 17 00:00:00 2001 From: Patrick Kaeding Date: Fri, 4 Sep 2015 16:21:15 -0700 Subject: [PATCH 13/15] added debug check to ensure streaming and polling agree --- .../com/launchdarkly/client/FeatureRep.java | 43 ++++++++++++++++++- .../com/launchdarkly/client/LDClient.java | 6 +++ .../com/launchdarkly/client/Variation.java | 33 ++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/launchdarkly/client/FeatureRep.java b/src/main/java/com/launchdarkly/client/FeatureRep.java index 0f13b2733..1c20631c4 100644 --- a/src/main/java/com/launchdarkly/client/FeatureRep.java +++ b/src/main/java/com/launchdarkly/client/FeatureRep.java @@ -1,6 +1,5 @@ package com.launchdarkly.client; - import org.apache.commons.codec.digest.DigestUtils; import java.util.ArrayList; @@ -22,6 +21,48 @@ public FeatureRep() { } + @Override + public String toString() { + return "FeatureRep{" + + "name='" + name + '\'' + + ", key='" + key + '\'' + + ", salt='" + salt + '\'' + + ", on=" + on + + ", variations=" + variations + + ", deleted=" + deleted + + ", version=" + version + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FeatureRep that = (FeatureRep) o; + + if (on != that.on) return false; + if (deleted != that.deleted) return false; + if (version != that.version) return false; + if (!name.equals(that.name)) return false; + if (!key.equals(that.key)) return false; + if (!salt.equals(that.salt)) return false; + return variations.equals(that.variations); + + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + key.hashCode(); + result = 31 * result + salt.hashCode(); + result = 31 * result + (on ? 1 : 0); + result = 31 * result + variations.hashCode(); + result = 31 * result + (deleted ? 1 : 0); + result = 31 * result + version; + return result; + } + FeatureRep(Builder b) { this.name = b.name; this.key = b.key; diff --git a/src/main/java/com/launchdarkly/client/LDClient.java b/src/main/java/com/launchdarkly/client/LDClient.java index bb63e2444..3b54a3f8d 100644 --- a/src/main/java/com/launchdarkly/client/LDClient.java +++ b/src/main/java/com/launchdarkly/client/LDClient.java @@ -190,6 +190,12 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) { if (this.config.stream && this.streamProcessor != null && this.streamProcessor.initialized()) { logger.debug("Using feature flag stored from streaming API"); result = (FeatureRep) this.streamProcessor.getFeature(featureKey); + if (logger.isDebugEnabled()) { + FeatureRep pollingResult = fetchFeature(featureKey); + if (!result.equals(pollingResult)) { + logger.debug("Mismatch between streaming and polling feature! Streaming: {} Polling: {}", result, pollingResult); + } + } } else { result = fetchFeature(featureKey); } diff --git a/src/main/java/com/launchdarkly/client/Variation.java b/src/main/java/com/launchdarkly/client/Variation.java index 179767208..21d0dd939 100644 --- a/src/main/java/com/launchdarkly/client/Variation.java +++ b/src/main/java/com/launchdarkly/client/Variation.java @@ -21,6 +21,39 @@ public Variation() { } + @Override + public String toString() { + return "Variation{" + + "value=" + value + + ", weight=" + weight + + ", userTarget=" + userTarget + + ", targets=" + targets + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Variation variation = (Variation) o; + + if (weight != variation.weight) return false; + if (!value.equals(variation.value)) return false; + if (!userTarget.equals(variation.userTarget)) return false; + return targets.equals(variation.targets); + + } + + @Override + public int hashCode() { + int result = value.hashCode(); + result = 31 * result + weight; + result = 31 * result + userTarget.hashCode(); + result = 31 * result + targets.hashCode(); + return result; + } + Variation(Builder b) { this.value = b.value; this.weight = b.weight; From 655133e70ca25eb999e2083625bafb47e5a50d1e Mon Sep 17 00:00:00 2001 From: Patrick Kaeding Date: Fri, 4 Sep 2015 16:24:02 -0700 Subject: [PATCH 14/15] removed extra logging statements --- src/main/java/com/launchdarkly/client/EventSource.java | 1 - src/main/java/com/launchdarkly/client/StreamProcessor.java | 4 ---- 2 files changed, 5 deletions(-) diff --git a/src/main/java/com/launchdarkly/client/EventSource.java b/src/main/java/com/launchdarkly/client/EventSource.java index ca673d1d1..c0e4b0d00 100644 --- a/src/main/java/com/launchdarkly/client/EventSource.java +++ b/src/main/java/com/launchdarkly/client/EventSource.java @@ -497,7 +497,6 @@ public void run() { EventInput eventInput = null; try { try { - logger.debug("current state is {}", state.get()); final Invocation.Builder request = prepareHandshakeRequest(); if (state.get() == State.OPEN) { // attempt to connect only if even source is open logger.debug("Connecting..."); diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index dc67e1ba0..4213abe74 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -45,22 +45,18 @@ void subscribe() { @Override public void onEvent(InboundEvent event) { Gson gson = new Gson(); - System.out.println("Received an event: " + event.getName()); if (event.getName().equals(PUT_FEATURES)) { Type type = new TypeToken>>(){}.getType(); Map> features = gson.fromJson(event.readData(), type); store.init(features); - System.out.println("replaced all features: " + gson.toJson(features)); } else if (event.getName().equals(PATCH_FEATURE)) { FeaturePatchData data = gson.fromJson(event.readData(), FeaturePatchData.class); store.upsert(data.key(), data.feature()); - System.out.println("replaced single feature: " + gson.toJson(data.feature())); } else if (event.getName().equals(DELETE_FEATURE)) { FeatureDeleteData data = gson.fromJson(event.readData(), FeatureDeleteData.class); store.delete(data.key(), data.version()); - System.out.println("deleted feature " + data.key()); } else { // TODO log an error From a6bdfa361dfb58c69004a9f6b30e0d6a2f7f870d Mon Sep 17 00:00:00 2001 From: Patrick Kaeding Date: Fri, 4 Sep 2015 17:24:49 -0700 Subject: [PATCH 15/15] added debugStreaming config parameter --- .../java/com/launchdarkly/client/LDClient.java | 4 ++-- .../java/com/launchdarkly/client/LDConfig.java | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/launchdarkly/client/LDClient.java b/src/main/java/com/launchdarkly/client/LDClient.java index 3b54a3f8d..c0f34db02 100644 --- a/src/main/java/com/launchdarkly/client/LDClient.java +++ b/src/main/java/com/launchdarkly/client/LDClient.java @@ -190,10 +190,10 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) { if (this.config.stream && this.streamProcessor != null && this.streamProcessor.initialized()) { logger.debug("Using feature flag stored from streaming API"); result = (FeatureRep) this.streamProcessor.getFeature(featureKey); - if (logger.isDebugEnabled()) { + if (config.debugStreaming) { FeatureRep pollingResult = fetchFeature(featureKey); if (!result.equals(pollingResult)) { - logger.debug("Mismatch between streaming and polling feature! Streaming: {} Polling: {}", result, pollingResult); + logger.warn("Mismatch between streaming and polling feature! Streaming: {} Polling: {}", result, pollingResult); } } } else { diff --git a/src/main/java/com/launchdarkly/client/LDConfig.java b/src/main/java/com/launchdarkly/client/LDConfig.java index f411b0467..7143586e2 100644 --- a/src/main/java/com/launchdarkly/client/LDConfig.java +++ b/src/main/java/com/launchdarkly/client/LDConfig.java @@ -32,6 +32,7 @@ public final class LDConfig { final int flushInterval; final HttpHost proxyHost; final boolean stream; + final boolean debugStreaming; protected LDConfig(Builder builder) { this.baseURI = builder.baseURI; @@ -42,6 +43,7 @@ protected LDConfig(Builder builder) { this.proxyHost = builder.proxyHost(); this.streamURI = builder.streamURI; this.stream = builder.stream; + this.debugStreaming = builder.debugStreaming; } /** @@ -67,6 +69,7 @@ public static class Builder{ private int proxyPort = -1; private String proxyScheme; private boolean stream = false; + private boolean debugStreaming = false; /** * Creates a builder with all configuration parameters set to the default @@ -94,6 +97,18 @@ public Builder streamURI(URI streamURI) { return this; } + /** + * Set whether we should debug streaming mode. If set, the client will fetch features via polling and compare the + * retrieved feature with the value in the feature store. There is a performance cost to this, so it is not + * recommended for production use. + * @param debugStreaming whether streaming mode should be debugged + * @return + */ + public Builder debugStreaming(boolean debugStreaming) { + this.debugStreaming = debugStreaming; + return this; + } + /** * Set whether streaming mode should be enabled * @param stream whether streaming mode should be enabled