From 546a22001ef264934eae6c63a97e9a739659d600 Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Fri, 29 Mar 2024 11:32:40 -0300 Subject: [PATCH 1/2] forward decorator into streaming --- .../io/split/client/NoOpHeaderDecorator.java | 18 ++++++++++++++++++ .../java/io/split/client/RequestDecorator.java | 8 -------- .../java/io/split/client/SplitFactoryImpl.java | 14 ++++++++++---- .../io/split/engine/common/PushManagerImp.java | 2 +- .../java/io/split/engine/common/SplitAPI.java | 11 ++++++++--- .../split/engine/sse/EventSourceClientImp.java | 13 +++++++++---- .../io/split/engine/sse/client/SSEClient.java | 11 +++++++++-- .../engine/sse/EventSourceClientTest.java | 7 ++++--- .../io/split/engine/sse/SSEClientTest.java | 3 ++- 9 files changed, 61 insertions(+), 26 deletions(-) create mode 100644 client/src/main/java/io/split/client/NoOpHeaderDecorator.java diff --git a/client/src/main/java/io/split/client/NoOpHeaderDecorator.java b/client/src/main/java/io/split/client/NoOpHeaderDecorator.java new file mode 100644 index 000000000..8ce04fdbc --- /dev/null +++ b/client/src/main/java/io/split/client/NoOpHeaderDecorator.java @@ -0,0 +1,18 @@ +package io.split.client; + +import io.split.client.CustomHeaderDecorator; +import io.split.client.dtos.RequestContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class NoOpHeaderDecorator implements CustomHeaderDecorator { + public NoOpHeaderDecorator() { + } + + @Override + public Map> getHeaderOverrides(RequestContext context) { + return new HashMap<>(); + } +} diff --git a/client/src/main/java/io/split/client/RequestDecorator.java b/client/src/main/java/io/split/client/RequestDecorator.java index 8bc9e216b..718717e59 100644 --- a/client/src/main/java/io/split/client/RequestDecorator.java +++ b/client/src/main/java/io/split/client/RequestDecorator.java @@ -13,14 +13,6 @@ import java.util.Set; import java.util.List; -class NoOpHeaderDecorator implements CustomHeaderDecorator { - public NoOpHeaderDecorator() {} - @Override - public Map> getHeaderOverrides(RequestContext context) { - return new HashMap<>(); - } -} - public final class RequestDecorator { CustomHeaderDecorator _headerDecorator; diff --git a/client/src/main/java/io/split/client/SplitFactoryImpl.java b/client/src/main/java/io/split/client/SplitFactoryImpl.java index 35bbed8c3..9625690b9 100644 --- a/client/src/main/java/io/split/client/SplitFactoryImpl.java +++ b/client/src/main/java/io/split/client/SplitFactoryImpl.java @@ -185,7 +185,8 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn _gates = new SDKReadinessGates(); // HttpClient - _splitHttpClient = buildSplitHttpClient(apiToken, config, _sdkMetadata); + RequestDecorator requestDecorator = new RequestDecorator(config.customHeaderDecorator()); + _splitHttpClient = buildSplitHttpClient(apiToken, config, _sdkMetadata, requestDecorator); // Roots _rootTarget = URI.create(config.endpoint()); @@ -252,7 +253,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn // SyncManager SplitTasks splitTasks = SplitTasks.build(_splitSynchronizationTask, _segmentSynchronizationTaskImp, _impressionsManager, _eventsTask, _telemetrySyncTask, _uniqueKeysTracker); - SplitAPI splitAPI = SplitAPI.build(_splitHttpClient, buildSSEdHttpClient(apiToken, config, _sdkMetadata)); + SplitAPI splitAPI = SplitAPI.build(_splitHttpClient, buildSSEdHttpClient(apiToken, config, _sdkMetadata), requestDecorator); _syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI, segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser, flagSetsFilter); @@ -473,7 +474,12 @@ public boolean isDestroyed() { return isTerminated; } - private static SplitHttpClient buildSplitHttpClient(String apiToken, SplitClientConfig config, SDKMetadata sdkMetadata) + private static SplitHttpClient buildSplitHttpClient( + String apiToken, + SplitClientConfig config, + SDKMetadata sdkMetadata, + RequestDecorator requestDecorator + ) throws URISyntaxException { SSLConnectionSocketFactory sslSocketFactory = SSLConnectionSocketFactoryBuilder.create() .setSslContext(SSLContexts.createSystemDefault()) @@ -508,7 +514,7 @@ private static SplitHttpClient buildSplitHttpClient(String apiToken, SplitClient httpClientbuilder = setupProxy(httpClientbuilder, config); } - return SplitHttpClientImpl.create(httpClientbuilder.build(), new RequestDecorator(config.customHeaderDecorator())); + return SplitHttpClientImpl.create(httpClientbuilder.build(), requestDecorator); } private static CloseableHttpClient buildSSEdHttpClient(String apiToken, SplitClientConfig config, SDKMetadata sdkMetadata) { diff --git a/client/src/main/java/io/split/engine/common/PushManagerImp.java b/client/src/main/java/io/split/engine/common/PushManagerImp.java index ff3343ed4..b6118efb6 100644 --- a/client/src/main/java/io/split/engine/common/PushManagerImp.java +++ b/client/src/main/java/io/split/engine/common/PushManagerImp.java @@ -83,7 +83,7 @@ public static PushManagerImp build(Synchronizer synchronizer, PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer); return new PushManagerImp(new AuthApiClientImp(authUrl, splitAPI.getHttpClient(), telemetryRuntimeProducer), EventSourceClientImp.build(streamingUrl, featureFlagsWorker, segmentWorker, pushStatusTracker, splitAPI.getSseHttpClient(), - telemetryRuntimeProducer, threadFactory), + telemetryRuntimeProducer, threadFactory, splitAPI.getRequestDecorator()), featureFlagsWorker, segmentWorker, pushStatusTracker, diff --git a/client/src/main/java/io/split/engine/common/SplitAPI.java b/client/src/main/java/io/split/engine/common/SplitAPI.java index b3b42966f..229f34cbc 100644 --- a/client/src/main/java/io/split/engine/common/SplitAPI.java +++ b/client/src/main/java/io/split/engine/common/SplitAPI.java @@ -1,5 +1,6 @@ package io.split.engine.common; +import io.split.client.RequestDecorator; import io.split.service.SplitHttpClient; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.slf4j.Logger; @@ -9,15 +10,17 @@ public class SplitAPI { private final SplitHttpClient _httpClient; private final CloseableHttpClient _sseHttpClient; + private final RequestDecorator _requestDecorator; private static final Logger _log = LoggerFactory.getLogger(SplitAPI.class); - private SplitAPI(SplitHttpClient httpClient, CloseableHttpClient sseHttpClient) { + private SplitAPI(SplitHttpClient httpClient, CloseableHttpClient sseHttpClient, RequestDecorator requestDecorator) { _httpClient = httpClient; _sseHttpClient = sseHttpClient; + _requestDecorator = requestDecorator; } - public static SplitAPI build(SplitHttpClient httpClient, CloseableHttpClient sseHttpClient){ - return new SplitAPI(httpClient,sseHttpClient); + public static SplitAPI build(SplitHttpClient httpClient, CloseableHttpClient sseHttpClient, RequestDecorator requestDecorator){ + return new SplitAPI(httpClient, sseHttpClient, requestDecorator); } public SplitHttpClient getHttpClient() { @@ -28,6 +31,8 @@ public CloseableHttpClient getSseHttpClient() { return _sseHttpClient; } + public RequestDecorator getRequestDecorator() { return _requestDecorator; } + public void close(){ try { _sseHttpClient.close(); diff --git a/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java b/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java index 35d1c05d7..212d929f3 100644 --- a/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java +++ b/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java @@ -2,6 +2,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import io.split.client.RequestDecorator; import io.split.engine.sse.client.RawEvent; import io.split.engine.sse.client.SSEClient; import io.split.engine.sse.dtos.SegmentQueueDto; @@ -40,7 +41,8 @@ public class EventSourceClientImp implements EventSourceClient { PushStatusTracker pushStatusTracker, CloseableHttpClient sseHttpClient, TelemetryRuntimeProducer telemetryRuntimeProducer, - ThreadFactory threadFactory) { + ThreadFactory threadFactory, + RequestDecorator requestDecorator) { _baseStreamingUrl = checkNotNull(baseStreamingUrl); _notificationParser = checkNotNull(notificationParser); _notificationProcessor = checkNotNull(notificationProcessor); @@ -51,7 +53,8 @@ public class EventSourceClientImp implements EventSourceClient { status -> { _pushStatusTracker.handleSseStatus(status); return null; }, sseHttpClient, telemetryRuntimeProducer, - threadFactory); + threadFactory, + requestDecorator); _firstEvent = new AtomicBoolean(); } @@ -61,14 +64,16 @@ public static EventSourceClientImp build(String baseStreamingUrl, PushStatusTracker pushStatusTracker, CloseableHttpClient sseHttpClient, TelemetryRuntimeProducer telemetryRuntimeProducer, - ThreadFactory threadFactory) { + ThreadFactory threadFactory, + RequestDecorator requestDecorator) { return new EventSourceClientImp(baseStreamingUrl, new NotificationParserImp(), NotificationProcessorImp.build(featureFlagsWorker, segmentWorker, pushStatusTracker), pushStatusTracker, sseHttpClient, telemetryRuntimeProducer, - threadFactory); + threadFactory, + requestDecorator); } @Override diff --git a/client/src/main/java/io/split/engine/sse/client/SSEClient.java b/client/src/main/java/io/split/engine/sse/client/SSEClient.java index 30dd16f20..9c2024d99 100644 --- a/client/src/main/java/io/split/engine/sse/client/SSEClient.java +++ b/client/src/main/java/io/split/engine/sse/client/SSEClient.java @@ -1,10 +1,12 @@ package io.split.engine.sse.client; import com.google.common.base.Strings; +import io.split.client.RequestDecorator; import io.split.telemetry.domain.StreamingEvent; import io.split.telemetry.domain.enums.StreamEventsEnum; import io.split.telemetry.storage.TelemetryRuntimeProducer; import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; import org.slf4j.Logger; @@ -56,6 +58,7 @@ private enum ConnectionState { private final AtomicReference _ongoingResponse = new AtomicReference<>(); private final AtomicReference _ongoingRequest = new AtomicReference<>(); private AtomicBoolean _forcedStop; + private final RequestDecorator _requestDecorator; private final TelemetryRuntimeProducer _telemetryRuntimeProducer; @@ -63,13 +66,15 @@ public SSEClient(Function eventCallback, Function statusCallback, CloseableHttpClient client, TelemetryRuntimeProducer telemetryRuntimeProducer, - ThreadFactory threadFactory) { + ThreadFactory threadFactory, + RequestDecorator requestDecorator) { _eventCallback = eventCallback; _statusCallback = statusCallback; _client = client; _forcedStop = new AtomicBoolean(); _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer); _connectionExecutor = buildExecutorService(threadFactory, "SPLIT-SSEConnection-%d"); + _requestDecorator = requestDecorator; } public synchronized boolean open(URI uri) { @@ -177,7 +182,9 @@ private void connectAndLoop(URI uri, CountDownLatch signal) { } private boolean establishConnection(URI uri, CountDownLatch signal) { - _ongoingRequest.set(new HttpGet(uri)); + HttpGet request = new HttpGet(uri); + request = (HttpGet) _requestDecorator.decorateHeaders(request); + _ongoingRequest.set(request); try { _ongoingResponse.set(_client.execute(_ongoingRequest.get())); if (_ongoingResponse.get().getCode() != 200) { diff --git a/client/src/test/java/io/split/engine/sse/EventSourceClientTest.java b/client/src/test/java/io/split/engine/sse/EventSourceClientTest.java index c5bc22b1b..604b6371f 100644 --- a/client/src/test/java/io/split/engine/sse/EventSourceClientTest.java +++ b/client/src/test/java/io/split/engine/sse/EventSourceClientTest.java @@ -1,6 +1,7 @@ package io.split.engine.sse; import io.split.SSEMockServer; +import io.split.client.RequestDecorator; import io.split.engine.sse.client.SSEClient; import io.split.engine.sse.dtos.ErrorNotification; import io.split.engine.sse.dtos.FeatureFlagChangeNotification; @@ -42,7 +43,7 @@ public void startShouldConnect() throws IOException { TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(InMemoryTelemetryStorage.class); sseServer.start(); - EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null); + EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null, new RequestDecorator(null)); boolean result = eventSourceClient.start("channel-test", "token-test"); @@ -57,7 +58,7 @@ public void startShouldReconnect() throws IOException { SSEMockServer sseServer = buildSSEMockServer(eventQueue); TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(InMemoryTelemetryStorage.class); sseServer.start(); - EventSourceClient eventSourceClient = new EventSourceClientImp("http://fake:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null); + EventSourceClient eventSourceClient = new EventSourceClientImp("http://fake:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null, new RequestDecorator(null)); boolean result = eventSourceClient.start("channel-test", "token-test"); @@ -74,7 +75,7 @@ public void startAndReceiveNotification() throws IOException { SSEMockServer sseServer = buildSSEMockServer(eventQueue); TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(InMemoryTelemetryStorage.class); sseServer.start(); - EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null); + EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null, new RequestDecorator(null)); boolean result = eventSourceClient.start("channel-test", "token-test"); diff --git a/client/src/test/java/io/split/engine/sse/SSEClientTest.java b/client/src/test/java/io/split/engine/sse/SSEClientTest.java index 97aaa4de5..15f13d3b3 100644 --- a/client/src/test/java/io/split/engine/sse/SSEClientTest.java +++ b/client/src/test/java/io/split/engine/sse/SSEClientTest.java @@ -1,5 +1,6 @@ package io.split.engine.sse; +import io.split.client.RequestDecorator; import io.split.engine.sse.client.SSEClient; import io.split.telemetry.storage.InMemoryTelemetryStorage; import io.split.telemetry.storage.TelemetryRuntimeProducer; @@ -38,7 +39,7 @@ public void basicUsageTest() throws URISyntaxException, InterruptedException { CloseableHttpClient httpClient = httpClientbuilder.build(); SSEClient sse = new SSEClient(e -> null, - s -> null, httpClient, telemetryRuntimeProducer, null); + s -> null, httpClient, telemetryRuntimeProducer, null, new RequestDecorator(null)); sse.open(uri); Thread.sleep(5000); sse.close(); From 1df6d661777ed157aa4c8ec1f3bb5def321db2e1 Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Fri, 29 Mar 2024 13:05:57 -0300 Subject: [PATCH 2/2] preparing rc --- client/pom.xml | 2 +- pluggable-storage/pom.xml | 2 +- pom.xml | 2 +- redis-wrapper/pom.xml | 2 +- testing/pom.xml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index a0becea3c..12cda4f36 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -5,7 +5,7 @@ io.split.client java-client-parent - 4.12.0-rc1 + 4.12.0-rc2 java-client jar diff --git a/pluggable-storage/pom.xml b/pluggable-storage/pom.xml index 30b90d945..d72620667 100644 --- a/pluggable-storage/pom.xml +++ b/pluggable-storage/pom.xml @@ -6,7 +6,7 @@ java-client-parent io.split.client - 4.12.0-rc1 + 4.12.0-rc2 2.1.0 diff --git a/pom.xml b/pom.xml index 3b7e17ed1..06f94991b 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 io.split.client java-client-parent - 4.12.0-rc1 + 4.12.0-rc2 diff --git a/redis-wrapper/pom.xml b/redis-wrapper/pom.xml index 962d8c51f..498ec361b 100644 --- a/redis-wrapper/pom.xml +++ b/redis-wrapper/pom.xml @@ -6,7 +6,7 @@ java-client-parent io.split.client - 4.12.0-rc1 + 4.12.0-rc2 redis-wrapper 3.1.0 diff --git a/testing/pom.xml b/testing/pom.xml index d0f537dc6..4bbe4b31d 100644 --- a/testing/pom.xml +++ b/testing/pom.xml @@ -5,7 +5,7 @@ io.split.client java-client-parent - 4.12.0-rc1 + 4.12.0-rc2 java-client-testing jar