From f5b39c5b1cf2fe227adafe2e6c357712fad7b111 Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Wed, 12 May 2021 18:35:35 -0300 Subject: [PATCH 1/6] Cleaning metrics and adding Telemetry config initializer --- .../java/io/split/client/ApiKeyCounter.java | 12 ++ .../io/split/client/SplitFactoryImpl.java | 5 +- .../metrics/BinarySearchLatencyTracker.java | 131 ----------------- .../io/split/client/metrics/DTOMetrics.java | 13 -- .../io/split/client/metrics/HttpMetrics.java | 135 ------------------ .../split/client/metrics/ILatencyTracker.java | 22 --- .../LogarithmicSearchLatencyTracker.java | 116 --------------- .../TelemetryConfigInitializer.java | 33 +++++ .../io/split/client/ApiKeyCounterTest.java | 16 +++ .../BinarySearchLatencyTrackerTest.java | 92 ------------ .../split/client/metrics/HttpMetricsTest.java | 48 ------- .../LogarithmicSearchLatencyTrackerTest.java | 112 --------------- .../TelemetryConfigInitializerTest.java | 20 +++ 13 files changed, 85 insertions(+), 670 deletions(-) delete mode 100644 client/src/main/java/io/split/client/metrics/BinarySearchLatencyTracker.java delete mode 100644 client/src/main/java/io/split/client/metrics/DTOMetrics.java delete mode 100644 client/src/main/java/io/split/client/metrics/HttpMetrics.java delete mode 100644 client/src/main/java/io/split/client/metrics/ILatencyTracker.java delete mode 100644 client/src/main/java/io/split/client/metrics/LogarithmicSearchLatencyTracker.java create mode 100644 client/src/main/java/io/split/telemetry/synchronizer/TelemetryConfigInitializer.java delete mode 100644 client/src/test/java/io/split/client/metrics/BinarySearchLatencyTrackerTest.java delete mode 100644 client/src/test/java/io/split/client/metrics/HttpMetricsTest.java delete mode 100644 client/src/test/java/io/split/client/metrics/LogarithmicSearchLatencyTrackerTest.java create mode 100644 client/src/test/java/io/split/telemetry/synchronizer/TelemetryConfigInitializerTest.java diff --git a/client/src/main/java/io/split/client/ApiKeyCounter.java b/client/src/main/java/io/split/client/ApiKeyCounter.java index 8c39394dd..546e9c78b 100644 --- a/client/src/main/java/io/split/client/ApiKeyCounter.java +++ b/client/src/main/java/io/split/client/ApiKeyCounter.java @@ -6,6 +6,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + public class ApiKeyCounter { private static final Logger _log = LoggerFactory.getLogger(ApiKeyCounter.class); @@ -63,4 +66,13 @@ boolean isApiKeyPresent(String apiKey) { int getCount(String apiKey) { return USED_API_KEYS.count(apiKey); } + + public Map getFactoryInstances() { + Map factoryInstances = new HashMap<>(); + for (String factory :USED_API_KEYS) { + factoryInstances.putIfAbsent(factory, new Long(getCount(factory))); + } + + return factoryInstances; + } } diff --git a/client/src/main/java/io/split/client/SplitFactoryImpl.java b/client/src/main/java/io/split/client/SplitFactoryImpl.java index 87edaefda..e3455d4da 100644 --- a/client/src/main/java/io/split/client/SplitFactoryImpl.java +++ b/client/src/main/java/io/split/client/SplitFactoryImpl.java @@ -8,7 +8,6 @@ import io.split.client.interceptors.GzipDecoderResponseInterceptor; import io.split.client.interceptors.GzipEncoderRequestInterceptor; import io.split.client.interceptors.SdkMetadataInterceptorFilter; -import io.split.client.metrics.HttpMetrics; import io.split.cache.InMemoryCacheImp; import io.split.cache.SplitCache; import io.split.engine.evaluator.Evaluator; @@ -29,6 +28,7 @@ import io.split.telemetry.storage.InMemoryTelemetryStorage; import io.split.telemetry.storage.TelemetryStorage; import io.split.telemetry.synchronizer.SynchronizerMemory; +import io.split.telemetry.synchronizer.TelemetryConfigInitializer; import io.split.telemetry.synchronizer.TelemetrySyncTask; import io.split.telemetry.synchronizer.TelemetrySynchronizer; import org.apache.hc.client5.http.auth.AuthScope; @@ -95,6 +95,7 @@ public class SplitFactoryImpl implements SplitFactory { private final TelemetrySynchronizer _telemetrySynchronizer; private final TelemetrySyncTask _telemetrySyncTask; private final long _startTime; + private final TelemetryConfigInitializer _telemetryConfigInitializer; public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyntaxException { _startTime = System.currentTimeMillis(); @@ -126,6 +127,8 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn _splitCache = new InMemoryCacheImp(); _telemetrySynchronizer = new SynchronizerMemory(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage); _telemetrySyncTask = new TelemetrySyncTask(config.get_telemetryRefreshRate(), _telemetrySynchronizer); + _telemetryConfigInitializer = new TelemetryConfigInitializer(_telemetrySynchronizer,_gates,config); + // Segments _segmentSynchronizationTaskImp = buildSegments(config); diff --git a/client/src/main/java/io/split/client/metrics/BinarySearchLatencyTracker.java b/client/src/main/java/io/split/client/metrics/BinarySearchLatencyTracker.java deleted file mode 100644 index 35efff10f..000000000 --- a/client/src/main/java/io/split/client/metrics/BinarySearchLatencyTracker.java +++ /dev/null @@ -1,131 +0,0 @@ -package io.split.client.metrics; - -import java.util.Arrays; - -/** - * Tracks latencies pero bucket of time. - * Each bucket represent a latency greater than the one before - * and each number within each bucket is a number of calls in the range. - *

- * (1) 1.00 - * (2) 1.50 - * (3) 2.25 - * (4) 3.38 - * (5) 5.06 - * (6) 7.59 - * (7) 11.39 - * (8) 17.09 - * (9) 25.63 - * (10) 38.44 - * (11) 57.67 - * (12) 86.50 - * (13) 129.75 - * (14) 194.62 - * (15) 291.93 - * (16) 437.89 - * (17) 656.84 - * (18) 985.26 - * (19) 1,477.89 - * (20) 2,216.84 - * (21) 3,325.26 - * (22) 4,987.89 - * (23) 7,481.83 - *

- * Thread-safety: This class is not thread safe. - *

- * Created by patricioe on 2/10/16. - */ -public class BinarySearchLatencyTracker implements ILatencyTracker { - - static final long[] BUCKETS = { - 1000, 1500, 2250, 3375, 5063, - 7594, 11391, 17086, 25629, 38443, - 57665, 86498, 129746, 194620, 291929, - 437894, 656841, 985261, 1477892, 2216838, - 3325257, 4987885, 7481828 - }; - - static final long MAX_LATENCY = 7481828; - - long[] latencies = new long[BUCKETS.length]; - - /** - * Increment the internal counter for the bucket this latency falls into. - * - * @param millis - */ - public void addLatencyMillis(long millis) { - int index = findIndex(millis * 1000); - latencies[index]++; - } - - /** - * Increment the internal counter for the bucket this latency falls into. - * - * @param micros - */ - public void addLatencyMicros(long micros) { - int index = findIndex(micros); - latencies[index]++; - } - - /** - * Returns the list of latencies buckets as an array. - * - * @return the list of latencies buckets as an array. - */ - public long[] getLatencies() { - return latencies; - } - - @Override - public long getLatency(int index) { - return latencies[index]; - } - - public void clear() { - latencies = new long[BUCKETS.length]; - } - - /** - * Returns the counts in the bucket this latency falls into. - * The latencies will no be updated. - * - * @param latency - * @return the bucket content for the latency. - */ - public long getBucketForLatencyMillis(long latency) { - return latencies[findIndex(latency * 1000)]; - } - - /** - * Returns the counts in the bucket this latency falls into. - * The latencies will no be updated. - * - * @param latency - * @return the bucket content for the latency. - */ - public long getBucketForLatencyMicros(long latency) { - return latencies[findIndex(latency)]; - } - - - private int findIndex(long micros) { - if (micros > MAX_LATENCY) { - return BUCKETS.length - 1; - } - - int index = Arrays.binarySearch(BUCKETS, micros); - - if (index < 0) { - - // Adjust the index based on Java Array javadocs. <0 means the value wasn't found and it's module value - // is where it should be inserted (in this case, it means the counter it applies - unless it's equals to the - // length of the array). - - index = -(index + 1); - } - return index; - } - -} diff --git a/client/src/main/java/io/split/client/metrics/DTOMetrics.java b/client/src/main/java/io/split/client/metrics/DTOMetrics.java deleted file mode 100644 index 86c793cb6..000000000 --- a/client/src/main/java/io/split/client/metrics/DTOMetrics.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.split.client.metrics; - -import io.split.client.dtos.Counter; -import io.split.client.dtos.Latency; - -/** - * Created by adilaijaz on 6/14/16. - */ -public interface DTOMetrics { - void time(Latency dto); - - void count(Counter dto); -} diff --git a/client/src/main/java/io/split/client/metrics/HttpMetrics.java b/client/src/main/java/io/split/client/metrics/HttpMetrics.java deleted file mode 100644 index 14d63b0f4..000000000 --- a/client/src/main/java/io/split/client/metrics/HttpMetrics.java +++ /dev/null @@ -1,135 +0,0 @@ -package io.split.client.metrics; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import io.split.client.dtos.Counter; -import io.split.client.dtos.Latency; -import io.split.client.utils.Utils; -import io.split.engine.metrics.Metrics; -import org.apache.hc.client5.http.classic.methods.HttpPost; -import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; -import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; -import org.apache.hc.core5.http.HttpEntity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URI; -import java.net.URISyntaxException; - -/** - * Created by adilaijaz on 9/4/15. - */ -public class HttpMetrics implements Metrics, DTOMetrics { - private static final Logger _log = LoggerFactory.getLogger(HttpMetrics.class); - - private final CloseableHttpClient _client; - private final URI _timeTarget; - private final URI _counterTarget; - - - public static HttpMetrics create(CloseableHttpClient client, URI root) throws URISyntaxException { - return new HttpMetrics(client, root); - } - - - public HttpMetrics(CloseableHttpClient client, URI root) throws URISyntaxException { - Preconditions.checkNotNull(root); - _client = Preconditions.checkNotNull(client); - _timeTarget = Utils.appendPath(root, "api/metrics/time"); - _counterTarget = Utils.appendPath(root, "api/metrics/counter"); - } - - - @Override - public void time(Latency dto) { - if (dto.latencies.isEmpty()) { - return; - } - - try { - post(_timeTarget, dto); - } catch (Throwable t) { - _log.warn("Exception when posting metric " + dto, t); - } - ; - - } - - @Override - public void count(Counter dto) { - try { - post(_counterTarget, dto); - } catch (Throwable t) { - _log.warn("Exception when posting metric " + dto, t); - } - - } - - private void post(URI uri, Object dto) { - - CloseableHttpResponse response = null; - - try { - HttpEntity entity = Utils.toJsonEntity(dto); - - HttpPost request = new HttpPost(uri); - request.setEntity(entity); - - response = _client.execute(request); - - int status = response.getCode(); - - if (status < 200 || status >= 300) { - _log.warn("Response status was: " + status); - } - - } catch (Throwable t) { - _log.warn("Exception when posting metrics: " + t.getMessage()); - if (_log.isDebugEnabled()) { - _log.debug("Reason: ", t); - } - } finally { - Utils.forceClose(response); - } - - } - - @Override - public void count(String counter, long delta) { - try { - Counter dto = new Counter(); - dto.name = counter; - dto.delta = delta; - - count(dto); - } catch (Throwable t) { - _log.info("Could not count metric " + counter, t); - } - - } - - @Override - public void time(String operation, long timeInMs) { - try { - Latency dto = new Latency(); - dto.name = operation; - dto.latencies = Lists.newArrayList(timeInMs); - - time(dto); - } catch (Throwable t) { - _log.info("Could not time metric " + operation, t); - } - } - - @VisibleForTesting - URI getTimeTarget() { - return _timeTarget; - } - - @VisibleForTesting - URI getCounterTarget() { - return _counterTarget; - } - -} diff --git a/client/src/main/java/io/split/client/metrics/ILatencyTracker.java b/client/src/main/java/io/split/client/metrics/ILatencyTracker.java deleted file mode 100644 index 282db06cb..000000000 --- a/client/src/main/java/io/split/client/metrics/ILatencyTracker.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.split.client.metrics; - -/** - * Created by patricioe on 2/10/16. - */ -public interface ILatencyTracker { - - void addLatencyMillis(long millis); - - void addLatencyMicros(long micros); - - long[] getLatencies(); - - long getLatency(int index); - - void clear(); - - long getBucketForLatencyMillis(long latency); - - long getBucketForLatencyMicros(long latency); - -} diff --git a/client/src/main/java/io/split/client/metrics/LogarithmicSearchLatencyTracker.java b/client/src/main/java/io/split/client/metrics/LogarithmicSearchLatencyTracker.java deleted file mode 100644 index 4034d8de6..000000000 --- a/client/src/main/java/io/split/client/metrics/LogarithmicSearchLatencyTracker.java +++ /dev/null @@ -1,116 +0,0 @@ -package io.split.client.metrics; - -/** - * Tracks latencies pero bucket of time. - * Each bucket represent a latency greater than the one before - * and each number within each bucket is a number of calls in the range. - *

- * (1) 1.00 - * (2) 1.50 - * (3) 2.25 - * (4) 3.38 - * (5) 5.06 - * (6) 7.59 - * (7) 11.39 - * (8) 17.09 - * (9) 25.63 - * (10) 38.44 - * (11) 57.67 - * (12) 86.50 - * (13) 129.75 - * (14) 194.62 - * (15) 291.93 - * (16) 437.89 - * (17) 656.84 - * (18) 985.26 - * (19) 1,477.89 - * (20) 2,216.84 - * (21) 3,325.26 - * (22) 4,987.89 - * (23) 7,481.83 - *

- * Thread-safety: This class is not thread safe. - *

- * Created by patricioe on 2/10/16. - */ -public class LogarithmicSearchLatencyTracker implements ILatencyTracker { - - static final int BUCKETS = 23; - private static final double LOG_10_1000_MICROS = Math.log10(1000); - private static final double LOG_10_1_5_MICROS = Math.log10(Double.valueOf("1.5").doubleValue()); - - - long[] latencies = new long[BUCKETS]; - - /** - * Increment the internal counter for the bucket this latency falls into. - * - * @param millis - */ - public void addLatencyMillis(long millis) { - int index = findIndex(millis * 1000); - latencies[index]++; - } - - /** - * Increment the internal counter for the bucket this latency falls into. - * - * @param micros - */ - public void addLatencyMicros(long micros) { - int index = findIndex(micros); - latencies[index]++; - } - - /** - * Returns the list of latencies buckets as an array. - * - * @return the list of latencies buckets as an array. - */ - public long[] getLatencies() { - return latencies; - } - - @Override - public long getLatency(int index) { - return latencies[index]; - } - - public void clear() { - latencies = new long[BUCKETS]; - } - - /** - * Returns the counts in the bucket this latency falls into. - * The latencies will no be updated. - * - * @param latency - * @return the bucket content for the latency. - */ - public long getBucketForLatencyMillis(long latency) { - return latencies[findIndex(latency * 1000)]; - } - - /** - * Returns the counts in the bucket this latency falls into. - * The latencies will no be updated. - * - * @param latency - * @return the bucket content for the latency. - */ - public long getBucketForLatencyMicros(long latency) { - return latencies[findIndex(latency)]; - } - - - private int findIndex(long micros) { - - if (micros <= 1000) return 0; - if (micros > 4987885) return 22; - - double raw = (Math.log10(micros) - LOG_10_1000_MICROS) / LOG_10_1_5_MICROS; - double rounded = Math.round(raw * 1000000d) / 1000000d; - return (int) Math.ceil(rounded); - } - -} diff --git a/client/src/main/java/io/split/telemetry/synchronizer/TelemetryConfigInitializer.java b/client/src/main/java/io/split/telemetry/synchronizer/TelemetryConfigInitializer.java new file mode 100644 index 000000000..b53dce7ae --- /dev/null +++ b/client/src/main/java/io/split/telemetry/synchronizer/TelemetryConfigInitializer.java @@ -0,0 +1,33 @@ +package io.split.telemetry.synchronizer; + +import io.split.client.ApiKeyCounter; +import io.split.client.SplitClientConfig; +import io.split.engine.SDKReadinessGates; + +import java.util.ArrayList; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class TelemetryConfigInitializer { + + private final TelemetrySynchronizer _telemetrySynchronizer; + private final SDKReadinessGates _gates; + private final SplitClientConfig _config; + + public TelemetryConfigInitializer(TelemetrySynchronizer _telemetrySynchronizer, SDKReadinessGates _gates, SplitClientConfig config) { + this._telemetrySynchronizer = checkNotNull(_telemetrySynchronizer); + this._gates = checkNotNull(_gates); + _config = checkNotNull(config); + this.waitForSDKReady(); + } + + private void waitForSDKReady() { + long initTime = System.currentTimeMillis(); + while(true) { + if (_gates.isSDKReadyNow()) { + _telemetrySynchronizer.synchronizeConfig(_config,System.currentTimeMillis()-initTime, ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(),new ArrayList<>()); + break; + } + } + } +} diff --git a/client/src/test/java/io/split/client/ApiKeyCounterTest.java b/client/src/test/java/io/split/client/ApiKeyCounterTest.java index c017127ce..bf1f21f11 100644 --- a/client/src/test/java/io/split/client/ApiKeyCounterTest.java +++ b/client/src/test/java/io/split/client/ApiKeyCounterTest.java @@ -1,8 +1,11 @@ package io.split.client; import junit.framework.TestCase; +import org.junit.Assert; import org.junit.Test; +import java.util.Map; + public class ApiKeyCounterTest extends TestCase { private static final String FIRST_KEY = "KEYNUMBER1"; @@ -47,4 +50,17 @@ public void testAddingNonExistingToken() { ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); ApiKeyCounter.getApiKeyCounterInstance().remove(SECOND_KEY); } + + @Test + public void testFactoryInstances() { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); + + Map factoryInstances = ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(); + Assert.assertEquals(2, factoryInstances.size()); + Assert.assertEquals(3, factoryInstances.get(FIRST_KEY).intValue()); + } } diff --git a/client/src/test/java/io/split/client/metrics/BinarySearchLatencyTrackerTest.java b/client/src/test/java/io/split/client/metrics/BinarySearchLatencyTrackerTest.java deleted file mode 100644 index c04fa5b49..000000000 --- a/client/src/test/java/io/split/client/metrics/BinarySearchLatencyTrackerTest.java +++ /dev/null @@ -1,92 +0,0 @@ -package io.split.client.metrics; - -import org.junit.Before; -import org.junit.Test; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.*; - -public class BinarySearchLatencyTrackerTest { - - BinarySearchLatencyTracker tracker; - - @Before - public void before() { - tracker = new BinarySearchLatencyTracker(); - } - - /** - * Latencies of <=1 millis or <= 1000 micros correspond to the first bucket (index 0) - */ - @Test - public void testLessThanFirstBucket() { - - tracker.addLatencyMicros(750); - tracker.addLatencyMicros(450); - assertThat(tracker.getLatency(0), is(equalTo(2L))); - - tracker.addLatencyMillis(0); - assertThat(tracker.getLatency(0), is(equalTo(3L))); - } - - /** - * Latencies of 1 millis or <= 1000 micros correspond to the first bucket (index 0) - */ - @Test - public void testFirstBucket() { - - tracker.addLatencyMicros(1000); - assertThat(tracker.getLatency(0), is(equalTo(1L))); - - tracker.addLatencyMillis(1); - assertThat(tracker.getLatency(0), is(equalTo(2L))); - } - - /** - * Latencies of 7481 millis or 7481828 micros correspond to the last bucket (index 22) - */ - @Test - public void testLastBucket() { - - tracker.addLatencyMicros(7481828); - assertThat(tracker.getLatency(22), is(equalTo(1L))); - - tracker.addLatencyMillis(7481); - assertThat(tracker.getLatency(22), is(equalTo(2L))); - } - - /** - * Latencies of more than 7481 millis or 7481828 micros correspond to the last bucket (index 22) - */ - @Test - public void testGreaterThanLastBucket() { - - tracker.addLatencyMicros(7481830); - assertThat(tracker.getLatency(22), is(equalTo(1L))); - - tracker.addLatencyMicros(7999999); - assertThat(tracker.getLatency(22), is(equalTo(2L))); - - tracker.addLatencyMillis(7482); - assertThat(tracker.getLatency(22), is(equalTo(3L))); - - tracker.addLatencyMillis(8000); - assertThat(tracker.getLatency(22), is(equalTo(4L))); - } - - /** - * Latencies between 11,392 and 17,086 are in the 8th bucket. - */ - @Test - public void test8ThBucket() { - - tracker.addLatencyMicros(11392); - assertThat(tracker.getLatency(7), is(equalTo(1L))); - - tracker.addLatencyMicros(17086); - assertThat(tracker.getLatency(7), is(equalTo(2L))); - - } - -} \ No newline at end of file diff --git a/client/src/test/java/io/split/client/metrics/HttpMetricsTest.java b/client/src/test/java/io/split/client/metrics/HttpMetricsTest.java deleted file mode 100644 index 71ade9dbe..000000000 --- a/client/src/test/java/io/split/client/metrics/HttpMetricsTest.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.split.client.metrics; - -import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; -import org.apache.hc.client5.http.impl.classic.HttpClients; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Test; - -import java.net.URI; -import java.net.URISyntaxException; - -public class HttpMetricsTest { - @Test - public void testDefaultURL() throws URISyntaxException { - URI rootTarget = URI.create("https://api.split.io"); - CloseableHttpClient httpClient = HttpClients.custom().build(); - HttpMetrics fetcher = HttpMetrics.create(httpClient, rootTarget); - Assert.assertThat(fetcher.getTimeTarget().toString(), Matchers.is(Matchers.equalTo("https://api.split.io/api/metrics/time"))); - Assert.assertThat(fetcher.getCounterTarget().toString(), Matchers.is(Matchers.equalTo("https://api.split.io/api/metrics/counter"))); - } - - @Test - public void testCustomURLNoPathNoBackslash() throws URISyntaxException { - URI rootTarget = URI.create("https://kubernetesturl.com"); - CloseableHttpClient httpClient = HttpClients.custom().build(); - HttpMetrics fetcher = HttpMetrics.create(httpClient, rootTarget); - Assert.assertThat(fetcher.getTimeTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/api/metrics/time"))); - Assert.assertThat(fetcher.getCounterTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/api/metrics/counter"))); - } - - @Test - public void testCustomURLAppendingPath() throws URISyntaxException { - URI rootTarget = URI.create("https://kubernetesturl.com/split/"); - CloseableHttpClient httpClient = HttpClients.custom().build(); - HttpMetrics fetcher = HttpMetrics.create(httpClient, rootTarget); - Assert.assertThat(fetcher.getTimeTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/split/api/metrics/time"))); - Assert.assertThat(fetcher.getCounterTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/split/api/metrics/counter"))); - } - - @Test - public void testCustomURLAppendingPathNoBackslash() throws URISyntaxException { - URI rootTarget = URI.create("https://kubernetesturl.com/split"); - CloseableHttpClient httpClient = HttpClients.custom().build(); - HttpMetrics fetcher = HttpMetrics.create(httpClient, rootTarget); - Assert.assertThat(fetcher.getTimeTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/split/api/metrics/time"))); - Assert.assertThat(fetcher.getCounterTarget().toString(), Matchers.is(Matchers.equalTo("https://kubernetesturl.com/split/api/metrics/counter"))); - } -} diff --git a/client/src/test/java/io/split/client/metrics/LogarithmicSearchLatencyTrackerTest.java b/client/src/test/java/io/split/client/metrics/LogarithmicSearchLatencyTrackerTest.java deleted file mode 100644 index fedcec0a9..000000000 --- a/client/src/test/java/io/split/client/metrics/LogarithmicSearchLatencyTrackerTest.java +++ /dev/null @@ -1,112 +0,0 @@ -package io.split.client.metrics; - -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -@Ignore -public class LogarithmicSearchLatencyTrackerTest { - - LogarithmicSearchLatencyTracker tracker; - - @Before - public void before() { - tracker = new LogarithmicSearchLatencyTracker(); - } - - /** - * Latencies of <=1 millis or <= 1000 micros correspond to the first bucket (index 0) - */ - @Test - public void testLessThanFirstBucket() { - - tracker.addLatencyMicros(750); - tracker.addLatencyMicros(450); - assertThat(tracker.getLatency(0), is(equalTo(2L))); - - tracker.addLatencyMillis(0); - assertThat(tracker.getLatency(0), is(equalTo(3L))); - } - - /** - * Latencies of 1 millis or <= 1000 micros correspond to the first bucket (index 0) - */ - @Test - public void testFirstBucket() { - - tracker.addLatencyMicros(1000); - assertThat(tracker.getLatency(0), is(equalTo(1L))); - - tracker.addLatencyMillis(1); - assertThat(tracker.getLatency(0), is(equalTo(2L))); - } - - /** - * Latencies of 7481 millis or 7481828 micros correspond to the last bucket (index 22) - */ - @Test - public void testLastBucket() { - - tracker.addLatencyMicros(7481828); - assertThat(tracker.getLatency(22), is(equalTo(1L))); - - tracker.addLatencyMillis(7481); - assertThat(tracker.getLatency(22), is(equalTo(2L))); - } - - /** - * Latencies of more than 7481 millis or 7481828 micros correspond to the last bucket (index 22) - */ - @Test - public void testGreaterThanLastBucket() { - - tracker.addLatencyMicros(7481830); - assertThat(tracker.getLatency(22), is(equalTo(1L))); - - tracker.addLatencyMicros(7999999); - assertThat(tracker.getLatency(22), is(equalTo(2L))); - - tracker.addLatencyMillis(7482); - assertThat(tracker.getLatency(22), is(equalTo(3L))); - - tracker.addLatencyMillis(8000); - assertThat(tracker.getLatency(22), is(equalTo(4L))); - } - - /** - * Latencies between 11,392 and 17,086 are in the 8th bucket. - */ - @Test - public void test8ThBucket() { - - tracker.addLatencyMicros(11392); - assertThat(tracker.getLatency(7), is(equalTo(1L))); - - tracker.addLatencyMicros(17086); - assertThat(tracker.getLatency(7), is(equalTo(2L))); - - tracker.addLatencyMillis(18); - assertThat(tracker.getLatency(7), is(equalTo(3L))); - } - - /** - * Latencies between 656,842 and 985,261 are in the 18th bucket. - */ - @Test - public void test17ThBucket() { - - tracker.addLatencyMicros(656842); - assertThat(tracker.getLatency(17), is(equalTo(1L))); - - tracker.addLatencyMicros(985261); - assertThat(tracker.getLatency(17), is(equalTo(2L))); - - tracker.addLatencyMillis(985); - assertThat(tracker.getLatency(17), is(equalTo(3L))); - } - -} \ No newline at end of file diff --git a/client/src/test/java/io/split/telemetry/synchronizer/TelemetryConfigInitializerTest.java b/client/src/test/java/io/split/telemetry/synchronizer/TelemetryConfigInitializerTest.java new file mode 100644 index 000000000..12fd8621d --- /dev/null +++ b/client/src/test/java/io/split/telemetry/synchronizer/TelemetryConfigInitializerTest.java @@ -0,0 +1,20 @@ +package io.split.telemetry.synchronizer; + +import io.split.client.SplitClientConfig; +import io.split.engine.SDKReadinessGates; +import org.junit.Test; +import org.mockito.Mockito; + +public class TelemetryConfigInitializerTest { + + @Test + public void testRun() { + SynchronizerMemory synchronizerMemory = Mockito.mock(SynchronizerMemory.class); + SDKReadinessGates gates = Mockito.mock(SDKReadinessGates.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + Mockito.when(gates.isSDKReadyNow()).thenReturn(true); + TelemetryConfigInitializer telemetryConfigInitializer = new TelemetryConfigInitializer(synchronizerMemory, gates, config); + Mockito.verify(synchronizerMemory, Mockito.times(1)).synchronizeConfig(Mockito.anyObject(),Mockito.anyLong(), Mockito.anyObject(), Mockito.anyObject()); + } + +} \ No newline at end of file From 2dd455cb04c817646aca780dc130adc3a33255d8 Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Tue, 18 May 2021 15:01:38 -0300 Subject: [PATCH 2/6] Making syncManager start async and telemetry init --- .../io/split/client/SplitFactoryImpl.java | 18 +++--- .../io/split/engine/SDKReadinessGates.java | 12 +--- .../split/engine/common/SyncManagerImp.java | 51 +++++++++++------ .../io/split/engine/common/Synchronizer.java | 2 +- .../split/engine/common/SynchronizerImp.java | 19 +++---- .../engine/experiments/SplitFetcher.java | 2 +- .../engine/experiments/SplitFetcherImp.java | 11 ++-- .../segments/SegmentSynchronizationTask.java | 2 +- .../SegmentSynchronizationTaskImp.java | 5 +- .../synchronizer/SynchronizerMemory.java | 12 ++-- .../TelemetryConfigInitializer.java | 33 ----------- .../io/split/client/ApiKeyCounterTest.java | 5 ++ .../split/engine/common/SyncManagerTest.java | 57 ++++++++++++++++--- .../split/engine/common/SynchronizerTest.java | 5 +- .../engine/experiments/SplitFetcherTest.java | 16 ++---- .../synchronizer/SynchronizerMemoryTest.java | 3 +- .../TelemetryConfigInitializerTest.java | 20 ------- 17 files changed, 136 insertions(+), 137 deletions(-) delete mode 100644 client/src/main/java/io/split/telemetry/synchronizer/TelemetryConfigInitializer.java delete mode 100644 client/src/test/java/io/split/telemetry/synchronizer/TelemetryConfigInitializerTest.java diff --git a/client/src/main/java/io/split/client/SplitFactoryImpl.java b/client/src/main/java/io/split/client/SplitFactoryImpl.java index e3455d4da..03aaedf3c 100644 --- a/client/src/main/java/io/split/client/SplitFactoryImpl.java +++ b/client/src/main/java/io/split/client/SplitFactoryImpl.java @@ -28,7 +28,6 @@ import io.split.telemetry.storage.InMemoryTelemetryStorage; import io.split.telemetry.storage.TelemetryStorage; import io.split.telemetry.synchronizer.SynchronizerMemory; -import io.split.telemetry.synchronizer.TelemetryConfigInitializer; import io.split.telemetry.synchronizer.TelemetrySyncTask; import io.split.telemetry.synchronizer.TelemetrySynchronizer; import org.apache.hc.client5.http.auth.AuthScope; @@ -95,7 +94,6 @@ public class SplitFactoryImpl implements SplitFactory { private final TelemetrySynchronizer _telemetrySynchronizer; private final TelemetrySyncTask _telemetrySyncTask; private final long _startTime; - private final TelemetryConfigInitializer _telemetryConfigInitializer; public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyntaxException { _startTime = System.currentTimeMillis(); @@ -125,9 +123,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn // Cache Initialisations _segmentCache = new SegmentCacheInMemoryImpl(); _splitCache = new InMemoryCacheImp(); - _telemetrySynchronizer = new SynchronizerMemory(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage); - _telemetrySyncTask = new TelemetrySyncTask(config.get_telemetryRefreshRate(), _telemetrySynchronizer); - _telemetryConfigInitializer = new TelemetryConfigInitializer(_telemetrySynchronizer,_gates,config); + _telemetrySynchronizer = new SynchronizerMemory(_httpclient, URI.create(config.get_telemetryURL()), _telemetryStorage, _splitCache, _segmentCache, _telemetryStorage, _startTime); // Segments @@ -145,9 +141,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn // EventClient _eventClient = EventClientImpl.create(_httpclient, _eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown(), _telemetryStorage); - // SyncManager - _syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(apiToken, config), _segmentCache, config.streamingRetryDelay(), _gates, _telemetryStorage); - _syncManager.start(); + _telemetrySyncTask = new TelemetrySyncTask(config.get_telemetryRefreshRate(), _telemetrySynchronizer); // Evaluator _evaluator = new EvaluatorImp(_splitCache); @@ -158,6 +152,12 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn // SplitManager _manager = new SplitManagerImpl(_splitCache, config, _gates, _telemetryStorage); + // SyncManager + _syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, + config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(apiToken, config), + _segmentCache, config.streamingRetryDelay(), _gates, _telemetryStorage, _telemetrySynchronizer,config); + _syncManager.start(); + // DestroyOnShutDown if (config.destroyOnShutDown()) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -316,7 +316,7 @@ private SplitFetcher buildSplitFetcher() throws URISyntaxException { SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorage); SplitParser splitParser = new SplitParser(_segmentSynchronizationTaskImp, _segmentCache); - return new SplitFetcherImp(splitChangeFetcher, splitParser, _gates, _splitCache, _telemetryStorage); + return new SplitFetcherImp(splitChangeFetcher, splitParser, _splitCache, _telemetryStorage); } private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config) throws URISyntaxException { diff --git a/client/src/main/java/io/split/engine/SDKReadinessGates.java b/client/src/main/java/io/split/engine/SDKReadinessGates.java index e8a4e6453..fefcc0466 100644 --- a/client/src/main/java/io/split/engine/SDKReadinessGates.java +++ b/client/src/main/java/io/split/engine/SDKReadinessGates.java @@ -35,17 +35,7 @@ public class SDKReadinessGates { * @throws InterruptedException if this operation was interrupted. */ public boolean isSDKReady(long milliseconds) throws InterruptedException { - long end = System.currentTimeMillis() + milliseconds; - long timeLeft = milliseconds; - - boolean splits = areSplitsReady(timeLeft); - if (!splits) { - return false; - } - - timeLeft = end - System.currentTimeMillis(); - - return areSegmentsReady(timeLeft); + return _internalReady.await(milliseconds, TimeUnit.MILLISECONDS); } public boolean isSDKReadyNow() { diff --git a/client/src/main/java/io/split/engine/common/SyncManagerImp.java b/client/src/main/java/io/split/engine/common/SyncManagerImp.java index c532f660c..8ac52b425 100644 --- a/client/src/main/java/io/split/engine/common/SyncManagerImp.java +++ b/client/src/main/java/io/split/engine/common/SyncManagerImp.java @@ -4,6 +4,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.split.cache.SegmentCache; import io.split.cache.SplitCache; +import io.split.client.ApiKeyCounter; +import io.split.client.SplitClientConfig; import io.split.engine.SDKReadinessGates; import io.split.engine.experiments.SplitFetcher; import io.split.engine.experiments.SplitSynchronizationTask; @@ -11,10 +13,12 @@ import io.split.telemetry.domain.StreamingEvent; import io.split.telemetry.domain.enums.StreamEventsEnum; import io.split.telemetry.storage.TelemetryRuntimeProducer; +import io.split.telemetry.synchronizer.TelemetrySynchronizer; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -34,11 +38,13 @@ public class SyncManagerImp implements SyncManager { private final AtomicBoolean _shutdown; private final LinkedBlockingQueue _incomingPushStatus; private final ExecutorService _executorService; - private final ExecutorService _pollingExecutorService; + private final ExecutorService _startExecutorService; private final SDKReadinessGates _gates; private Future _pushStatusMonitorTask; private Backoff _backoff; private final TelemetryRuntimeProducer _telemetryRuntimeProducer; + private final TelemetrySynchronizer _telemetrySynchronizer; + private final SplitClientConfig _config; @VisibleForTesting /* package private */ SyncManagerImp(boolean streamingEnabledConfig, @@ -46,7 +52,9 @@ public class SyncManagerImp implements SyncManager { PushManager pushManager, LinkedBlockingQueue pushMessages, int authRetryBackOffBase, - SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer) { + SDKReadinessGates gates, TelemetryRuntimeProducer telemetryRuntimeProducer, + TelemetrySynchronizer telemetrySynchronizer, + SplitClientConfig config) { _streamingEnabledConfig = new AtomicBoolean(streamingEnabledConfig); _synchronizer = checkNotNull(synchronizer); _pushManager = checkNotNull(pushManager); @@ -56,13 +64,15 @@ public class SyncManagerImp implements SyncManager { .setNameFormat("SPLIT-PushStatusMonitor-%d") .setDaemon(true) .build()); - _pollingExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + _startExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() .setNameFormat("SPLIT-PollingMode-%d") .setDaemon(true) .build()); _backoff = new Backoff(authRetryBackOffBase); _gates = checkNotNull(gates); _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer); + _telemetrySynchronizer = checkNotNull(telemetrySynchronizer); + _config = checkNotNull(config); } public static SyncManagerImp build(boolean streamingEnabledConfig, @@ -78,21 +88,34 @@ public static SyncManagerImp build(boolean streamingEnabledConfig, SegmentCache segmentCache, int streamingRetryDelay, SDKReadinessGates gates, - TelemetryRuntimeProducer telemetryRuntimeProducer) { + TelemetryRuntimeProducer telemetryRuntimeProducer, + TelemetrySynchronizer telemetrySynchronizer, + SplitClientConfig config) { LinkedBlockingQueue pushMessages = new LinkedBlockingQueue<>(); Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache, streamingRetryDelay, gates); PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, pushMessages, sseHttpClient, telemetryRuntimeProducer); - return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase, gates, telemetryRuntimeProducer); + return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase, gates, telemetryRuntimeProducer,telemetrySynchronizer, config); } @Override public void start() { - _synchronizer.syncAll(); - if (_streamingEnabledConfig.get()) { - startStreamingMode(); - } else { - _pollingExecutorService.submit(this::startPollingMode); - } + _startExecutorService.submit(() -> { + while(!_synchronizer.syncAll()) { + try { + Thread.currentThread().sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + } + } + _gates.sdkInternalReady(); + _telemetrySynchronizer.synchronizeConfig(_config, System.currentTimeMillis(), ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(), new ArrayList<>()); + if (_streamingEnabledConfig.get()) { + startStreamingMode(); + } else { + startPollingMode(); + } + }); } @Override @@ -112,12 +135,6 @@ private void startStreamingMode() { } private void startPollingMode() { - try { - _gates.waitUntilInternalReady(); - } catch (InterruptedException ex) { - _log.debug(ex.getMessage()); - } - _log.debug("Starting in polling mode ..."); _synchronizer.startPeriodicFetching(); _telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.get_type(), POLLING_STREAMING_EVENT, System.currentTimeMillis())); diff --git a/client/src/main/java/io/split/engine/common/Synchronizer.java b/client/src/main/java/io/split/engine/common/Synchronizer.java index ab8467a5c..9197baacf 100644 --- a/client/src/main/java/io/split/engine/common/Synchronizer.java +++ b/client/src/main/java/io/split/engine/common/Synchronizer.java @@ -1,7 +1,7 @@ package io.split.engine.common; public interface Synchronizer { - void syncAll(); + boolean syncAll(); void startPeriodicFetching(); void stopPeriodicFetching(); void refreshSplits(long targetChangeNumber); diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index d63f5417f..5ad718073 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -11,10 +11,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Preconditions.checkNotNull; @@ -54,12 +52,13 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask, } @Override - public void syncAll() { - _syncAllScheduledExecutorService.schedule(() -> { - _splitFetcher.fetchAll(true); - _segmentSynchronizationTaskImp.fetchAllSynchronous(); - _gates.sdkInternalReady(); - }, 0, TimeUnit.SECONDS); + public boolean syncAll() { + AtomicBoolean syncStatus = new AtomicBoolean(false); + if(_splitFetcher.fetchAll(true) && + _segmentSynchronizationTaskImp.fetchAllSynchronous()) { + syncStatus.set(true); + } + return syncStatus.get(); } @Override diff --git a/client/src/main/java/io/split/engine/experiments/SplitFetcher.java b/client/src/main/java/io/split/engine/experiments/SplitFetcher.java index 4266659b1..8637feefd 100644 --- a/client/src/main/java/io/split/engine/experiments/SplitFetcher.java +++ b/client/src/main/java/io/split/engine/experiments/SplitFetcher.java @@ -14,5 +14,5 @@ public interface SplitFetcher extends Runnable { * Forces a sync of ALL splits, outside of any scheduled * syncs. This method MUST NOT throw any exceptions. */ - void fetchAll(boolean addCacheHeader); + boolean fetchAll(boolean addCacheHeader); } diff --git a/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java b/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java index 0f2e91e54..1d5e60e5c 100644 --- a/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java +++ b/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java @@ -25,7 +25,6 @@ public class SplitFetcherImp implements SplitFetcher { private final SplitParser _parser; private final SplitChangeFetcher _splitChangeFetcher; private final SplitCache _splitCache; - private final SDKReadinessGates _gates; private final Object _lock = new Object(); private final TelemetryRuntimeProducer _telemetryRuntimeProducer; @@ -39,10 +38,9 @@ public class SplitFetcherImp implements SplitFetcher { * an ARCHIVED split is received, we know if we need to remove a traffic type from the multiset. */ - public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser, SDKReadinessGates gates, SplitCache splitCache, TelemetryRuntimeProducer telemetryRuntimeProducer) { + public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser, SplitCache splitCache, TelemetryRuntimeProducer telemetryRuntimeProducer) { _splitChangeFetcher = checkNotNull(splitChangeFetcher); _parser = checkNotNull(parser); - _gates = checkNotNull(gates); _splitCache = checkNotNull(splitCache); _telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer); } @@ -146,16 +144,18 @@ private void runWithoutExceptionHandling(boolean addCacheHeader) throws Interrup } } @Override - public void fetchAll(boolean addCacheHeader) { + public boolean fetchAll(boolean addCacheHeader) { + boolean fetchAllStatus = true; _log.debug("Fetch splits starting ..."); long start = _splitCache.getChangeNumber(); try { runWithoutExceptionHandling(addCacheHeader); - _gates.splitsAreReady(); } catch (InterruptedException e) { + fetchAllStatus = false; _log.warn("Interrupting split fetcher task"); Thread.currentThread().interrupt(); } catch (Throwable t) { + fetchAllStatus = false; _log.error("RefreshableSplitFetcher failed: " + t.getMessage()); if (_log.isDebugEnabled()) { _log.debug("Reason:", t); @@ -165,5 +165,6 @@ public void fetchAll(boolean addCacheHeader) { _log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber()); } } + return fetchAllStatus; } } diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java index 5ad181cf4..1a1764ed9 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTask.java @@ -33,5 +33,5 @@ public interface SegmentSynchronizationTask extends Runnable { /** * fetch every Segment Synchronous */ - void fetchAllSynchronous(); + boolean fetchAllSynchronous(); } diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java index d3ed0dbfe..c6433c7ec 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java @@ -165,7 +165,8 @@ public void fetchAll(boolean addCacheHeader) { } @Override - public void fetchAllSynchronous() { + public boolean fetchAllSynchronous() { + AtomicBoolean fetchAllStatus = new AtomicBoolean(true); _segmentFetchers .entrySet() .stream().map(e -> _scheduledExecutorService.submit(e.getValue()::runWhitCacheHeader)) @@ -174,7 +175,9 @@ public void fetchAllSynchronous() { try { future.get(); } catch (Exception ex) { + fetchAllStatus.set(false); _log.error(ex.getMessage()); }}); + return fetchAllStatus.get(); } } diff --git a/client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java b/client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java index 1ffeb1178..bf075c4ed 100644 --- a/client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java +++ b/client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java @@ -32,18 +32,20 @@ public class SynchronizerMemory implements TelemetrySynchronizer{ private TelemetryStorageConsumer _teleTelemetryStorageConsumer; private SplitCache _splitCache; private SegmentCache _segmentCache; + private final long _initStartTime; public SynchronizerMemory(CloseableHttpClient client, URI telemetryRootEndpoint, TelemetryStorageConsumer telemetryStorageConsumer, SplitCache splitCache, - SegmentCache segmentCache, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException { + SegmentCache segmentCache, TelemetryRuntimeProducer telemetryRuntimeProducer, long initStartTime) throws URISyntaxException { _httpHttpTelemetryMemorySender = HttpTelemetryMemorySender.create(client, telemetryRootEndpoint, telemetryRuntimeProducer); _teleTelemetryStorageConsumer = telemetryStorageConsumer; _splitCache = splitCache; _segmentCache = segmentCache; + _initStartTime = initStartTime; } @Override - public void synchronizeConfig(SplitClientConfig config, long timeUntilReady, Map factoryInstances, List tags) { - _httpHttpTelemetryMemorySender.postConfig(generateConfig(config, timeUntilReady, factoryInstances, tags)); + public void synchronizeConfig(SplitClientConfig config, long readyTimeStamp, Map factoryInstances, List tags) { + _httpHttpTelemetryMemorySender.postConfig(generateConfig(config, readyTimeStamp, factoryInstances, tags)); } @Override @@ -74,7 +76,7 @@ private Stats generateStats() throws Exception { return stats; } - private Config generateConfig(SplitClientConfig splitClientConfig, long timeUntilReady, Map factoryInstances, List tags) { + private Config generateConfig(SplitClientConfig splitClientConfig, long readyTimestamp, Map factoryInstances, List tags) { Config config = new Config(); Rates rates = new Rates(); URLOverrides urlOverrides = new URLOverrides(); @@ -110,7 +112,7 @@ private Config generateConfig(SplitClientConfig splitClientConfig, long timeUnti config.set_eventsQueueSize(splitClientConfig.eventsQueueSize()); config.set_tags(getListMaxSize(tags)); config.set_activeFactories(factoryInstances.size()); - config.set_timeUntilReady(timeUntilReady); + config.set_timeUntilReady(readyTimestamp - _initStartTime); config.set_rates(rates); config.set_urlOverrides(urlOverrides); config.set_streamingEnabled(splitClientConfig.streamingEnabled()); diff --git a/client/src/main/java/io/split/telemetry/synchronizer/TelemetryConfigInitializer.java b/client/src/main/java/io/split/telemetry/synchronizer/TelemetryConfigInitializer.java deleted file mode 100644 index b53dce7ae..000000000 --- a/client/src/main/java/io/split/telemetry/synchronizer/TelemetryConfigInitializer.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.split.telemetry.synchronizer; - -import io.split.client.ApiKeyCounter; -import io.split.client.SplitClientConfig; -import io.split.engine.SDKReadinessGates; - -import java.util.ArrayList; - -import static com.google.common.base.Preconditions.checkNotNull; - -public class TelemetryConfigInitializer { - - private final TelemetrySynchronizer _telemetrySynchronizer; - private final SDKReadinessGates _gates; - private final SplitClientConfig _config; - - public TelemetryConfigInitializer(TelemetrySynchronizer _telemetrySynchronizer, SDKReadinessGates _gates, SplitClientConfig config) { - this._telemetrySynchronizer = checkNotNull(_telemetrySynchronizer); - this._gates = checkNotNull(_gates); - _config = checkNotNull(config); - this.waitForSDKReady(); - } - - private void waitForSDKReady() { - long initTime = System.currentTimeMillis(); - while(true) { - if (_gates.isSDKReadyNow()) { - _telemetrySynchronizer.synchronizeConfig(_config,System.currentTimeMillis()-initTime, ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(),new ArrayList<>()); - break; - } - } - } -} diff --git a/client/src/test/java/io/split/client/ApiKeyCounterTest.java b/client/src/test/java/io/split/client/ApiKeyCounterTest.java index bf1f21f11..64304f462 100644 --- a/client/src/test/java/io/split/client/ApiKeyCounterTest.java +++ b/client/src/test/java/io/split/client/ApiKeyCounterTest.java @@ -62,5 +62,10 @@ public void testFactoryInstances() { Map factoryInstances = ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(); Assert.assertEquals(2, factoryInstances.size()); Assert.assertEquals(3, factoryInstances.get(FIRST_KEY).intValue()); + ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().remove(SECOND_KEY); + ApiKeyCounter.getApiKeyCounterInstance().remove(SECOND_KEY); } } diff --git a/client/src/test/java/io/split/engine/common/SyncManagerTest.java b/client/src/test/java/io/split/engine/common/SyncManagerTest.java index d2204ec18..fafe0874c 100644 --- a/client/src/test/java/io/split/engine/common/SyncManagerTest.java +++ b/client/src/test/java/io/split/engine/common/SyncManagerTest.java @@ -1,8 +1,10 @@ package io.split.engine.common; +import io.split.client.SplitClientConfig; import io.split.engine.SDKReadinessGates; import io.split.telemetry.storage.InMemoryTelemetryStorage; import io.split.telemetry.storage.TelemetryStorage; +import io.split.telemetry.synchronizer.TelemetrySynchronizer; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -26,7 +28,10 @@ public void setUp() { public void startWithStreamingFalseShouldStartPolling() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); _gates.sdkInternalReady(); - SyncManagerImp syncManager = new SyncManagerImp(false, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + Mockito.when(_synchronizer.syncAll()).thenReturn(true); + SyncManagerImp syncManager = new SyncManagerImp(false, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); syncManager.start(); Thread.sleep(1000); Mockito.verify(_synchronizer, Mockito.times(1)).startPeriodicFetching(); @@ -35,10 +40,14 @@ public void startWithStreamingFalseShouldStartPolling() throws InterruptedExcept } @Test - public void startWithStreamingTrueShouldStartSyncAll() { + public void startWithStreamingTrueShouldStartSyncAll() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); - SyncManager sm = new SyncManagerImp(true, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + Mockito.when(_synchronizer.syncAll()).thenReturn(true); + SyncManager sm = new SyncManagerImp(true, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); sm.start(); + Thread.sleep(1000); Mockito.verify(_synchronizer, Mockito.times(0)).startPeriodicFetching(); Mockito.verify(_synchronizer, Mockito.times(1)).syncAll(); Mockito.verify(_pushManager, Mockito.times(1)).start(); @@ -48,7 +57,10 @@ public void startWithStreamingTrueShouldStartSyncAll() { public void onStreamingAvailable() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); Thread t = new Thread(syncManager::incomingPushStatusHandler); t.start(); messsages.offer(PushManager.Status.STREAMING_READY); @@ -63,7 +75,9 @@ public void onStreamingAvailable() throws InterruptedException { public void onStreamingDisabled() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); Thread t = new Thread(syncManager::incomingPushStatusHandler); t.start(); messsages.offer(PushManager.Status.STREAMING_DOWN); @@ -78,7 +92,9 @@ public void onStreamingDisabled() throws InterruptedException { public void onStreamingShutdown() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); Thread t = new Thread(syncManager::incomingPushStatusHandler); t.start(); messsages.offer(PushManager.Status.STREAMING_OFF); @@ -91,7 +107,9 @@ public void onStreamingShutdown() throws InterruptedException { public void onConnected() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); Thread t = new Thread(syncManager::incomingPushStatusHandler); t.start(); messsages.offer(PushManager.Status.STREAMING_READY); @@ -105,7 +123,9 @@ public void onConnected() throws InterruptedException { public void onDisconnect() throws InterruptedException { TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); Thread t = new Thread(syncManager::incomingPushStatusHandler); t.start(); messsages.offer(PushManager.Status.STREAMING_OFF); @@ -118,7 +138,10 @@ public void onDisconnect() throws InterruptedException { public void onDisconnectAndReconnect() throws InterruptedException { // Check with mauro. reconnect should call pushManager.start again, right? TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); LinkedBlockingQueue messsages = new LinkedBlockingQueue<>(); - SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + Mockito.when(_synchronizer.syncAll()).thenReturn(true); + SyncManagerImp syncManager = new SyncManagerImp(true, _synchronizer, _pushManager, messsages, BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); syncManager.start(); messsages.offer(PushManager.Status.STREAMING_BACKOFF); Thread.sleep(1200); @@ -126,4 +149,20 @@ public void onDisconnectAndReconnect() throws InterruptedException { // Check wi Mockito.verify(_synchronizer, Mockito.times(1)).syncAll(); Mockito.verify(_pushManager, Mockito.times(2)).start(); } + + @Test + public void syncAllRetryThenShouldStartPolling() throws InterruptedException { + TelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetrySynchronizer.class); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + Mockito.when(_synchronizer.syncAll()).thenReturn(false).thenReturn(true); + SyncManagerImp syncManager = new SyncManagerImp(false, _synchronizer, _pushManager, new LinkedBlockingQueue<>(), BACKOFF_BASE, _gates, telemetryStorage, telemetrySynchronizer, config); + syncManager.start(); + Thread.sleep(2000); + Mockito.verify(_synchronizer, Mockito.times(1)).startPeriodicFetching(); + Mockito.verify(_synchronizer, Mockito.times(2)).syncAll(); + Mockito.verify(_pushManager, Mockito.times(0)).start(); + Mockito.verify(_gates, Mockito.times(1)).sdkInternalReady(); + Mockito.verify(telemetrySynchronizer, Mockito.times(1)).synchronizeConfig(Mockito.anyObject(), Mockito.anyLong(), Mockito.anyObject(), Mockito.anyObject()); + } } diff --git a/client/src/test/java/io/split/engine/common/SynchronizerTest.java b/client/src/test/java/io/split/engine/common/SynchronizerTest.java index 53e4175d1..bb13bcccd 100644 --- a/client/src/test/java/io/split/engine/common/SynchronizerTest.java +++ b/client/src/test/java/io/split/engine/common/SynchronizerTest.java @@ -34,12 +34,13 @@ public void beforeMethod() { @Test public void syncAll() throws InterruptedException { + Mockito.when(_splitFetcher.fetchAll(true)).thenReturn(true); + Mockito.when(_segmentFetcher.fetchAllSynchronous()).thenReturn(true); _synchronizer.syncAll(); - Thread.sleep(100); + Thread.sleep(1000); Mockito.verify(_splitFetcher, Mockito.times(1)).fetchAll(true); Mockito.verify(_segmentFetcher, Mockito.times(1)).fetchAllSynchronous(); - Mockito.verify(_gates, Mockito.times(1)).sdkInternalReady(); } @Test diff --git a/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java b/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java index bfd720216..595b0669d 100644 --- a/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java +++ b/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java @@ -61,7 +61,7 @@ private void works(long startingChangeNumber) throws InterruptedException { SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher,1,10, gates, segmentCache, TELEMETRY_STORAGE); SplitCache cache = new InMemoryCacheImp(startingChangeNumber); - SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), gates, cache, TELEMETRY_STORAGE); + SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), cache, TELEMETRY_STORAGE); // execute the fetcher for a little bit. executeWaitAndTerminate(fetcher, 1, 3, TimeUnit.SECONDS); @@ -80,9 +80,8 @@ private void works(long startingChangeNumber) throws InterruptedException { ParsedSplit expected = ParsedSplit.createParsedSplitForTests("" + cache.getChangeNumber(), (int) cache.getChangeNumber(), false, Treatments.OFF, expectedListOfMatcherAndSplits, null, cache.getChangeNumber(), 1); ParsedSplit actual = cache.get("" + cache.getChangeNumber()); - + Thread.sleep(1000); assertThat(actual, is(equalTo(expected))); - assertThat(gates.areSplitsReady(0), is(equalTo(true))); } @Test @@ -135,7 +134,7 @@ public void when_parser_fails_we_remove_the_experiment() throws InterruptedExcep SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, gates, segmentCache, TELEMETRY_STORAGE); segmentSynchronizationTask.startPeriodicFetching(); SplitCache cache = new InMemoryCacheImp(-1); - SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), gates, cache, TELEMETRY_STORAGE); + SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), cache, TELEMETRY_STORAGE); // execute the fetcher for a little bit. executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS); @@ -157,7 +156,7 @@ public void if_there_is_a_problem_talking_to_split_change_count_down_latch_is_no SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class); SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, gates, segmentCache, TELEMETRY_STORAGE); segmentSynchronizationTask.startPeriodicFetching(); - SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), gates, cache, TELEMETRY_STORAGE); + SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), cache, TELEMETRY_STORAGE); // execute the fetcher for a little bit. executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS); @@ -199,7 +198,7 @@ public void works_with_user_defined_segments() throws Exception { when(segmentChangeFetcher.fetch(anyString(), anyLong(), anyBoolean())).thenReturn(segmentChange); SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, gates, segmentCache, TELEMETRY_STORAGE); segmentSynchronizationTask.startPeriodicFetching(); - SplitFetcherImp fetcher = new SplitFetcherImp(experimentChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), gates, cache, TELEMETRY_STORAGE); + SplitFetcherImp fetcher = new SplitFetcherImp(experimentChangeFetcher, new SplitParser(segmentSynchronizationTask, segmentCache), cache, TELEMETRY_STORAGE); // execute the fetcher for a little bit. executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS); @@ -212,11 +211,6 @@ public void works_with_user_defined_segments() throws Exception { assertThat("Asking for " + i + " " + cache.getAll(), cache.get("" + i), is(not(nullValue()))); assertThat(cache.get("" + i).killed(), is(true)); } - - assertThat(gates.areSplitsReady(0), is(equalTo(true))); - assertThat(gates.isSegmentRegistered(segmentName), is(equalTo(true))); - assertThat(gates.areSegmentsReady(100), is(equalTo(true))); - assertThat(gates.isSDKReady(0), is(equalTo(true))); } private SegmentChange getSegmentChange(long since, long till, String segmentName){ diff --git a/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java b/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java index ff45f1ba7..0afab5ddf 100644 --- a/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java +++ b/client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java @@ -50,7 +50,8 @@ private TelemetrySynchronizer getTelemetrySynchronizer(CloseableHttpClient httpC TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(TelemetryRuntimeProducer.class); SplitCache splitCache = Mockito.mock(SplitCache.class); SegmentCache segmentCache = Mockito.mock(SegmentCacheInMemoryImpl.class); - TelemetrySynchronizer telemetrySynchronizer = new SynchronizerMemory(httpClient, URI.create(TELEMETRY_ENDPOINT), consumer, splitCache, segmentCache, telemetryRuntimeProducer); + SplitClientConfig config = Mockito.mock(SplitClientConfig.class); + TelemetrySynchronizer telemetrySynchronizer = new SynchronizerMemory(httpClient, URI.create(TELEMETRY_ENDPOINT), consumer, splitCache, segmentCache, telemetryRuntimeProducer, 0l); return telemetrySynchronizer; } diff --git a/client/src/test/java/io/split/telemetry/synchronizer/TelemetryConfigInitializerTest.java b/client/src/test/java/io/split/telemetry/synchronizer/TelemetryConfigInitializerTest.java deleted file mode 100644 index 12fd8621d..000000000 --- a/client/src/test/java/io/split/telemetry/synchronizer/TelemetryConfigInitializerTest.java +++ /dev/null @@ -1,20 +0,0 @@ -package io.split.telemetry.synchronizer; - -import io.split.client.SplitClientConfig; -import io.split.engine.SDKReadinessGates; -import org.junit.Test; -import org.mockito.Mockito; - -public class TelemetryConfigInitializerTest { - - @Test - public void testRun() { - SynchronizerMemory synchronizerMemory = Mockito.mock(SynchronizerMemory.class); - SDKReadinessGates gates = Mockito.mock(SDKReadinessGates.class); - SplitClientConfig config = Mockito.mock(SplitClientConfig.class); - Mockito.when(gates.isSDKReadyNow()).thenReturn(true); - TelemetryConfigInitializer telemetryConfigInitializer = new TelemetryConfigInitializer(synchronizerMemory, gates, config); - Mockito.verify(synchronizerMemory, Mockito.times(1)).synchronizeConfig(Mockito.anyObject(),Mockito.anyLong(), Mockito.anyObject(), Mockito.anyObject()); - } - -} \ No newline at end of file From bed5bfd0d6b7b464fbd482c6758526808e432137 Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Wed, 19 May 2021 17:34:50 -0300 Subject: [PATCH 3/6] Fix PR comments --- .../java/io/split/client/ApiKeyCounter.java | 5 + .../split/client/LocalhostSplitFactory.java | 2 +- .../java/io/split/client/SplitClientImpl.java | 7 +- .../io/split/client/SplitManagerImpl.java | 11 +- .../io/split/engine/SDKReadinessGates.java | 127 +----------------- .../engine/experiments/SplitFetcherImp.java | 7 +- .../engine/segments/SegmentFetcherImp.java | 4 - .../SegmentSynchronizationTaskImp.java | 6 - .../io/split/client/ApiKeyCounterTest.java | 112 +++++++++------ .../io/split/client/SplitClientImplTest.java | 10 +- .../io/split/client/SplitManagerImplTest.java | 10 +- .../engine/experiments/SplitFetcherTest.java | 1 - .../segments/SegmentFetcherImpTest.java | 4 - .../SegmentSynchronizationTaskImpTest.java | 2 - 14 files changed, 103 insertions(+), 205 deletions(-) diff --git a/client/src/main/java/io/split/client/ApiKeyCounter.java b/client/src/main/java/io/split/client/ApiKeyCounter.java index 546e9c78b..426ade17b 100644 --- a/client/src/main/java/io/split/client/ApiKeyCounter.java +++ b/client/src/main/java/io/split/client/ApiKeyCounter.java @@ -75,4 +75,9 @@ public Map getFactoryInstances() { return factoryInstances; } + + @VisibleForTesting + void clearApiKeys() { + USED_API_KEYS.clear(); + } } diff --git a/client/src/main/java/io/split/client/LocalhostSplitFactory.java b/client/src/main/java/io/split/client/LocalhostSplitFactory.java index c716ae3ce..3a2f0a14b 100644 --- a/client/src/main/java/io/split/client/LocalhostSplitFactory.java +++ b/client/src/main/java/io/split/client/LocalhostSplitFactory.java @@ -56,9 +56,9 @@ public LocalhostSplitFactory(String directory, String file) throws IOException { SplitCache splitCache = new InMemoryCacheImp(); SDKReadinessGates sdkReadinessGates = new SDKReadinessGates(); - sdkReadinessGates.splitsAreReady(); _cacheUpdaterService = new CacheUpdaterService(splitCache); _cacheUpdaterService.updateCache(splitAndKeyToTreatment); + sdkReadinessGates.sdkInternalReady(); _client = new SplitClientImpl(this, splitCache, new ImpressionsManager.NoOpImpressionsManager(), new NoopEventClient(), SplitClientConfig.builder().setBlockUntilReadyTimeout(1).build(), sdkReadinessGates, new EvaluatorImp(splitCache), new NoopTelemetryStorage(), new NoopTelemetryStorage()); diff --git a/client/src/main/java/io/split/client/SplitClientImpl.java b/client/src/main/java/io/split/client/SplitClientImpl.java index 050acb516..9c994030d 100644 --- a/client/src/main/java/io/split/client/SplitClientImpl.java +++ b/client/src/main/java/io/split/client/SplitClientImpl.java @@ -15,7 +15,6 @@ import io.split.inputValidation.KeyValidator; import io.split.inputValidation.SplitNameValidator; import io.split.inputValidation.TrafficTypeValidator; -import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum; import io.split.telemetry.domain.enums.MethodEnum; import io.split.telemetry.storage.TelemetryConfigProducer; import io.split.telemetry.storage.TelemetryEvaluationProducer; @@ -138,7 +137,7 @@ public void blockUntilReady() throws TimeoutException, InterruptedException { if (_config.blockUntilReady() <= 0) { throw new IllegalArgumentException("setBlockUntilReadyTimeout must be positive but in config was: " + _config.blockUntilReady()); } - if (!_gates.isSDKReady(_config.blockUntilReady())) { + if (!_gates.waitUntilInternalReady(_config.blockUntilReady())) { throw new TimeoutException("SDK was not ready in " + _config.blockUntilReady()+ " milliseconds"); } _log.debug(String.format("Split SDK ready in %d ms", (System.currentTimeMillis() - startTime))); @@ -188,7 +187,7 @@ private boolean track(Event event) { private SplitResult getTreatmentWithConfigInternal(String method, String matchingKey, String bucketingKey, String split, Map attributes, MethodEnum methodEnum) { long initTime = System.currentTimeMillis(); try { - if(!_gates.isSDKReadyNow()){ + if(!_gates.isSDKReady()){ _log.warn(method + ": the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method"); _telemetryConfigProducer.recordNonReadyUsage(); } @@ -215,7 +214,7 @@ private SplitResult getTreatmentWithConfigInternal(String method, String matchin EvaluatorImp.TreatmentLabelAndChangeNumber result = _evaluator.evaluateFeature(matchingKey, bucketingKey, split, attributes); - if (result.treatment.equals(Treatments.CONTROL) && result.label.equals(Labels.DEFINITION_NOT_FOUND) && _gates.isSDKReadyNow()) { + if (result.treatment.equals(Treatments.CONTROL) && result.label.equals(Labels.DEFINITION_NOT_FOUND) && _gates.isSDKReady()) { _log.warn( "getTreatment: you passed \"" + split + "\" that does not exist in this environment, " + "please double check what Splits exist in the web console."); diff --git a/client/src/main/java/io/split/client/SplitManagerImpl.java b/client/src/main/java/io/split/client/SplitManagerImpl.java index 4d41ac36e..0edd88aaa 100644 --- a/client/src/main/java/io/split/client/SplitManagerImpl.java +++ b/client/src/main/java/io/split/client/SplitManagerImpl.java @@ -6,7 +6,6 @@ import io.split.cache.SplitCache; import io.split.engine.experiments.ParsedSplit; import io.split.inputValidation.SplitNameValidator; -import io.split.telemetry.domain.enums.MethodEnum; import io.split.telemetry.storage.TelemetryConfigProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +42,7 @@ public SplitManagerImpl(SplitCache splitCache, @Override public List splits() { - if (!_gates.isSDKReadyNow()) { { + if (!_gates.isSDKReady()) { { _log.warn("splits: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method"); _telemetryConfigProducer.recordNonReadyUsage(); }} @@ -58,7 +57,7 @@ public List splits() { @Override public SplitView split(String featureName) { - if (!_gates.isSDKReadyNow()) { { + if (!_gates.isSDKReady()) { { _log.warn("split: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method"); _telemetryConfigProducer.recordNonReadyUsage(); }} @@ -70,7 +69,7 @@ public SplitView split(String featureName) { ParsedSplit parsedSplit = _splitCache.get(featureName); if (parsedSplit == null) { - if (_gates.isSDKReadyNow()) { + if (_gates.isSDKReady()) { _log.warn("split: you passed \"" + featureName + "\" that does not exist in this environment, " + "please double check what Splits exist in the web console."); } @@ -82,7 +81,7 @@ public SplitView split(String featureName) { @Override public List splitNames() { - if (!_gates.isSDKReadyNow()) { { + if (!_gates.isSDKReady()) { { _log.warn("splitNames: the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method"); _telemetryConfigProducer.recordNonReadyUsage(); }} @@ -100,7 +99,7 @@ public void blockUntilReady() throws TimeoutException, InterruptedException { if (_config.blockUntilReady() <= 0) { throw new IllegalArgumentException("setBlockUntilReadyTimeout must be positive but in config was: " + _config.blockUntilReady()); } - if (!_gates.isSDKReady(_config.blockUntilReady())) { + if (!_gates.waitUntilInternalReady(_config.blockUntilReady())) { _telemetryConfigProducer.recordBURTimeout(); throw new TimeoutException("SDK was not ready in " + _config.blockUntilReady()+ " milliseconds"); } diff --git a/client/src/main/java/io/split/engine/SDKReadinessGates.java b/client/src/main/java/io/split/engine/SDKReadinessGates.java index fefcc0466..10a18fbba 100644 --- a/client/src/main/java/io/split/engine/SDKReadinessGates.java +++ b/client/src/main/java/io/split/engine/SDKReadinessGates.java @@ -15,9 +15,7 @@ public class SDKReadinessGates { private static final Logger _log = LoggerFactory.getLogger(SDKReadinessGates.class); - private final CountDownLatch _splitsAreReady = new CountDownLatch(1); private final CountDownLatch _internalReady = new CountDownLatch(1); - private final ConcurrentMap _segmentsAreReady = new ConcurrentHashMap<>(); /** * Returns true if the SDK is ready. The SDK is ready when: @@ -34,134 +32,15 @@ public class SDKReadinessGates { * @return true if the sdk is ready, false otherwise. * @throws InterruptedException if this operation was interrupted. */ - public boolean isSDKReady(long milliseconds) throws InterruptedException { + public boolean waitUntilInternalReady(long milliseconds) throws InterruptedException { return _internalReady.await(milliseconds, TimeUnit.MILLISECONDS); } - public boolean isSDKReadyNow() { - try { - return isSDKReady(0); - } catch (InterruptedException e) { - return false; - } - } - - /** - * Records that the SDK split initialization is done. - * This operation is atomic and idempotent. Repeated invocations - * will not have any impact on the state. - */ - public void splitsAreReady() { - long originalCount = _splitsAreReady.getCount(); - _splitsAreReady.countDown(); - if (originalCount > 0L) { - _log.info("splits are ready"); - } - } - - /** - * Registers a segment that the SDK should download before it is ready. - * This method should be called right after the first successful download - * of split definitions. - *

- * Note that if this method is called in subsequent fetches of splits, - * it will return false; meaning any segments used in new splits - * will not be able to block the SDK from being marked as complete. - * - * @param segmentName the segment to register - * @return true if the segments were registered, false otherwise. - * @throws InterruptedException - */ - public boolean registerSegment(String segmentName) throws InterruptedException { - if (segmentName == null || segmentName.isEmpty() || areSplitsReady(0L)) { - return false; - } - - _segmentsAreReady.putIfAbsent(segmentName, new CountDownLatch(1)); - _log.info("Registered segment: " + segmentName); - return true; - } - - /** - * Records that the SDK segment initialization for this segment is done. - * This operation is atomic and idempotent. Repeated invocations - * will not have any impact on the state. - */ - public void segmentIsReady(String segmentName) { - CountDownLatch cdl = _segmentsAreReady.get(segmentName); - if (cdl == null) { - return; - } - - long originalCount = cdl.getCount(); - - cdl.countDown(); - - if (originalCount > 0L) { - _log.info(segmentName + " segment is ready"); - } - } - - public boolean isSegmentRegistered(String segmentName) { - return _segmentsAreReady.get(segmentName) != null; - } - - /** - * Returns true if the SDK is ready w.r.t segments. In other words, this method returns true if: - *

    - *
  1. The SDK has fetched segment definitions the first time.
  2. - *
- *

- * This operation will block until the SDK is ready or 'milliseconds' have passed. If the milliseconds - * are less than or equal to zero, the operation will not block and return immediately - * - * @param milliseconds time to wait for an answer. if the value is zero or negative, we will not - * block for an answer. - * @return true if the sdk is ready w.r.t splits, false otherwise. - * @throws InterruptedException if this operation was interrupted. - */ - public boolean areSegmentsReady(long milliseconds) throws InterruptedException { - long end = System.currentTimeMillis() + milliseconds; - long timeLeft = milliseconds; - - for (Map.Entry entry : _segmentsAreReady.entrySet()) { - String segmentName = entry.getKey(); - CountDownLatch cdl = entry.getValue(); - - if (!cdl.await(timeLeft, TimeUnit.MILLISECONDS)) { - _log.error(segmentName + " is not ready yet"); - return false; - } - - timeLeft = end - System.currentTimeMillis(); - } - - return true; - } - - /** - * Returns true if the SDK is ready w.r.t splits. In other words, this method returns true if: - *

    - *
  1. The SDK has fetched Split definitions the first time.
  2. - *
- *

- * This operation will block until the SDK is ready or 'milliseconds' have passed. If the milliseconds - * are less than or equal to zero, the operation will not block and return immediately - * - * @param milliseconds time to wait for an answer. if the value is zero or negative, we will not - * block for an answer. - * @return true if the sdk is ready w.r.t splits, false otherwise. - * @throws InterruptedException if this operation was interrupted. - */ - public boolean areSplitsReady(long milliseconds) throws InterruptedException { - return _splitsAreReady.await(milliseconds, TimeUnit.MILLISECONDS); + public boolean isSDKReady() { + return _internalReady.getCount() == 0; } public void sdkInternalReady() { _internalReady.countDown(); } - - public void waitUntilInternalReady() throws InterruptedException { - _internalReady.await(); - } } diff --git a/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java b/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java index 1d5e60e5c..19adb994d 100644 --- a/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java +++ b/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java @@ -145,26 +145,25 @@ private void runWithoutExceptionHandling(boolean addCacheHeader) throws Interrup } @Override public boolean fetchAll(boolean addCacheHeader) { - boolean fetchAllStatus = true; _log.debug("Fetch splits starting ..."); long start = _splitCache.getChangeNumber(); try { runWithoutExceptionHandling(addCacheHeader); } catch (InterruptedException e) { - fetchAllStatus = false; _log.warn("Interrupting split fetcher task"); Thread.currentThread().interrupt(); + return false; } catch (Throwable t) { - fetchAllStatus = false; _log.error("RefreshableSplitFetcher failed: " + t.getMessage()); if (_log.isDebugEnabled()) { _log.debug("Reason:", t); } + return false; } finally { if (_log.isDebugEnabled()) { _log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber()); } } - return fetchAllStatus; + return true; } } diff --git a/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java b/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java index 6492c5740..3bdab0125 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java @@ -142,11 +142,7 @@ public void runWhitCacheHeader(){ private void fetchAndUpdate(boolean addCacheHeader) { try { // Do this again in case the previous call errored out. - _gates.registerSegment(_segmentName); callLoopRun(true, addCacheHeader); - - _gates.segmentIsReady(_segmentName); - } catch (Throwable t) { _log.error("RefreshableSegmentFetcher failed: " + t.getMessage()); if (_log.isDebugEnabled()) { diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java index c6433c7ec..c40915335 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java @@ -82,12 +82,6 @@ public void initializeSegment(String segmentName) { return; } - try { - _gates.registerSegment(segmentName); - } catch (InterruptedException e) { - _log.error("Unable to register segment " + segmentName); - } - segment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _gates, _segmentCache, _telemetryRuntimeProducer); if (_running.get()) { diff --git a/client/src/test/java/io/split/client/ApiKeyCounterTest.java b/client/src/test/java/io/split/client/ApiKeyCounterTest.java index 64304f462..c0dde2379 100644 --- a/client/src/test/java/io/split/client/ApiKeyCounterTest.java +++ b/client/src/test/java/io/split/client/ApiKeyCounterTest.java @@ -1,6 +1,7 @@ package io.split.client; import junit.framework.TestCase; +import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -11,61 +12,94 @@ public class ApiKeyCounterTest extends TestCase { private static final String FIRST_KEY = "KEYNUMBER1"; private static final String SECOND_KEY = "KEYNUMBER2"; - @Test - public void testAddingNewToken() { - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); + @After + public synchronized void clearApiKeys() { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); + @Test + public synchronized void testAddingNewToken() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } } @Test - public void testAddingExistingToken() { - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + public synchronized void testAddingExistingToken() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); - assertEquals(2, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); + assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); + assertEquals(2, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } } @Test - public void testRemovingToken() { - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); + public synchronized void testRemovingToken() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); - assertFalse(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); - assertEquals(0, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); + assertFalse(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); + assertEquals(0, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } } @Test - public void testAddingNonExistingToken() { - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); + public synchronized void testAddingNonExistingToken() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); - assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); - assertEquals(1, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); - assertEquals(1, ApiKeyCounter.getApiKeyCounterInstance().getCount(SECOND_KEY)); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(SECOND_KEY); + assertTrue(ApiKeyCounter.getApiKeyCounterInstance().isApiKeyPresent(FIRST_KEY)); + assertEquals(1, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); + assertEquals(1, ApiKeyCounter.getApiKeyCounterInstance().getCount(SECOND_KEY)); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } } @Test - public void testFactoryInstances() { - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); - ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); + public synchronized void testFactoryInstances() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(SECOND_KEY); - Map factoryInstances = ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(); - Assert.assertEquals(2, factoryInstances.size()); - Assert.assertEquals(3, factoryInstances.get(FIRST_KEY).intValue()); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(FIRST_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(SECOND_KEY); - ApiKeyCounter.getApiKeyCounterInstance().remove(SECOND_KEY); + Map factoryInstances = ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(); + Assert.assertEquals(2, factoryInstances.size()); + Assert.assertEquals(3, factoryInstances.get(FIRST_KEY).intValue()); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } + } + + @Test + public synchronized void testClearApiKey() { + try { + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + Assert.assertEquals(0, ApiKeyCounter.getApiKeyCounterInstance().getCount(FIRST_KEY)); + } + finally { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); + } } } diff --git a/client/src/test/java/io/split/client/SplitClientImplTest.java b/client/src/test/java/io/split/client/SplitClientImplTest.java index aa508cd2f..acb1f304b 100644 --- a/client/src/test/java/io/split/client/SplitClientImplTest.java +++ b/client/src/test/java/io/split/client/SplitClientImplTest.java @@ -153,7 +153,7 @@ public void works() { SDKReadinessGates gates = mock(SDKReadinessGates.class); SplitCache splitCache = mock(InMemoryCacheImp.class); when(splitCache.get(test)).thenReturn(parsedSplit); - when(gates.isSDKReadyNow()).thenReturn(true); + when(gates.isSDKReady()).thenReturn(true); SplitClientImpl client = new SplitClientImpl( mock(SplitFactory.class), @@ -326,7 +326,7 @@ public void multiple_conditions_work() { SDKReadinessGates gates = mock(SDKReadinessGates.class); SplitCache splitCache = mock(InMemoryCacheImp.class); when(splitCache.get(test)).thenReturn(parsedSplit); - when(gates.isSDKReadyNow()).thenReturn(false); + when(gates.isSDKReady()).thenReturn(false); SplitClientImpl client = new SplitClientImpl( mock(SplitFactory.class), @@ -905,7 +905,7 @@ private Partition partition(String treatment, int size) { public void block_until_ready_does_not_time_when_sdk_is_ready() throws TimeoutException, InterruptedException { SplitCache splitCache = mock(InMemoryCacheImp.class); SDKReadinessGates ready = mock(SDKReadinessGates.class); - when(ready.isSDKReady(100)).thenReturn(true); + when(ready.waitUntilInternalReady(100)).thenReturn(true); SplitClientImpl client = new SplitClientImpl( mock(SplitFactory.class), @@ -924,7 +924,7 @@ public void block_until_ready_does_not_time_when_sdk_is_ready() throws TimeoutEx public void block_until_ready_times_when_sdk_is_not_ready() throws TimeoutException, InterruptedException { SplitCache splitCache = mock(InMemoryCacheImp.class); SDKReadinessGates ready = mock(SDKReadinessGates.class); - when(ready.isSDKReady(100)).thenReturn(false); + when(ready.waitUntilInternalReady(100)).thenReturn(false); SplitClientImpl client = new SplitClientImpl( mock(SplitFactory.class), @@ -943,7 +943,7 @@ public void block_until_ready_times_when_sdk_is_not_ready() throws TimeoutExcept public void track_with_valid_parameters() { SDKReadinessGates gates = mock(SDKReadinessGates.class); SplitCache splitCache = mock(InMemoryCacheImp.class); - when(gates.isSDKReadyNow()).thenReturn(false); + when(gates.isSDKReady()).thenReturn(false); SplitClientImpl client = new SplitClientImpl( mock(SplitFactory.class), splitCache, diff --git a/client/src/test/java/io/split/client/SplitManagerImplTest.java b/client/src/test/java/io/split/client/SplitManagerImplTest.java index a77674045..201f1fedc 100644 --- a/client/src/test/java/io/split/client/SplitManagerImplTest.java +++ b/client/src/test/java/io/split/client/SplitManagerImplTest.java @@ -100,7 +100,7 @@ public void splitsCallWithNoSplit() { SplitCache splitCache = Mockito.mock(SplitCache.class); Mockito.when(splitCache.getAll()).thenReturn(Lists.newArrayList()); SDKReadinessGates gates = Mockito.mock(SDKReadinessGates.class); - Mockito.when(gates.isSDKReadyNow()).thenReturn(false); + Mockito.when(gates.isSDKReady()).thenReturn(false); SplitManagerImpl splitManager = new SplitManagerImpl(splitCache, Mockito.mock(SplitClientConfig.class), gates, TELEMETRY_STORAGE); @@ -113,7 +113,7 @@ public void splitsCallWithSplit() { SplitCache splitCache = Mockito.mock(SplitCache.class); List parsedSplits = Lists.newArrayList(); SDKReadinessGates gates = Mockito.mock(SDKReadinessGates.class); - Mockito.when(gates.isSDKReadyNow()).thenReturn(false); + Mockito.when(gates.isSDKReady()).thenReturn(false); ParsedSplit response = ParsedSplit.createParsedSplitForTests("FeatureName", 123, true, "off", Lists.newArrayList(getTestCondition("off")), "traffic", 456L, 1); parsedSplits.add(response); @@ -137,7 +137,7 @@ public void splitNamesCallWithNoSplit() { SplitCache splitCache = Mockito.mock(SplitCache.class); Mockito.when(splitCache.getAll()).thenReturn(Lists.newArrayList()); SDKReadinessGates gates = Mockito.mock(SDKReadinessGates.class); - Mockito.when(gates.isSDKReadyNow()).thenReturn(false); + Mockito.when(gates.isSDKReady()).thenReturn(false); SplitManagerImpl splitManager = new SplitManagerImpl(splitCache, Mockito.mock(SplitClientConfig.class), gates, TELEMETRY_STORAGE); @@ -164,7 +164,7 @@ public void splitNamesCallWithSplit() { @Test public void block_until_ready_does_not_time_when_sdk_is_ready() throws TimeoutException, InterruptedException { SDKReadinessGates ready = mock(SDKReadinessGates.class); - when(ready.isSDKReady(100)).thenReturn(true); + when(ready.waitUntilInternalReady(100)).thenReturn(true); SplitManagerImpl splitManager = new SplitManagerImpl(mock(SplitCache.class), config, ready, TELEMETRY_STORAGE); @@ -175,7 +175,7 @@ public void block_until_ready_does_not_time_when_sdk_is_ready() throws TimeoutEx @Test(expected = TimeoutException.class) public void block_until_ready_times_when_sdk_is_not_ready() throws TimeoutException, InterruptedException { SDKReadinessGates ready = mock(SDKReadinessGates.class); - when(ready.isSDKReady(100)).thenReturn(false); + when(ready.waitUntilInternalReady(100)).thenReturn(false); SplitManagerImpl splitManager = new SplitManagerImpl(mock(SplitCache.class), config, diff --git a/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java b/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java index 595b0669d..245d07381 100644 --- a/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java +++ b/client/src/test/java/io/split/engine/experiments/SplitFetcherTest.java @@ -162,7 +162,6 @@ public void if_there_is_a_problem_talking_to_split_change_count_down_latch_is_no executeWaitAndTerminate(fetcher, 1, 5, TimeUnit.SECONDS); assertThat(cache.getChangeNumber(), is(equalTo(-1L))); - assertThat(gates.areSplitsReady(0), is(equalTo(false))); } private void executeWaitAndTerminate(Runnable runnable, long frequency, long waitInBetween, TimeUnit unit) throws InterruptedException { diff --git a/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java b/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java index d41b9c829..c5f51afc8 100644 --- a/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java +++ b/client/src/test/java/io/split/engine/segments/SegmentFetcherImpTest.java @@ -49,7 +49,6 @@ public void works_when_we_start_with_state() throws InterruptedException { public void works_when_there_are_no_changes() throws InterruptedException { long startingChangeNumber = -1L; SDKReadinessGates gates = new SDKReadinessGates(); - gates.registerSegment(SEGMENT_NAME); SegmentCache segmentCache = new SegmentCacheInMemoryImpl(); SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); @@ -79,14 +78,12 @@ public void works_when_there_are_no_changes() throws InterruptedException { assertNotNull(segmentCache.getChangeNumber(SEGMENT_NAME)); assertEquals(10L, segmentCache.getChangeNumber(SEGMENT_NAME)); - assertThat(gates.areSegmentsReady(10), is(true)); } private void works(long startingChangeNumber) throws InterruptedException { SDKReadinessGates gates = new SDKReadinessGates(); String segmentName = SEGMENT_NAME; - gates.registerSegment(segmentName); SegmentCache segmentCache = Mockito.mock(SegmentCache.class); Mockito.when(segmentCache.getChangeNumber(SEGMENT_NAME)).thenReturn(-1L).thenReturn(-1L) .thenReturn(-1L) @@ -116,7 +113,6 @@ private void works(long startingChangeNumber) throws InterruptedException { Thread.currentThread().interrupt(); } Mockito.verify(segmentChangeFetcher, Mockito.times(2)).fetch(Mockito.anyString(), Mockito.anyLong(), Mockito.anyBoolean()); - assertThat(gates.areSegmentsReady(10), is(true)); } diff --git a/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java b/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java index 722458e3c..c49d29962 100644 --- a/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java +++ b/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java @@ -75,8 +75,6 @@ public void run() { Thread.currentThread().interrupt(); } - gates.splitsAreReady(); - assertThat(fetcher1.get(), is(notNullValue())); assertThat(fetcher1.get(), is(sameInstance(fetcher2.get()))); } From 9d8dd475a2b4d40ea9b30a3d561c665e3e5e217f Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Wed, 19 May 2021 18:44:22 -0300 Subject: [PATCH 4/6] Travis fix --- client/src/test/java/io/split/client/ApiKeyCounterTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/src/test/java/io/split/client/ApiKeyCounterTest.java b/client/src/test/java/io/split/client/ApiKeyCounterTest.java index c0dde2379..1513313b8 100644 --- a/client/src/test/java/io/split/client/ApiKeyCounterTest.java +++ b/client/src/test/java/io/split/client/ApiKeyCounterTest.java @@ -74,6 +74,7 @@ public synchronized void testAddingNonExistingToken() { @Test public synchronized void testFactoryInstances() { try { + ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); ApiKeyCounter.getApiKeyCounterInstance().add(FIRST_KEY); @@ -83,6 +84,7 @@ public synchronized void testFactoryInstances() { Map factoryInstances = ApiKeyCounter.getApiKeyCounterInstance().getFactoryInstances(); Assert.assertEquals(2, factoryInstances.size()); Assert.assertEquals(3, factoryInstances.get(FIRST_KEY).intValue()); + Assert.assertEquals(2, factoryInstances.get(SECOND_KEY).intValue()); } finally { ApiKeyCounter.getApiKeyCounterInstance().clearApiKeys(); From a0c4fd0317a42ea7d603bff9dd7eb0b0537a9deb Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Fri, 21 May 2021 11:39:10 -0300 Subject: [PATCH 5/6] Fixing PR comments --- .../split/engine/common/SyncManagerImp.java | 1 + .../split/engine/common/SynchronizerImp.java | 7 +-- .../engine/experiments/SplitFetcherImp.java | 2 +- .../split/engine/segments/SegmentFetcher.java | 2 +- .../engine/segments/SegmentFetcherImp.java | 13 +++-- .../SegmentSynchronizationTaskImp.java | 4 +- .../SegmentSynchronizationTaskImpTest.java | 53 +++++++++++++++++++ 7 files changed, 69 insertions(+), 13 deletions(-) diff --git a/client/src/main/java/io/split/engine/common/SyncManagerImp.java b/client/src/main/java/io/split/engine/common/SyncManagerImp.java index 8ac52b425..529d02914 100644 --- a/client/src/main/java/io/split/engine/common/SyncManagerImp.java +++ b/client/src/main/java/io/split/engine/common/SyncManagerImp.java @@ -104,6 +104,7 @@ public void start() { try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { + _log.warn("Sdk Initializer thread interrupted"); e.printStackTrace(); Thread.currentThread().interrupt(); } diff --git a/client/src/main/java/io/split/engine/common/SynchronizerImp.java b/client/src/main/java/io/split/engine/common/SynchronizerImp.java index 5ad718073..d3e800ebc 100644 --- a/client/src/main/java/io/split/engine/common/SynchronizerImp.java +++ b/client/src/main/java/io/split/engine/common/SynchronizerImp.java @@ -53,12 +53,7 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask, @Override public boolean syncAll() { - AtomicBoolean syncStatus = new AtomicBoolean(false); - if(_splitFetcher.fetchAll(true) && - _segmentSynchronizationTaskImp.fetchAllSynchronous()) { - syncStatus.set(true); - } - return syncStatus.get(); + return _splitFetcher.fetchAll(true) && _segmentSynchronizationTaskImp.fetchAllSynchronous(); } @Override diff --git a/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java b/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java index 19adb994d..0dd56ae2c 100644 --- a/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java +++ b/client/src/main/java/io/split/engine/experiments/SplitFetcherImp.java @@ -149,6 +149,7 @@ public boolean fetchAll(boolean addCacheHeader) { long start = _splitCache.getChangeNumber(); try { runWithoutExceptionHandling(addCacheHeader); + return true; } catch (InterruptedException e) { _log.warn("Interrupting split fetcher task"); Thread.currentThread().interrupt(); @@ -164,6 +165,5 @@ public boolean fetchAll(boolean addCacheHeader) { _log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber()); } } - return true; } } diff --git a/client/src/main/java/io/split/engine/segments/SegmentFetcher.java b/client/src/main/java/io/split/engine/segments/SegmentFetcher.java index af4bbc767..bd9b19ddd 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentFetcher.java +++ b/client/src/main/java/io/split/engine/segments/SegmentFetcher.java @@ -9,7 +9,7 @@ public interface SegmentFetcher { */ void fetch(boolean addCacheHeader); - void runWhitCacheHeader(); + boolean runWhitCacheHeader(); void fetchAll(); } diff --git a/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java b/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java index 3bdab0125..b18180264 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentFetcherImp.java @@ -1,5 +1,6 @@ package io.split.engine.segments; +import com.google.common.annotations.VisibleForTesting; import io.split.cache.SegmentCache; import io.split.client.dtos.SegmentChange; import io.split.engine.SDKReadinessGates; @@ -116,7 +117,8 @@ private String summarize(List changes) { return bldr.toString(); } - private void callLoopRun(boolean isFetch, boolean addCacheHeader){ + @VisibleForTesting + void callLoopRun(boolean isFetch, boolean addCacheHeader){ while (true) { long start = _segmentCache.getChangeNumber(_segmentName); runWithoutExceptionHandling(addCacheHeader); @@ -131,23 +133,26 @@ private void callLoopRun(boolean isFetch, boolean addCacheHeader){ } @Override - public void runWhitCacheHeader(){ - this.fetchAndUpdate(true); + public boolean runWhitCacheHeader(){ + return this.fetchAndUpdate(true); } /** * Calls callLoopRun and after fetchs segment. * @param addCacheHeader indicates if CacheHeader is required */ - private void fetchAndUpdate(boolean addCacheHeader) { + @VisibleForTesting + boolean fetchAndUpdate(boolean addCacheHeader) { try { // Do this again in case the previous call errored out. callLoopRun(true, addCacheHeader); + return true; } catch (Throwable t) { _log.error("RefreshableSegmentFetcher failed: " + t.getMessage()); if (_log.isDebugEnabled()) { _log.debug("Reason:", t); } + return false; } } diff --git a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java index c40915335..025ca08ab 100644 --- a/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java +++ b/client/src/main/java/io/split/engine/segments/SegmentSynchronizationTaskImp.java @@ -167,7 +167,9 @@ public boolean fetchAllSynchronous() { .collect(Collectors.toList()) .stream().forEach(future -> { try { - future.get(); + if(!future.get()) { + fetchAllStatus.set(false); + }; } catch (Exception ex) { fetchAllStatus.set(false); _log.error(ex.getMessage()); diff --git a/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java b/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java index c49d29962..946cba066 100644 --- a/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java +++ b/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java @@ -1,16 +1,21 @@ package io.split.engine.segments; +import com.google.common.collect.Maps; import io.split.engine.SDKReadinessGates; import io.split.cache.SegmentCache; import io.split.telemetry.storage.InMemoryTelemetryStorage; import io.split.telemetry.storage.TelemetryStorage; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.List; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -79,5 +84,53 @@ public void run() { assertThat(fetcher1.get(), is(sameInstance(fetcher2.get()))); } + @Test + public void testFetchAllAsynchronousAndGetFalse() throws NoSuchFieldException, IllegalAccessException { + SDKReadinessGates gates = new SDKReadinessGates(); + SegmentCache segmentCache = Mockito.mock(SegmentCache.class); + ConcurrentMap _segmentFetchers = Maps.newConcurrentMap(); + + SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); + SegmentFetcherImp segmentFetcher = Mockito.mock(SegmentFetcherImp.class); + _segmentFetchers.put("SF", segmentFetcher); + final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, gates, segmentCache, TELEMETRY_STORAGE); + Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean()); + Mockito.when(segmentFetcher.runWhitCacheHeader()).thenReturn(false); + Mockito.when(segmentFetcher.fetchAndUpdate(Mockito.anyBoolean())).thenReturn(false); + Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean()); + + // Before executing, we'll update the map of segmentFecthers via reflection. + Field backoffBase = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers"); + backoffBase.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(backoffBase, backoffBase.getModifiers() & ~Modifier.FINAL); + + backoffBase.set(fetchers, _segmentFetchers); // 1ms + fetcher1.set(segmentFetcher); + boolean fetch = fetchers.fetchAllSynchronous(); + Assert.assertEquals(false, fetch); + } + @Test + public void testFetchAllAsynchronousAndGetTrue() throws NoSuchFieldException, IllegalAccessException { + SDKReadinessGates gates = new SDKReadinessGates(); + SegmentCache segmentCache = Mockito.mock(SegmentCache.class); + + SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class); + SegmentFetcherImp segmentFetcher = Mockito.mock(SegmentFetcherImp.class); + final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, gates, segmentCache, TELEMETRY_STORAGE); + + // Before executing, we'll update the map of segmentFecthers via reflection. + Field backoffBase = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers"); + backoffBase.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(backoffBase, backoffBase.getModifiers() & ~Modifier.FINAL); + Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean()); + Mockito.when(segmentFetcher.runWhitCacheHeader()).thenReturn(true); + Mockito.when(segmentFetcher.fetchAndUpdate(Mockito.anyBoolean())).thenReturn(true); + boolean fetch = fetchers.fetchAllSynchronous(); + Assert.assertEquals(true, fetch); + } } From 78540db9fc06c864ee28e03d5b0c05f1988f7d11 Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Fri, 21 May 2021 12:24:08 -0300 Subject: [PATCH 6/6] Final fixes --- .../io/split/engine/common/SyncManagerImp.java | 1 - .../SegmentSynchronizationTaskImpTest.java | 14 +++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/client/src/main/java/io/split/engine/common/SyncManagerImp.java b/client/src/main/java/io/split/engine/common/SyncManagerImp.java index 529d02914..b556beb9d 100644 --- a/client/src/main/java/io/split/engine/common/SyncManagerImp.java +++ b/client/src/main/java/io/split/engine/common/SyncManagerImp.java @@ -105,7 +105,6 @@ public void start() { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { _log.warn("Sdk Initializer thread interrupted"); - e.printStackTrace(); Thread.currentThread().interrupt(); } } diff --git a/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java b/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java index 946cba066..db3740565 100644 --- a/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java +++ b/client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java @@ -100,13 +100,13 @@ public void testFetchAllAsynchronousAndGetFalse() throws NoSuchFieldException, I Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean()); // Before executing, we'll update the map of segmentFecthers via reflection. - Field backoffBase = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers"); - backoffBase.setAccessible(true); + Field segmentFetchersForced = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers"); + segmentFetchersForced.setAccessible(true); Field modifiersField = Field.class.getDeclaredField("modifiers"); modifiersField.setAccessible(true); - modifiersField.setInt(backoffBase, backoffBase.getModifiers() & ~Modifier.FINAL); + modifiersField.setInt(segmentFetchersForced, segmentFetchersForced.getModifiers() & ~Modifier.FINAL); - backoffBase.set(fetchers, _segmentFetchers); // 1ms + segmentFetchersForced.set(fetchers, _segmentFetchers); // 1ms fetcher1.set(segmentFetcher); boolean fetch = fetchers.fetchAllSynchronous(); Assert.assertEquals(false, fetch); @@ -122,11 +122,11 @@ public void testFetchAllAsynchronousAndGetTrue() throws NoSuchFieldException, Il final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, gates, segmentCache, TELEMETRY_STORAGE); // Before executing, we'll update the map of segmentFecthers via reflection. - Field backoffBase = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers"); - backoffBase.setAccessible(true); + Field segmentFetchersForced = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers"); + segmentFetchersForced.setAccessible(true); Field modifiersField = Field.class.getDeclaredField("modifiers"); modifiersField.setAccessible(true); - modifiersField.setInt(backoffBase, backoffBase.getModifiers() & ~Modifier.FINAL); + modifiersField.setInt(segmentFetchersForced, segmentFetchersForced.getModifiers() & ~Modifier.FINAL); Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean()); Mockito.when(segmentFetcher.runWhitCacheHeader()).thenReturn(true); Mockito.when(segmentFetcher.fetchAndUpdate(Mockito.anyBoolean())).thenReturn(true);