From 770169bec0fba81471683e6f301d35d2058c7bb7 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 21 Feb 2018 19:05:39 -0800 Subject: [PATCH 1/4] fix unmarshaling of stream put data --- .../com/launchdarkly/client/StreamProcessor.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index a3008fb88..105d39145 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -83,8 +83,8 @@ public void onMessage(String name, MessageEvent event) throws Exception { Gson gson = new Gson(); switch (name) { case PUT: { - FeatureRequestor.AllData allData = gson.fromJson(event.getData(), FeatureRequestor.AllData.class); - store.init(FeatureRequestor.toVersionedDataMap(allData)); + PutData putData = gson.fromJson(event.getData(), PutData.class); + store.init(FeatureRequestor.toVersionedDataMap(putData.data)); if (!initialized.getAndSet(true)) { initFuture.set(null); logger.info("Initialized LaunchDarkly client."); @@ -200,6 +200,14 @@ public boolean initialized() { return initialized.get(); } + private static final class PutData { + FeatureRequestor.AllData data; + + public PutData() { + + } + } + private static final class PatchData { String path; JsonElement data; From 28ca2aff7f6af1a8a132dbeaad364d186963d455 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 21 Feb 2018 20:18:38 -0800 Subject: [PATCH 2/4] add unit tests for StreamProcessor --- .../launchdarkly/client/StreamProcessor.java | 20 +- .../client/StreamProcessorTest.java | 338 ++++++++++++++++++ 2 files changed, 353 insertions(+), 5 deletions(-) create mode 100644 src/test/java/com/launchdarkly/client/StreamProcessorTest.java diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index 105d39145..15fce2a04 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.JsonElement; @@ -161,8 +162,19 @@ public void onError(Throwable throwable) { } }; + es = createEventSource(handler, + URI.create(config.streamURI.toASCIIString() + "/all"), + connectionErrorHandler, + headers); + es.start(); + return initFuture; + } + + @VisibleForTesting + protected EventSource createEventSource(EventHandler handler, URI streamUri, ConnectionErrorHandler errorHandler, + Headers headers) { EventSource.Builder builder = new EventSource.Builder(handler, URI.create(config.streamURI.toASCIIString() + "/all")) - .connectionErrorHandler(connectionErrorHandler) + .connectionErrorHandler(errorHandler) .headers(headers) .reconnectTimeMs(config.reconnectTimeMs) .connectTimeoutMs(config.connectTimeoutMillis) @@ -179,11 +191,9 @@ public void onError(Throwable throwable) { } } - es = builder.build(); - es.start(); - return initFuture; + return builder.build(); } - + @Override public void close() throws IOException { logger.info("Closing LaunchDarkly StreamProcessor"); diff --git a/src/test/java/com/launchdarkly/client/StreamProcessorTest.java b/src/test/java/com/launchdarkly/client/StreamProcessorTest.java new file mode 100644 index 000000000..94382d1c1 --- /dev/null +++ b/src/test/java/com/launchdarkly/client/StreamProcessorTest.java @@ -0,0 +1,338 @@ +package com.launchdarkly.client; + +import com.launchdarkly.eventsource.ConnectionErrorHandler; +import com.launchdarkly.eventsource.EventHandler; +import com.launchdarkly.eventsource.EventSource; +import com.launchdarkly.eventsource.MessageEvent; +import com.launchdarkly.eventsource.UnsuccessfulResponseException; + +import org.easymock.EasyMockSupport; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.concurrent.Future; + +import static com.launchdarkly.client.VersionedDataKind.FEATURES; +import static com.launchdarkly.client.VersionedDataKind.SEGMENTS; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import okhttp3.Headers; + +public class StreamProcessorTest extends EasyMockSupport { + + private static final String SDK_KEY = "sdk_key"; + private static final URI STREAM_URI = URI.create("http://stream.test.com"); + private static final String FEATURE1_KEY = "feature1"; + private static final int FEATURE1_VERSION = 11; + private static final FeatureFlag FEATURE = new FeatureFlagBuilder(FEATURE1_KEY).version(FEATURE1_VERSION).build(); + private static final String SEGMENT1_KEY = "segment1"; + private static final int SEGMENT1_VERSION = 22; + private static final Segment SEGMENT = new Segment.Builder(SEGMENT1_KEY).version(SEGMENT1_VERSION).build(); + + private InMemoryFeatureStore featureStore; + private LDConfig.Builder configBuilder; + private FeatureRequestor mockRequestor; + private EventSource mockEventSource; + private EventHandler eventHandler; + private URI actualStreamUri; + private ConnectionErrorHandler errorHandler; + private Headers headers; + + @Before + public void setup() { + featureStore = new InMemoryFeatureStore(); + configBuilder = new LDConfig.Builder().featureStore(featureStore); + mockRequestor = createStrictMock(FeatureRequestor.class); + mockEventSource = createStrictMock(EventSource.class); + } + + @Test + public void streamUriHasCorrectEndpoint() { + LDConfig config = configBuilder.streamURI(STREAM_URI).build(); + createStreamProcessor(SDK_KEY, config).start(); + assertEquals(URI.create(STREAM_URI.toString() + "/all"), actualStreamUri); + } + + @Test + public void headersHaveAuthorization() { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + assertEquals(SDK_KEY, headers.get("Authorization")); + } + + @Test + public void headersHaveUserAgent() { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + assertEquals("JavaClient/" + LDClient.CLIENT_VERSION, headers.get("User-Agent")); + } + + @Test + public void headersHaveAccept() { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + assertEquals("text/event-stream", headers.get("Accept")); + } + + @Test + public void putCausesFeatureToBeStored() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + MessageEvent event = new MessageEvent("{\"data\":{\"flags\":{\"" + + FEATURE1_KEY + "\":" + featureJson(FEATURE1_KEY, FEATURE1_VERSION) + "}," + + "\"segments\":{}}}"); + eventHandler.onMessage("put", event); + + assertFeatureInStore(FEATURE); + } + + @Test + public void putCausesSegmentToBeStored() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + MessageEvent event = new MessageEvent("{\"data\":{\"flags\":{},\"segments\":{\"" + + SEGMENT1_KEY + "\":" + segmentJson(SEGMENT1_KEY, SEGMENT1_VERSION) + "}}}"); + eventHandler.onMessage("put", event); + + assertSegmentInStore(SEGMENT); + } + + @Test + public void storeNotInitializedByDefault() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + assertFalse(featureStore.initialized()); + } + + @Test + public void putCausesStoreToBeInitialized() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + eventHandler.onMessage("put", emptyPutEvent()); + assertTrue(featureStore.initialized()); + } + + @Test + public void processorNotInitializedByDefault() throws Exception { + StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build()); + sp.start(); + assertFalse(sp.initialized()); + } + + @Test + public void putCausesProcessorToBeInitialized() throws Exception { + StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build()); + sp.start(); + eventHandler.onMessage("put", emptyPutEvent()); + assertTrue(sp.initialized()); + } + + @Test + public void futureIsNotSetByDefault() throws Exception { + StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build()); + Future future = sp.start(); + assertFalse(future.isDone()); + } + + @Test + public void putCausesFutureToBeSet() throws Exception { + StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build()); + Future future = sp.start(); + eventHandler.onMessage("put", emptyPutEvent()); + assertTrue(future.isDone()); + } + + @Test + public void patchUpdatesFeature() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + eventHandler.onMessage("put", emptyPutEvent()); + + String path = "/flags/" + FEATURE1_KEY; + MessageEvent event = new MessageEvent("{\"path\":\"" + path + "\",\"data\":" + + featureJson(FEATURE1_KEY, FEATURE1_VERSION) + "}"); + eventHandler.onMessage("patch", event); + + assertFeatureInStore(FEATURE); + } + + @Test + public void patchUpdatesSegment() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + eventHandler.onMessage("put", emptyPutEvent()); + + String path = "/segments/" + SEGMENT1_KEY; + MessageEvent event = new MessageEvent("{\"path\":\"" + path + "\",\"data\":" + + segmentJson(SEGMENT1_KEY, SEGMENT1_VERSION) + "}"); + eventHandler.onMessage("patch", event); + + assertSegmentInStore(SEGMENT); + } + + @Test + public void deleteDeletesFeature() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + eventHandler.onMessage("put", emptyPutEvent()); + featureStore.upsert(FEATURES, FEATURE); + + String path = "/flags/" + FEATURE1_KEY; + MessageEvent event = new MessageEvent("{\"path\":\"" + path + "\",\"version\":" + + (FEATURE1_VERSION + 1) + "}"); + eventHandler.onMessage("delete", event); + + assertNull(featureStore.get(FEATURES, FEATURE1_KEY)); + } + + @Test + public void deleteDeletesSegment() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + eventHandler.onMessage("put", emptyPutEvent()); + featureStore.upsert(SEGMENTS, SEGMENT); + + String path = "/segments/" + SEGMENT1_KEY; + MessageEvent event = new MessageEvent("{\"path\":\"" + path + "\",\"version\":" + + (SEGMENT1_VERSION + 1) + "}"); + eventHandler.onMessage("delete", event); + + assertNull(featureStore.get(SEGMENTS, SEGMENT1_KEY)); + } + + @Test + public void indirectPutRequestsAndStoresFeature() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + setupRequestorToReturnAllDataWithFlag(FEATURE); + replayAll(); + + eventHandler.onMessage("indirect/put", new MessageEvent("")); + + assertFeatureInStore(FEATURE); + } + + @Test + public void indirectPutInitializesStore() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + setupRequestorToReturnAllDataWithFlag(FEATURE); + replayAll(); + + eventHandler.onMessage("indirect/put", new MessageEvent("")); + + assertTrue(featureStore.initialized()); + } + + @Test + public void indirectPutInitializesProcessor() throws Exception { + StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build()); + sp.start(); + setupRequestorToReturnAllDataWithFlag(FEATURE); + replayAll(); + + eventHandler.onMessage("indirect/put", new MessageEvent("")); + + assertTrue(featureStore.initialized()); + } + + @Test + public void indirectPutSetsFuture() throws Exception { + StreamProcessor sp = createStreamProcessor(SDK_KEY, configBuilder.build()); + Future future = sp.start(); + setupRequestorToReturnAllDataWithFlag(FEATURE); + replayAll(); + + eventHandler.onMessage("indirect/put", new MessageEvent("")); + + assertTrue(future.isDone()); + } + + @Test + public void indirectPatchRequestsAndUpdatesFeature() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + expect(mockRequestor.getFlag(FEATURE1_KEY)).andReturn(FEATURE); + replayAll(); + + eventHandler.onMessage("put", emptyPutEvent()); + eventHandler.onMessage("indirect/patch", new MessageEvent("/flags/" + FEATURE1_KEY)); + + assertFeatureInStore(FEATURE); + } + + @Test + public void indirectPatchRequestsAndUpdatesSegment() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + expect(mockRequestor.getSegment(SEGMENT1_KEY)).andReturn(SEGMENT); + replayAll(); + + eventHandler.onMessage("put", emptyPutEvent()); + eventHandler.onMessage("indirect/patch", new MessageEvent("/segments/" + SEGMENT1_KEY)); + + assertSegmentInStore(SEGMENT); + } + + @Test + public void unknownEventTypeDoesNotThrowException() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + eventHandler.onMessage("what", new MessageEvent("")); + } + + @Test + public void streamWillReconnectAfterGeneralIOException() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + ConnectionErrorHandler.Action action = errorHandler.onConnectionError(new IOException()); + assertEquals(ConnectionErrorHandler.Action.PROCEED, action); + } + + @Test + public void streamWillReconnectAfterHttp500Error() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + UnsuccessfulResponseException e = new UnsuccessfulResponseException(500); + ConnectionErrorHandler.Action action = errorHandler.onConnectionError(e); + assertEquals(ConnectionErrorHandler.Action.PROCEED, action); + } + + @Test + public void streamWillCloseAfterHttp401Error() throws Exception { + createStreamProcessor(SDK_KEY, configBuilder.build()).start(); + UnsuccessfulResponseException e = new UnsuccessfulResponseException(401); + ConnectionErrorHandler.Action action = errorHandler.onConnectionError(e); + assertEquals(ConnectionErrorHandler.Action.SHUTDOWN, action); + } + + private StreamProcessor createStreamProcessor(String sdkKey, LDConfig config) { + return new StreamProcessor(sdkKey, config, mockRequestor) { + @Override + protected EventSource createEventSource(EventHandler handler, URI streamUri, ConnectionErrorHandler errorHandler, + Headers headers) { + + StreamProcessorTest.this.eventHandler = handler; + StreamProcessorTest.this.actualStreamUri = streamUri; + StreamProcessorTest.this.errorHandler = errorHandler; + StreamProcessorTest.this.headers = headers; + return mockEventSource; + } + }; + } + + private String featureJson(String key, int version) { + return "{\"key\":\"" + key + "\",\"version\":" + version + ",\"on\":true}"; + } + + private String segmentJson(String key, int version) { + return "{\"key\":\"" + key + "\",\"version\":" + version + ",\"includes\":[],\"excludes\":[],\"rules\":[]}"; + } + + private MessageEvent emptyPutEvent() { + return new MessageEvent("{\"data\":{\"flags\":{},\"segments\":{}}}"); + } + + private void setupRequestorToReturnAllDataWithFlag(FeatureFlag feature) throws Exception { + FeatureRequestor.AllData data = new FeatureRequestor.AllData( + Collections.singletonMap(feature.getKey(), feature), Collections.emptyMap()); + expect(mockRequestor.getAllData()).andReturn(data); + } + + private void assertFeatureInStore(FeatureFlag feature) { + assertEquals(feature.getVersion(), featureStore.get(FEATURES, feature.getKey()).getVersion()); + } + + private void assertSegmentInStore(Segment segment) { + assertEquals(segment.getVersion(), featureStore.get(SEGMENTS, segment.getKey()).getVersion()); + } +} From 75de70ee537eebab04f62d33d944d678a7fc2b12 Mon Sep 17 00:00:00 2001 From: Eli Bishop <35503443+eli-darkly@users.noreply.github.com> Date: Wed, 21 Feb 2018 22:59:04 -0800 Subject: [PATCH 3/4] deprecate 3.0.0 --- CHANGELOG.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb4b7fb65..dadd0abf0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,8 @@ All notable changes to the LaunchDarkly Java SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). ## [3.0.0] - 2018-02-21 -### Added -- Support for a new LaunchDarkly feature: reusable user segments. -### Changed -- The `FeatureStore` interface has been changed to support user segment data as well as feature flags. Existing code that uses `InMemoryFeatureStore` or `RedisFeatureStore` should work as before, but custom feature store implementations will need to be updated. -- Removed deprecated methods. +_This release was broken and should not be used._ ## [2.6.0] - 2018-02-12 From f61acd661147d983b011ee236c62cc7778bf3892 Mon Sep 17 00:00:00 2001 From: Eli Bishop <35503443+eli-darkly@users.noreply.github.com> Date: Wed, 21 Feb 2018 22:59:45 -0800 Subject: [PATCH 4/4] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index acfc0a73d..b5c6c53c5 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Quick setup com.launchdarkly launchdarkly-client - 3.0.0 + 2.6.0 1. Import the LaunchDarkly package: