From ecc9ed10470bfc4b0be9a4552cf736f859086bcd Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Mon, 29 Mar 2021 18:28:35 -0300 Subject: [PATCH 1/2] Fixing streaming retry --- .../java/io/split/engine/common/SynchronizerImp.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index 4e8115b18..0d22ff9f2 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -19,6 +19,7 @@ public class SynchronizerImp implements Synchronizer { private static final Logger _log = LoggerFactory.getLogger(Synchronizer.class); + private static final int RETRIES_NUMBER = 10; private final SplitSynchronizationTask _splitSynchronizationTask; private final SplitFetcher _splitFetcher; @@ -69,8 +70,10 @@ public void stopPeriodicFetching() { @Override public void refreshSplits(long targetChangeNumber) { - if (targetChangeNumber > _splitCache.getChangeNumber()) { + int retries = 1; + while(targetChangeNumber > _splitCache.getChangeNumber() && retries <= RETRIES_NUMBER) { _splitFetcher.forceRefresh(true); + retries++; } } @@ -84,7 +87,8 @@ public void localKillSplit(String splitName, String defaultTreatment, long newCh @Override public void refreshSegment(String segmentName, long changeNumber) { - if (changeNumber > _segmentCache.getChangeNumber(segmentName)) { + int retries = 1; + while(changeNumber > _segmentCache.getChangeNumber(segmentName) && retries <= RETRIES_NUMBER) { SegmentFetcher fetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName); try{ fetcher.fetch(true); @@ -93,6 +97,7 @@ public void refreshSegment(String segmentName, long changeNumber) { catch (NullPointerException np){ throw new NullPointerException(); } + retries++; } } } From f93ae28923db4665737e03fa17997b4e23a55fd6 Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Tue, 6 Apr 2021 10:42:18 -0300 Subject: [PATCH 2/2] Adding tests --- .../split/engine/common/SynchronizerTest.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index f4e0bec31..5bff9d000 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -4,6 +4,7 @@ import io.split.cache.SplitCache; import io.split.engine.experiments.SplitFetcherImp; import io.split.engine.experiments.SplitSynchronizationTask; +import io.split.engine.segments.SegmentFetcher; import io.split.engine.segments.SegmentSynchronizationTask; import org.junit.Before; import org.junit.Test; @@ -15,6 +16,7 @@ public class SynchronizerTest { private SplitFetcherImp _splitFetcher; private SplitCache _splitCache; private Synchronizer _synchronizer; + private SegmentCache _segmentCache; @Before public void beforeMethod() { @@ -22,7 +24,7 @@ public void beforeMethod() { _segmentFetcher = Mockito.mock(SegmentSynchronizationTask.class); _splitFetcher = Mockito.mock(SplitFetcherImp.class); _splitCache = Mockito.mock(SplitCache.class); - SegmentCache _segmentCache = Mockito.mock(SegmentCache.class); + _segmentCache = Mockito.mock(SegmentCache.class); _synchronizer = new SynchronizerImp(_refreshableSplitFetcherTask, _splitFetcher, _segmentFetcher, _splitCache, _segmentCache); } @@ -51,4 +53,23 @@ public void stopPeriodicFetching() { Mockito.verify(_refreshableSplitFetcherTask, Mockito.times(1)).stop(); Mockito.verify(_segmentFetcher, Mockito.times(1)).stop(); } + + @Test + public void streamingRetryOnSplit() { + Mockito.when(_splitCache.getChangeNumber()).thenReturn(0l).thenReturn(0l).thenReturn(1l); + _synchronizer.refreshSplits(1l); + + Mockito.verify(_splitCache, Mockito.times(3)).getChangeNumber(); + } + + @Test + public void streamingRetryOnSegment() { + SegmentFetcher fetcher = Mockito.mock(SegmentFetcher.class); + Mockito.when(_segmentFetcher.getFetcher(Mockito.anyString())).thenReturn(fetcher); + Mockito.when(_segmentCache.getChangeNumber(Mockito.anyString())).thenReturn(0l).thenReturn(0l).thenReturn(1l); + _synchronizer.refreshSegment("Segment",1l); + + Mockito.verify(_segmentCache, Mockito.times(3)).getChangeNumber(Mockito.anyString()); + } + }