diff --git a/client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java b/client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java index d8118a052..7d7d735f6 100644 --- a/client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java +++ b/client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java @@ -28,6 +28,8 @@ public final class HttpSegmentChangeFetcher implements SegmentChangeFetcher { private static final String SINCE = "since"; private static final String PREFIX = "segmentChangeFetcher"; + private static final String NAME_CACHE = "Cache-Control"; + private static final String VALUE_CACHE = "no-cache"; private final CloseableHttpClient _client; private final URI _target; @@ -49,7 +51,7 @@ private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, Metrics me } @Override - public SegmentChange fetch(String segmentName, long since) { + public SegmentChange fetch(String segmentName, long since, boolean addCacheHeader) { long start = System.currentTimeMillis(); CloseableHttpResponse response = null; @@ -58,6 +60,9 @@ public SegmentChange fetch(String segmentName, long since) { String path = _target.getPath() + "/" + segmentName; URI uri = new URIBuilder(_target).setPath(path).addParameter(SINCE, "" + since).build(); HttpGet request = new HttpGet(uri); + if(addCacheHeader) { + request.setHeader(NAME_CACHE, VALUE_CACHE); + } response = _client.execute(request); int statusCode = response.getCode(); diff --git a/client/src/main/java/io/split/client/HttpSplitChangeFetcher.java b/client/src/main/java/io/split/client/HttpSplitChangeFetcher.java index 50ab2dd2c..3c5f9b8fc 100644 --- a/client/src/main/java/io/split/client/HttpSplitChangeFetcher.java +++ b/client/src/main/java/io/split/client/HttpSplitChangeFetcher.java @@ -28,6 +28,8 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher { private static final String SINCE = "since"; private static final String PREFIX = "splitChangeFetcher"; + private static final String NAME_CACHE = "Cache-Control"; + private static final String VALUE_CACHE = "no-cache"; private final CloseableHttpClient _client; private final URI _target; @@ -49,7 +51,7 @@ private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, Metrics metr } @Override - public SplitChange fetch(long since) { + public SplitChange fetch(long since, boolean addCacheHeader) { long start = System.currentTimeMillis(); @@ -59,6 +61,9 @@ public SplitChange fetch(long since) { URI uri = new URIBuilder(_target).addParameter(SINCE, "" + since).build(); HttpGet request = new HttpGet(uri); + if(addCacheHeader) { + request.setHeader(NAME_CACHE, VALUE_CACHE); + } response = _client.execute(request); int statusCode = response.getCode(); diff --git a/client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java b/client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java index 8640d4a63..e5d49e115 100644 --- a/client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java +++ b/client/src/main/java/io/split/client/jmx/SplitJmxMonitor.java @@ -34,7 +34,7 @@ public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, Spl @Override public boolean forceSyncFeatures() { - _featureFetcher.forceRefresh(); + _featureFetcher.forceRefresh(true); _log.info("Features successfully refreshed via JMX"); return true; } @@ -43,7 +43,7 @@ public boolean forceSyncFeatures() { public boolean forceSyncSegment(String segmentName) { SegmentFetcher fetcher = _segmentSynchronizationTask.getFetcher(segmentName); try{ - fetcher.fetch(); + fetcher.fetch(true); } //We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only. catch (NullPointerException np){ diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index 9f228e8c7..4e8115b18 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -48,8 +48,8 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask, @Override public void syncAll() { _syncAllScheduledExecutorService.schedule(() -> { - _splitFetcher.run(); - _segmentSynchronizationTaskImp.run(); + _splitFetcher.fetchAll(true); + _segmentSynchronizationTaskImp.fetchAll(true); }, 0, TimeUnit.SECONDS); } @@ -70,7 +70,7 @@ public void stopPeriodicFetching() { @Override public void refreshSplits(long targetChangeNumber) { if (targetChangeNumber > _splitCache.getChangeNumber()) { - _splitFetcher.forceRefresh(); + _splitFetcher.forceRefresh(true); } } @@ -87,7 +87,7 @@ public void refreshSegment(String segmentName, long changeNumber) { if (changeNumber > _segmentCache.getChangeNumber(segmentName)) { SegmentFetcher fetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName); try{ - fetcher.fetch(); + fetcher.fetch(true); } //We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only. catch (NullPointerException np){ diff --git a/client/src/main/java/io/split/engine/experiments/SplitChangeFetcher.java b/client/src/main/java/io/split/engine/experiments/SplitChangeFetcher.java index b05fea930..63298a5e7 100644 --- a/client/src/main/java/io/split/engine/experiments/SplitChangeFetcher.java +++ b/client/src/main/java/io/split/engine/experiments/SplitChangeFetcher.java @@ -31,5 +31,5 @@ public interface SplitChangeFetcher { * @return SegmentChange * @throws java.lang.RuntimeException if there was a problem computing split changes */ - SplitChange fetch(long since); + SplitChange fetch(long since, boolean addCacheHeader); } diff --git a/client/src/main/java/io/split/engine/experiments/SplitFetcher.java b/client/src/main/java/io/split/engine/experiments/SplitFetcher.java index f428317c9..4266659b1 100644 --- a/client/src/main/java/io/split/engine/experiments/SplitFetcher.java +++ b/client/src/main/java/io/split/engine/experiments/SplitFetcher.java @@ -8,5 +8,11 @@ public interface SplitFetcher extends Runnable { * Forces a sync of splits, outside of any scheduled * syncs. This method MUST NOT throw any exceptions. */ - void forceRefresh(); + void forceRefresh(boolean addCacheHeader); + + /** + * Forces a sync of ALL splits, outside of any scheduled + * syncs. This method MUST NOT throw any exceptions. + */ + void fetchAll(boolean addCacheHeader); } diff --git a/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java b/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java index 79d387ee4..510001153 100644 --- a/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java +++ b/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java @@ -43,12 +43,12 @@ public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser } @Override - public void forceRefresh() { + public void forceRefresh(boolean addCacheHeader) { _log.debug("Force Refresh splits starting ..."); try { while (true) { long start = _splitCache.getChangeNumber(); - runWithoutExceptionHandling(); + runWithoutExceptionHandling(addCacheHeader); long end = _splitCache.getChangeNumber(); if (start >= end) { @@ -65,28 +65,11 @@ public void forceRefresh() { @Override public void run() { - _log.debug("Fetch splits starting ..."); - long start = _splitCache.getChangeNumber(); - try { - runWithoutExceptionHandling(); - _gates.splitsAreReady(); - } catch (InterruptedException e) { - _log.warn("Interrupting split fetcher task"); - Thread.currentThread().interrupt(); - } catch (Throwable t) { - _log.error("RefreshableSplitFetcher failed: " + t.getMessage()); - if (_log.isDebugEnabled()) { - _log.debug("Reason:", t); - } - } finally { - if (_log.isDebugEnabled()) { - _log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber()); - } - } + this.fetchAll(false); } - private void runWithoutExceptionHandling() throws InterruptedException { - SplitChange change = _splitChangeFetcher.fetch(_splitCache.getChangeNumber()); + private void runWithoutExceptionHandling(boolean addCacheHeader) throws InterruptedException { + SplitChange change = _splitChangeFetcher.fetch(_splitCache.getChangeNumber(), addCacheHeader); if (change == null) { throw new IllegalStateException("SplitChange was null"); @@ -155,4 +138,25 @@ private void runWithoutExceptionHandling() throws InterruptedException { _splitCache.setChangeNumber(change.till); } } + @Override + public void fetchAll(boolean addCacheHeader) { + _log.debug("Fetch splits starting ..."); + long start = _splitCache.getChangeNumber(); + try { + runWithoutExceptionHandling(addCacheHeader); + _gates.splitsAreReady(); + } catch (InterruptedException e) { + _log.warn("Interrupting split fetcher task"); + Thread.currentThread().interrupt(); + } catch (Throwable t) { + _log.error("RefreshableSplitFetcher failed: " + t.getMessage()); + if (_log.isDebugEnabled()) { + _log.debug("Reason:", t); + } + } finally { + if (_log.isDebugEnabled()) { + _log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber()); + } + } + } } diff --git a/client/src/main/java/io/split/engine/segments/SegmentChangeFetcher.java b/client/src/main/java/io/split/engine/segments/SegmentChangeFetcher.java index 8b72f0ae6..f4d46ed13 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentChangeFetcher.java +++ b/client/src/main/java/io/split/engine/segments/SegmentChangeFetcher.java @@ -25,5 +25,5 @@ public interface SegmentChangeFetcher { * @return SegmentChange * @throws java.lang.RuntimeException if there was a problem fetching segment changes */ - SegmentChange fetch(String segmentName, long changesSinceThisChangeNumber); + SegmentChange fetch(String segmentName, long changesSinceThisChangeNumber, boolean addCacheHeader); } diff --git a/client/src/main/java/io/split/engine/segments/SegmentFetcher.java b/client/src/main/java/io/split/engine/segments/SegmentFetcher.java index e55515e22..af4bbc767 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentFetcher.java +++ b/client/src/main/java/io/split/engine/segments/SegmentFetcher.java @@ -7,5 +7,9 @@ public interface SegmentFetcher { /** * fetch */ - void fetch(); + void fetch(boolean addCacheHeader); + + void runWhitCacheHeader(); + + void fetchAll(); } diff --git a/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java b/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java index 44a128243..ac21e8461 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java @@ -11,7 +11,7 @@ import static com.google.common.base.Preconditions.checkNotNull; -public class SegmentFetcherImp implements Runnable, SegmentFetcher { +public class SegmentFetcherImp implements SegmentFetcher { private static final Logger _log = LoggerFactory.getLogger(SegmentFetcherImp.class); private final String _segmentName; @@ -31,14 +31,9 @@ public SegmentFetcherImp(String segmentName, SegmentChangeFetcher segmentChangeF } @Override - public void run() { + public void fetch(boolean addCacheHeader){ try { - // Do this again in case the previous call errored out. - _gates.registerSegment(_segmentName); - callLoopRun(true); - - _gates.segmentIsReady(_segmentName); - + callLoopRun(false, addCacheHeader); } catch (Throwable t) { _log.error("RefreshableSegmentFetcher failed: " + t.getMessage()); if (_log.isDebugEnabled()) { @@ -47,20 +42,8 @@ public void run() { } } - @Override - public void fetch(){ - try { - callLoopRun(false); - } catch (Throwable t) { - _log.error("RefreshableSegmentFetcher failed: " + t.getMessage()); - if (_log.isDebugEnabled()) { - _log.debug("Reason:", t); - } - } - } - - private void runWithoutExceptionHandling() { - SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCache.getChangeNumber(_segmentName)); + private void runWithoutExceptionHandling(boolean addCacheHeader) { + SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCache.getChangeNumber(_segmentName), addCacheHeader); if (change == null) { throw new IllegalStateException("SegmentChange was null"); @@ -126,10 +109,10 @@ private String summarize(List changes) { return bldr.toString(); } - private void callLoopRun(boolean isFetch){ + private void callLoopRun(boolean isFetch, boolean addCacheHeader){ while (true) { long start = _segmentCache.getChangeNumber(_segmentName); - runWithoutExceptionHandling(); + runWithoutExceptionHandling(addCacheHeader); long end = _segmentCache.getChangeNumber(_segmentName); if (isFetch && _log.isDebugEnabled()) { _log.debug(_segmentName + " segment fetch before: " + start + ", after: " + _segmentCache.getChangeNumber(_segmentName) /*+ " size: " + _concurrentKeySet.size()*/); @@ -139,4 +122,36 @@ private void callLoopRun(boolean isFetch){ } } } + + @Override + public void runWhitCacheHeader(){ + this.fetchAndUpdate(true); + } + + /** + * Calls callLoopRun and after fetchs segment. + * @param addCacheHeader indicates if CacheHeader is required + */ + private void fetchAndUpdate(boolean addCacheHeader) { + try { + // Do this again in case the previous call errored out. + _gates.registerSegment(_segmentName); + callLoopRun(true, addCacheHeader); + + _gates.segmentIsReady(_segmentName); + + } catch (Throwable t) { + _log.error("RefreshableSegmentFetcher failed: " + t.getMessage()); + if (_log.isDebugEnabled()) { + _log.debug("Reason:", t); + } + } + } + + @Override + public void fetchAll() { + this.fetchAndUpdate(false); + } + + } diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java index 08b020f7a..0bed99225 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java @@ -23,4 +23,10 @@ public interface SegmentSynchronizationTask extends Runnable { * stops the thread */ void stop(); + + /** + * fetch every Segment + * @param addCacheHeader + */ + void fetchAll(boolean addCacheHeader); } diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java index 442c8b859..3db365ba9 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java @@ -29,7 +29,7 @@ public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask private final AtomicLong _refreshEveryNSeconds; private final AtomicBoolean _running; private final Object _lock = new Object(); - private final ConcurrentMap _segmentFetchers = Maps.newConcurrentMap(); + private final ConcurrentMap _segmentFetchers = Maps.newConcurrentMap(); private final SegmentCache _segmentCache; private final SDKReadinessGates _gates; private final ScheduledExecutorService _scheduledExecutorService; @@ -58,20 +58,12 @@ public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher, @Override public void run() { - for (Map.Entry entry : _segmentFetchers.entrySet()) { - SegmentFetcherImp fetcher = entry.getValue(); - - if (fetcher == null) { - continue; - } - - _scheduledExecutorService.submit(fetcher); - } + this.fetchAll(false); } @Override public void initializeSegment(String segmentName) { - SegmentFetcherImp segment = _segmentFetchers.get(segmentName); + SegmentFetcher segment = _segmentFetchers.get(segmentName); if (segment != null) { return; } @@ -94,7 +86,7 @@ public void initializeSegment(String segmentName) { segment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _gates, _segmentCache); if (_running.get()) { - _scheduledExecutorService.submit(segment); + _scheduledExecutorService.submit(segment::fetchAll); } _segmentFetchers.putIfAbsent(segmentName, segment); @@ -148,4 +140,21 @@ public void close() { Thread.currentThread().interrupt(); } } + + @Override + public void fetchAll(boolean addCacheHeader) { + for (Map.Entry entry : _segmentFetchers.entrySet()) { + SegmentFetcher fetcher = entry.getValue(); + + if (fetcher == null) { + continue; + } + + if(addCacheHeader) { + _scheduledExecutorService.submit(fetcher::runWhitCacheHeader); + continue; + } + _scheduledExecutorService.submit(fetcher::fetchAll); + } + } } diff --git a/client/src/test/java/io/split/client/HttpSegmentChangeFetcherTest.java b/client/src/test/java/io/split/client/HttpSegmentChangeFetcherTest.java index 6e0a70b6e..afb238552 100644 --- a/client/src/test/java/io/split/client/HttpSegmentChangeFetcherTest.java +++ b/client/src/test/java/io/split/client/HttpSegmentChangeFetcherTest.java @@ -61,7 +61,7 @@ public void testFetcherWithSpecialCharacters() throws URISyntaxException, IOExce Metrics.NoopMetrics metrics = new Metrics.NoopMetrics(); HttpSegmentChangeFetcher fetcher = HttpSegmentChangeFetcher.create(httpClientMock, rootTarget, metrics); - SegmentChange change = fetcher.fetch("some_segment", 1234567); + SegmentChange change = fetcher.fetch("some_segment", 1234567, true); Assert.assertNotNull(change); Assert.assertEquals(1, change.added.size()); diff --git a/client/src/test/java/io/split/client/HttpSplitChangeFetcherTest.java b/client/src/test/java/io/split/client/HttpSplitChangeFetcherTest.java index 1c7887681..564339db7 100644 --- a/client/src/test/java/io/split/client/HttpSplitChangeFetcherTest.java +++ b/client/src/test/java/io/split/client/HttpSplitChangeFetcherTest.java @@ -63,7 +63,7 @@ public void testFetcherWithSpecialCharacters() throws URISyntaxException, Invoca Metrics.NoopMetrics metrics = new Metrics.NoopMetrics(); HttpSplitChangeFetcher fetcher = HttpSplitChangeFetcher.create(httpClientMock, rootTarget, metrics); - SplitChange change = fetcher.fetch(1234567); + SplitChange change = fetcher.fetch(1234567, true); Assert.assertNotNull(change); Assert.assertEquals(1, change.splits.size()); diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index ad3cf68c4..f4e0bec31 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -32,8 +32,8 @@ public void syncAll() throws InterruptedException { _synchronizer.syncAll(); Thread.sleep(100); - Mockito.verify(_splitFetcher, Mockito.times(1)).run(); - Mockito.verify(_segmentFetcher, Mockito.times(1)).run(); + Mockito.verify(_splitFetcher, Mockito.times(1)).fetchAll(true); + Mockito.verify(_segmentFetcher, Mockito.times(1)).fetchAll(true); } @Test diff --git a/client/src/test/java/io/split/engine/experiments/AChangePerCallSplitChangeFetcher.java b/client/src/test/java/io/split/engine/experiments/AChangePerCallSplitChangeFetcher.java index 2b98e223c..6b0114566 100644 --- a/client/src/test/java/io/split/engine/experiments/AChangePerCallSplitChangeFetcher.java +++ b/client/src/test/java/io/split/engine/experiments/AChangePerCallSplitChangeFetcher.java @@ -31,7 +31,7 @@ public AChangePerCallSplitChangeFetcher(String segmentName) { @Override - public SplitChange fetch(long since) { + public SplitChange fetch(long since, boolean addCacheHeader) { long latestChangeNumber = since + 1; Condition condition = null; diff --git a/client/src/test/java/io/split/engine/experiments/SplitChangeFetcherWithTrafficTypeNames.java b/client/src/test/java/io/split/engine/experiments/SplitChangeFetcherWithTrafficTypeNames.java deleted file mode 100644 index 2630758a2..000000000 --- a/client/src/test/java/io/split/engine/experiments/SplitChangeFetcherWithTrafficTypeNames.java +++ /dev/null @@ -1,85 +0,0 @@ -package io.split.engine.experiments; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import io.split.client.dtos.Condition; -import io.split.client.dtos.Split; -import io.split.client.dtos.SplitChange; -import io.split.client.dtos.Status; -import io.split.engine.ConditionsTestUtil; -import io.split.grammar.Treatments; - -import java.util.List; -import java.util.Map; - -/** - * Mock Class of SplitChangeFetcher for testing. - * - * Every time you run this inside RefreshableSplitFetcher it will add +1 to the since. - * So the first time you run this, RefreshableSplitFetcher will send since -1 and next time will be run with 0 - * So next time you run it, it will receive since 0 and will set to 1 - * Next 1 and prepare for 2, etc. - * - * This is important since you can mock the changes that it will return given a specific since - * with addSplitForSince and removeSplitForSince. - * With those methods you can mock what changes (ACTIVE and ARCHIVED) are goint to be returned for since -1, 0, etc - * - */ -public class SplitChangeFetcherWithTrafficTypeNames implements SplitChangeFetcher { - - private final Map> _trafficTypesToAdd = Maps.newHashMap(); - private final Map> _trafficTypesToRemove = Maps.newHashMap(); - - public SplitChangeFetcherWithTrafficTypeNames() { } - - public void addSplitForSince(Long since, String name, String trafficTypeName) { - modifyTrafficTypeMap(_trafficTypesToAdd, since, name, trafficTypeName, Status.ACTIVE); - } - - public void removeSplitForSince(Long since, String name, String trafficTypeName) { - modifyTrafficTypeMap(_trafficTypesToRemove, since, name, trafficTypeName, Status.ARCHIVED); - } - - @Override - public SplitChange fetch(long since) { - long latestChangeNumber = since + 1; - - SplitChange splitChange = new SplitChange(); - splitChange.splits = Lists.newArrayList(); - splitChange.since = since; - splitChange.till = latestChangeNumber; - - if (_trafficTypesToAdd.get(since) != null) { - splitChange.splits.addAll(_trafficTypesToAdd.get(since)); - } - - if (_trafficTypesToRemove.get(since) != null) { - splitChange.splits.addAll(_trafficTypesToRemove.get(since)); - } - return splitChange; - } - - private void modifyTrafficTypeMap(Map> map, Long since, String name, String trafficTypeName, Status status) { - List splits = map.get(since); - if (splits == null) { - splits = Lists.newArrayList(); - } - splits.add(stubSplit(name, trafficTypeName, status, since)); - map.put(since, splits); - } - - private Split stubSplit(String name, String trafficTypeName, Status status, Long changeNumber) { - Split add = new Split(); - Condition condition = ConditionsTestUtil.makeAllKeysCondition(Lists.newArrayList(ConditionsTestUtil.partition("on", 10))); - add.status = status; - add.trafficAllocation = 100; - add.trafficAllocationSeed = changeNumber.intValue(); - add.seed = changeNumber.intValue(); - add.conditions = Lists.newArrayList(condition); - add.name = name; - add.trafficTypeName = trafficTypeName; - add.defaultTreatment = Treatments.OFF; - add.changeNumber = changeNumber; - return add; - } -} diff --git a/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java b/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java index 10c73fff9..cb9352994 100644 --- a/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java +++ b/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java @@ -31,8 +31,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -123,9 +122,9 @@ public void when_parser_fails_we_remove_the_experiment() throws InterruptedExcep noReturn.till = 1L; SplitChangeFetcher splitChangeFetcher = mock(SplitChangeFetcher.class); - when(splitChangeFetcher.fetch(-1L)).thenReturn(validReturn); - when(splitChangeFetcher.fetch(0L)).thenReturn(invalidReturn); - when(splitChangeFetcher.fetch(1L)).thenReturn(noReturn); + when(splitChangeFetcher.fetch(-1L, false)).thenReturn(validReturn); + when(splitChangeFetcher.fetch(0L, false)).thenReturn(invalidReturn); + when(splitChangeFetcher.fetch(1L, false)).thenReturn(noReturn); SegmentCache segmentCache = new SegmentCacheInMemoryImpl(); @@ -149,7 +148,7 @@ public void if_there_is_a_problem_talking_to_split_change_count_down_latch_is_no SplitCache cache = new InMemoryCacheImp(-1); SplitChangeFetcher splitChangeFetcher = mock(SplitChangeFetcher.class); - when(splitChangeFetcher.fetch(-1L)).thenThrow(new RuntimeException()); + when(splitChangeFetcher.fetch(-1L, false)).thenThrow(new RuntimeException()); SegmentCache segmentCache = new SegmentCacheInMemoryImpl(); SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class); @@ -194,7 +193,7 @@ public void works_with_user_defined_segments() throws Exception { SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class); SegmentChange segmentChange = getSegmentChange(0L, 0L, segmentName); - when(segmentChangeFetcher.fetch(anyString(), anyLong())).thenReturn(segmentChange); + when(segmentChangeFetcher.fetch(anyString(), anyLong(), anyBoolean())).thenReturn(segmentChange); SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, gates, segmentCache); segmentSynchronizationTask.startPeriodicFetching(); SplitFetcherImp fetcher = new SplitFetcherImp(experimentChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), gates, cache); diff --git a/client/src/test/java/io/split/engine/experiments/SplitParserTest.java b/client/src/test/java/io/split/engine/experiments/SplitParserTest.java index 862d95397..86beed261 100644 --- a/client/src/test/java/io/split/engine/experiments/SplitParserTest.java +++ b/client/src/test/java/io/split/engine/experiments/SplitParserTest.java @@ -58,7 +58,7 @@ public void works() { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChangeEmployee = getSegmentChange(-1L, -1L, EMPLOYEES); SegmentChange segmentChangeSalesPeople = getSegmentChange(-1L, -1L, SALES_PEOPLE); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); SegmentSynchronizationTask segmentFetcher = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1L, 1, gates, segmentCache); SplitParser parser = new SplitParser(segmentFetcher, segmentCache); @@ -97,7 +97,7 @@ public void worksWithConfig() { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChangeEmployee = getSegmentChange(-1L, -1L, EMPLOYEES); SegmentChange segmentChangeSalesPeople = getSegmentChange(-1L, -1L, SALES_PEOPLE); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); SegmentSynchronizationTask segmentFetcher = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1L, 1, gates, segmentCache); @@ -141,7 +141,7 @@ public void works_for_two_conditions() { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChangeEmployee = getSegmentChange(-1L, -1L, EMPLOYEES); SegmentChange segmentChangeSalesPeople = getSegmentChange(-1L, -1L, SALES_PEOPLE); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); SegmentSynchronizationTask segmentFetcher = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1L, 1, gates, segmentCache); @@ -180,7 +180,7 @@ public void fails_for_long_conditions() { segmentCache.updateSegment(SALES_PEOPLE, Stream.of("kunal").collect(Collectors.toList()), new ArrayList<>()); SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChangeEmployee = getSegmentChange(-1L, -1L, EMPLOYEES); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChangeEmployee); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChangeEmployee); SegmentSynchronizationTask segmentFetcher = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1L, 1, gates, segmentCache); @@ -210,7 +210,7 @@ public void works_with_attributes() { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChangeEmployee = getSegmentChange(-1L, -1L, EMPLOYEES); SegmentChange segmentChangeSalesPeople = getSegmentChange(-1L, -1L, SALES_PEOPLE); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); SegmentSynchronizationTask segmentFetcher = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1L, 1, gates, segmentCache); @@ -256,7 +256,7 @@ public void less_than_or_equal_to() { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChangeEmployee = getSegmentChange(-1L, -1L, EMPLOYEES); SegmentChange segmentChangeSalesPeople = getSegmentChange(-1L, -1L, SALES_PEOPLE); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); SegmentSynchronizationTask segmentFetcher = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1L, 1, gates, segmentCache); @@ -295,7 +295,7 @@ public void equal_to() { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChangeEmployee = getSegmentChange(-1L, -1L, EMPLOYEES); SegmentChange segmentChangeSalesPeople = getSegmentChange(-1L, -1L, SALES_PEOPLE); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); SegmentSynchronizationTask segmentFetcher = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1L, 1, gates, segmentCache); @@ -334,7 +334,7 @@ public void equal_to_negative_number() { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChangeEmployee = getSegmentChange(-1L, -1L, EMPLOYEES); SegmentChange segmentChangeSalesPeople = getSegmentChange(-1L, -1L, SALES_PEOPLE); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); SegmentSynchronizationTask segmentFetcher = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1L, 1, gates, segmentCache); @@ -373,7 +373,7 @@ public void between() { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChangeEmployee = getSegmentChange(-1L, -1L, EMPLOYEES); SegmentChange segmentChangeSalesPeople = getSegmentChange(-1L, -1L, SALES_PEOPLE); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); SegmentSynchronizationTask segmentFetcher = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1L, 1, gates, segmentCache); @@ -550,7 +550,7 @@ public void set_matcher_test(Condition c, io.split.engine.matchers.Matcher m) { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChangeEmployee = getSegmentChange(-1L, -1L, EMPLOYEES); SegmentChange segmentChangeSalesPeople = getSegmentChange(-1L, -1L, SALES_PEOPLE); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChangeEmployee).thenReturn(segmentChangeSalesPeople); SegmentSynchronizationTask segmentFetcher = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1L, 1, gates, segmentCache); diff --git a/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java b/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java index dc9602b94..79786bdd6 100644 --- a/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java +++ b/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java @@ -51,13 +51,13 @@ public void works_when_there_are_no_changes() throws InterruptedException { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChange = getSegmentChange(-1L, 10L); - Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong())).thenReturn(segmentChange); + Mockito.when(segmentChangeFetcher.fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean())).thenReturn(segmentChange); SegmentFetcherImp fetcher = new SegmentFetcherImp(SEGMENT_NAME, segmentChangeFetcher, gates, segmentCache); // execute the fetcher for a little bit. ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - scheduledExecutorService.scheduleWithFixedDelay(fetcher, 0L, 100, TimeUnit.MICROSECONDS); + scheduledExecutorService.scheduleWithFixedDelay(fetcher::fetchAll, 0L, 100, TimeUnit.MICROSECONDS); Thread.currentThread().sleep(5 * 100); scheduledExecutorService.shutdown(); @@ -92,13 +92,13 @@ private void works(long startingChangeNumber) throws InterruptedException { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentChange segmentChange = getSegmentChange(-1L, -1L); - Mockito.when(segmentChangeFetcher.fetch(SEGMENT_NAME, -1L)).thenReturn(segmentChange); - Mockito.when(segmentChangeFetcher.fetch(SEGMENT_NAME, 0L)).thenReturn(segmentChange); - SegmentFetcherImp fetcher = new SegmentFetcherImp(segmentName, segmentChangeFetcher, gates, segmentCache); + Mockito.when(segmentChangeFetcher.fetch(SEGMENT_NAME, -1L, false)).thenReturn(segmentChange); + Mockito.when(segmentChangeFetcher.fetch(SEGMENT_NAME, 0L, false)).thenReturn(segmentChange); + SegmentFetcher fetcher = new SegmentFetcherImp(segmentName, segmentChangeFetcher, gates, segmentCache); // execute the fetcher for a little bit. ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - scheduledExecutorService.scheduleWithFixedDelay(fetcher, 0L, Integer.MAX_VALUE, TimeUnit.SECONDS); + scheduledExecutorService.scheduleWithFixedDelay(fetcher::fetchAll, 0L, Integer.MAX_VALUE, TimeUnit.SECONDS); Thread.currentThread().sleep(5 * 100); scheduledExecutorService.shutdown(); @@ -112,7 +112,7 @@ private void works(long startingChangeNumber) throws InterruptedException { // reset the interrupt. Thread.currentThread().interrupt(); } - Mockito.verify(segmentChangeFetcher, Mockito.times(2)).fetch(Mockito.anyString(), Mockito.anyLong()); + Mockito.verify(segmentChangeFetcher, Mockito.times(2)).fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean()); assertThat(gates.areSegmentsReady(10), is(true)); } diff --git a/client/src/test/java/io/split/engine/segments/StaticSegmentChangeFetcher.java b/client/src/test/java/io/split/engine/segments/StaticSegmentChangeFetcher.java deleted file mode 100644 index f27f50e66..000000000 --- a/client/src/test/java/io/split/engine/segments/StaticSegmentChangeFetcher.java +++ /dev/null @@ -1,44 +0,0 @@ -package io.split.engine.segments; - -import com.google.common.collect.Lists; -import io.split.client.dtos.SegmentChange; - -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * A wrapper around a set of keys. We return the entire set of keys, everytime - * we are requested to fetch the latest set of changes. - * - * @author adil - */ -public class StaticSegmentChangeFetcher implements SegmentChangeFetcher { - - private final String _segmentName; - private final List _keys; - - public StaticSegmentChangeFetcher(String segmentName, Set keys) { - checkNotNull(keys); - - _segmentName = segmentName; - _keys = Lists.newArrayList(keys); - - checkNotNull(_segmentName); - } - - @Override - public SegmentChange fetch(String segmentName, long changesSinceThisChangeNumber) { - SegmentChange segmentChange = new SegmentChange(); - segmentChange.name = segmentName; - segmentChange.since = changesSinceThisChangeNumber; - segmentChange.till = 0; - segmentChange.added = _keys; - segmentChange.removed = Collections.emptyList(); - - return segmentChange; - - } -}