diff --git a/client/CHANGES.txt b/client/CHANGES.txt index 4fa73125c..43236c201 100644 --- a/client/CHANGES.txt +++ b/client/CHANGES.txt @@ -1,5 +1,8 @@ CHANGES +4.1.1 (Sep 30, 2020) +- Fixed fetch retries after received an SPLIT_CHANGE. + 4.1.0 (Sep 25, 2020) - Add local impressions deduping (enabled by default) diff --git a/client/pom.xml b/client/pom.xml index c0b4c6556..747594a03 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -5,7 +5,7 @@ io.split.client java-client-parent - 4.1.0 + 4.1.1 java-client jar diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index 6f2ebc2ce..9d370391c 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -38,7 +38,7 @@ public SynchronizerImp(RefreshableSplitFetcherProvider refreshableSplitFetcherPr @Override public void syncAll() { _syncAllScheduledExecutorService.schedule(() -> { - _splitFetcher.forceRefresh(); + _splitFetcher.run(); _segmentFetcher.forceRefreshAll(); }, 0, TimeUnit.SECONDS); } @@ -68,6 +68,7 @@ public void refreshSplits(long targetChangeNumber) { public void localKillSplit(String splitName, String defaultTreatment, long newChangeNumber) { if (newChangeNumber > _splitFetcher.changeNumber()) { _splitFetcher.killSplit(splitName, defaultTreatment, newChangeNumber); + refreshSplits(newChangeNumber); } } diff --git a/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java b/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java index ade51595b..4ef49750f 100644 --- a/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java +++ b/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java @@ -82,7 +82,23 @@ public RefreshableSplitFetcher(SplitChangeFetcher splitChangeFetcher, SplitParse @Override public void forceRefresh() { - run(); + _log.debug("Force Refresh splits starting ..."); + try { + while (true) { + long start = _changeNumber.get(); + runWithoutExceptionHandling(); + long end = _changeNumber.get(); + + if (start >= end) { + break; + } + } + } catch (InterruptedException e) { + _log.warn("Interrupting split fetcher task"); + Thread.currentThread().interrupt(); + } catch (Throwable t) { + _log.error("RefreshableSplitFetcher failed: " + t.getMessage()); + } } @Override @@ -171,8 +187,7 @@ public void runWithoutExceptionHandling() throws InterruptedException { return; } - if (change.since != _changeNumber.get() - || change.till < _changeNumber.get()) { + if (change.since != _changeNumber.get() || change.till < _changeNumber.get()) { // some other thread may have updated the shared state. exit return; } @@ -257,20 +272,5 @@ public void runWithoutExceptionHandling() throws InterruptedException { _changeNumber.set(change.till); } - - } - - private List collectSegmentsInUse(Split split) { - List result = Lists.newArrayList(); - for (Condition condition : split.conditions) { - for (Matcher matcher : condition.matcherGroup.matchers) { - if (matcher.matcherType == MatcherType.IN_SEGMENT) { - if (matcher.userDefinedSegmentMatcherData != null && matcher.userDefinedSegmentMatcherData.segmentName != null) { - result.add(matcher.userDefinedSegmentMatcherData.segmentName); - } - } - } - } - return result; } } diff --git a/client/src/main/java/io/split/engine/segments/RefreshableSegment.java b/client/src/main/java/io/split/engine/segments/RefreshableSegment.java index 664aff4c1..656f5bcec 100644 --- a/client/src/main/java/io/split/engine/segments/RefreshableSegment.java +++ b/client/src/main/java/io/split/engine/segments/RefreshableSegment.java @@ -45,7 +45,20 @@ public boolean contains(String key) { @Override public void forceRefresh() { - run(); + try { + _log.debug("Force Refresh segment starting ..."); + while (true) { + long start = _changeNumber.get(); + runWithoutExceptionHandling(); + long end = _changeNumber.get(); + + if (start >= end) { + break; + } + } + } catch (Throwable t) { + _log.error("forceRefresh segment failed: " + t.getMessage()); + } } @Override diff --git a/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java b/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java index 53f4b35f1..1b1de0b6c 100644 --- a/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java +++ b/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java @@ -103,7 +103,7 @@ private void onMessage(RawEvent event) { } catch (EventParsingException ex) { _log.debug(String.format("Error parsing the event: %s. Payload: %s", ex.getMessage(), ex.getPayload())); } catch (Exception e) { - _log.warn(String.format("Error onMessage: %s", e.getMessage())); + _log.debug(String.format("Error onMessage: %s", e.getMessage())); } } } \ No newline at end of file diff --git a/client/src/test/java/io/split/SplitMockServer.java b/client/src/test/java/io/split/SplitMockServer.java index 6aad00556..209f26e06 100644 --- a/client/src/test/java/io/split/SplitMockServer.java +++ b/client/src/test/java/io/split/SplitMockServer.java @@ -46,12 +46,16 @@ public MockResponse dispatch(RecordedRequest request) { return new MockResponse().setBody(inputStreamToString("splits2.json")); case "/api/splitChanges?since=1585948850111": return new MockResponse().setBody(inputStreamToString("splits_killed.json")); + case "/api/splitChanges?since=1585948850112": + return new MockResponse().setBody("{\"splits\": [], \"since\":1585948850112, \"till\":1585948850112}"); case "/api/segmentChanges/segment-test?since=-1": return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": -1,\"till\": -1}"); case "/api/segmentChanges/segment3?since=-1": return new MockResponse().setBody(inputStreamToString("segment3.json")); case "/api/segmentChanges/segment3?since=1585948850110": return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850110}"); + case "/api/segmentChanges/segment3?since=1585948850111": + return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850111,\"till\": 1585948850111}"); case "/api/metrics/time": case "api/metrics/counter": return new MockResponse().setResponseCode(200); diff --git a/client/src/test/java/io/split/client/SplitClientIntegrationTest.java b/client/src/test/java/io/split/client/SplitClientIntegrationTest.java index 5f33bb0a6..05026ee1f 100644 --- a/client/src/test/java/io/split/client/SplitClientIntegrationTest.java +++ b/client/src/test/java/io/split/client/SplitClientIntegrationTest.java @@ -402,7 +402,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte Awaitility.await() .atMost(50L, TimeUnit.SECONDS) - .until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test"))); + .until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test"))); Awaitility.await() .atMost(50L, TimeUnit.SECONDS) @@ -420,30 +420,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte Awaitility.await() .atMost(50L, TimeUnit.SECONDS) - .until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test"))); - - Awaitility.await() - .atMost(50L, TimeUnit.SECONDS) - .until(() -> "on_whitelist".equals(client2.getTreatment("admin", "push_test"))); - - Awaitility.await() - .atMost(50L, TimeUnit.SECONDS) - .until(() -> "after_notification_received".equals(client3.getTreatment("admin", "push_test"))); - - Awaitility.await() - .atMost(50L, TimeUnit.SECONDS) - .until(() -> "on_whitelist".equals(client4.getTreatment("admin", "push_test"))); - - OutboundSseEvent sseEventSplitUpdate3 = new OutboundEvent - .Builder() - .name("message") - .data("{\"id\":\"22\",\"clientId\":\"22\",\"timestamp\":1592590436082,\"encoding\":\"json\",\"channel\":\"xxxx_xxxx_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1585948850112}\"}") - .build(); - eventQueue3.push(sseEventSplitUpdate3); - - Awaitility.await() - .atMost(50L, TimeUnit.SECONDS) - .until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test"))); + .until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test"))); Awaitility.await() .atMost(50L, TimeUnit.SECONDS) diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index 299b83208..544e8fca6 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -1,8 +1,12 @@ package io.split.engine.common; +import io.split.client.HttpSegmentChangeFetcher; +import io.split.engine.SDKReadinessGates; import io.split.engine.experiments.RefreshableSplitFetcher; import io.split.engine.experiments.RefreshableSplitFetcherProvider; import io.split.engine.segments.RefreshableSegmentFetcher; +import io.split.engine.segments.SegmentChangeFetcher; +import org.apache.http.impl.client.CloseableHttpClient; import org.junit.Test; import org.mockito.Mockito; @@ -21,7 +25,7 @@ public void syncAll() throws InterruptedException { synchronizer.syncAll(); Thread.sleep(100); - Mockito.verify(splitFetcher, Mockito.times(1)).forceRefresh(); + Mockito.verify(splitFetcher, Mockito.times(1)).run(); Mockito.verify(segmentFetcher, Mockito.times(1)).forceRefreshAll(); } diff --git a/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java b/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java index 35243957c..8db49054b 100644 --- a/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java +++ b/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java @@ -60,7 +60,7 @@ public void killShouldTriggerFetch() { } @Test - public void messagesNotProcesedWhenWorkerStopped() throws InterruptedException { + public void messagesNotProcessedWhenWorkerStopped() throws InterruptedException { Synchronizer syncMock = Mockito.mock(Synchronizer.class); SplitsWorker splitsWorker = new SplitsWorkerImp(syncMock); splitsWorker.start(); diff --git a/pom.xml b/pom.xml index a46387c69..9f696ef6d 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 io.split.client java-client-parent - 4.1.0 + 4.1.1 diff --git a/testing/pom.xml b/testing/pom.xml index 7cf50657e..2ae2feb11 100644 --- a/testing/pom.xml +++ b/testing/pom.xml @@ -6,7 +6,7 @@ io.split.client java-client-parent - 4.1.0 + 4.1.1 java-client-testing