diff --git a/client/src/main/java/io/split/client/ApiKeyCounter.java b/client/src/main/java/io/split/client/ApiKeyCounter.java index 8c39394dd..426ade17b 100644 --- a/client/src/main/java/io/split/client/ApiKeyCounter.java +++ b/client/src/main/java/io/split/client/ApiKeyCounter.java @@ -6,6 +6,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + public class ApiKeyCounter { private static final Logger _log = LoggerFactory.getLogger(ApiKeyCounter.class); @@ -63,4 +66,18 @@ boolean isApiKeyPresent(String apiKey) { int getCount(String apiKey) { return USED_API_KEYS.count(apiKey); } + + public Map getFactoryInstances() { + Map factoryInstances = new HashMap<>(); + for (String factory :USED_API_KEYS) { + factoryInstances.putIfAbsent(factory, new Long(getCount(factory))); + } + + return factoryInstances; + } + + @VisibleForTesting + void clearApiKeys() { + USED_API_KEYS.clear(); + } } diff --git a/client/src/main/java/io/split/client/LocalhostSplitFactory.java b/client/src/main/java/io/split/client/LocalhostSplitFactory.java index c716ae3ce..3a2f0a14b 100644 --- a/client/src/main/java/io/split/client/LocalhostSplitFactory.java +++ b/client/src/main/java/io/split/client/LocalhostSplitFactory.java @@ -56,9 +56,9 @@ public LocalhostSplitFactory(String directory, String file) throws IOException { SplitCache splitCache = new InMemoryCacheImp(); SDKReadinessGates sdkReadinessGates = new SDKReadinessGates(); - sdkReadinessGates.splitsAreReady(); _cacheUpdaterService = new CacheUpdaterService(splitCache); _cacheUpdaterService.updateCache(splitAndKeyToTreatment); + sdkReadinessGates.sdkInternalReady(); _client = new SplitClientImpl(this, splitCache, new ImpressionsManager.NoOpImpressionsManager(), new NoopEventClient(), SplitClientConfig.builder().setBlockUntilReadyTimeout(1).build(), sdkReadinessGates, new EvaluatorImp(splitCache), new NoopTelemetryStorage(), new NoopTelemetryStorage()); diff --git a/client/src/main/java/io/split/client/SplitClientImpl.java b/client/src/main/java/io/split/client/SplitClientImpl.java index 050acb516..9c994030d 100644 --- a/client/src/main/java/io/split/client/SplitClientImpl.java +++ b/client/src/main/java/io/split/client/SplitClientImpl.java @@ -15,7 +15,6 @@ import io.split.inputValidation.KeyValidator; import io.split.inputValidation.SplitNameValidator; import io.split.inputValidation.TrafficTypeValidator; -import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum; import io.split.telemetry.domain.enums.MethodEnum; import io.split.telemetry.storage.TelemetryConfigProducer; import io.split.telemetry.storage.TelemetryEvaluationProducer; @@ -138,7 +137,7 @@ public void blockUntilReady() throws TimeoutException, InterruptedException { if (_config.blockUntilReady() <= 0) { throw new IllegalArgumentException("setBlockUntilReadyTimeout must be positive but in config was: " + _config.blockUntilReady()); } - if (!_gates.isSDKReady(_config.blockUntilReady())) { + if (!_gates.waitUntilInternalReady(_config.blockUntilReady())) { throw new TimeoutException("SDK was not ready in " + _config.blockUntilReady()+ " milliseconds"); } _log.debug(String.format("Split SDK ready in %d ms", (System.currentTimeMillis() - startTime))); @@ -188,7 +187,7 @@ private boolean track(Event event) { private SplitResult getTreatmentWithConfigInternal(String method, String matchingKey, String bucketingKey, String split, Map attributes, MethodEnum methodEnum) { long initTime = System.currentTimeMillis(); try { - if(!_gates.isSDKReadyNow()){ + if(!_gates.isSDKReady()){ _log.warn(method + ": the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method"); _telemetryConfigProducer.recordNonReadyUsage(); } @@ -215,7 +214,7 @@ private SplitResult getTreatmentWithConfigInternal(String method, String matchin EvaluatorImp.TreatmentLabelAndChangeNumber result = _evaluator.evaluateFeature(matchingKey, bucketingKey, split, attributes); - if (result.treatment.equals(Treatments.CONTROL) && result.label.equals(Labels.DEFINITION_NOT_FOUND) && _gates.isSDKReadyNow()) { + if (result.treatment.equals(Treatments.CONTROL) && result.label.equals(Labels.DEFINITION_NOT_FOUND) && _gates.isSDKReady()) { _log.warn( "getTreatment: you passed \"" + split + "\" that does not exist in this environment, " + "please double check what Splits exist in the web console."); diff --git a/client/src/main/java/io/split/client/SplitFactoryImpl.java b/client/src/main/java/io/split/client/SplitFactoryImpl.java index 87edaefda..03aaedf3c 100644 --- a/client/src/main/java/io/split/client/SplitFactoryImpl.java +++ b/client/src/main/java/io/split/client/SplitFactoryImpl.java @@ -8,7 +8,6 @@ import io.split.client.interceptors.GzipDecoderResponseInterceptor; import io.split.client.interceptors.GzipEncoderRequestInterceptor; import io.split.client.interceptors.SdkMetadataInterceptorFilter; -import io.split.client.metrics.HttpMetrics; import io.split.cache.InMemoryCacheImp; import io.split.cache.SplitCache; import io.split.engine.evaluator.Evaluator; @@ -124,8 +123,8 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn // Cache Initialisations _segmentCache = new SegmentCacheInMemoryImpl(); _splitCache = new InMemoryCacheImp(); - _telemetrySynchronizer = new SynchronizerMemory(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage); - _telemetrySyncTask = new TelemetrySyncTask(config.get_telemetryRefreshRate(), _telemetrySynchronizer); + _telemetrySynchronizer = new SynchronizerMemory(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage, _startTime); + // Segments _segmentSynchronizationTaskImp = buildSegments(config); @@ -142,9 +141,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn // EventClient _eventClient = EventClientImpl.create(_httpclient, _eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown(), _telemetryStorage); - // SyncManager - _syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(apiToken, config), _segmentCache, config.streamingRetryDelay(), _gates, _telemetryStorage); - _syncManager.start(); + _telemetrySyncTask = new TelemetrySyncTask(config.get_telemetryRefreshRate(), _telemetrySynchronizer); // Evaluator _evaluator = new EvaluatorImp(_splitCache); @@ -155,6 +152,12 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn // SplitManager _manager = new SplitManagerImpl(_splitCache, config, _gates, _telemetryStorage); + // SyncManager + _syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, + config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(apiToken, config), + _segmentCache, config.streamingRetryDelay(), _gates, _telemetryStorage, _telemetrySynchronizer,config); + _syncManager.start(); + // DestroyOnShutDown if (config.destroyOnShutDown()) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -313,7 +316,7 @@ private SplitFetcher buildSplitFetcher() throws URISyntaxException { SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorage); SplitParser splitParser = new SplitParser(_segmentSynchronizationTaskImp, _segmentCache); - return new SplitFetcherImp(splitChangeFetcher, splitParser, _gates, _splitCache, _telemetryStorage); + return new SplitFetcherImp(splitChangeFetcher, splitParser, _splitCache, _telemetryStorage); } private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config) throws URISyntaxException { diff --git a/client/src/main/java/io/split/client/SplitManagerImpl.java b/client/src/main/java/io/split/client/SplitManagerImpl.java index 4d41ac36e..0edd88aaa 100644 --- a/client/src/main/java/io/split/client/SplitManagerImpl.java +++ b/client/src/main/java/io/split/client/SplitManagerImpl.java @@ -6,7 +6,6 @@ import io.split.cache.SplitCache; import io.split.engine.experiments.ParsedSplit; import io.split.inputValidation.SplitNameValidator; -import io.split.telemetry.domain.enums.MethodEnum; import io.split.telemetry.storage.TelemetryConfigProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +42,7 @@ public SplitManagerImpl(SplitCache splitCache, @Override public List splits() { - if (!_gates.isSDKReadyNow()) { { + if (!_gates.isSDKReady()) { { _log.warn("splits: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method"); _telemetryConfigProducer.recordNonReadyUsage(); }} @@ -58,7 +57,7 @@ public List splits() { @Override public SplitView split(String featureName) { - if (!_gates.isSDKReadyNow()) { { + if (!_gates.isSDKReady()) { { _log.warn("split: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method"); _telemetryConfigProducer.recordNonReadyUsage(); }} @@ -70,7 +69,7 @@ public SplitView split(String featureName) { ParsedSplit parsedSplit = _splitCache.get(featureName); if (parsedSplit == null) { - if (_gates.isSDKReadyNow()) { + if (_gates.isSDKReady()) { _log.warn("split: you passed \"" + featureName + "\" that does not exist in this environment, " + "please double check what Splits exist in the web console."); } @@ -82,7 +81,7 @@ public SplitView split(String featureName) { @Override public List splitNames() { - if (!_gates.isSDKReadyNow()) { { + if (!_gates.isSDKReady()) { { _log.warn("splitNames: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method"); _telemetryConfigProducer.recordNonReadyUsage(); }} @@ -100,7 +99,7 @@ public void blockUntilReady() throws TimeoutException, InterruptedException { if (_config.blockUntilReady() <= 0) { throw new IllegalArgumentException("setBlockUntilReadyTimeout must be positive but in config was: " + _config.blockUntilReady()); } - if (!_gates.isSDKReady(_config.blockUntilReady())) { + if (!_gates.waitUntilInternalReady(_config.blockUntilReady())) { _telemetryConfigProducer.recordBURTimeout(); throw new TimeoutException("SDK was not ready in " + _config.blockUntilReady()+ " milliseconds"); } diff --git a/client/src/main/java/io/split/client/metrics/BinarySearchLatencyTracker.java b/client/src/main/java/io/split/client/metrics/BinarySearchLatencyTracker.java deleted file mode 100644 index 35efff10f..000000000 --- a/client/src/main/java/io/split/client/metrics/BinarySearchLatencyTracker.java +++ /dev/null @@ -1,131 +0,0 @@ -package io.split.client.metrics; - -import java.util.Arrays; - -/** - * Tracks latencies pero bucket of time. - * Each bucket represent a latency greater than the one before - * and each number within each bucket is a number of calls in the range. - *

- * (1) 1.00 - * (2) 1.50 - * (3) 2.25 - * (4) 3.38 - * (5) 5.06 - * (6) 7.59 - * (7) 11.39 - * (8) 17.09 - * (9) 25.63 - * (10) 38.44 - * (11) 57.67 - * (12) 86.50 - * (13) 129.75 - * (14) 194.62 - * (15) 291.93 - * (16) 437.89 - * (17) 656.84 - * (18) 985.26 - * (19) 1,477.89 - * (20) 2,216.84 - * (21) 3,325.26 - * (22) 4,987.89 - * (23) 7,481.83 - *

- * Thread-safety: This class is not thread safe. - *

- * Created by patricioe on 2/10/16. - */ -public class BinarySearchLatencyTracker implements ILatencyTracker { - - static final long[] BUCKETS = { - 1000, 1500, 2250, 3375, 5063, - 7594, 11391, 17086, 25629, 38443, - 57665, 86498, 129746, 194620, 291929, - 437894, 656841, 985261, 1477892, 2216838, - 3325257, 4987885, 7481828 - }; - - static final long MAX_LATENCY = 7481828; - - long[] latencies = new long[BUCKETS.length]; - - /** - * Increment the internal counter for the bucket this latency falls into. - * - * @param millis - */ - public void addLatencyMillis(long millis) { - int index = findIndex(millis * 1000); - latencies[index]++; - } - - /** - * Increment the internal counter for the bucket this latency falls into. - * - * @param micros - */ - public void addLatencyMicros(long micros) { - int index = findIndex(micros); - latencies[index]++; - } - - /** - * Returns the list of latencies buckets as an array. - * - * @return the list of latencies buckets as an array. - */ - public long[] getLatencies() { - return latencies; - } - - @Override - public long getLatency(int index) { - return latencies[index]; - } - - public void clear() { - latencies = new long[BUCKETS.length]; - } - - /** - * Returns the counts in the bucket this latency falls into. - * The latencies will no be updated. - * - * @param latency - * @return the bucket content for the latency. - */ - public long getBucketForLatencyMillis(long latency) { - return latencies[findIndex(latency * 1000)]; - } - - /** - * Returns the counts in the bucket this latency falls into. - * The latencies will no be updated. - * - * @param latency - * @return the bucket content for the latency. - */ - public long getBucketForLatencyMicros(long latency) { - return latencies[findIndex(latency)]; - } - - - private int findIndex(long micros) { - if (micros > MAX_LATENCY) { - return BUCKETS.length - 1; - } - - int index = Arrays.binarySearch(BUCKETS, micros); - - if (index < 0) { - - // Adjust the index based on Java Array javadocs. <0 means the value wasn't found and it's module value - // is where it should be inserted (in this case, it means the counter it applies - unless it's equals to the - // length of the array). - - index = -(index + 1); - } - return index; - } - -} diff --git a/client/src/main/java/io/split/client/metrics/DTOMetrics.java b/client/src/main/java/io/split/client/metrics/DTOMetrics.java deleted file mode 100644 index 86c793cb6..000000000 --- a/client/src/main/java/io/split/client/metrics/DTOMetrics.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.split.client.metrics; - -import io.split.client.dtos.Counter; -import io.split.client.dtos.Latency; - -/** - * Created by adilaijaz on 6/14/16. - */ -public interface DTOMetrics { - void time(Latency dto); - - void count(Counter dto); -} diff --git a/client/src/main/java/io/split/client/metrics/HttpMetrics.java b/client/src/main/java/io/split/client/metrics/HttpMetrics.java deleted file mode 100644 index 14d63b0f4..000000000 --- a/client/src/main/java/io/split/client/metrics/HttpMetrics.java +++ /dev/null @@ -1,135 +0,0 @@ -package io.split.client.metrics; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import io.split.client.dtos.Counter; -import io.split.client.dtos.Latency; -import io.split.client.utils.Utils; -import io.split.engine.metrics.Metrics; -import org.apache.hc.client5.http.classic.methods.HttpPost; -import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; -import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; -import org.apache.hc.core5.http.HttpEntity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URI; -import java.net.URISyntaxException; - -/** - * Created by adilaijaz on 9/4/15. - */ -public class HttpMetrics implements Metrics, DTOMetrics { - private static final Logger _log = LoggerFactory.getLogger(HttpMetrics.class); - - private final CloseableHttpClient _client; - private final URI _timeTarget; - private final URI _counterTarget; - - - public static HttpMetrics create(CloseableHttpClient client, URI root) throws URISyntaxException { - return new HttpMetrics(client, root); - } - - - public HttpMetrics(CloseableHttpClient client, URI root) throws URISyntaxException { - Preconditions.checkNotNull(root); - _client = Preconditions.checkNotNull(client); - _timeTarget = Utils.appendPath(root, "api/metrics/time"); - _counterTarget = Utils.appendPath(root, "api/metrics/counter"); - } - - - @Override - public void time(Latency dto) { - if (dto.latencies.isEmpty()) { - return; - } - - try { - post(_timeTarget, dto); - } catch (Throwable t) { - _log.warn("Exception when posting metric " + dto, t); - } - ; - - } - - @Override - public void count(Counter dto) { - try { - post(_counterTarget, dto); - } catch (Throwable t) { - _log.warn("Exception when posting metric " + dto, t); - } - - } - - private void post(URI uri, Object dto) { - - CloseableHttpResponse response = null; - - try { - HttpEntity entity = Utils.toJsonEntity(dto); - - HttpPost request = new HttpPost(uri); - request.setEntity(entity); - - response = _client.execute(request); - - int status = response.getCode(); - - if (status < 200 || status >= 300) { - _log.warn("Response status was: " + status); - } - - } catch (Throwable t) { - _log.warn("Exception when posting metrics: " + t.getMessage()); - if (_log.isDebugEnabled()) { - _log.debug("Reason: ", t); - } - } finally { - Utils.forceClose(response); - } - - } - - @Override - public void count(String counter, long delta) { - try { - Counter dto = new Counter(); - dto.name = counter; - dto.delta = delta; - - count(dto); - } catch (Throwable t) { - _log.info("Could not count metric " + counter, t); - } - - } - - @Override - public void time(String operation, long timeInMs) { - try { - Latency dto = new Latency(); - dto.name = operation; - dto.latencies = Lists.newArrayList(timeInMs); - - time(dto); - } catch (Throwable t) { - _log.info("Could not time metric " + operation, t); - } - } - - @VisibleForTesting - URI getTimeTarget() { - return _timeTarget; - } - - @VisibleForTesting - URI getCounterTarget() { - return _counterTarget; - } - -} diff --git a/client/src/main/java/io/split/client/metrics/ILatencyTracker.java b/client/src/main/java/io/split/client/metrics/ILatencyTracker.java deleted file mode 100644 index 282db06cb..000000000 --- a/client/src/main/java/io/split/client/metrics/ILatencyTracker.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.split.client.metrics; - -/** - * Created by patricioe on 2/10/16. - */ -public interface ILatencyTracker { - - void addLatencyMillis(long millis); - - void addLatencyMicros(long micros); - - long[] getLatencies(); - - long getLatency(int index); - - void clear(); - - long getBucketForLatencyMillis(long latency); - - long getBucketForLatencyMicros(long latency); - -} diff --git a/client/src/main/java/io/split/client/metrics/LogarithmicSearchLatencyTracker.java b/client/src/main/java/io/split/client/metrics/LogarithmicSearchLatencyTracker.java deleted file mode 100644 index 4034d8de6..000000000 --- a/client/src/main/java/io/split/client/metrics/LogarithmicSearchLatencyTracker.java +++ /dev/null @@ -1,116 +0,0 @@ -package io.split.client.metrics; - -/** - * Tracks latencies pero bucket of time. - * Each bucket represent a latency greater than the one before - * and each number within each bucket is a number of calls in the range. - *

- * (1) 1.00 - * (2) 1.50 - * (3) 2.25 - * (4) 3.38 - * (5) 5.06 - * (6) 7.59 - * (7) 11.39 - * (8) 17.09 - * (9) 25.63 - * (10) 38.44 - * (11) 57.67 - * (12) 86.50 - * (13) 129.75 - * (14) 194.62 - * (15) 291.93 - * (16) 437.89 - * (17) 656.84 - * (18) 985.26 - * (19) 1,477.89 - * (20) 2,216.84 - * (21) 3,325.26 - * (22) 4,987.89 - * (23) 7,481.83 - *

- * Thread-safety: This class is not thread safe. - *

- * Created by patricioe on 2/10/16. - */ -public class LogarithmicSearchLatencyTracker implements ILatencyTracker { - - static final int BUCKETS = 23; - private static final double LOG_10_1000_MICROS = Math.log10(1000); - private static final double LOG_10_1_5_MICROS = Math.log10(Double.valueOf("1.5").doubleValue()); - - - long[] latencies = new long[BUCKETS]; - - /** - * Increment the internal counter for the bucket this latency falls into. - * - * @param millis - */ - public void addLatencyMillis(long millis) { - int index = findIndex(millis * 1000); - latencies[index]++; - } - - /** - * Increment the internal counter for the bucket this latency falls into. - * - * @param micros - */ - public void addLatencyMicros(long micros) { - int index = findIndex(micros); - latencies[index]++; - } - - /** - * Returns the list of latencies buckets as an array. - * - * @return the list of latencies buckets as an array. - */ - public long[] getLatencies() { - return latencies; - } - - @Override - public long getLatency(int index) { - return latencies[index]; - } - - public void clear() { - latencies = new long[BUCKETS]; - } - - /** - * Returns the counts in the bucket this latency falls into. - * The latencies will no be updated. - * - * @param latency - * @return the bucket content for the latency. - */ - public long getBucketForLatencyMillis(long latency) { - return latencies[findIndex(latency * 1000)]; - } - - /** - * Returns the counts in the bucket this latency falls into. - * The latencies will no be updated. - * - * @param latency - * @return the bucket content for the latency. - */ - public long getBucketForLatencyMicros(long latency) { - return latencies[findIndex(latency)]; - } - - - private int findIndex(long micros) { - - if (micros <= 1000) return 0; - if (micros > 4987885) return 22; - - double raw = (Math.log10(micros) - LOG_10_1000_MICROS) / LOG_10_1_5_MICROS; - double rounded = Math.round(raw * 1000000d) / 1000000d; - return (int) Math.ceil(rounded); - } - -} diff --git a/client/src/main/java/io/split/engine/SDKReadinessGates.java b/client/src/main/java/io/split/engine/SDKReadinessGates.java index e8a4e6453..10a18fbba 100644 --- a/client/src/main/java/io/split/engine/SDKReadinessGates.java +++ b/client/src/main/java/io/split/engine/SDKReadinessGates.java @@ -15,9 +15,7 @@ public class SDKReadinessGates { private static final Logger _log = LoggerFactory.getLogger(SDKReadinessGates.class); - private final CountDownLatch _splitsAreReady = new CountDownLatch(1); private final CountDownLatch _internalReady = new CountDownLatch(1); - private final ConcurrentMap _segmentsAreReady = new ConcurrentHashMap<>(); /** * Returns true if the SDK is ready. The SDK is ready when: @@ -34,144 +32,15 @@ public class SDKReadinessGates { * @return true if the sdk is ready, false otherwise. * @throws InterruptedException if this operation was interrupted. */ - public boolean isSDKReady(long milliseconds) throws InterruptedException { - long end = System.currentTimeMillis() + milliseconds; - long timeLeft = milliseconds; - - boolean splits = areSplitsReady(timeLeft); - if (!splits) { - return false; - } - - timeLeft = end - System.currentTimeMillis(); - - return areSegmentsReady(timeLeft); - } - - public boolean isSDKReadyNow() { - try { - return isSDKReady(0); - } catch (InterruptedException e) { - return false; - } - } - - /** - * Records that the SDK split initialization is done. - * This operation is atomic and idempotent. Repeated invocations - * will not have any impact on the state. - */ - public void splitsAreReady() { - long originalCount = _splitsAreReady.getCount(); - _splitsAreReady.countDown(); - if (originalCount > 0L) { - _log.info("splits are ready"); - } - } - - /** - * Registers a segment that the SDK should download before it is ready. - * This method should be called right after the first successful download - * of split definitions. - *

- * Note that if this method is called in subsequent fetches of splits, - * it will return false; meaning any segments used in new splits - * will not be able to block the SDK from being marked as complete. - * - * @param segmentName the segment to register - * @return true if the segments were registered, false otherwise. - * @throws InterruptedException - */ - public boolean registerSegment(String segmentName) throws InterruptedException { - if (segmentName == null || segmentName.isEmpty() || areSplitsReady(0L)) { - return false; - } - - _segmentsAreReady.putIfAbsent(segmentName, new CountDownLatch(1)); - _log.info("Registered segment: " + segmentName); - return true; - } - - /** - * Records that the SDK segment initialization for this segment is done. - * This operation is atomic and idempotent. Repeated invocations - * will not have any impact on the state. - */ - public void segmentIsReady(String segmentName) { - CountDownLatch cdl = _segmentsAreReady.get(segmentName); - if (cdl == null) { - return; - } - - long originalCount = cdl.getCount(); - - cdl.countDown(); - - if (originalCount > 0L) { - _log.info(segmentName + " segment is ready"); - } + public boolean waitUntilInternalReady(long milliseconds) throws InterruptedException { + return _internalReady.await(milliseconds, TimeUnit.MILLISECONDS); } - public boolean isSegmentRegistered(String segmentName) { - return _segmentsAreReady.get(segmentName) != null; - } - - /** - * Returns true if the SDK is ready w.r.t segments. In other words, this method returns true if: - *

    - *
  1. The SDK has fetched segment definitions the first time.
  2. - *
- *

- * This operation will block until the SDK is ready or 'milliseconds' have passed. If the milliseconds - * are less than or equal to zero, the operation will not block and return immediately - * - * @param milliseconds time to wait for an answer. if the value is zero or negative, we will not - * block for an answer. - * @return true if the sdk is ready w.r.t splits, false otherwise. - * @throws InterruptedException if this operation was interrupted. - */ - public boolean areSegmentsReady(long milliseconds) throws InterruptedException { - long end = System.currentTimeMillis() + milliseconds; - long timeLeft = milliseconds; - - for (Map.Entry entry : _segmentsAreReady.entrySet()) { - String segmentName = entry.getKey(); - CountDownLatch cdl = entry.getValue(); - - if (!cdl.await(timeLeft, TimeUnit.MILLISECONDS)) { - _log.error(segmentName + " is not ready yet"); - return false; - } - - timeLeft = end - System.currentTimeMillis(); - } - - return true; - } - - /** - * Returns true if the SDK is ready w.r.t splits. In other words, this method returns true if: - *

    - *
  1. The SDK has fetched Split definitions the first time.
  2. - *
- *

- * This operation will block until the SDK is ready or 'milliseconds' have passed. If the milliseconds - * are less than or equal to zero, the operation will not block and return immediately - * - * @param milliseconds time to wait for an answer. if the value is zero or negative, we will not - * block for an answer. - * @return true if the sdk is ready w.r.t splits, false otherwise. - * @throws InterruptedException if this operation was interrupted. - */ - public boolean areSplitsReady(long milliseconds) throws InterruptedException { - return _splitsAreReady.await(milliseconds, TimeUnit.MILLISECONDS); + public boolean isSDKReady() { + return _internalReady.getCount() == 0; } public void sdkInternalReady() { _internalReady.countDown(); } - - public void waitUntilInternalReady() throws InterruptedException { - _internalReady.await(); - } } diff --git a/client/src/main/java/io/split/engine/common/SyncManagerImp.java b/client/src/main/java/io/split/engine/common/SyncManagerImp.java index c532f660c..b556beb9d 100644 --- a/client/src/main/java/io/split/engine/common/SyncManagerImp.java +++ b/client/src/main/java/io/split/engine/common/SyncManagerImp.java @@ -4,6 +4,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.split.cache.SegmentCache; import io.split.cache.SplitCache; +import io.split.client.ApiKeyCounter; +import io.split.client.SplitClientConfig; import io.split.engine.SDKReadinessGates; import io.split.engine.experiments.SplitFetcher; import io.split.engine.experiments.SplitSynchronizationTask; @@ -11,10 +13,12 @@ import io.split.telemetry.domain.StreamingEvent; import io.split.telemetry.domain.enums.StreamEventsEnum; import io.split.telemetry.storage.TelemetryRuntimeProducer; +import io.split.telemetry.synchronizer.TelemetrySynchronizer; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -34,11 +38,13 @@ public class SyncManagerImp implements SyncManager { private final AtomicBoolean _shutdown; private final LinkedBlockingQueue _incomingPushStatus; private final ExecutorService _executorService; - private final ExecutorService _pollingExecutorService; + private final ExecutorService _startExecutorService; private final SDKReadinessGates _gates; private Future _pushStatusMonitorTask; private Backoff _backoff; private final TelemetryRuntimeProducer _telemetryRuntimeProducer; + private final TelemetrySynchronizer _telemetrySynchronizer; + private final SplitClientConfig _config; @VisibleForTesting /* package private */ SyncManagerImp(boolean streamingEnabledConfig, @@ -46,7 +52,9 @@ public class SyncManagerImp implements SyncManager { PushManager pushManager, LinkedBlockingQueue pushMessages, int authRetryBackOffBase, - SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer) { + SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer, + TelemetrySynchronizer telemetrySynchronizer, + SplitClientConfig config) { _streamingEnabledConfig = new AtomicBoolean(streamingEnabledConfig); _synchronizer = checkNotNull(synchronizer); _pushManager = checkNotNull(pushManager); @@ -56,13 +64,15 @@ public class SyncManagerImp implements SyncManager { .setNameFormat("SPLIT-PushStatusMonitor-%d") .setDaemon(true) .build()); - _pollingExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + _startExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() .setNameFormat("SPLIT-PollingMode-%d") .setDaemon(true) .build()); _backoff = new Backoff(authRetryBackOffBase); _gates = checkNotNull(gates); _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer); + _telemetrySynchronizer = checkNotNull(telemetrySynchronizer); + _config = checkNotNull(config); } public static SyncManagerImp build(boolean streamingEnabledConfig, @@ -78,21 +88,34 @@ public static SyncManagerImp build(boolean streamingEnabledConfig, SegmentCache segmentCache, int streamingRetryDelay, SDKReadinessGates gates, - TelemetryRuntimeProducer telemetryRuntimeProducer) { + TelemetryRuntimeProducer telemetryRuntimeProducer, + TelemetrySynchronizer telemetrySynchronizer, + SplitClientConfig config) { LinkedBlockingQueue pushMessages = new LinkedBlockingQueue<>(); Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache, streamingRetryDelay, gates); PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, pushMessages, sseHttpClient, telemetryRuntimeProducer); - return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase, gates, telemetryRuntimeProducer); + return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase, gates, telemetryRuntimeProducer,telemetrySynchronizer, config); } @Override public void start() { - _synchronizer.syncAll(); - if (_streamingEnabledConfig.get()) { - startStreamingMode(); - } else { - _pollingExecutorService.submit(this::startPollingMode); - } + _startExecutorService.submit(() -> { + while(!_synchronizer.syncAll()) { + try { + Thread.currentThread().sleep(1000); + } catch (InterruptedException e) { + _log.warn("Sdk Initializer thread interrupted"); + Thread.currentThread().interrupt(); + } + } + _gates.sdkInternalReady(); + _telemetrySynchronizer.synchronizeConfig(_config, System.currentTimeMillis(), ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(), new ArrayList<>()); + if (_streamingEnabledConfig.get()) { + startStreamingMode(); + } else { + startPollingMode(); + } + }); } @Override @@ -112,12 +135,6 @@ private void startStreamingMode() { } private void startPollingMode() { - try { - _gates.waitUntilInternalReady(); - } catch (InterruptedException ex) { - _log.debug(ex.getMessage()); - } - _log.debug("Starting in polling mode ..."); _synchronizer.startPeriodicFetching(); _telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.get_type(), POLLING_STREAMING_EVENT, System.currentTimeMillis())); diff --git a/client/src/main/java/io/split/engine/common/Synchronizer.java b/client/src/main/java/io/split/engine/common/Synchronizer.java index ab8467a5c..9197baacf 100644 --- a/client/src/main/java/io/split/engine/common/Synchronizer.java +++ b/client/src/main/java/io/split/engine/common/Synchronizer.java @@ -1,7 +1,7 @@ package io.split.engine.common; public interface Synchronizer { - void syncAll(); + boolean syncAll(); void startPeriodicFetching(); void stopPeriodicFetching(); void refreshSplits(long targetChangeNumber); diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index d63f5417f..d3e800ebc 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -11,10 +11,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Preconditions.checkNotNull; @@ -54,12 +52,8 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask, } @Override - public void syncAll() { - _syncAllScheduledExecutorService.schedule(() -> { - _splitFetcher.fetchAll(true); - _segmentSynchronizationTaskImp.fetchAllSynchronous(); - _gates.sdkInternalReady(); - }, 0, TimeUnit.SECONDS); + public boolean syncAll() { + return _splitFetcher.fetchAll(true) && _segmentSynchronizationTaskImp.fetchAllSynchronous(); } @Override diff --git a/client/src/main/java/io/split/engine/experiments/SplitFetcher.java b/client/src/main/java/io/split/engine/experiments/SplitFetcher.java index 4266659b1..8637feefd 100644 --- a/client/src/main/java/io/split/engine/experiments/SplitFetcher.java +++ b/client/src/main/java/io/split/engine/experiments/SplitFetcher.java @@ -14,5 +14,5 @@ public interface SplitFetcher extends Runnable { * Forces a sync of ALL splits, outside of any scheduled * syncs. This method MUST NOT throw any exceptions. */ - void fetchAll(boolean addCacheHeader); + boolean fetchAll(boolean addCacheHeader); } diff --git a/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java b/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java index 0f2e91e54..0dd56ae2c 100644 --- a/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java +++ b/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java @@ -25,7 +25,6 @@ public class SplitFetcherImp implements SplitFetcher { private final SplitParser _parser; private final SplitChangeFetcher _splitChangeFetcher; private final SplitCache _splitCache; - private final SDKReadinessGates _gates; private final Object _lock = new Object(); private final TelemetryRuntimeProducer _telemetryRuntimeProducer; @@ -39,10 +38,9 @@ public class SplitFetcherImp implements SplitFetcher { * an ARCHIVED split is received, we know if we need to remove a traffic type from the multiset. */ - public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser, SDKReadinessGates gates, SplitCache splitCache, TelemetryRuntimeProducer telemetryRuntimeProducer) { + public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser, SplitCache splitCache, TelemetryRuntimeProducer telemetryRuntimeProducer) { _splitChangeFetcher = checkNotNull(splitChangeFetcher); _parser = checkNotNull(parser); - _gates = checkNotNull(gates); _splitCache = checkNotNull(splitCache); _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer); } @@ -146,20 +144,22 @@ private void runWithoutExceptionHandling(boolean addCacheHeader) throws Interrup } } @Override - public void fetchAll(boolean addCacheHeader) { + public boolean fetchAll(boolean addCacheHeader) { _log.debug("Fetch splits starting ..."); long start = _splitCache.getChangeNumber(); try { runWithoutExceptionHandling(addCacheHeader); - _gates.splitsAreReady(); + return true; } catch (InterruptedException e) { _log.warn("Interrupting split fetcher task"); Thread.currentThread().interrupt(); + return false; } catch (Throwable t) { _log.error("RefreshableSplitFetcher failed: " + t.getMessage()); if (_log.isDebugEnabled()) { _log.debug("Reason:", t); } + return false; } finally { if (_log.isDebugEnabled()) { _log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber()); diff --git a/client/src/main/java/io/split/engine/segments/SegmentFetcher.java b/client/src/main/java/io/split/engine/segments/SegmentFetcher.java index af4bbc767..bd9b19ddd 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentFetcher.java +++ b/client/src/main/java/io/split/engine/segments/SegmentFetcher.java @@ -9,7 +9,7 @@ public interface SegmentFetcher { */ void fetch(boolean addCacheHeader); - void runWhitCacheHeader(); + boolean runWhitCacheHeader(); void fetchAll(); } diff --git a/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java b/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java index 6492c5740..b18180264 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java @@ -1,5 +1,6 @@ package io.split.engine.segments; +import com.google.common.annotations.VisibleForTesting; import io.split.cache.SegmentCache; import io.split.client.dtos.SegmentChange; import io.split.engine.SDKReadinessGates; @@ -116,7 +117,8 @@ private String summarize(List changes) { return bldr.toString(); } - private void callLoopRun(boolean isFetch, boolean addCacheHeader){ + @VisibleForTesting + void callLoopRun(boolean isFetch, boolean addCacheHeader){ while (true) { long start = _segmentCache.getChangeNumber(_segmentName); runWithoutExceptionHandling(addCacheHeader); @@ -131,27 +133,26 @@ private void callLoopRun(boolean isFetch, boolean addCacheHeader){ } @Override - public void runWhitCacheHeader(){ - this.fetchAndUpdate(true); + public boolean runWhitCacheHeader(){ + return this.fetchAndUpdate(true); } /** * Calls callLoopRun and after fetchs segment. * @param addCacheHeader indicates if CacheHeader is required */ - private void fetchAndUpdate(boolean addCacheHeader) { + @VisibleForTesting + boolean fetchAndUpdate(boolean addCacheHeader) { try { // Do this again in case the previous call errored out. - _gates.registerSegment(_segmentName); callLoopRun(true, addCacheHeader); - - _gates.segmentIsReady(_segmentName); - + return true; } catch (Throwable t) { _log.error("RefreshableSegmentFetcher failed: " + t.getMessage()); if (_log.isDebugEnabled()) { _log.debug("Reason:", t); } + return false; } } diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java index 5ad181cf4..1a1764ed9 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java @@ -33,5 +33,5 @@ public interface SegmentSynchronizationTask extends Runnable { /** * fetch every Segment Synchronous */ - void fetchAllSynchronous(); + boolean fetchAllSynchronous(); } diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java index d3ed0dbfe..025ca08ab 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java @@ -82,12 +82,6 @@ public void initializeSegment(String segmentName) { return; } - try { - _gates.registerSegment(segmentName); - } catch (InterruptedException e) { - _log.error("Unable to register segment " + segmentName); - } - segment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _gates, _segmentCache, _telemetryRuntimeProducer); if (_running.get()) { @@ -165,16 +159,21 @@ public void fetchAll(boolean addCacheHeader) { } @Override - public void fetchAllSynchronous() { + public boolean fetchAllSynchronous() { + AtomicBoolean fetchAllStatus = new AtomicBoolean(true); _segmentFetchers .entrySet() .stream().map(e -> _scheduledExecutorService.submit(e.getValue()::runWhitCacheHeader)) .collect(Collectors.toList()) .stream().forEach(future -> { try { - future.get(); + if(!future.get()) { + fetchAllStatus.set(false); + }; } catch (Exception ex) { + fetchAllStatus.set(false); _log.error(ex.getMessage()); }}); + return fetchAllStatus.get(); } } diff --git a/client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java b/client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java index 1ffeb1178..bf075c4ed 100644 --- a/client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java +++ b/client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java @@ -32,18 +32,20 @@ public class SynchronizerMemory implements TelemetrySynchronizer{ private TelemetryStorageConsumer _teleTelemetryStorageConsumer; private SplitCache _splitCache; private SegmentCache _segmentCache; + private final long _initStartTime; public SynchronizerMemory(CloseableHttpClient client, URI telemetryRootEndpoint, TelemetryStorageConsumer telemetryStorageConsumer, SplitCache splitCache, - SegmentCache segmentCache, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException { + SegmentCache segmentCache, TelemetryRuntimeProducer telemetryRuntimeProducer, long initStartTime) throws URISyntaxException { _httpHttpTelemetryMemorySender = HttpTelemetryMemorySender.create(client, telemetryRootEndpoint, telemetryRuntimeProducer); _teleTelemetryStorageConsumer = telemetryStorageConsumer; _splitCache = splitCache; _segmentCache = segmentCache; + _initStartTime = initStartTime; } @Override - public void synchronizeConfig(SplitClientConfig config, long timeUntilReady, Map factoryInstances, List tags) { - _httpHttpTelemetryMemorySender.postConfig(generateConfig(config, timeUntilReady, factoryInstances, tags)); + public void synchronizeConfig(SplitClientConfig config, long readyTimeStamp, Map factoryInstances, List tags) { + _httpHttpTelemetryMemorySender.postConfig(generateConfig(config, readyTimeStamp, factoryInstances, tags)); } @Override @@ -74,7 +76,7 @@ private Stats generateStats() throws Exception { return stats; } - private Config generateConfig(SplitClientConfig splitClientConfig, long timeUntilReady, Map factoryInstances, List tags) { + private Config generateConfig(SplitClientConfig splitClientConfig, long readyTimestamp, Map factoryInstances, List tags) { Config config = new Config(); Rates rates = new Rates(); URLOverrides urlOverrides = new URLOverrides(); @@ -110,7 +112,7 @@ private Config generateConfig(SplitClientConfig splitClientConfig, long timeUnti config.set_eventsQueueSize(splitClientConfig.eventsQueueSize()); config.set_tags(getListMaxSize(tags)); config.set_activeFactories(factoryInstances.size()); - config.set_timeUntilReady(timeUntilReady); + config.set_timeUntilReady(readyTimestamp - _initStartTime); config.set_rates(rates); config.set_urlOverrides(urlOverrides); config.set_streamingEnabled(splitClientConfig.streamingEnabled()); diff --git a/client/src/test/java/io/split/client/ApiKeyCounterTest.java b/client/src/test/java/io/split/client/ApiKeyCounterTest.java index c017127ce..1513313b8 100644 --- a/client/src/test/java/io/split/client/ApiKeyCounterTest.java +++ b/client/src/test/java/io/split/client/ApiKeyCounterTest.java @@ -1,50 +1,107 @@ package io.split.client; import junit.framework.TestCase; +import org.junit.After; +import org.junit.Assert; import org.junit.Test; +import java.util.Map; + public class ApiKeyCounterTest extends TestCase { private static final String FIRST_KEY = "KEYNUMBER1"; private static final String SECOND_KEY = "KEYNUMBER2"; + @After + public synchronized void clearApiKeys() { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } + + @Test + public synchronized void testAddingNewToken() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } + } + @Test - public void testAddingNewToken() { - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); + public synchronized void testAddingExistingToken() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); + assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); + assertEquals(2, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } } @Test - public void testAddingExistingToken() { - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - - assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); - assertEquals(2, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); + public synchronized void testRemovingToken() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); + + assertFalse(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); + assertEquals(0, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } + } + + @Test + public synchronized void testAddingNonExistingToken() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); + + assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); + assertEquals(1, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); + assertEquals(1, ApiKeyCounter.getApiKeyCounterInstance().getCount(SECOND_KEY)); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } } @Test - public void testRemovingToken() { - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); + public synchronized void testFactoryInstances() { + try { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); - assertFalse(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); - assertEquals(0, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); + Map factoryInstances = ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(); + Assert.assertEquals(2, factoryInstances.size()); + Assert.assertEquals(3, factoryInstances.get(FIRST_KEY).intValue()); + Assert.assertEquals(2, factoryInstances.get(SECOND_KEY).intValue()); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } } @Test - public void testAddingNonExistingToken() { - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); - - assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); - assertEquals(1, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); - assertEquals(1, ApiKeyCounter.getApiKeyCounterInstance().getCount(SECOND_KEY)); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(SECOND_KEY); + public synchronized void testClearApiKey() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + Assert.assertEquals(0, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } } } diff --git a/client/src/test/java/io/split/client/SplitClientImplTest.java b/client/src/test/java/io/split/client/SplitClientImplTest.java index aa508cd2f..acb1f304b 100644 --- a/client/src/test/java/io/split/client/SplitClientImplTest.java +++ b/client/src/test/java/io/split/client/SplitClientImplTest.java @@ -153,7 +153,7 @@ public void works() { SDKReadinessGates gates = mock(SDKReadinessGates.class); SplitCache splitCache = mock(InMemoryCacheImp.class); when(splitCache.get(test)).thenReturn(parsedSplit); - when(gates.isSDKReadyNow()).thenReturn(true); + when(gates.isSDKReady()).thenReturn(true); SplitClientImpl client = new SplitClientImpl( mock(SplitFactory.class), @@ -326,7 +326,7 @@ public void multiple_conditions_work() { SDKReadinessGates gates = mock(SDKReadinessGates.class); SplitCache splitCache = mock(InMemoryCacheImp.class); when(splitCache.get(test)).thenReturn(parsedSplit); - when(gates.isSDKReadyNow()).thenReturn(false); + when(gates.isSDKReady()).thenReturn(false); SplitClientImpl client = new SplitClientImpl( mock(SplitFactory.class), @@ -905,7 +905,7 @@ private Partition partition(String treatment, int size) { public void block_until_ready_does_not_time_when_sdk_is_ready() throws TimeoutException, InterruptedException { SplitCache splitCache = mock(InMemoryCacheImp.class); SDKReadinessGates ready = mock(SDKReadinessGates.class); - when(ready.isSDKReady(100)).thenReturn(true); + when(ready.waitUntilInternalReady(100)).thenReturn(true); SplitClientImpl client = new SplitClientImpl( mock(SplitFactory.class), @@ -924,7 +924,7 @@ public void block_until_ready_does_not_time_when_sdk_is_ready() throws TimeoutEx public void block_until_ready_times_when_sdk_is_not_ready() throws TimeoutException, InterruptedException { SplitCache splitCache = mock(InMemoryCacheImp.class); SDKReadinessGates ready = mock(SDKReadinessGates.class); - when(ready.isSDKReady(100)).thenReturn(false); + when(ready.waitUntilInternalReady(100)).thenReturn(false); SplitClientImpl client = new SplitClientImpl( mock(SplitFactory.class), @@ -943,7 +943,7 @@ public void block_until_ready_times_when_sdk_is_not_ready() throws TimeoutExcept public void track_with_valid_parameters() { SDKReadinessGates gates = mock(SDKReadinessGates.class); SplitCache splitCache = mock(InMemoryCacheImp.class); - when(gates.isSDKReadyNow()).thenReturn(false); + when(gates.isSDKReady()).thenReturn(false); SplitClientImpl client = new SplitClientImpl( mock(SplitFactory.class), splitCache, diff --git a/client/src/test/java/io/split/client/SplitManagerImplTest.java b/client/src/test/java/io/split/client/SplitManagerImplTest.java index a77674045..201f1fedc 100644 --- a/client/src/test/java/io/split/client/SplitManagerImplTest.java +++ b/client/src/test/java/io/split/client/SplitManagerImplTest.java @@ -100,7 +100,7 @@ public void splitsCallWithNoSplit() { SplitCache splitCache = Mockito.mock(SplitCache.class); Mockito.when(splitCache.getAll()).thenReturn(Lists.newArrayList()); SDKReadinessGates gates = Mockito.mock(SDKReadinessGates.class); - Mockito.when(gates.isSDKReadyNow()).thenReturn(false); + Mockito.when(gates.isSDKReady()).thenReturn(false); SplitManagerImpl splitManager = new SplitManagerImpl(splitCache, Mockito.mock(SplitClientConfig.class), gates, TELEMETRY_STORAGE); @@ -113,7 +113,7 @@ public void splitsCallWithSplit() { SplitCache splitCache = Mockito.mock(SplitCache.class); List parsedSplits = Lists.newArrayList(); SDKReadinessGates gates = Mockito.mock(SDKReadinessGates.class); - Mockito.when(gates.isSDKReadyNow()).thenReturn(false); + Mockito.when(gates.isSDKReady()).thenReturn(false); ParsedSplit response = ParsedSplit.createParsedSplitForTests("FeatureName", 123, true, "off", Lists.newArrayList(getTestCondition("off")), "traffic", 456L, 1); parsedSplits.add(response); @@ -137,7 +137,7 @@ public void splitNamesCallWithNoSplit() { SplitCache splitCache = Mockito.mock(SplitCache.class); Mockito.when(splitCache.getAll()).thenReturn(Lists.newArrayList()); SDKReadinessGates gates = Mockito.mock(SDKReadinessGates.class); - Mockito.when(gates.isSDKReadyNow()).thenReturn(false); + Mockito.when(gates.isSDKReady()).thenReturn(false); SplitManagerImpl splitManager = new SplitManagerImpl(splitCache, Mockito.mock(SplitClientConfig.class), gates, TELEMETRY_STORAGE); @@ -164,7 +164,7 @@ public void splitNamesCallWithSplit() { @Test public void block_until_ready_does_not_time_when_sdk_is_ready() throws TimeoutException, InterruptedException { SDKReadinessGates ready = mock(SDKReadinessGates.class); - when(ready.isSDKReady(100)).thenReturn(true); + when(ready.waitUntilInternalReady(100)).thenReturn(true); SplitManagerImpl splitManager = new SplitManagerImpl(mock(SplitCache.class), config, ready, TELEMETRY_STORAGE); @@ -175,7 +175,7 @@ public void block_until_ready_does_not_time_when_sdk_is_ready() throws TimeoutEx @Test(expected = TimeoutException.class) public void block_until_ready_times_when_sdk_is_not_ready() throws TimeoutException, InterruptedException { SDKReadinessGates ready = mock(SDKReadinessGates.class); - when(ready.isSDKReady(100)).thenReturn(false); + when(ready.waitUntilInternalReady(100)).thenReturn(false); SplitManagerImpl splitManager = new SplitManagerImpl(mock(SplitCache.class), config, diff --git a/client/src/test/java/io/split/client/metrics/BinarySearchLatencyTrackerTest.java b/client/src/test/java/io/split/client/metrics/BinarySearchLatencyTrackerTest.java deleted file mode 100644 index c04fa5b49..000000000 --- a/client/src/test/java/io/split/client/metrics/BinarySearchLatencyTrackerTest.java +++ /dev/null @@ -1,92 +0,0 @@ -package io.split.client.metrics; - -import org.junit.Before; -import org.junit.Test; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.*; - -public class BinarySearchLatencyTrackerTest { - - BinarySearchLatencyTracker tracker; - - @Before - public void before() { - tracker = new BinarySearchLatencyTracker(); - } - - /** - * Latencies of <=1 millis or <= 1000 micros correspond to the first bucket (index 0) - */ - @Test - public void testLessThanFirstBucket() { - - tracker.addLatencyMicros(750); - tracker.addLatencyMicros(450); - assertThat(tracker.getLatency(0), is(equalTo(2L))); - - tracker.addLatencyMillis(0); - assertThat(tracker.getLatency(0), is(equalTo(3L))); - } - - /** - * Latencies of 1 millis or <= 1000 micros correspond to the first bucket (index 0) - */ - @Test - public void testFirstBucket() { - - tracker.addLatencyMicros(1000); - assertThat(tracker.getLatency(0), is(equalTo(1L))); - - tracker.addLatencyMillis(1); - assertThat(tracker.getLatency(0), is(equalTo(2L))); - } - - /** - * Latencies of 7481 millis or 7481828 micros correspond to the last bucket (index 22) - */ - @Test - public void testLastBucket() { - - tracker.addLatencyMicros(7481828); - assertThat(tracker.getLatency(22), is(equalTo(1L))); - - tracker.addLatencyMillis(7481); - assertThat(tracker.getLatency(22), is(equalTo(2L))); - } - - /** - * Latencies of more than 7481 millis or 7481828 micros correspond to the last bucket (index 22) - */ - @Test - public void testGreaterThanLastBucket() { - - tracker.addLatencyMicros(7481830); - assertThat(tracker.getLatency(22), is(equalTo(1L))); - - tracker.addLatencyMicros(7999999); - assertThat(tracker.getLatency(22), is(equalTo(2L))); - - tracker.addLatencyMillis(7482); - assertThat(tracker.getLatency(22), is(equalTo(3L))); - - tracker.addLatencyMillis(8000); - assertThat(tracker.getLatency(22), is(equalTo(4L))); - } - - /** - * Latencies between 11,392 and 17,086 are in the 8th bucket. - */ - @Test - public void test8ThBucket() { - - tracker.addLatencyMicros(11392); - assertThat(tracker.getLatency(7), is(equalTo(1L))); - - tracker.addLatencyMicros(17086); - assertThat(tracker.getLatency(7), is(equalTo(2L))); - - } - -} \ No newline at end of file diff --git a/client/src/test/java/io/split/client/metrics/HttpMetricsTest.java b/client/src/test/java/io/split/client/metrics/HttpMetricsTest.java deleted file mode 100644 index 71ade9dbe..000000000 --- a/client/src/test/java/io/split/client/metrics/HttpMetricsTest.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.split.client.metrics; - -import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; -import org.apache.hc.client5.http.impl.classic.HttpClients; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Test; - -import java.net.URI; -import java.net.URISyntaxException; - -public class HttpMetricsTest { - @Test - public void testDefaultURL() throws URISyntaxException { - URI rootTarget = URI.create("https://api.split.io"); - CloseableHttpClient httpClient = HttpClients.custom().build(); - HttpMetrics fetcher = HttpMetrics.create(httpClient, rootTarget); - Assert.assertThat(fetcher.getTimeTarget().toString(), Matchers.is(Matchers.equalTo("https://api.split.io/api/metrics/time"))); - Assert.assertThat(fetcher.getCounterTarget().toString(), Matchers.is(Matchers.equalTo("https://api.split.io/api/metrics/counter"))); - } - - @Test - public void testCustomURLNoPathNoBackslash() throws URISyntaxException { - URI rootTarget = URI.create("https://kubernetesturl.com"); - CloseableHttpClient httpClient = HttpClients.custom().build(); - HttpMetrics fetcher = HttpMetrics.create(httpClient, rootTarget); - Assert.assertThat(fetcher.getTimeTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/api/metrics/time"))); - Assert.assertThat(fetcher.getCounterTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/api/metrics/counter"))); - } - - @Test - public void testCustomURLAppendingPath() throws URISyntaxException { - URI rootTarget = URI.create("https://kubernetesturl.com/split/"); - CloseableHttpClient httpClient = HttpClients.custom().build(); - HttpMetrics fetcher = HttpMetrics.create(httpClient, rootTarget); - Assert.assertThat(fetcher.getTimeTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/split/api/metrics/time"))); - Assert.assertThat(fetcher.getCounterTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/split/api/metrics/counter"))); - } - - @Test - public void testCustomURLAppendingPathNoBackslash() throws URISyntaxException { - URI rootTarget = URI.create("https://kubernetesturl.com/split"); - CloseableHttpClient httpClient = HttpClients.custom().build(); - HttpMetrics fetcher = HttpMetrics.create(httpClient, rootTarget); - Assert.assertThat(fetcher.getTimeTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/split/api/metrics/time"))); - Assert.assertThat(fetcher.getCounterTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/split/api/metrics/counter"))); - } -} diff --git a/client/src/test/java/io/split/client/metrics/LogarithmicSearchLatencyTrackerTest.java b/client/src/test/java/io/split/client/metrics/LogarithmicSearchLatencyTrackerTest.java deleted file mode 100644 index fedcec0a9..000000000 --- a/client/src/test/java/io/split/client/metrics/LogarithmicSearchLatencyTrackerTest.java +++ /dev/null @@ -1,112 +0,0 @@ -package io.split.client.metrics; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -@Ignore -public class LogarithmicSearchLatencyTrackerTest { - - LogarithmicSearchLatencyTracker tracker; - - @Before - public void before() { - tracker = new LogarithmicSearchLatencyTracker(); - } - - /** - * Latencies of <=1 millis or <= 1000 micros correspond to the first bucket (index 0) - */ - @Test - public void testLessThanFirstBucket() { - - tracker.addLatencyMicros(750); - tracker.addLatencyMicros(450); - assertThat(tracker.getLatency(0), is(equalTo(2L))); - - tracker.addLatencyMillis(0); - assertThat(tracker.getLatency(0), is(equalTo(3L))); - } - - /** - * Latencies of 1 millis or <= 1000 micros correspond to the first bucket (index 0) - */ - @Test - public void testFirstBucket() { - - tracker.addLatencyMicros(1000); - assertThat(tracker.getLatency(0), is(equalTo(1L))); - - tracker.addLatencyMillis(1); - assertThat(tracker.getLatency(0), is(equalTo(2L))); - } - - /** - * Latencies of 7481 millis or 7481828 micros correspond to the last bucket (index 22) - */ - @Test - public void testLastBucket() { - - tracker.addLatencyMicros(7481828); - assertThat(tracker.getLatency(22), is(equalTo(1L))); - - tracker.addLatencyMillis(7481); - assertThat(tracker.getLatency(22), is(equalTo(2L))); - } - - /** - * Latencies of more than 7481 millis or 7481828 micros correspond to the last bucket (index 22) - */ - @Test - public void testGreaterThanLastBucket() { - - tracker.addLatencyMicros(7481830); - assertThat(tracker.getLatency(22), is(equalTo(1L))); - - tracker.addLatencyMicros(7999999); - assertThat(tracker.getLatency(22), is(equalTo(2L))); - - tracker.addLatencyMillis(7482); - assertThat(tracker.getLatency(22), is(equalTo(3L))); - - tracker.addLatencyMillis(8000); - assertThat(tracker.getLatency(22), is(equalTo(4L))); - } - - /** - * Latencies between 11,392 and 17,086 are in the 8th bucket. - */ - @Test - public void test8ThBucket() { - - tracker.addLatencyMicros(11392); - assertThat(tracker.getLatency(7), is(equalTo(1L))); - - tracker.addLatencyMicros(17086); - assertThat(tracker.getLatency(7), is(equalTo(2L))); - - tracker.addLatencyMillis(18); - assertThat(tracker.getLatency(7), is(equalTo(3L))); - } - - /** - * Latencies between 656,842 and 985,261 are in the 18th bucket. - */ - @Test - public void test17ThBucket() { - - tracker.addLatencyMicros(656842); - assertThat(tracker.getLatency(17), is(equalTo(1L))); - - tracker.addLatencyMicros(985261); - assertThat(tracker.getLatency(17), is(equalTo(2L))); - - tracker.addLatencyMillis(985); - assertThat(tracker.getLatency(17), is(equalTo(3L))); - } - -} \ No newline at end of file diff --git a/client/src/test/java/io/split/engine/common/SyncManagerTest.java b/client/src/test/java/io/split/engine/common/SyncManagerTest.java index d2204ec18..fafe0874c 100644 --- a/client/src/test/java/io/split/engine/common/SyncManagerTest.java +++ b/client/src/test/java/io/split/engine/common/SyncManagerTest.java @@ -1,8 +1,10 @@ package io.split.engine.common; +import io.split.client.SplitClientConfig; import io.split.engine.SDKReadinessGates; import io.split.telemetry.storage.InMemoryTelemetryStorage; import io.split.telemetry.storage.TelemetryStorage; +import io.split.telemetry.synchronizer.TelemetrySynchronizer; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -26,7 +28,10 @@ public void setUp() { public void startWithStreamingFalseShouldStartPolling() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); _gates.sdkInternalReady(); - SyncManagerImp syncManager = new SyncManagerImp(false, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + Mockito.when(_synchronizer.syncAll()).thenReturn(true); + SyncManagerImp syncManager = new SyncManagerImp(false, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); syncManager.start(); Thread.sleep(1000); Mockito.verify(_synchronizer, Mockito.times(1)).startPeriodicFetching(); @@ -35,10 +40,14 @@ public void startWithStreamingFalseShouldStartPolling() throws InterruptedExcept } @Test - public void startWithStreamingTrueShouldStartSyncAll() { + public void startWithStreamingTrueShouldStartSyncAll() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); - SyncManager sm = new SyncManagerImp(true, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + Mockito.when(_synchronizer.syncAll()).thenReturn(true); + SyncManager sm = new SyncManagerImp(true, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); sm.start(); + Thread.sleep(1000); Mockito.verify(_synchronizer, Mockito.times(0)).startPeriodicFetching(); Mockito.verify(_synchronizer, Mockito.times(1)).syncAll(); Mockito.verify(_pushManager, Mockito.times(1)).start(); @@ -48,7 +57,10 @@ public void startWithStreamingTrueShouldStartSyncAll() { public void onStreamingAvailable() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); Thread t = new Thread(syncManager::incomingPushStatusHandler); t.start(); messsages.offer(PushManager.Status.STREAMING_READY); @@ -63,7 +75,9 @@ public void onStreamingAvailable() throws InterruptedException { public void onStreamingDisabled() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); Thread t = new Thread(syncManager::incomingPushStatusHandler); t.start(); messsages.offer(PushManager.Status.STREAMING_DOWN); @@ -78,7 +92,9 @@ public void onStreamingDisabled() throws InterruptedException { public void onStreamingShutdown() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); Thread t = new Thread(syncManager::incomingPushStatusHandler); t.start(); messsages.offer(PushManager.Status.STREAMING_OFF); @@ -91,7 +107,9 @@ public void onStreamingShutdown() throws InterruptedException { public void onConnected() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); Thread t = new Thread(syncManager::incomingPushStatusHandler); t.start(); messsages.offer(PushManager.Status.STREAMING_READY); @@ -105,7 +123,9 @@ public void onConnected() throws InterruptedException { public void onDisconnect() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); Thread t = new Thread(syncManager::incomingPushStatusHandler); t.start(); messsages.offer(PushManager.Status.STREAMING_OFF); @@ -118,7 +138,10 @@ public void onDisconnect() throws InterruptedException { public void onDisconnectAndReconnect() throws InterruptedException { // Check with mauro. reconnect should call pushManager.start again, right? TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + Mockito.when(_synchronizer.syncAll()).thenReturn(true); + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); syncManager.start(); messsages.offer(PushManager.Status.STREAMING_BACKOFF); Thread.sleep(1200); @@ -126,4 +149,20 @@ public void onDisconnectAndReconnect() throws InterruptedException { // Check wi Mockito.verify(_synchronizer, Mockito.times(1)).syncAll(); Mockito.verify(_pushManager, Mockito.times(2)).start(); } + + @Test + public void syncAllRetryThenShouldStartPolling() throws InterruptedException { + TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + Mockito.when(_synchronizer.syncAll()).thenReturn(false).thenReturn(true); + SyncManagerImp syncManager = new SyncManagerImp(false, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); + syncManager.start(); + Thread.sleep(2000); + Mockito.verify(_synchronizer, Mockito.times(1)).startPeriodicFetching(); + Mockito.verify(_synchronizer, Mockito.times(2)).syncAll(); + Mockito.verify(_pushManager, Mockito.times(0)).start(); + Mockito.verify(_gates, Mockito.times(1)).sdkInternalReady(); + Mockito.verify(telemetrySynchronizer, Mockito.times(1)).synchronizeConfig(Mockito.anyObject(), Mockito.anyLong(), Mockito.anyObject(), Mockito.anyObject()); + } } diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index 53e4175d1..bb13bcccd 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -34,12 +34,13 @@ public void beforeMethod() { @Test public void syncAll() throws InterruptedException { + Mockito.when(_splitFetcher.fetchAll(true)).thenReturn(true); + Mockito.when(_segmentFetcher.fetchAllSynchronous()).thenReturn(true); _synchronizer.syncAll(); - Thread.sleep(100); + Thread.sleep(1000); Mockito.verify(_splitFetcher, Mockito.times(1)).fetchAll(true); Mockito.verify(_segmentFetcher, Mockito.times(1)).fetchAllSynchronous(); - Mockito.verify(_gates, Mockito.times(1)).sdkInternalReady(); } @Test diff --git a/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java b/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java index bfd720216..245d07381 100644 --- a/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java +++ b/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java @@ -61,7 +61,7 @@ private void works(long startingChangeNumber) throws InterruptedException { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1,10, gates, segmentCache, TELEMETRY_STORAGE); SplitCache cache = new InMemoryCacheImp(startingChangeNumber); - SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), gates, cache, TELEMETRY_STORAGE); + SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), cache, TELEMETRY_STORAGE); // execute the fetcher for a little bit. executeWaitAndTerminate(fetcher, 1, 3, TimeUnit.SECONDS); @@ -80,9 +80,8 @@ private void works(long startingChangeNumber) throws InterruptedException { ParsedSplit expected = ParsedSplit.createParsedSplitForTests("" + cache.getChangeNumber(), (int) cache.getChangeNumber(), false, Treatments.OFF, expectedListOfMatcherAndSplits, null, cache.getChangeNumber(), 1); ParsedSplit actual = cache.get("" + cache.getChangeNumber()); - + Thread.sleep(1000); assertThat(actual, is(equalTo(expected))); - assertThat(gates.areSplitsReady(0), is(equalTo(true))); } @Test @@ -135,7 +134,7 @@ public void when_parser_fails_we_remove_the_experiment() throws InterruptedExcep SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, gates, segmentCache, TELEMETRY_STORAGE); segmentSynchronizationTask.startPeriodicFetching(); SplitCache cache = new InMemoryCacheImp(-1); - SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), gates, cache, TELEMETRY_STORAGE); + SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), cache, TELEMETRY_STORAGE); // execute the fetcher for a little bit. executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS); @@ -157,13 +156,12 @@ public void if_there_is_a_problem_talking_to_split_change_count_down_latch_is_no SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class); SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, gates, segmentCache, TELEMETRY_STORAGE); segmentSynchronizationTask.startPeriodicFetching(); - SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), gates, cache, TELEMETRY_STORAGE); + SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), cache, TELEMETRY_STORAGE); // execute the fetcher for a little bit. executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS); assertThat(cache.getChangeNumber(), is(equalTo(-1L))); - assertThat(gates.areSplitsReady(0), is(equalTo(false))); } private void executeWaitAndTerminate(Runnable runnable, long frequency, long waitInBetween, TimeUnit unit) throws InterruptedException { @@ -199,7 +197,7 @@ public void works_with_user_defined_segments() throws Exception { when(segmentChangeFetcher.fetch(anyString(), anyLong(), anyBoolean())).thenReturn(segmentChange); SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, gates, segmentCache, TELEMETRY_STORAGE); segmentSynchronizationTask.startPeriodicFetching(); - SplitFetcherImp fetcher = new SplitFetcherImp(experimentChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), gates, cache, TELEMETRY_STORAGE); + SplitFetcherImp fetcher = new SplitFetcherImp(experimentChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), cache, TELEMETRY_STORAGE); // execute the fetcher for a little bit. executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS); @@ -212,11 +210,6 @@ public void works_with_user_defined_segments() throws Exception { assertThat("Asking for " + i + " " + cache.getAll(), cache.get("" + i), is(not(nullValue()))); assertThat(cache.get("" + i).killed(), is(true)); } - - assertThat(gates.areSplitsReady(0), is(equalTo(true))); - assertThat(gates.isSegmentRegistered(segmentName), is(equalTo(true))); - assertThat(gates.areSegmentsReady(100), is(equalTo(true))); - assertThat(gates.isSDKReady(0), is(equalTo(true))); } private SegmentChange getSegmentChange(long since, long till, String segmentName){ diff --git a/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java b/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java index d41b9c829..c5f51afc8 100644 --- a/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java +++ b/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java @@ -49,7 +49,6 @@ public void works_when_we_start_with_state() throws InterruptedException { public void works_when_there_are_no_changes() throws InterruptedException { long startingChangeNumber = -1L; SDKReadinessGates gates = new SDKReadinessGates(); - gates.registerSegment(SEGMENT_NAME); SegmentCache segmentCache = new SegmentCacheInMemoryImpl(); SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); @@ -79,14 +78,12 @@ public void works_when_there_are_no_changes() throws InterruptedException { assertNotNull(segmentCache.getChangeNumber(SEGMENT_NAME)); assertEquals(10L, segmentCache.getChangeNumber(SEGMENT_NAME)); - assertThat(gates.areSegmentsReady(10), is(true)); } private void works(long startingChangeNumber) throws InterruptedException { SDKReadinessGates gates = new SDKReadinessGates(); String segmentName = SEGMENT_NAME; - gates.registerSegment(segmentName); SegmentCache segmentCache = Mockito.mock(SegmentCache.class); Mockito.when(segmentCache.getChangeNumber(SEGMENT_NAME)).thenReturn(-1L).thenReturn(-1L) .thenReturn(-1L) @@ -116,7 +113,6 @@ private void works(long startingChangeNumber) throws InterruptedException { Thread.currentThread().interrupt(); } Mockito.verify(segmentChangeFetcher, Mockito.times(2)).fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean()); - assertThat(gates.areSegmentsReady(10), is(true)); } diff --git a/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java b/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java index 722458e3c..db3740565 100644 --- a/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java +++ b/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java @@ -1,16 +1,21 @@ package io.split.engine.segments; +import com.google.common.collect.Maps; import io.split.engine.SDKReadinessGates; import io.split.cache.SegmentCache; import io.split.telemetry.storage.InMemoryTelemetryStorage; import io.split.telemetry.storage.TelemetryStorage; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.List; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -75,11 +80,57 @@ public void run() { Thread.currentThread().interrupt(); } - gates.splitsAreReady(); - assertThat(fetcher1.get(), is(notNullValue())); assertThat(fetcher1.get(), is(sameInstance(fetcher2.get()))); } + @Test + public void testFetchAllAsynchronousAndGetFalse() throws NoSuchFieldException, IllegalAccessException { + SDKReadinessGates gates = new SDKReadinessGates(); + SegmentCache segmentCache = Mockito.mock(SegmentCache.class); + ConcurrentMap _segmentFetchers = Maps.newConcurrentMap(); + + SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); + SegmentFetcherImp segmentFetcher = Mockito.mock(SegmentFetcherImp.class); + _segmentFetchers.put("SF", segmentFetcher); + final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, gates, segmentCache, TELEMETRY_STORAGE); + Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean()); + Mockito.when(segmentFetcher.runWhitCacheHeader()).thenReturn(false); + Mockito.when(segmentFetcher.fetchAndUpdate(Mockito.anyBoolean())).thenReturn(false); + Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean()); + + // Before executing, we'll update the map of segmentFecthers via reflection. + Field segmentFetchersForced = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers"); + segmentFetchersForced.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(segmentFetchersForced, segmentFetchersForced.getModifiers() & ~Modifier.FINAL); + + segmentFetchersForced.set(fetchers, _segmentFetchers); // 1ms + fetcher1.set(segmentFetcher); + boolean fetch = fetchers.fetchAllSynchronous(); + Assert.assertEquals(false, fetch); + } + + @Test + public void testFetchAllAsynchronousAndGetTrue() throws NoSuchFieldException, IllegalAccessException { + SDKReadinessGates gates = new SDKReadinessGates(); + SegmentCache segmentCache = Mockito.mock(SegmentCache.class); + + SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); + SegmentFetcherImp segmentFetcher = Mockito.mock(SegmentFetcherImp.class); + final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, gates, segmentCache, TELEMETRY_STORAGE); + // Before executing, we'll update the map of segmentFecthers via reflection. + Field segmentFetchersForced = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers"); + segmentFetchersForced.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(segmentFetchersForced, segmentFetchersForced.getModifiers() & ~Modifier.FINAL); + Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean()); + Mockito.when(segmentFetcher.runWhitCacheHeader()).thenReturn(true); + Mockito.when(segmentFetcher.fetchAndUpdate(Mockito.anyBoolean())).thenReturn(true); + boolean fetch = fetchers.fetchAllSynchronous(); + Assert.assertEquals(true, fetch); + } } diff --git a/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java b/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java index ff45f1ba7..0afab5ddf 100644 --- a/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java +++ b/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java @@ -50,7 +50,8 @@ private TelemetrySynchronizer getTelemetrySynchronizer(CloseableHttpClient httpC TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class); SplitCache splitCache = Mockito.mock(SplitCache.class); SegmentCache segmentCache = Mockito.mock(SegmentCacheInMemoryImpl.class); - TelemetrySynchronizer telemetrySynchronizer = new SynchronizerMemory(httpClient, URI.create(TELEMETRY_ENDPOINT), consumer, splitCache, segmentCache, telemetryRuntimeProducer); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + TelemetrySynchronizer telemetrySynchronizer = new SynchronizerMemory(httpClient, URI.create(TELEMETRY_ENDPOINT), consumer, splitCache, segmentCache, telemetryRuntimeProducer, 0l); return telemetrySynchronizer; }