From 3f3c0fc898c882784b6ee0a9b9cc4fd13298efe3 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Wed, 23 Sep 2020 17:36:19 -0300 Subject: [PATCH 1/5] fixed fetchers retry until changeNumber > cahce.ChangeNumber --- .../split/engine/common/SynchronizerImp.java | 9 ++-- .../engine/sse/EventSourceClientImp.java | 2 +- .../test/java/io/split/SplitMockServer.java | 4 +- .../split/engine/common/SynchronizerTest.java | 48 +++++++++++++++++++ .../engine/sse/workers/SplitsWorkerTest.java | 2 +- 5 files changed, 58 insertions(+), 7 deletions(-) diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index 6f2ebc2ce..caa375c1e 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -58,8 +58,8 @@ public void stopPeriodicFetching() { } @Override - public void refreshSplits(long targetChangeNumber) { - if (targetChangeNumber > _splitFetcher.changeNumber()) { + public synchronized void refreshSplits(long targetChangeNumber) { + while (targetChangeNumber > _splitFetcher.changeNumber()) { _splitFetcher.forceRefresh(); } } @@ -68,12 +68,13 @@ public void refreshSplits(long targetChangeNumber) { public void localKillSplit(String splitName, String defaultTreatment, long newChangeNumber) { if (newChangeNumber > _splitFetcher.changeNumber()) { _splitFetcher.killSplit(splitName, defaultTreatment, newChangeNumber); + refreshSplits(newChangeNumber); } } @Override - public void refreshSegment(String segmentName, long changeNumber) { - if (changeNumber > _segmentFetcher.getChangeNumber(segmentName)) { + public synchronized void refreshSegment(String segmentName, long changeNumber) { + while (changeNumber > _segmentFetcher.getChangeNumber(segmentName)) { _segmentFetcher.forceRefresh(segmentName); } } diff --git a/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java b/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java index 53f4b35f1..1b1de0b6c 100644 --- a/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java +++ b/client/src/main/java/io/split/engine/sse/EventSourceClientImp.java @@ -103,7 +103,7 @@ private void onMessage(RawEvent event) { } catch (EventParsingException ex) { _log.debug(String.format("Error parsing the event: %s. Payload: %s", ex.getMessage(), ex.getPayload())); } catch (Exception e) { - _log.warn(String.format("Error onMessage: %s", e.getMessage())); + _log.debug(String.format("Error onMessage: %s", e.getMessage())); } } } \ No newline at end of file diff --git a/client/src/test/java/io/split/SplitMockServer.java b/client/src/test/java/io/split/SplitMockServer.java index 6aad00556..b08819eb7 100644 --- a/client/src/test/java/io/split/SplitMockServer.java +++ b/client/src/test/java/io/split/SplitMockServer.java @@ -51,7 +51,9 @@ public MockResponse dispatch(RecordedRequest request) { case "/api/segmentChanges/segment3?since=-1": return new MockResponse().setBody(inputStreamToString("segment3.json")); case "/api/segmentChanges/segment3?since=1585948850110": - return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850110}"); + return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850111}"); + case "/api/segmentChanges/segment3?since=1585948850111": + return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850111,\"till\": 1585948850111}"); case "/api/metrics/time": case "api/metrics/counter": return new MockResponse().setResponseCode(200); diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index 299b83208..17de6d837 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -1,8 +1,12 @@ package io.split.engine.common; +import io.split.client.HttpSegmentChangeFetcher; +import io.split.engine.SDKReadinessGates; import io.split.engine.experiments.RefreshableSplitFetcher; import io.split.engine.experiments.RefreshableSplitFetcherProvider; import io.split.engine.segments.RefreshableSegmentFetcher; +import io.split.engine.segments.SegmentChangeFetcher; +import org.apache.http.impl.client.CloseableHttpClient; import org.junit.Test; import org.mockito.Mockito; @@ -56,4 +60,48 @@ public void stopPeriodicFetching() { Mockito.verify(refreshableSplitFetcherProvider, Mockito.times(1)).stop(); Mockito.verify(segmentFetcher, Mockito.times(1)).stop(); } + + @Test + public void refreshSplits() { + RefreshableSplitFetcherProvider refreshableSplitFetcherProvider = Mockito.mock(RefreshableSplitFetcherProvider.class); + RefreshableSegmentFetcher segmentFetcher = Mockito.mock(RefreshableSegmentFetcher.class); + RefreshableSplitFetcher splitFetcher = Mockito.mock(RefreshableSplitFetcher.class); + + Mockito.when(refreshableSplitFetcherProvider.getFetcher()) + .thenReturn(splitFetcher); + + Mockito.when(splitFetcher.changeNumber()) + .thenReturn(450L) + .thenReturn(460L) + .thenReturn(470L) + .thenReturn(480L) + .thenReturn(500L); + + Synchronizer synchronizer = new SynchronizerImp(refreshableSplitFetcherProvider, segmentFetcher); + synchronizer.refreshSplits(500); + + Mockito.verify(splitFetcher, Mockito.times(4)).forceRefresh(); + } + + @Test + public void refreshSegment() { + RefreshableSplitFetcherProvider refreshableSplitFetcherProvider = Mockito.mock(RefreshableSplitFetcherProvider.class); + RefreshableSegmentFetcher segmentFetcher = Mockito.mock(RefreshableSegmentFetcher.class); + RefreshableSplitFetcher splitFetcher = Mockito.mock(RefreshableSplitFetcher.class); + + Mockito.when(refreshableSplitFetcherProvider.getFetcher()) + .thenReturn(splitFetcher); + + Mockito.when(segmentFetcher.getChangeNumber("segment-name")) + .thenReturn(450L) + .thenReturn(460L) + .thenReturn(470L) + .thenReturn(480L) + .thenReturn(500L); + + Synchronizer synchronizer = new SynchronizerImp(refreshableSplitFetcherProvider, segmentFetcher); + synchronizer.refreshSegment("segment-name", 500); + + Mockito.verify(segmentFetcher, Mockito.times(4)).forceRefresh("segment-name"); + } } diff --git a/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java b/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java index 35243957c..8db49054b 100644 --- a/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java +++ b/client/src/test/java/io/split/engine/sse/workers/SplitsWorkerTest.java @@ -60,7 +60,7 @@ public void killShouldTriggerFetch() { } @Test - public void messagesNotProcesedWhenWorkerStopped() throws InterruptedException { + public void messagesNotProcessedWhenWorkerStopped() throws InterruptedException { Synchronizer syncMock = Mockito.mock(Synchronizer.class); SplitsWorker splitsWorker = new SplitsWorkerImp(syncMock); splitsWorker.start(); From 36a22c60d3d7d73c75c456247a4b4bd697835185 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Fri, 25 Sep 2020 14:50:02 -0300 Subject: [PATCH 2/5] updated approach --- .../split/engine/common/SynchronizerImp.java | 8 +- .../experiments/RefreshableSplitFetcher.java | 166 ++++++++---------- .../split/engine/common/SynchronizerTest.java | 44 ----- 3 files changed, 80 insertions(+), 138 deletions(-) diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index caa375c1e..643e19ec7 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -58,8 +58,8 @@ public void stopPeriodicFetching() { } @Override - public synchronized void refreshSplits(long targetChangeNumber) { - while (targetChangeNumber > _splitFetcher.changeNumber()) { + public void refreshSplits(long targetChangeNumber) { + if (targetChangeNumber > _splitFetcher.changeNumber()) { _splitFetcher.forceRefresh(); } } @@ -73,8 +73,8 @@ public void localKillSplit(String splitName, String defaultTreatment, long newCh } @Override - public synchronized void refreshSegment(String segmentName, long changeNumber) { - while (changeNumber > _segmentFetcher.getChangeNumber(segmentName)) { + public void refreshSegment(String segmentName, long changeNumber) { + if (changeNumber > _segmentFetcher.getChangeNumber(segmentName)) { _segmentFetcher.forceRefresh(segmentName); } } diff --git a/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java b/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java index ade51595b..40c1ba837 100644 --- a/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java +++ b/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java @@ -160,117 +160,103 @@ public void run() { } public void runWithoutExceptionHandling() throws InterruptedException { - SplitChange change = _splitChangeFetcher.fetch(_changeNumber.get()); + while (true) { + SplitChange change = _splitChangeFetcher.fetch(_changeNumber.get()); - if (change == null) { - throw new IllegalStateException("SplitChange was null"); - } - - if (change.till == _changeNumber.get()) { - // no change. - return; - } - - if (change.since != _changeNumber.get() - || change.till < _changeNumber.get()) { - // some other thread may have updated the shared state. exit - return; - } + if (change == null) { + throw new IllegalStateException("SplitChange was null"); + } - if (change.splits.isEmpty()) { - // there are no changes. weird! - _changeNumber.set(change.till); - return; - } + if (change.till == _changeNumber.get()) { + // no change. + break; + } - synchronized (_lock) { - // check state one more time. - if (change.since != _changeNumber.get() - || change.till < _changeNumber.get()) { + if (change.since != _changeNumber.get() || change.till < _changeNumber.get()) { // some other thread may have updated the shared state. exit - return; + break; } - Set toRemove = Sets.newHashSet(); - Map toAdd = Maps.newHashMap(); - List trafficTypeNamesToRemove = Lists.newArrayList(); - List trafficTypeNamesToAdd = Lists.newArrayList(); + if (change.splits.isEmpty()) { + // there are no changes. weird! + _changeNumber.set(change.till); + break; + } - for (Split split : change.splits) { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException(); + synchronized (_lock) { + // check state one more time. + if (change.since != _changeNumber.get() + || change.till < _changeNumber.get()) { + // some other thread may have updated the shared state. exit + break; } - if (split.status != Status.ACTIVE) { - // archive. - toRemove.add(split.name); - if (split.trafficTypeName != null) { - trafficTypeNamesToRemove.add(split.trafficTypeName); - } - continue; - } + Set toRemove = Sets.newHashSet(); + Map toAdd = Maps.newHashMap(); + List trafficTypeNamesToRemove = Lists.newArrayList(); + List trafficTypeNamesToAdd = Lists.newArrayList(); - ParsedSplit parsedSplit = _parser.parse(split); - if (parsedSplit == null) { - _log.info("We could not parse the experiment definition for: " + split.name + " so we are removing it completely to be careful"); - toRemove.add(split.name); - if (split.trafficTypeName != null) { - trafficTypeNamesToRemove.add(split.trafficTypeName); + for (Split split : change.splits) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); } - continue; - } - - toAdd.put(split.name, parsedSplit); - - // If the split already exists, this is either an update, or the split has been - // deleted and recreated (possibly with a different traffic type). - // If it's an update, the traffic type should NOT be increased. - // If it's deleted & recreated, the old one should be decreased and the new one increased. - // To handle both cases, we simply delete the old one if the split is present. - // The new one is always increased. - ParsedSplit current = _concurrentMap.get(split.name); - if (current != null && current.trafficTypeName() != null) { - trafficTypeNamesToRemove.add(current.trafficTypeName()); - } - if (split.trafficTypeName != null) { - trafficTypeNamesToAdd.add(split.trafficTypeName); - } - } + if (split.status != Status.ACTIVE) { + // archive. + toRemove.add(split.name); + if (split.trafficTypeName != null) { + trafficTypeNamesToRemove.add(split.trafficTypeName); + } + continue; + } - _concurrentMap.putAll(toAdd); - _concurrentTrafficTypeNameSet.addAll(trafficTypeNamesToAdd); - //removeAll does not work here, since it wont remove all the occurrences, just one - Multisets.removeOccurrences(_concurrentTrafficTypeNameSet, trafficTypeNamesToRemove); + ParsedSplit parsedSplit = _parser.parse(split); + if (parsedSplit == null) { + _log.info("We could not parse the experiment definition for: " + split.name + " so we are removing it completely to be careful"); + toRemove.add(split.name); + if (split.trafficTypeName != null) { + trafficTypeNamesToRemove.add(split.trafficTypeName); + } + continue; + } - for (String remove : toRemove) { - _concurrentMap.remove(remove); - } + toAdd.put(split.name, parsedSplit); + + // If the split already exists, this is either an update, or the split has been + // deleted and recreated (possibly with a different traffic type). + // If it's an update, the traffic type should NOT be increased. + // If it's deleted & recreated, the old one should be decreased and the new one increased. + // To handle both cases, we simply delete the old one if the split is present. + // The new one is always increased. + ParsedSplit current = _concurrentMap.get(split.name); + if (current != null && current.trafficTypeName() != null) { + trafficTypeNamesToRemove.add(current.trafficTypeName()); + } - if (!toAdd.isEmpty()) { - _log.debug("Updated features: " + toAdd.keySet()); - } + if (split.trafficTypeName != null) { + trafficTypeNamesToAdd.add(split.trafficTypeName); + } + } - if (!toRemove.isEmpty()) { - _log.debug("Deleted features: " + toRemove); - } + _concurrentMap.putAll(toAdd); + _concurrentTrafficTypeNameSet.addAll(trafficTypeNamesToAdd); + //removeAll does not work here, since it wont remove all the occurrences, just one + Multisets.removeOccurrences(_concurrentTrafficTypeNameSet, trafficTypeNamesToRemove); - _changeNumber.set(change.till); - } + for (String remove : toRemove) { + _concurrentMap.remove(remove); + } - } + if (!toAdd.isEmpty()) { + _log.debug("Updated features: " + toAdd.keySet()); + } - private List collectSegmentsInUse(Split split) { - List result = Lists.newArrayList(); - for (Condition condition : split.conditions) { - for (Matcher matcher : condition.matcherGroup.matchers) { - if (matcher.matcherType == MatcherType.IN_SEGMENT) { - if (matcher.userDefinedSegmentMatcherData != null && matcher.userDefinedSegmentMatcherData.segmentName != null) { - result.add(matcher.userDefinedSegmentMatcherData.segmentName); - } + if (!toRemove.isEmpty()) { + _log.debug("Deleted features: " + toRemove); } + + _changeNumber.set(change.till); } } - return result; } } diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index 17de6d837..d69a759f3 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -60,48 +60,4 @@ public void stopPeriodicFetching() { Mockito.verify(refreshableSplitFetcherProvider, Mockito.times(1)).stop(); Mockito.verify(segmentFetcher, Mockito.times(1)).stop(); } - - @Test - public void refreshSplits() { - RefreshableSplitFetcherProvider refreshableSplitFetcherProvider = Mockito.mock(RefreshableSplitFetcherProvider.class); - RefreshableSegmentFetcher segmentFetcher = Mockito.mock(RefreshableSegmentFetcher.class); - RefreshableSplitFetcher splitFetcher = Mockito.mock(RefreshableSplitFetcher.class); - - Mockito.when(refreshableSplitFetcherProvider.getFetcher()) - .thenReturn(splitFetcher); - - Mockito.when(splitFetcher.changeNumber()) - .thenReturn(450L) - .thenReturn(460L) - .thenReturn(470L) - .thenReturn(480L) - .thenReturn(500L); - - Synchronizer synchronizer = new SynchronizerImp(refreshableSplitFetcherProvider, segmentFetcher); - synchronizer.refreshSplits(500); - - Mockito.verify(splitFetcher, Mockito.times(4)).forceRefresh(); - } - - @Test - public void refreshSegment() { - RefreshableSplitFetcherProvider refreshableSplitFetcherProvider = Mockito.mock(RefreshableSplitFetcherProvider.class); - RefreshableSegmentFetcher segmentFetcher = Mockito.mock(RefreshableSegmentFetcher.class); - RefreshableSplitFetcher splitFetcher = Mockito.mock(RefreshableSplitFetcher.class); - - Mockito.when(refreshableSplitFetcherProvider.getFetcher()) - .thenReturn(splitFetcher); - - Mockito.when(segmentFetcher.getChangeNumber("segment-name")) - .thenReturn(450L) - .thenReturn(460L) - .thenReturn(470L) - .thenReturn(480L) - .thenReturn(500L); - - Synchronizer synchronizer = new SynchronizerImp(refreshableSplitFetcherProvider, segmentFetcher); - synchronizer.refreshSegment("segment-name", 500); - - Mockito.verify(segmentFetcher, Mockito.times(4)).forceRefresh("segment-name"); - } } From b3217be108856b19ae3b3a69af2eab4189373792 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Mon, 28 Sep 2020 17:58:26 -0300 Subject: [PATCH 3/5] updated approach --- .../split/engine/common/SynchronizerImp.java | 2 +- .../experiments/RefreshableSplitFetcher.java | 168 ++++++++++-------- .../engine/segments/RefreshableSegment.java | 15 +- .../test/java/io/split/SplitMockServer.java | 4 +- .../client/SplitClientIntegrationTest.java | 27 +-- .../split/engine/common/SynchronizerTest.java | 2 +- 6 files changed, 112 insertions(+), 106 deletions(-) diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index 643e19ec7..9d370391c 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -38,7 +38,7 @@ public SynchronizerImp(RefreshableSplitFetcherProvider refreshableSplitFetcherPr @Override public void syncAll() { _syncAllScheduledExecutorService.schedule(() -> { - _splitFetcher.forceRefresh(); + _splitFetcher.run(); _segmentFetcher.forceRefreshAll(); }, 0, TimeUnit.SECONDS); } diff --git a/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java b/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java index 40c1ba837..4ef49750f 100644 --- a/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java +++ b/client/src/main/java/io/split/engine/experiments/RefreshableSplitFetcher.java @@ -82,7 +82,23 @@ public RefreshableSplitFetcher(SplitChangeFetcher splitChangeFetcher, SplitParse @Override public void forceRefresh() { - run(); + _log.debug("Force Refresh splits starting ..."); + try { + while (true) { + long start = _changeNumber.get(); + runWithoutExceptionHandling(); + long end = _changeNumber.get(); + + if (start >= end) { + break; + } + } + } catch (InterruptedException e) { + _log.warn("Interrupting split fetcher task"); + Thread.currentThread().interrupt(); + } catch (Throwable t) { + _log.error("RefreshableSplitFetcher failed: " + t.getMessage()); + } } @Override @@ -160,103 +176,101 @@ public void run() { } public void runWithoutExceptionHandling() throws InterruptedException { - while (true) { - SplitChange change = _splitChangeFetcher.fetch(_changeNumber.get()); + SplitChange change = _splitChangeFetcher.fetch(_changeNumber.get()); - if (change == null) { - throw new IllegalStateException("SplitChange was null"); - } + if (change == null) { + throw new IllegalStateException("SplitChange was null"); + } - if (change.till == _changeNumber.get()) { - // no change. - break; - } + if (change.till == _changeNumber.get()) { + // no change. + return; + } - if (change.since != _changeNumber.get() || change.till < _changeNumber.get()) { + if (change.since != _changeNumber.get() || change.till < _changeNumber.get()) { + // some other thread may have updated the shared state. exit + return; + } + + if (change.splits.isEmpty()) { + // there are no changes. weird! + _changeNumber.set(change.till); + return; + } + + synchronized (_lock) { + // check state one more time. + if (change.since != _changeNumber.get() + || change.till < _changeNumber.get()) { // some other thread may have updated the shared state. exit - break; + return; } - if (change.splits.isEmpty()) { - // there are no changes. weird! - _changeNumber.set(change.till); - break; - } + Set toRemove = Sets.newHashSet(); + Map toAdd = Maps.newHashMap(); + List trafficTypeNamesToRemove = Lists.newArrayList(); + List trafficTypeNamesToAdd = Lists.newArrayList(); - synchronized (_lock) { - // check state one more time. - if (change.since != _changeNumber.get() - || change.till < _changeNumber.get()) { - // some other thread may have updated the shared state. exit - break; + for (Split split : change.splits) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); } - Set toRemove = Sets.newHashSet(); - Map toAdd = Maps.newHashMap(); - List trafficTypeNamesToRemove = Lists.newArrayList(); - List trafficTypeNamesToAdd = Lists.newArrayList(); - - for (Split split : change.splits) { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException(); - } - - if (split.status != Status.ACTIVE) { - // archive. - toRemove.add(split.name); - if (split.trafficTypeName != null) { - trafficTypeNamesToRemove.add(split.trafficTypeName); - } - continue; - } - - ParsedSplit parsedSplit = _parser.parse(split); - if (parsedSplit == null) { - _log.info("We could not parse the experiment definition for: " + split.name + " so we are removing it completely to be careful"); - toRemove.add(split.name); - if (split.trafficTypeName != null) { - trafficTypeNamesToRemove.add(split.trafficTypeName); - } - continue; - } - - toAdd.put(split.name, parsedSplit); - - // If the split already exists, this is either an update, or the split has been - // deleted and recreated (possibly with a different traffic type). - // If it's an update, the traffic type should NOT be increased. - // If it's deleted & recreated, the old one should be decreased and the new one increased. - // To handle both cases, we simply delete the old one if the split is present. - // The new one is always increased. - ParsedSplit current = _concurrentMap.get(split.name); - if (current != null && current.trafficTypeName() != null) { - trafficTypeNamesToRemove.add(current.trafficTypeName()); + if (split.status != Status.ACTIVE) { + // archive. + toRemove.add(split.name); + if (split.trafficTypeName != null) { + trafficTypeNamesToRemove.add(split.trafficTypeName); } + continue; + } + ParsedSplit parsedSplit = _parser.parse(split); + if (parsedSplit == null) { + _log.info("We could not parse the experiment definition for: " + split.name + " so we are removing it completely to be careful"); + toRemove.add(split.name); if (split.trafficTypeName != null) { - trafficTypeNamesToAdd.add(split.trafficTypeName); + trafficTypeNamesToRemove.add(split.trafficTypeName); } + continue; } - _concurrentMap.putAll(toAdd); - _concurrentTrafficTypeNameSet.addAll(trafficTypeNamesToAdd); - //removeAll does not work here, since it wont remove all the occurrences, just one - Multisets.removeOccurrences(_concurrentTrafficTypeNameSet, trafficTypeNamesToRemove); - - for (String remove : toRemove) { - _concurrentMap.remove(remove); + toAdd.put(split.name, parsedSplit); + + // If the split already exists, this is either an update, or the split has been + // deleted and recreated (possibly with a different traffic type). + // If it's an update, the traffic type should NOT be increased. + // If it's deleted & recreated, the old one should be decreased and the new one increased. + // To handle both cases, we simply delete the old one if the split is present. + // The new one is always increased. + ParsedSplit current = _concurrentMap.get(split.name); + if (current != null && current.trafficTypeName() != null) { + trafficTypeNamesToRemove.add(current.trafficTypeName()); } - if (!toAdd.isEmpty()) { - _log.debug("Updated features: " + toAdd.keySet()); + if (split.trafficTypeName != null) { + trafficTypeNamesToAdd.add(split.trafficTypeName); } + } - if (!toRemove.isEmpty()) { - _log.debug("Deleted features: " + toRemove); - } + _concurrentMap.putAll(toAdd); + _concurrentTrafficTypeNameSet.addAll(trafficTypeNamesToAdd); + //removeAll does not work here, since it wont remove all the occurrences, just one + Multisets.removeOccurrences(_concurrentTrafficTypeNameSet, trafficTypeNamesToRemove); - _changeNumber.set(change.till); + for (String remove : toRemove) { + _concurrentMap.remove(remove); } + + if (!toAdd.isEmpty()) { + _log.debug("Updated features: " + toAdd.keySet()); + } + + if (!toRemove.isEmpty()) { + _log.debug("Deleted features: " + toRemove); + } + + _changeNumber.set(change.till); } } } diff --git a/client/src/main/java/io/split/engine/segments/RefreshableSegment.java b/client/src/main/java/io/split/engine/segments/RefreshableSegment.java index 664aff4c1..656f5bcec 100644 --- a/client/src/main/java/io/split/engine/segments/RefreshableSegment.java +++ b/client/src/main/java/io/split/engine/segments/RefreshableSegment.java @@ -45,7 +45,20 @@ public boolean contains(String key) { @Override public void forceRefresh() { - run(); + try { + _log.debug("Force Refresh segment starting ..."); + while (true) { + long start = _changeNumber.get(); + runWithoutExceptionHandling(); + long end = _changeNumber.get(); + + if (start >= end) { + break; + } + } + } catch (Throwable t) { + _log.error("forceRefresh segment failed: " + t.getMessage()); + } } @Override diff --git a/client/src/test/java/io/split/SplitMockServer.java b/client/src/test/java/io/split/SplitMockServer.java index b08819eb7..209f26e06 100644 --- a/client/src/test/java/io/split/SplitMockServer.java +++ b/client/src/test/java/io/split/SplitMockServer.java @@ -46,12 +46,14 @@ public MockResponse dispatch(RecordedRequest request) { return new MockResponse().setBody(inputStreamToString("splits2.json")); case "/api/splitChanges?since=1585948850111": return new MockResponse().setBody(inputStreamToString("splits_killed.json")); + case "/api/splitChanges?since=1585948850112": + return new MockResponse().setBody("{\"splits\": [], \"since\":1585948850112, \"till\":1585948850112}"); case "/api/segmentChanges/segment-test?since=-1": return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": -1,\"till\": -1}"); case "/api/segmentChanges/segment3?since=-1": return new MockResponse().setBody(inputStreamToString("segment3.json")); case "/api/segmentChanges/segment3?since=1585948850110": - return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850111}"); + return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850110}"); case "/api/segmentChanges/segment3?since=1585948850111": return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850111,\"till\": 1585948850111}"); case "/api/metrics/time": diff --git a/client/src/test/java/io/split/client/SplitClientIntegrationTest.java b/client/src/test/java/io/split/client/SplitClientIntegrationTest.java index 5f33bb0a6..05026ee1f 100644 --- a/client/src/test/java/io/split/client/SplitClientIntegrationTest.java +++ b/client/src/test/java/io/split/client/SplitClientIntegrationTest.java @@ -402,7 +402,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte Awaitility.await() .atMost(50L, TimeUnit.SECONDS) - .until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test"))); + .until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test"))); Awaitility.await() .atMost(50L, TimeUnit.SECONDS) @@ -420,30 +420,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte Awaitility.await() .atMost(50L, TimeUnit.SECONDS) - .until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test"))); - - Awaitility.await() - .atMost(50L, TimeUnit.SECONDS) - .until(() -> "on_whitelist".equals(client2.getTreatment("admin", "push_test"))); - - Awaitility.await() - .atMost(50L, TimeUnit.SECONDS) - .until(() -> "after_notification_received".equals(client3.getTreatment("admin", "push_test"))); - - Awaitility.await() - .atMost(50L, TimeUnit.SECONDS) - .until(() -> "on_whitelist".equals(client4.getTreatment("admin", "push_test"))); - - OutboundSseEvent sseEventSplitUpdate3 = new OutboundEvent - .Builder() - .name("message") - .data("{\"id\":\"22\",\"clientId\":\"22\",\"timestamp\":1592590436082,\"encoding\":\"json\",\"channel\":\"xxxx_xxxx_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1585948850112}\"}") - .build(); - eventQueue3.push(sseEventSplitUpdate3); - - Awaitility.await() - .atMost(50L, TimeUnit.SECONDS) - .until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test"))); + .until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test"))); Awaitility.await() .atMost(50L, TimeUnit.SECONDS) diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index d69a759f3..544e8fca6 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -25,7 +25,7 @@ public void syncAll() throws InterruptedException { synchronizer.syncAll(); Thread.sleep(100); - Mockito.verify(splitFetcher, Mockito.times(1)).forceRefresh(); + Mockito.verify(splitFetcher, Mockito.times(1)).run(); Mockito.verify(segmentFetcher, Mockito.times(1)).forceRefreshAll(); } From 039ab80d6e96c25902116956b040e521c8259c68 Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Tue, 29 Sep 2020 18:58:48 -0300 Subject: [PATCH 4/5] bump version and changes --- client/CHANGES.txt | 3 +++ client/pom.xml | 2 +- pom.xml | 2 +- testing/pom.xml | 2 +- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/client/CHANGES.txt b/client/CHANGES.txt index 4fa73125c..c33c4507f 100644 --- a/client/CHANGES.txt +++ b/client/CHANGES.txt @@ -1,5 +1,8 @@ CHANGES +4.1.1 (Sep 30, 2020) +- + 4.1.0 (Sep 25, 2020) - Add local impressions deduping (enabled by default) diff --git a/client/pom.xml b/client/pom.xml index c0b4c6556..747594a03 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -5,7 +5,7 @@ io.split.client java-client-parent - 4.1.0 + 4.1.1 java-client jar diff --git a/pom.xml b/pom.xml index a46387c69..9f696ef6d 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 io.split.client java-client-parent - 4.1.0 + 4.1.1 diff --git a/testing/pom.xml b/testing/pom.xml index 7cf50657e..2ae2feb11 100644 --- a/testing/pom.xml +++ b/testing/pom.xml @@ -6,7 +6,7 @@ io.split.client java-client-parent - 4.1.0 + 4.1.1 java-client-testing From 3f0e1e48505905432011d859e427df9e180e717c Mon Sep 17 00:00:00 2001 From: Mauro Sanz Date: Wed, 30 Sep 2020 10:25:38 -0300 Subject: [PATCH 5/5] updated changes --- client/CHANGES.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/CHANGES.txt b/client/CHANGES.txt index c33c4507f..43236c201 100644 --- a/client/CHANGES.txt +++ b/client/CHANGES.txt @@ -1,7 +1,7 @@ CHANGES 4.1.1 (Sep 30, 2020) -- +- Fixed fetch retries after received an SPLIT_CHANGE. 4.1.0 (Sep 25, 2020) - Add local impressions deduping (enabled by default)