diff --git a/build.gradle b/build.gradle index 939dea2e1..b2a867332 100644 --- a/build.gradle +++ b/build.gradle @@ -5,14 +5,21 @@ apply plugin: 'signing' apply plugin: 'idea' apply plugin: 'com.github.johnrengelman.shadow' +configurations.all { + // check for updates every build for dependencies with: 'changing: true' + resolutionStrategy.cacheChangingModulesFor 0, 'seconds' +} + repositories { - mavenCentral() mavenLocal() + // Before LaunchDarkly release artifacts get synced to Maven Central they are here along with snapshots: + maven { url "https://oss.sonatype.org/content/groups/public/" } + mavenCentral() } allprojects { group = 'com.launchdarkly' - version = "0.20.0" + version = "1.0.0-SNAPSHOT" sourceCompatibility = 1.7 targetCompatibility = 1.7 } @@ -24,11 +31,11 @@ dependencies { compile "com.google.code.gson:gson:2.2.4" compile "com.google.guava:guava:19.0" compile "org.slf4j:slf4j-api:1.7.7" - compile "org.glassfish.jersey.media:jersey-media-sse:2.20" + compile group: "com.launchdarkly", name: "okhttp-eventsource", version: "0.1.3-SNAPSHOT", changing: true compile "redis.clients:jedis:2.8.0" - testCompile "org.easymock:easymock:3.3" + testCompile "org.easymock:easymock:3.4" testCompile 'junit:junit:[4.10,)' - testRuntime "org.slf4j:slf4j-simple:1.7.7" + testRuntime "ch.qos.logback:logback-classic:1.1.3" } jar { @@ -50,7 +57,7 @@ buildscript { } dependencies { classpath 'org.ajoberstar:gradle-git:0.12.0' - classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.2' + classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.3' } } @@ -125,12 +132,6 @@ uploadArchives { description 'Official LaunchDarkly SDK for Java' url 'https://github.com/launchdarkly/java-client' - scm { - connection 'scm:svn:http://foo.googlecode.com/svn/trunk/' - developerConnection 'scm:svn:https://foo.googlecode.com/svn/trunk/' - url 'http://foo.googlecode.com/svn/trunk/' - } - licenses { license { name 'The Apache License, Version 2.0' diff --git a/src/main/java/com/launchdarkly/client/EventProcessor.java b/src/main/java/com/launchdarkly/client/EventProcessor.java index 713edb853..ab4eb1362 100644 --- a/src/main/java/com/launchdarkly/client/EventProcessor.java +++ b/src/main/java/com/launchdarkly/client/EventProcessor.java @@ -1,5 +1,6 @@ package com.launchdarkly.client; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.Gson; import org.apache.http.HttpStatus; import org.apache.http.client.methods.CloseableHttpResponse; @@ -14,22 +15,34 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Random; import java.util.concurrent.*; class EventProcessor implements Closeable { - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory()); + private final ScheduledExecutorService scheduler; + private final Random random = new Random(); private final BlockingQueue queue; private final String apiKey; + private final LDConfig config; private final Consumer consumer; EventProcessor(String apiKey, LDConfig config) { this.apiKey = apiKey; this.queue = new ArrayBlockingQueue<>(config.capacity); this.consumer = new Consumer(config); + this.config = config; + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("LaunchDarkly-EventProcessor-%d") + .build(); + this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); this.scheduler.scheduleAtFixedRate(consumer, 0, config.flushInterval, TimeUnit.SECONDS); } boolean sendEvent(Event e) { + if (config.samplingInterval > 0 && random.nextInt(config.samplingInterval) != 0) { + return true; + } return queue.offer(e); } @@ -43,18 +56,8 @@ public void flush() { this.consumer.flush(); } - static class DaemonThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread thread = new Thread(r); - thread.setDaemon(true); - return thread; - } - } - class Consumer implements Runnable { private final Logger logger = LoggerFactory.getLogger(Consumer.class); - - private final CloseableHttpClient client; private final LDConfig config; @@ -78,6 +81,7 @@ public void flush() { } private void postEvents(List events) { + logger.debug("Posting " + events.size() + " event(s).."); CloseableHttpResponse response = null; Gson gson = new Gson(); String json = gson.toJson(events); @@ -95,16 +99,14 @@ private void postEvents(List events) { if (status >= 300) { if (status == HttpStatus.SC_UNAUTHORIZED) { logger.error("Invalid API key"); - } - else { + } else { logger.error("Unexpected status code: " + status); } - } - else { - logger.debug("Successfully processed events"); + } else { + logger.debug("Successfully posted " + events.size() + " event(s)."); } } catch (IOException e) { - logger.error("Unhandled exception in LaunchDarkly client", e); + logger.error("Unhandled exception in LaunchDarkly client attempting to connect to URI: " + config.eventsURI, e); } finally { try { if (response != null) response.close(); @@ -112,7 +114,6 @@ private void postEvents(List events) { logger.error("Unhandled exception in LaunchDarkly client", e); } } - } } } diff --git a/src/main/java/com/launchdarkly/client/EventSource.java b/src/main/java/com/launchdarkly/client/EventSource.java deleted file mode 100644 index 9d206cd80..000000000 --- a/src/main/java/com/launchdarkly/client/EventSource.java +++ /dev/null @@ -1,662 +0,0 @@ -package com.launchdarkly.client; - -import org.glassfish.jersey.internal.util.collection.StringKeyIgnoreCaseMultivaluedMap; -import org.glassfish.jersey.media.sse.*; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.ServiceUnavailableException; -import javax.ws.rs.client.Invocation; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.MultivaluedMap; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; - -// EventSource class modified from -// https://github.com/jersey/jersey/blob/master/media/sse/src/main/java/org/glassfish/jersey/media/sse/EventSource.java -// Modifications: -// - support for custom headers -// - set spawned thread as a daemon to permit application shutdown -public class EventSource implements EventListener { - - /** - * Default SSE {@link EventSource} reconnect delay value in milliseconds. - * - * @since 2.3 - */ - public static final long RECONNECT_DEFAULT = 500; - - private static enum State { - READY, OPEN, CLOSED - } - - private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EventSource.class); - - /** - * SSE streaming resource target. - */ - private final WebTarget target; - /** - * Default reconnect delay. - */ - private final long reconnectDelay; - /** - * Flag indicating if the persistent HTTP connections should be disabled. - */ - private final boolean disableKeepAlive; - /** - * Incoming SSE event processing task executor. - */ - private final ScheduledExecutorService executor; - /** - * Event source internal state. - */ - private final AtomicReference state = new AtomicReference<>(State.READY); - /** - * List of all listeners not bound to receive only events of a particular name. - */ - private final List unboundListeners = new CopyOnWriteArrayList<>(); - /** - * A map of listeners bound to receive only events of a particular name. - */ - private final ConcurrentMap> boundListeners = new ConcurrentHashMap<>(); - - private final MultivaluedMap headers; - - /** - * Jersey {@link EventSource} builder class. - * - * Event source builder provides methods that let you conveniently configure and subsequently build - * a new {@code EventSource} instance. You can obtain a new event source builder instance using - * a static {@link EventSource#target(javax.ws.rs.client.WebTarget) EventSource.target(endpoint)} factory method. - *

- * For example: - *

- *
-   * EventSource es = EventSource.target(endpoint).named("my source")
-   *                             .reconnectingEvery(5, SECONDS)
-   *                             .open();
-   * 
- * - * @since 2.3 - */ - public static class Builder { - - private final WebTarget endpoint; - - private long reconnect = EventSource.RECONNECT_DEFAULT; - private String name = null; - private boolean disableKeepAlive = true; - - private Builder(final WebTarget endpoint) { - this.endpoint = endpoint; - } - - private MultivaluedMap headers = new StringKeyIgnoreCaseMultivaluedMap<>(); - - /** - * Set a custom name for the event source. - *

- * At present, custom event source name is mainly useful to be able to distinguish different event source - * event processing threads from one another. If not set, a default name will be generated using the - * SSE endpoint URI. - *

- * - * @param name custom event source name. - * @return updated event source builder instance. - */ - public Builder named(String name) { - this.name = name; - return this; - } - - /** - * Instruct event source to use - * persistent HTTP connections when connecting - * (or reconnecting) to the SSE endpoint, provided the mechanism is supported by the underlying client - * {@link org.glassfish.jersey.client.spi.Connector}. - *

- * By default, the persistent HTTP connections are disabled for the reasons discussed in the {@link EventSource} - * javadoc. - *

- * - * @return updated event source builder instance. - */ - public Builder usePersistentConnections() { - disableKeepAlive = false; - return this; - } - - public Builder header(String name, Object value) { - if (value == null) { - headers.remove(name); - } else { - headers.add(name, value); - } - return this; - } - - /** - * Set the initial reconnect delay to be used by the event source. - *

- * Note that this value may be later overridden by the SSE endpoint using either a {@code retry} SSE event field - * or HTTP 503 + {@value javax.ws.rs.core.HttpHeaders#RETRY_AFTER} mechanism as described - * in the {@link EventSource} javadoc. - *

- * - * @param delay the default time to wait before attempting to recover from a connection loss. - * @param unit time unit of the reconnect delay parameter. - * @return updated event source builder instance. - */ - public Builder reconnectingEvery(final long delay, TimeUnit unit) { - reconnect = unit.toMillis(delay); - return this; - } - - /** - * Build new SSE event source pointing at a SSE streaming {@link WebTarget web target}. - *

- * The returned event source is ready, but not {@link EventSource#open() connected} to the SSE endpoint. - * It is expected that you will manually invoke its {@link #open()} method once you are ready to start - * receiving SSE events. In case you want to build an event source instance that is already connected - * to the SSE endpoint, use the event source builder {@link #open()} method instead. - *

- *

- * Once the event source is open, the incoming events are processed by the event source in an - * asynchronous task that runs in an internal single-threaded {@link ScheduledExecutorService - * scheduled executor service}. - *

- * - * @return new event source instance, ready to be connected to the SSE endpoint. - * @see #open() - */ - public EventSource build() { - return new EventSource(endpoint, name, reconnect, disableKeepAlive, false, headers); - } - - /** - * Build new SSE event source pointing at a SSE streaming {@link WebTarget web target}. - *

- * The returned event source is already {@link EventSource#open() connected} to the SSE endpoint - * and is processing any new incoming events. In case you want to build an event source instance - * that is already ready, but not automatically connected to the SSE endpoint, use the event source - * builder {@link #build()} method instead. - *

- *

- * The incoming events are processed by the event source in an asynchronous task that runs in an - * internal single-threaded {@link ScheduledExecutorService scheduled executor service}. - *

- * - * @return new event source instance, already connected to the SSE endpoint. - * @see #build() - */ - public EventSource open() { - // opening directly in the constructor is just plain ugly... - final EventSource source = new EventSource(endpoint, name, reconnect, disableKeepAlive, false, headers); - source.open(); - return source; - } - } - - /** - * Create a new {@link EventSource.Builder event source builder} that provides convenient way how to - * configure and fine-tune various aspects of a newly prepared event source instance. - * - * @param endpoint SSE streaming endpoint. Must not be {@code null}. - * @return a builder of a new event source instance pointing at the specified SSE streaming endpoint. - * @throws NullPointerException in case the supplied web target is {@code null}. - * @since 2.3 - */ - public static Builder target(WebTarget endpoint) { - return new Builder(endpoint); - } - - /** - * Create new SSE event source and open a connection it to the supplied SSE streaming {@link WebTarget web target}. - * - * This constructor is performs the same series of actions as a call to: - *
EventSource.target(endpoint).open()
- *

- * The created event source instance automatically {@link #open opens a connection} to the supplied SSE streaming - * web target and starts processing incoming {@link org.glassfish.jersey.media.sse.InboundEvent events}. - *

- *

- * The incoming events are processed by the event source in an asynchronous task that runs in an - * internal single-threaded {@link ScheduledExecutorService scheduled executor service}. - *

- * - * @param endpoint SSE streaming endpoint. Must not be {@code null}. - * @throws NullPointerException in case the supplied web target is {@code null}. - */ - public EventSource(final WebTarget endpoint) { - this(endpoint, true, new StringKeyIgnoreCaseMultivaluedMap<>()); - } - - /** - * Create new SSE event source pointing at a SSE streaming {@link WebTarget web target}. - * - * This constructor is performs the same series of actions as a call to: - *
-   * if (open) {
-   *     EventSource.target(endpoint).open();
-   * } else {
-   *     EventSource.target(endpoint).build();
-   * }
- *

- * If the supplied {@code open} flag is {@code true}, the created event source instance automatically - * {@link #open opens a connection} to the supplied SSE streaming web target and starts processing incoming - * {@link org.glassfish.jersey.media.sse.InboundEvent events}. - * Otherwise, if the {@code open} flag is set to {@code false}, the created event source instance - * is not automatically connected to the web target. In this case it is expected that the user who - * created the event source will manually invoke its {@link #open()} method. - *

- *

- * Once the event source is open, the incoming events are processed by the event source in an - * asynchronous task that runs in an internal single-threaded {@link ScheduledExecutorService - * scheduled executor service}. - *

- * - * @param endpoint SSE streaming endpoint. Must not be {@code null}. - * @param open if {@code true}, the event source will immediately connect to the SSE endpoint, - * if {@code false}, the connection will not be established until {@link #open()} method is - * called explicitly on the event stream. - * @throws NullPointerException in case the supplied web target is {@code null}. - */ - public EventSource(final WebTarget endpoint, final boolean open, final MultivaluedMap headers) { - this(endpoint, null, RECONNECT_DEFAULT, true, open, headers); - } - - private EventSource(final WebTarget target, - final String name, - final long reconnectDelay, - final boolean disableKeepAlive, - final boolean open, - final MultivaluedMap headers) { - if (target == null) { - throw new NullPointerException("Web target is 'null'."); - } - this.target = target; // SseFeature.register(target); - this.reconnectDelay = reconnectDelay; - this.disableKeepAlive = disableKeepAlive; - - final String esName = (name == null) ? createDefaultName(target) : name; - this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, esName); - t.setDaemon(true); - return t; - } - }); - - this.headers = new StringKeyIgnoreCaseMultivaluedMap<>(); - this.headers.putAll(headers); - - if (open) { - open(); - } - } - - private static String createDefaultName(WebTarget target) { - return String.format("jersey-sse-event-source-[%s]", target.getUri().toASCIIString()); - } - - /** - * Open the connection to the supplied SSE underlying {@link WebTarget web target} and start processing incoming - * {@link org.glassfish.jersey.media.sse.InboundEvent events}. - * - * @throws IllegalStateException in case the event source has already been opened earlier. - */ - public void open() { - if (!state.compareAndSet(State.READY, State.OPEN)) { - switch (state.get()) { - case OPEN: - throw new IllegalStateException(LocalizationMessages.EVENT_SOURCE_ALREADY_CONNECTED()); - case CLOSED: - throw new IllegalStateException(LocalizationMessages.EVENT_SOURCE_ALREADY_CLOSED()); - } - } - - EventProcessor processor = new EventProcessor(reconnectDelay, null, headers); - executor.submit(processor); - - // return only after the first request to the SSE endpoint has been made - //processor.awaitFirstContact(); - } - - /** - * Check if this event source instance has already been {@link #open() opened}. - * - * @return {@code true} if this event source is open, {@code false} otherwise. - */ - public boolean isOpen() { - return state.get() == State.OPEN; - } - - /** - * Register new {@link EventListener event listener} to receive all streamed {@link org.glassfish.jersey.media.sse.InboundEvent SSE events}. - * - * @param listener event listener to be registered with the event source. - * @see #register(EventListener, String, String...) - */ - public void register(final EventListener listener) { - register(listener, null); - } - - /** - * Add name-bound {@link EventListener event listener} which will be called only for incoming SSE - * {@link org.glassfish.jersey.media.sse.InboundEvent events} whose {@link org.glassfish.jersey.media.sse.InboundEvent#getName() name} is equal to the specified - * name(s). - * - * @param listener event listener to register with this event source. - * @param eventName inbound event name. - * @param eventNames additional event names. - * @see #register(EventListener) - */ - public void register(final EventListener listener, final String eventName, final String... eventNames) { - if (eventName == null) { - unboundListeners.add(listener); - } else { - addBoundListener(eventName, listener); - - if (eventNames != null) { - for (String name : eventNames) { - addBoundListener(name, listener); - } - } - } - } - - private void addBoundListener(final String name, final EventListener listener) { - List listeners = boundListeners.putIfAbsent(name, - new CopyOnWriteArrayList<>(Collections.singleton(listener))); - if (listeners != null) { - // alas, new listener collection registration conflict: - // need to add the new listener to the existing listener collection - listeners.add(listener); - } - } - - /** - * {@inheritDoc} - *

- * The default {@code EventSource} implementation is empty, users can override this method to handle - * incoming {@link org.glassfish.jersey.media.sse.InboundEvent}s. - *

- *

- * Note that overriding this method may be necessary to make sure no {@code InboundEvent incoming events} - * are lost in case the event source is constructed using {@link #EventSource(javax.ws.rs.client.WebTarget)} - * constructor or in case a {@code true} flag is passed to the {@link #EventSource(javax.ws.rs.client.WebTarget, boolean, javax.ws.rs.core.MultivaluedMap)} - * constructor, since the connection is opened as as part of the constructor call and the event processing starts - * immediately. Therefore any {@link EventListener}s registered later after the event source has been constructed - * may miss the notifications about the one or more events that arrive immediately after the connection to the - * event source is established. - *

- * - * @param inboundEvent received inbound event. - */ - @Override - public void onEvent(final InboundEvent inboundEvent) { - // do nothing - } - - /** - * Close this event source. - * - * The method will wait up to 5 seconds for the internal event processing task to complete. - */ - public void close() { - close(5, TimeUnit.SECONDS); - } - - /** - * Close this event source and wait for the internal event processing task to complete - * for up to the specified amount of wait time. - *

- * The method blocks until the event processing task has completed execution after a shutdown - * request, or until the timeout occurs, or the current thread is interrupted, whichever happens - * first. - *

- *

- * In case the waiting for the event processing task has been interrupted, this method restores - * the {@link Thread#interrupted() interrupt} flag on the thread before returning {@code false}. - *

- * - * @param timeout the maximum time to wait. - * @param unit the time unit of the timeout argument. - * @return {@code true} if this executor terminated and {@code false} if the timeout elapsed - * before termination or the termination was interrupted. - */ - public boolean close(final long timeout, final TimeUnit unit) { - shutdown(); - try { - if (!executor.awaitTermination(timeout, unit)) { - return false; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - return true; - } - - private void shutdown() { - if (state.getAndSet(State.CLOSED) != State.CLOSED) { - executor.shutdownNow(); - } - } - - /** - * Private event processor task responsible for connecting to the SSE stream and processing - * incoming SSE events as well as handling any connection issues. - */ - private class EventProcessor implements Runnable, EventListener { - - /** - * Open connection response arrival synchronization latch. - */ - private final CountDownLatch firstContactSignal; - private final MultivaluedMap headers; - /** - * Last received event id. - */ - private String lastEventId; - /** - * Re-connect delay. - */ - private long reconnectDelay; - - public EventProcessor(final long reconnectDelay, final String lastEventId, final MultivaluedMap headers) { - /** - * Synchronization barrier used to signal that the initial contact with SSE endpoint - * has been made. - */ - this.firstContactSignal = new CountDownLatch(1); - - this.reconnectDelay = reconnectDelay; - this.lastEventId = lastEventId; - this.headers = headers; - } - - private EventProcessor(final EventProcessor that) { - this.firstContactSignal = null; - - this.reconnectDelay = that.reconnectDelay; - this.lastEventId = that.lastEventId; - this.headers = that.headers; - } - - @Override - public void run() { - logger.debug("Listener task started."); - - EventInput eventInput = null; - try { - try { - final Invocation.Builder request = prepareHandshakeRequest(); - if (state.get() == State.OPEN) { // attempt to connect only if even source is open - logger.debug("Connecting..."); - eventInput = request.get(EventInput.class); - logger.debug("Connected!"); - } - } catch (Exception e) { - logger.warn("Encountered error trying to connect", e); - } finally { - if (firstContactSignal != null) { - // release the signal regardless of event source state or connection request outcome - firstContactSignal.countDown(); - } - } - - final Thread execThread = Thread.currentThread(); - - while (state.get() == State.OPEN && !execThread.isInterrupted()) { - if (eventInput == null || eventInput.isClosed()) { - logger.debug("Connection lost - scheduling reconnect in {} ms", reconnectDelay); - scheduleReconnect(reconnectDelay); - break; - } else { - this.onEvent(eventInput.read()); - } - } - } catch (ServiceUnavailableException ex) { - logger.debug("Received HTTP 503"); - long delay = reconnectDelay; - if (ex.hasRetryAfter()) { - logger.debug("Recovering from HTTP 503 using HTTP Retry-After header value as a reconnect delay"); - final Date requestTime = new Date(); - delay = ex.getRetryTime(requestTime).getTime() - requestTime.getTime(); - delay = (delay > 0) ? delay : 0; - } - - logger.debug("Recovering from HTTP 503 - scheduling to reconnect in {} ms", delay); - scheduleReconnect(delay); - } catch (Exception ex) { - logger.debug("Recovering from exception -- scheduling reconnect in {} ms", reconnectDelay, ex); - scheduleReconnect(reconnectDelay); - - } finally { - if (eventInput != null && !eventInput.isClosed()) { - eventInput.close(); - } - logger.debug("Listener task finished."); - } - } - - /** - * Called by the event source when an inbound event is received. - * - * This listener aggregator method is responsible for invoking {@link EventSource#onEvent(InboundEvent)} - * method on the owning event source as well as for notifying all registered {@link EventListener event listeners}. - * - * @param event incoming {@link InboundEvent inbound event}. - */ - @Override - public void onEvent(final InboundEvent event) { - if (event == null) { - return; - } - - logger.debug("New event received."); - - if (event.getId() != null) { - lastEventId = event.getId(); - } - if (event.isReconnectDelaySet()) { - reconnectDelay = event.getReconnectDelay(); - } - - notify(EventSource.this, event); - notify(unboundListeners, event); - - final String eventName = event.getName(); - if (eventName != null) { - final List eventListeners = boundListeners.get(eventName); - if (eventListeners != null) { - notify(eventListeners, event); - } - } - } - - private void notify(final Collection listeners, final InboundEvent event) { - for (EventListener listener : listeners) { - notify(listener, event); - } - } - - private void notify(final EventListener listener, final InboundEvent event) { - try { - listener.onEvent(event); - } catch (Exception ex) { - logger.warn(String.format("Event notification in a listener of %s class failed.", - listener.getClass().getName()), ex); - } - } - - /** - * Schedule a new event processor task to reconnect after the specified {@code delay} [milliseconds]. - * - * If the {@code delay} is zero or negative, the new reconnect task will be scheduled immediately. - * The {@code reconnectDelay} and {@code lastEventId} field values are propagated into the newly - * scheduled task. - *

- * The method will silently abort in case the event source is not {@link EventSource#isOpen() open}. - *

- * - * @param delay specifies the amount of time [milliseconds] to wait before attempting a reconnect. - * If zero or negative, the new reconnect task will be scheduled immediately. - */ - private void scheduleReconnect(final long delay) { - final State s = state.get(); - if (s != State.OPEN) { - logger.debug("Aborting reconnect of event source in {} state", state); - return; - } - - // propagate the current reconnectDelay, but schedule based on the delay parameter - final EventProcessor processor = new EventProcessor(reconnectDelay, null, headers); - if (delay > 0) { - executor.schedule(processor, delay, TimeUnit.MILLISECONDS); - } else { - executor.submit(processor); - } - } - - private Invocation.Builder prepareHandshakeRequest() { - final Invocation.Builder request = target.request(SseFeature.SERVER_SENT_EVENTS_TYPE); - // TODO add the SERVER_SENT_EVENTS_TYPE header - request.headers(this.headers); - if (lastEventId != null && !lastEventId.isEmpty()) { - request.header(SseFeature.LAST_EVENT_ID_HEADER, lastEventId); - } - if (disableKeepAlive) { - request.header("Connection", "close"); - } - return request; - } - - /** - * Await the initial contact with the SSE endpoint. - */ - public void awaitFirstContact() { - logger.debug("Awaiting first contact signal."); - try { - if (firstContactSignal == null) { - return; - } - - try { - firstContactSignal.await(); - } catch (InterruptedException ex) { - logger.warn(LocalizationMessages.EVENT_SOURCE_OPEN_CONNECTION_INTERRUPTED(), ex); - Thread.currentThread().interrupt(); - } - } finally { - logger.debug("First contact signal released."); - } - } - } -} \ No newline at end of file diff --git a/src/main/java/com/launchdarkly/client/FeatureRequestor.java b/src/main/java/com/launchdarkly/client/FeatureRequestor.java index 2bab25df6..2dc949b50 100644 --- a/src/main/java/com/launchdarkly/client/FeatureRequestor.java +++ b/src/main/java/com/launchdarkly/client/FeatureRequestor.java @@ -68,6 +68,7 @@ Map> makeAllRequest(boolean latest) throws IOException { CloseableHttpResponse response = null; try { + logger.debug("Making request: " + request); response = client.execute(request, context); logCacheResponse(context.getCacheResponseStatus()); @@ -76,7 +77,10 @@ Map> makeAllRequest(boolean latest) throws IOException { Type type = new TypeToken>>() {}.getType(); - Map> result = gson.fromJson(EntityUtils.toString(response.getEntity()), type); + String json = EntityUtils.toString(response.getEntity()); + logger.debug("Got response: " + response.toString()); + logger.debug("Got Response body: " + json); + Map> result = gson.fromJson(json, type); return result; } finally { diff --git a/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java b/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java index 9a5d2f6a3..a4837955f 100644 --- a/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java +++ b/src/main/java/com/launchdarkly/client/InMemoryFeatureStore.java @@ -12,9 +12,9 @@ */ public class InMemoryFeatureStore implements FeatureStore { - final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - final Map> features = new HashMap<>(); - volatile boolean initialized = false; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Map> features = new HashMap<>(); + private volatile boolean initialized = false; /** diff --git a/src/main/java/com/launchdarkly/client/LDClient.java b/src/main/java/com/launchdarkly/client/LDClient.java index a5f5bdff4..cc9a833c5 100644 --- a/src/main/java/com/launchdarkly/client/LDClient.java +++ b/src/main/java/com/launchdarkly/client/LDClient.java @@ -1,6 +1,7 @@ package com.launchdarkly.client; +import com.google.common.annotations.VisibleForTesting; import com.google.gson.JsonElement; import org.apache.http.annotation.ThreadSafe; import org.slf4j.Logger; @@ -9,14 +10,15 @@ import java.io.Closeable; import java.io.IOException; import java.net.URL; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.jar.Attributes; import java.util.jar.Manifest; /** - * * A client for the LaunchDarkly API. Client instances are thread-safe. Applications should instantiate * a single {@code LDClient} for the lifetime of their application. - * */ @ThreadSafe public class LDClient implements Closeable { @@ -24,16 +26,14 @@ public class LDClient implements Closeable { private final LDConfig config; private final FeatureRequestor requestor; private final EventProcessor eventProcessor; - private final StreamProcessor streamProcessor; + private UpdateProcessor updateProcessor; protected static final String CLIENT_VERSION = getClientVersion(); - private volatile boolean offline = false; - /** * Creates a new client instance that connects to LaunchDarkly with the default configuration. In most * cases, you should use this constructor. * - * @param apiKey the API key for your account + * @param apiKey the API key for your account */ public LDClient(String apiKey) { this(apiKey, LDConfig.DEFAULT); @@ -43,36 +43,70 @@ public LDClient(String apiKey) { * Creates a new client to connect to LaunchDarkly with a custom configuration. This constructor * can be used to configure advanced client features, such as customizing the LaunchDarkly base URL. * - * @param apiKey the API key for your account - * @param config a client configuration object + * @param apiKey the API key for your account + * @param config a client configuration object */ public LDClient(String apiKey, LDConfig config) { this.config = config; this.requestor = createFeatureRequestor(apiKey, config); this.eventProcessor = createEventProcessor(apiKey, config); + if (config.offline) { + logger.info("Starting LaunchDarkly client in offline mode"); + return; + } + + if (config.useLdd) { + logger.info("Starting LaunchDarkly in LDD mode. Skipping direct feature retrieval."); + return; + } + if (config.stream) { - logger.debug("Enabling streaming API"); - this.streamProcessor = createStreamProcessor(apiKey, config, requestor); - this.streamProcessor.subscribe(); + logger.info("Enabling streaming API"); + this.updateProcessor = createStreamProcessor(apiKey, config, requestor); } else { - logger.debug("Streaming API disabled"); - this.streamProcessor = null; + logger.info("Disabling streaming API"); + this.updateProcessor = createPollingProcessor(config); } + + Future startFuture = updateProcessor.start(); + + if (config.startWaitMillis > 0L) { + logger.info("Waiting up to " + config.startWaitMillis + " milliseconds for LaunchDarkly client to start..."); + try { + startFuture.get(config.startWaitMillis, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + logger.error("Timeout encountered waiting for LaunchDarkly client initialization"); + } catch (Exception e) { + logger.error("Exception encountered waiting for LaunchDarkly client initialization", e); + } + } + } + + public boolean initialized() { + return isOffline() || config.useLdd || updateProcessor.initialized(); } + @VisibleForTesting protected FeatureRequestor createFeatureRequestor(String apiKey, LDConfig config) { return new FeatureRequestor(apiKey, config); } + @VisibleForTesting protected EventProcessor createEventProcessor(String apiKey, LDConfig config) { return new EventProcessor(apiKey, config); } + @VisibleForTesting protected StreamProcessor createStreamProcessor(String apiKey, LDConfig config, FeatureRequestor requestor) { return new StreamProcessor(apiKey, config, requestor); } + @VisibleForTesting + protected PollingProcessor createPollingProcessor(LDConfig config) { + return new PollingProcessor(config, requestor); + } + /** * Tracks that a user performed an event. @@ -82,6 +116,9 @@ protected StreamProcessor createStreamProcessor(String apiKey, LDConfig config, * @param data a JSON object containing additional data associated with the event */ public void track(String eventName, LDUser user, JsonElement data) { + if (isOffline()) { + return; + } boolean processed = eventProcessor.sendEvent(new CustomEvent(eventName, user, data)); if (!processed) { logger.warn("Exceeded event queue capacity. Increase capacity to avoid dropping events."); @@ -95,7 +132,7 @@ public void track(String eventName, LDUser user, JsonElement data) { * @param user the user that performed the event */ public void track(String eventName, LDUser user) { - if (this.offline) { + if (isOffline()) { return; } track(eventName, user, null); @@ -106,7 +143,7 @@ public void track(String eventName, LDUser user) { * @param user the user to register */ public void identify(LDUser user) { - if (this.offline) { + if (isOffline()) { return; } boolean processed = eventProcessor.sendEvent(new IdentifyEvent(user)); @@ -116,6 +153,9 @@ public void identify(LDUser user) { } private void sendFlagRequestEvent(String featureKey, LDUser user, boolean value, boolean defaultValue) { + if (isOffline()) { + return; + } boolean processed = eventProcessor.sendEvent(new FeatureRequestEvent<>(featureKey, user, value, defaultValue)); if (!processed) { logger.warn("Exceeded event queue capacity. Increase capacity to avoid dropping events."); @@ -145,41 +185,34 @@ public boolean getFlag(String featureKey, LDUser user, boolean defaultValue) { * @return whether or not the flag should be enabled, or {@code defaultValue} if the flag is disabled in the LaunchDarkly control panel */ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) { - if (this.offline) { + if (isOffline()) { return defaultValue; } + boolean value = evaluate(featureKey, user, defaultValue); + sendFlagRequestEvent(featureKey, user, value, defaultValue); + return value; + } + + private boolean evaluate(String featureKey, LDUser user, boolean defaultValue) { + if (!initialized()) { + return defaultValue; + } + try { - FeatureRep result; - if (this.config.stream && this.streamProcessor != null && this.streamProcessor.initialized()) { - logger.debug("Using feature flag stored from streaming API"); - result = (FeatureRep) this.streamProcessor.getFeature(featureKey); - if (config.debugStreaming) { - FeatureRep pollingResult = requestor.makeRequest(featureKey, true); - if (!result.equals(pollingResult)) { - logger.warn("Mismatch between streaming and polling feature! Streaming: {} Polling: {}", result, pollingResult); - } - } - } else { - // If streaming is enabled, always get the latest version of the feature while polling - result = requestor.makeRequest(featureKey, this.config.stream); - } + FeatureRep result = (FeatureRep) config.featureStore.get(featureKey); if (result == null) { - logger.warn("Unknown feature flag " + featureKey + "; returning default value"); - sendFlagRequestEvent(featureKey, user, defaultValue, defaultValue); + logger.warn("Unknown feature flag " + featureKey + "; returning default value: "); return defaultValue; } Boolean val = result.evaluate(user); if (val == null) { - sendFlagRequestEvent(featureKey, user, defaultValue, defaultValue); return defaultValue; } else { - sendFlagRequestEvent(featureKey, user, val, defaultValue); return val; } } catch (Exception e) { logger.error("Encountered exception in LaunchDarkly client", e); - sendFlagRequestEvent(featureKey, user, defaultValue, defaultValue); return defaultValue; } } @@ -195,8 +228,8 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) { @Override public void close() throws IOException { this.eventProcessor.close(); - if (this.streamProcessor != null) { - this.streamProcessor.close(); + if (this.updateProcessor != null) { + this.updateProcessor.close(); } } @@ -208,29 +241,10 @@ public void flush() { } /** - * Puts the LaunchDarkly client in offline mode. - * In offline mode, all calls to {@link #toggle(String, LDUser, boolean)} will return the default value, and - * {@link #track(String, LDUser, com.google.gson.JsonElement)} will be a no-op. - * - */ - public void setOffline() { - this.offline = true; - } - - /** - * Puts the LaunchDarkly client in online mode. - * - */ - public void setOnline() { - this.offline = false; - } - - /** - * * @return whether the client is in offline mode */ public boolean isOffline() { - return this.offline; + return config.offline; } private static String getClientVersion() { diff --git a/src/main/java/com/launchdarkly/client/LDConfig.java b/src/main/java/com/launchdarkly/client/LDConfig.java index d1e25928b..0dd0d6461 100644 --- a/src/main/java/com/launchdarkly/client/LDConfig.java +++ b/src/main/java/com/launchdarkly/client/LDConfig.java @@ -21,6 +21,9 @@ public final class LDConfig { private static final int DEFAULT_CONNECT_TIMEOUT = 2000; private static final int DEFAULT_SOCKET_TIMEOUT = 10000; private static final int DEFAULT_FLUSH_INTERVAL = 5; + private static final long DEFAULT_POLLING_INTERVAL_MILLIS = 1000L; + private static final long DEFAULT_START_WAIT_MILLIS = 5000L; + private static final int DEFAULT_SAMPLING_INTERVAL = 0; private static final Logger logger = LoggerFactory.getLogger(LDConfig.class); protected static final LDConfig DEFAULT = new Builder().build(); @@ -37,6 +40,10 @@ public final class LDConfig { final boolean debugStreaming; final FeatureStore featureStore; final boolean useLdd; + final boolean offline; + final long pollingIntervalMillis; + final long startWaitMillis; + final int samplingInterval; protected LDConfig(Builder builder) { this.baseURI = builder.baseURI; @@ -51,21 +58,28 @@ protected LDConfig(Builder builder) { this.debugStreaming = builder.debugStreaming; this.featureStore = builder.featureStore; this.useLdd = builder.useLdd; + this.offline = builder.offline; + if (builder.pollingIntervalMillis < DEFAULT_POLLING_INTERVAL_MILLIS) { + this.pollingIntervalMillis = DEFAULT_POLLING_INTERVAL_MILLIS; + } else { + this.pollingIntervalMillis = builder.pollingIntervalMillis; + } + this.startWaitMillis = builder.startWaitMillis; + this.samplingInterval = builder.samplingInterval; } /** * A builder that helps construct {@link com.launchdarkly.client.LDConfig} objects. Builder * calls can be chained, enabling the following pattern: - * + * *
    * LDConfig config = new LDConfig.Builder()
    *      .connectTimeout(3)
    *      .socketTimeout(3)
    *      .build()
    * 
- * */ - public static class Builder{ + public static class Builder { private URI baseURI = DEFAULT_BASE_URI; private URI eventsURI = DEFAULT_EVENTS_URI; private URI streamURI = DEFAULT_STREAM_URI; @@ -79,7 +93,11 @@ public static class Builder{ private boolean stream = true; private boolean debugStreaming = false; private boolean useLdd = false; + private boolean offline = false; + private long pollingIntervalMillis = DEFAULT_POLLING_INTERVAL_MILLIS; private FeatureStore featureStore = new InMemoryFeatureStore(); + private long startWaitMillis = DEFAULT_START_WAIT_MILLIS; + private int samplingInterval = DEFAULT_SAMPLING_INTERVAL; /** * Creates a builder with all configuration parameters set to the default @@ -231,6 +249,7 @@ public Builder capacity(int capacity) { * If none of {@link #proxyHost(String)}, {@link #proxyPort(int)} or {@link #proxyScheme(String)} are specified, * a proxy will not be used, and {@link LDClient} will connect to LaunchDarkly directly. *

+ * * @param host * @return the builder */ @@ -242,11 +261,12 @@ public Builder proxyHost(String host) { /** * Set the port to use for an HTTP proxy for making connections to LaunchDarkly. If not set (but {@link #proxyHost(String)} * or {@link #proxyScheme(String)} are specified, the default port for the scheme will be used. - * + *

*

* If none of {@link #proxyHost(String)}, {@link #proxyPort(int)} or {@link #proxyScheme(String)} are specified, * a proxy will not be used, and {@link LDClient} will connect to LaunchDarkly directly. *

+ * * @param port * @return the builder */ @@ -258,11 +278,12 @@ public Builder proxyPort(int port) { /** * Set the scheme to use for an HTTP proxy for making connections to LaunchDarkly. If not set (but {@link #proxyHost(String)} * or {@link #proxyPort(int)} are specified, the default https scheme will be used. - * + *

*

* If none of {@link #proxyHost(String)}, {@link #proxyPort(int)} or {@link #proxyScheme(String)} are specified, * a proxy will not be used, and {@link LDClient} will connect to LaunchDarkly directly. *

+ * * @param scheme * @return the builder */ @@ -274,6 +295,7 @@ public Builder proxyScheme(String scheme) { /** * Set whether this client should subscribe to the streaming API, or whether the LaunchDarkly daemon is in use * instead + * * @param useLdd * @return the builder */ @@ -282,6 +304,58 @@ public Builder useLdd(boolean useLdd) { return this; } + /** + * Set whether this client is offline. + * + * @param offline when set to true no calls to LaunchDarkly will be made. + * @return the builder + */ + public Builder offline(boolean offline) { + this.offline = offline; + return this; + } + + /** + * Set the polling interval (when streaming is disabled). Values less than {@value #DEFAULT_POLLING_INTERVAL_MILLIS} + * will be set to the default of {@value #DEFAULT_POLLING_INTERVAL_MILLIS} + * + * @param pollingIntervalMillis rule update polling interval in milliseconds. + * @return the builder + */ + public Builder pollingIntervalMillis(long pollingIntervalMillis) { + this.pollingIntervalMillis = pollingIntervalMillis; + return this; + } + + /** + * Set how long the constructor will block awaiting a successful connection to LaunchDarkly. + * Setting this to 0 will not block and cause the constructor to return immediately. + * Default value: {@value #DEFAULT_START_WAIT_MILLIS} + * + * + * @param startWaitMillis milliseconds to wait + * @return the builder + */ + public Builder startWaitMillis(long startWaitMillis) { + this.startWaitMillis = startWaitMillis; + return this; + } + + /** + * Enable event sampling. When set to the default of zero, sampling is disabled and all events + * are sent back to LaunchDarkly. When set to greater than zero, there is a 1 in + * samplingInterval chance events will be will be sent. + * + *

Example: if you want 5% sampling rate, set samplingInterval to 20. + * + * @param samplingInterval the sampling interval. + * @return the builder + */ + public Builder samplingInterval(int samplingInterval) { + this.samplingInterval = samplingInterval; + return this; + } + HttpHost proxyHost() { if (this.proxyHost == null && this.proxyPort == -1 && this.proxyScheme == null) { return null; @@ -294,6 +368,7 @@ HttpHost proxyHost() { /** * Build the configured {@link com.launchdarkly.client.LDConfig} object + * * @return the {@link com.launchdarkly.client.LDConfig} configured by this builder */ public LDConfig build() { @@ -311,9 +386,9 @@ private URIBuilder getBuilder() { private URIBuilder getEventsBuilder() { return new URIBuilder() - .setScheme(eventsURI.getScheme()) - .setHost(eventsURI.getHost()) - .setPort(eventsURI.getPort()); + .setScheme(eventsURI.getScheme()) + .setHost(eventsURI.getHost()) + .setPort(eventsURI.getPort()); } HttpGet getRequest(String apiKey, String path) { @@ -325,8 +400,7 @@ HttpGet getRequest(String apiKey, String path) { request.addHeader("User-Agent", "JavaClient/" + LDClient.CLIENT_VERSION); return request; - } - catch (Exception e) { + } catch (Exception e) { logger.error("Unhandled exception in LaunchDarkly client", e); return null; } @@ -341,8 +415,7 @@ HttpPost postRequest(String apiKey, String path) { request.addHeader("User-Agent", "JavaClient/" + LDClient.CLIENT_VERSION); return request; - } - catch (Exception e) { + } catch (Exception e) { logger.error("Unhandled exception in LaunchDarkly client", e); return null; } @@ -357,8 +430,7 @@ HttpPost postEventsRequest(String apiKey, String path) { request.addHeader("User-Agent", "JavaClient/" + LDClient.CLIENT_VERSION); return request; - } - catch (Exception e) { + } catch (Exception e) { logger.error("Unhandled exception in LaunchDarkly client", e); return null; } diff --git a/src/main/java/com/launchdarkly/client/PollingProcessor.java b/src/main/java/com/launchdarkly/client/PollingProcessor.java new file mode 100644 index 000000000..3b1541715 --- /dev/null +++ b/src/main/java/com/launchdarkly/client/PollingProcessor.java @@ -0,0 +1,63 @@ +package com.launchdarkly.client; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +public class PollingProcessor implements UpdateProcessor { + private static final Logger logger = LoggerFactory.getLogger(PollingProcessor.class); + + private final FeatureRequestor requestor; + private final LDConfig config; + private final FeatureStore store; + private AtomicBoolean initialized = new AtomicBoolean(false); + private ScheduledExecutorService scheduler = null; + + PollingProcessor(LDConfig config, FeatureRequestor requestor) { + this.requestor = requestor; + this.config = config; + this.store = config.featureStore; + } + + @Override + public boolean initialized() { + return initialized.get(); + } + + @Override + public void close() throws IOException { + scheduler.shutdown(); + } + + @Override + public Future start() { + logger.info("Starting LaunchDarkly polling client with interval: " + + config.pollingIntervalMillis + " milliseconds"); + final VeryBasicFuture initFuture = new VeryBasicFuture(); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("LaunchDarkly-PollingProcessor-%d") + .build(); + scheduler = Executors.newScheduledThreadPool(1, threadFactory); + + scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + store.init(requestor.makeAllRequest(true)); + if (!initialized.getAndSet(true)) { + logger.info("Initialized LaunchDarkly client."); + initFuture.completed(null); + } + } catch (IOException e) { + logger.error("Encountered exception in LaunchDarkly client when retrieving update", e); + } + } + }, 0L, config.pollingIntervalMillis, TimeUnit.MILLISECONDS); + + return initFuture; + } +} diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index 8f6a5035a..cb9d90aca 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -2,22 +2,21 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import org.glassfish.jersey.internal.util.collection.StringKeyIgnoreCaseMultivaluedMap; -import org.glassfish.jersey.media.sse.InboundEvent; -import org.glassfish.jersey.media.sse.SseFeature; +import com.launchdarkly.eventsource.EventHandler; +import com.launchdarkly.eventsource.EventSource; +import com.launchdarkly.eventsource.MessageEvent; +import okhttp3.Headers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.MultivaluedMap; -import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Type; +import java.net.URI; import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; -class StreamProcessor implements Closeable { +class StreamProcessor implements UpdateProcessor { private static final String PUT = "put"; private static final String PATCH = "patch"; private static final String DELETE = "delete"; @@ -25,62 +24,72 @@ class StreamProcessor implements Closeable { private static final String INDIRECT_PATCH = "indirect/patch"; private static final Logger logger = LoggerFactory.getLogger(StreamProcessor.class); - private final Client client; private final FeatureStore store; private final LDConfig config; private final String apiKey; private final FeatureRequestor requestor; private EventSource es; + private AtomicBoolean initialized = new AtomicBoolean(false); StreamProcessor(String apiKey, LDConfig config, FeatureRequestor requestor) { - this.client = ClientBuilder.newBuilder().register(SseFeature.class).build(); this.store = config.featureStore; this.config = config; this.apiKey = apiKey; this.requestor = requestor; } - void subscribe() { - // If the LaunchDarkly daemon is to be used, then do not subscribe to the stream - if (config.useLdd) { - return; - } + @Override + public Future start() { + final VeryBasicFuture initFuture = new VeryBasicFuture(); + + Headers headers = new Headers.Builder() + .add("Authorization", "api_key " + this.apiKey) + .add("User-Agent", "JavaClient/" + LDClient.CLIENT_VERSION) + .add("Accept", "text/event-stream") + .build(); - MultivaluedMap headers = new StringKeyIgnoreCaseMultivaluedMap<>(); - headers.putSingle("Authorization", "api_key " + this.apiKey); - headers.putSingle("User-Agent", "JavaClient/" + LDClient.CLIENT_VERSION); - headers.putSingle("Accept", SseFeature.SERVER_SENT_EVENTS_TYPE); + EventHandler handler = new EventHandler() { - WebTarget target = client.target(config.streamURI.toASCIIString() + "/features"); + @Override + public void onOpen() throws Exception { + + } - es = new EventSource(target, true, headers) { @Override - public void onEvent(InboundEvent event) { + public void onMessage(String name, MessageEvent event) throws Exception { Gson gson = new Gson(); - if (event.getName().equals(PUT)) { + if (name.equals(PUT)) { Type type = new TypeToken>>(){}.getType(); - Map> features = gson.fromJson(event.readData(), type); + Map> features = gson.fromJson(event.getData(), type); store.init(features); + if (!initialized.getAndSet(true)) { + initFuture.completed(null); + logger.info("Initialized LaunchDarkly client."); + } } - else if (event.getName().equals(PATCH)) { - FeaturePatchData data = gson.fromJson(event.readData(), FeaturePatchData.class); + else if (name.equals(PATCH)) { + FeaturePatchData data = gson.fromJson(event.getData(), FeaturePatchData.class); store.upsert(data.key(), data.feature()); } - else if (event.getName().equals(DELETE)) { - FeatureDeleteData data = gson.fromJson(event.readData(), FeatureDeleteData.class); + else if (name.equals(DELETE)) { + FeatureDeleteData data = gson.fromJson(event.getData(), FeatureDeleteData.class); store.delete(data.key(), data.version()); } - else if (event.getName().equals(INDIRECT_PUT)) { + else if (name.equals(INDIRECT_PUT)) { try { Map> features = requestor.makeAllRequest(true); store.init(features); + if (!initialized.getAndSet(true)) { + initFuture.completed(null); + logger.info("Initialized LaunchDarkly client."); + } } catch (IOException e) { logger.error("Encountered exception in LaunchDarkly client", e); } } - else if (event.getName().equals(INDIRECT_PATCH)) { - String key = event.readData(); + else if (name.equals(INDIRECT_PATCH)) { + String key = event.getData(); try { FeatureRep feature = requestor.makeRequest(key, true); store.upsert(key, feature); @@ -89,11 +98,22 @@ else if (event.getName().equals(INDIRECT_PATCH)) { } } else { - logger.warn("Unexpected event found in stream: " + event.getName()); + logger.warn("Unexpected event found in stream: " + event.getData()); } } + + @Override + public void onError(Throwable throwable) { + logger.warn("Encountered EventSource error", throwable); + } }; + es = new EventSource.Builder(handler, URI.create(config.streamURI.toASCIIString() + "/features")) + .headers(headers) + .build(); + + es.start(); + return initFuture; } @Override @@ -106,8 +126,9 @@ public void close() throws IOException { } } - boolean initialized() { - return store.initialized(); + @Override + public boolean initialized() { + return initialized.get(); } FeatureRep getFeature(String key) { diff --git a/src/main/java/com/launchdarkly/client/UpdateProcessor.java b/src/main/java/com/launchdarkly/client/UpdateProcessor.java new file mode 100644 index 000000000..d5d2876f7 --- /dev/null +++ b/src/main/java/com/launchdarkly/client/UpdateProcessor.java @@ -0,0 +1,23 @@ +package com.launchdarkly.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.Future; + +public interface UpdateProcessor extends Closeable { + + /** + * Starts the client. + * @return {@link Future}'s completion status indicates the client has been initialized. + */ + Future start(); + + /** + * Returns true once the client has been initialized and will never return false again. + * @return + */ + boolean initialized(); + + + void close() throws IOException; +} diff --git a/src/main/java/com/launchdarkly/client/VeryBasicFuture.java b/src/main/java/com/launchdarkly/client/VeryBasicFuture.java new file mode 100644 index 000000000..6bcab2811 --- /dev/null +++ b/src/main/java/com/launchdarkly/client/VeryBasicFuture.java @@ -0,0 +1,30 @@ +package com.launchdarkly.client; + +import org.apache.http.concurrent.BasicFuture; +import org.apache.http.concurrent.FutureCallback; + +import java.util.concurrent.Future; + +/** + * Very Basic {@link Future} implementation extending {@link BasicFuture} with no callback or return value. + */ +public class VeryBasicFuture extends BasicFuture { + + public VeryBasicFuture() { + super(new NoOpFutureCallback()); + } + + static class NoOpFutureCallback implements FutureCallback { + @Override + public void completed(Void result) { + } + + @Override + public void failed(Exception ex) { + } + + @Override + public void cancelled() { + } + } +} diff --git a/src/test/java/com/launchdarkly/client/LDClientTest.java b/src/test/java/com/launchdarkly/client/LDClientTest.java index bf8d0a587..78e020d4a 100644 --- a/src/test/java/com/launchdarkly/client/LDClientTest.java +++ b/src/test/java/com/launchdarkly/client/LDClientTest.java @@ -1,36 +1,162 @@ package com.launchdarkly.client; -import static org.easymock.EasyMock.*; -import static org.junit.Assert.assertEquals; - -import org.apache.http.impl.client.CloseableHttpClient; -import org.easymock.*; +import org.easymock.EasyMockSupport; +import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class LDClientTest extends EasyMockSupport { + private FeatureRequestor requestor; + private StreamProcessor streamProcessor; + private PollingProcessor pollingProcessor; + private EventProcessor eventProcessor; + private Future initFuture; + private LDClient client; + + @Before + public void before() { + requestor = createStrictMock(FeatureRequestor.class); + streamProcessor = createStrictMock(StreamProcessor.class); + pollingProcessor = createStrictMock(PollingProcessor.class); + eventProcessor = createStrictMock(EventProcessor.class); + initFuture = createStrictMock(Future.class); + } + + @Test + public void testOffline() throws IOException { + LDConfig config = new LDConfig.Builder() + .offline(true) + .build(); + + client = createMockClient(config); + replayAll(); + + assertDefaultValueIsReturned(); + assertTrue(client.initialized()); + verifyAll(); + } + + @Test + public void testUseLdd() throws IOException { + LDConfig config = new LDConfig.Builder() + .useLdd(true) + .build(); + + client = createMockClient(config); + // Asserting 2 things here: no pollingProcessor or streamingProcessor activity + // and sending of event: + expect(eventProcessor.sendEvent(anyObject(Event.class))).andReturn(true); + replayAll(); + + assertDefaultValueIsReturned(); + assertTrue(client.initialized()); + verifyAll(); + } + + @Test + public void testStreamingNoWait() throws IOException { + LDConfig config = new LDConfig.Builder() + .startWaitMillis(0L) + .stream(true) + .build(); - private CloseableHttpClient httpClient = createMock(CloseableHttpClient.class); + expect(streamProcessor.start()).andReturn(initFuture); + expect(streamProcessor.initialized()).andReturn(false); + expect(eventProcessor.sendEvent(anyObject(Event.class))).andReturn(true); + replayAll(); - private FeatureRequestor requestor = createMock(FeatureRequestor.class); + client = createMockClient(config); + assertDefaultValueIsReturned(); - LDClient client = new LDClient("API_KEY") { + verifyAll(); + } + + @Test + public void testStreamingWait() throws Exception { + LDConfig config = new LDConfig.Builder() + .startWaitMillis(10L) + .stream(true) + .build(); + + expect(streamProcessor.start()).andReturn(initFuture); + expect(initFuture.get(10L, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException()); + replayAll(); + + client = createMockClient(config); + verifyAll(); + } + + @Test + public void testPollingNoWait() throws IOException { + LDConfig config = new LDConfig.Builder() + .startWaitMillis(0L) + .stream(false) + .build(); + + expect(pollingProcessor.start()).andReturn(initFuture); + expect(pollingProcessor.initialized()).andReturn(false); + expect(eventProcessor.sendEvent(anyObject(Event.class))).andReturn(true); + replayAll(); - @Override - protected FeatureRequestor createFeatureRequestor(String apiKey, LDConfig config) { - return requestor; - } - }; + client = createMockClient(config); + assertDefaultValueIsReturned(); + + verifyAll(); + } @Test - public void testExceptionThrownByHttpClientReturnsDefaultValue() throws IOException { + public void testPollingWait() throws Exception { + LDConfig config = new LDConfig.Builder() + .startWaitMillis(10L) + .stream(false) + .build(); - expect(requestor.makeRequest(anyString(), anyBoolean())).andThrow(new IOException()); - replay(requestor); + expect(pollingProcessor.start()).andReturn(initFuture); + expect(initFuture.get(10L, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException()); + expect(eventProcessor.sendEvent(anyObject(Event.class))).andReturn(true); + expect(pollingProcessor.initialized()).andReturn(false); + replayAll(); + client = createMockClient(config); + assertDefaultValueIsReturned(); + verifyAll(); + } + + private void assertDefaultValueIsReturned() { boolean result = client.toggle("test", new LDUser("test.key"), true); assertEquals(true, result); - verify(requestor); + } + + private LDClient createMockClient(LDConfig config) { + return new LDClient("API_KEY", config) { + @Override + protected FeatureRequestor createFeatureRequestor(String apiKey, LDConfig config) { + return requestor; + } + + @Override + protected StreamProcessor createStreamProcessor(String apiKey, LDConfig config, FeatureRequestor requestor) { + return streamProcessor; + } + + @Override + protected PollingProcessor createPollingProcessor(LDConfig config) { + return pollingProcessor; + } + + @Override + protected EventProcessor createEventProcessor(String apiKey, LDConfig config) { + return eventProcessor; + } + }; } } diff --git a/src/test/java/com/launchdarkly/client/LDConfigTest.java b/src/test/java/com/launchdarkly/client/LDConfigTest.java index de22de994..d5078750c 100644 --- a/src/test/java/com/launchdarkly/client/LDConfigTest.java +++ b/src/test/java/com/launchdarkly/client/LDConfigTest.java @@ -1,13 +1,13 @@ package com.launchdarkly.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - import org.apache.http.client.methods.HttpPost; import org.junit.Test; import java.net.URI; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + public class LDConfigTest { @Test public void testConnectTimeoutSpecifiedInSeconds() { @@ -100,4 +100,16 @@ public void testCustomEventsUriIsConstructedProperly(){ HttpPost post = config.postEventsRequest("dummy-api-key", "/bulk"); assertEquals("http://localhost:3000/api/events/bulk", post.getURI().toString()); } + + @Test + public void testMinimumPollingIntervalIsEnforcedProperly(){ + LDConfig config = new LDConfig.Builder().pollingIntervalMillis(10L).build(); + assertEquals(1000L, config.pollingIntervalMillis); + } + + @Test + public void testPollingIntervalIsEnforcedProperly(){ + LDConfig config = new LDConfig.Builder().pollingIntervalMillis(10001L).build(); + assertEquals(10001L, config.pollingIntervalMillis); + } } \ No newline at end of file diff --git a/src/test/java/com/launchdarkly/client/PollingProcessorTest.java b/src/test/java/com/launchdarkly/client/PollingProcessorTest.java new file mode 100644 index 000000000..1b5cc05e6 --- /dev/null +++ b/src/test/java/com/launchdarkly/client/PollingProcessorTest.java @@ -0,0 +1,61 @@ +package com.launchdarkly.client; + +import org.easymock.EasyMockSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.sql.Time; +import java.util.HashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class PollingProcessorTest extends EasyMockSupport { + @Test + public void testConnectionOk() throws Exception { + FeatureRequestor requestor = createStrictMock(FeatureRequestor.class); + PollingProcessor pollingProcessor = new PollingProcessor(LDConfig.DEFAULT, requestor); + + expect(requestor.makeAllRequest(true)) + .andReturn(new HashMap>()) + .once(); + replayAll(); + + Future initFuture = pollingProcessor.start(); + initFuture.get(100, TimeUnit.MILLISECONDS); + assertTrue(pollingProcessor.initialized()); + pollingProcessor.close(); + verifyAll(); + } + + @Test + public void testConnectionProblem() throws Exception { + FeatureRequestor requestor = createStrictMock(FeatureRequestor.class); + PollingProcessor pollingProcessor = new PollingProcessor(LDConfig.DEFAULT, requestor); + + expect(requestor.makeAllRequest(true)) + .andThrow(new IOException("This exception is part of a test and yes you should be seeing it.")) + .once(); + replayAll(); + + Future initFuture = pollingProcessor.start(); + try { + initFuture.get(100L, TimeUnit.MILLISECONDS); + fail("Expected Timeout, instead initFuture.get() returned."); + } catch (TimeoutException expected) { + } + assertFalse(initFuture.isDone()); + assertFalse(pollingProcessor.initialized()); + pollingProcessor.close(); + verifyAll(); + } +} diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml new file mode 100644 index 000000000..757bb2429 --- /dev/null +++ b/src/test/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36}:%line - %msg%n + + + + + + +