Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.1.7-rc3</version>
<version>4.1.7-rc4</version>
</parent>
<artifactId>java-client</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher {
private static final Logger _log = LoggerFactory.getLogger(HttpSplitChangeFetcher.class);

private static final String SINCE = "since";
private static final String TILL = "till";
private static final String PREFIX = "splitChangeFetcher";

private static final String HEADER_CACHE_CONTROL_NAME = "Cache-Control";
Expand Down Expand Up @@ -58,6 +59,10 @@ private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, Metrics metr
checkNotNull(_target);
}

long makeRandomTill() {
return (-1)*(int)Math.floor(Math.random()*(Math.pow(2, 63)));
}

@Override
public SplitChange fetch(long since, FetchOptions options) {

Expand All @@ -66,7 +71,11 @@ public SplitChange fetch(long since, FetchOptions options) {
CloseableHttpResponse response = null;

try {
URI uri = new URIBuilder(_target).addParameter(SINCE, "" + since).build();
URIBuilder uriBuilder = new URIBuilder(_target).addParameter(SINCE, "" + since);
if (options.cdnBypass()) {
uriBuilder.addParameter(TILL, "" + makeRandomTill());
}
URI uri = uriBuilder.build();

HttpGet request = new HttpGet(uri);
if(options.cacheControlHeadersEnabled()) {
Expand Down
10 changes: 8 additions & 2 deletions client/src/main/java/io/split/engine/common/Backoff.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@
import static com.google.common.base.Preconditions.checkNotNull;

public class Backoff {
private static final long BACKOFF_MAX_SECONDS_ALLOWED = 1800;
private static final long BACKOFF_MAX_ALLOWED = 1800;

private final long _backoffBase;
private AtomicInteger _attempt;
private final long _maxAllowed;

public Backoff(long backoffBase) {
this(backoffBase, BACKOFF_MAX_ALLOWED);
}

public Backoff(long backoffBase, long maxAllowed) {
_backoffBase = checkNotNull(backoffBase);
_attempt = new AtomicInteger(0);
_maxAllowed = maxAllowed;
}

public long interval() {
long interval = _backoffBase * (long) Math.pow(2, _attempt.getAndIncrement());

return interval >= BACKOFF_MAX_SECONDS_ALLOWED ? BACKOFF_MAX_SECONDS_ALLOWED : interval;
return interval >= _maxAllowed ? BACKOFF_MAX_ALLOWED : interval;
}

public synchronized void reset() {
Expand Down
27 changes: 22 additions & 5 deletions client/src/main/java/io/split/engine/common/FetchOptions.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package io.split.engine.common;

import io.split.engine.matchers.AttributeMatcher;
import org.checkerframework.checker.units.qual.A;

import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

public class FetchOptions {

public static class Builder {

public Builder() {}

public Builder(FetchOptions opts) {
_cdnBypass = opts._cdnBypass;
_cacheControlHeaders = opts._cacheControlHeaders;
_fastlyDebugHeader = opts._fastlyDebugHeader;
_responseHeadersCallback = opts._responseHeadersCallback;
}

public Builder cacheControlHeaders(boolean on) {
_cacheControlHeaders = on;
return this;
Expand All @@ -27,10 +32,16 @@ public Builder responseHeadersCallback(Function<Map<String, String>, Void> callb
return this;
}

public Builder cdnBypass(boolean bypass) {
_cdnBypass = bypass;
return this;
}

public FetchOptions build() {
return new FetchOptions(_cacheControlHeaders, _responseHeadersCallback, _fastlyDebugHeader);
return new FetchOptions(_cacheControlHeaders, _cdnBypass, _responseHeadersCallback, _fastlyDebugHeader);
}

private boolean _cdnBypass = false;
private boolean _cacheControlHeaders = false;
private boolean _fastlyDebugHeader = false;
private Function<Map<String, String>, Void> _responseHeadersCallback = null;
Expand All @@ -44,16 +55,21 @@ public boolean fastlyDebugHeaderEnabled() {
return _fastlyDebugHeader;
}

public boolean cdnBypass() { return _cdnBypass; }

public void handleResponseHeaders(Map<String, String> headers) {
if (Objects.isNull(_responseHeadersCallback) || Objects.isNull(headers)) {
return;
}
_responseHeadersCallback.apply(headers);
}

private FetchOptions(boolean cacheControlHeaders, Function<Map<String, String>, Void> responseHeadersCallback,
private FetchOptions(boolean cacheControlHeaders,
boolean cdnBypass,
Function<Map<String, String>, Void> responseHeadersCallback,
boolean fastlyDebugHeader) {
_cacheControlHeaders = cacheControlHeaders;
_cdnBypass = cdnBypass;
_responseHeadersCallback = responseHeadersCallback;
_fastlyDebugHeader = fastlyDebugHeader;
}
Expand All @@ -78,5 +94,6 @@ public int hashCode() {

private final boolean _cacheControlHeaders;
private final boolean _fastlyDebugHeader;
private final boolean _cdnBypass;
private final Function<Map<String, String>, Void> _responseHeadersCallback;
}
95 changes: 71 additions & 24 deletions client/src/main/java/io/split/engine/common/SynchronizerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkNotNull;

public class SynchronizerImp implements Synchronizer {

// The boxing here IS necessary, so that the constants are not inlined by the compiler
// and can be modified for the test (we don't want to wait that much in an UT)
private static final long ON_DEMAND_FETCH_BACKOFF_BASE_MS = new Long(10000); //backoff base starting at 10 seconds (!)
private static final long ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS = new Long(60000); // don't sleep for more than 1 second
private static final int ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10;

private static final Logger _log = LoggerFactory.getLogger(Synchronizer.class);
private final SplitSynchronizationTask _splitSynchronizationTask;
private final SplitFetcher _splitFetcher;
Expand Down Expand Up @@ -49,7 +55,7 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
_segmentSynchronizationTaskImp = checkNotNull(segmentSynchronizationTaskImp);
_splitCache = checkNotNull(splitCache);
_segmentCache = checkNotNull(segmentCache);
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
_onDemandFetchRetryDelayMs = onDemandFetchRetryDelayMs;
_cdnResponseHeadersLogging = cdnResponseHeadersLogging;
_onDemandFetchMaxRetries = onDemandFetchMaxRetries;
_failedAttemptsBeforeLogging = failedAttemptsBeforeLogging;
Expand Down Expand Up @@ -83,42 +89,83 @@ public void stopPeriodicFetching() {
_segmentSynchronizationTaskImp.stop();
}

@Override
public void refreshSplits(long targetChangeNumber) {
private static class SyncResult {

if (targetChangeNumber <= _splitCache.getChangeNumber()) {
return;
/* package private */ SyncResult(boolean success, int remainingAttempts) {
_success = success;
_remainingAttempts = remainingAttempts;
}

FastlyHeadersCaptor captor = new FastlyHeadersCaptor();
FetchOptions opts = new FetchOptions.Builder()
.cacheControlHeaders(true)
.fastlyDebugHeader(_cdnResponseHeadersLogging)
.responseHeadersCallback(_cdnResponseHeadersLogging ? captor::handle : null)
.build();
public boolean success() { return _success; }
public int remainingAttempts() { return _remainingAttempts; }

private final boolean _success;
private final int _remainingAttempts;
}

int remainingAttempts = _onDemandFetchMaxRetries;
private SyncResult attemptSync(long targetChangeNumber,
FetchOptions opts,
Function<Void, Long> nextWaitMs,
int maxRetries) {
int remainingAttempts = maxRetries;
while(true) {
remainingAttempts--;
_splitFetcher.forceRefresh(opts);
if (targetChangeNumber <= _splitCache.getChangeNumber()) {
_log.debug(String.format("Refresh completed in %s attempts.", _onDemandFetchMaxRetries - remainingAttempts));
break;
return new SyncResult(true, remainingAttempts);
} else if (remainingAttempts <= 0) {
_log.info(String.format("No changes fetched after %s attempts.", _onDemandFetchMaxRetries));
break;
_log.info(String.format("No changes fetched after %s attempts.", maxRetries));
return new SyncResult(false, remainingAttempts);
}
try {
Thread.sleep(_onDemandFetchRetryDelayMs);
long howLong = nextWaitMs.apply(null);
Thread.sleep(howLong);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
_log.debug("Error trying to sleep current Thread.");
}
}
}

private void logCdnHeaders(int maxRetries, int remainingAttempts, List<Map<String, String>> headers) {
if (maxRetries - remainingAttempts > _failedAttemptsBeforeLogging) {
_log.info(String.format("CDN Debug headers: %s", gson.toJson(headers)));
}
}

@Override
public void refreshSplits(long targetChangeNumber) {

if (targetChangeNumber <= _splitCache.getChangeNumber()) {
return;
}

FastlyHeadersCaptor captor = new FastlyHeadersCaptor();
FetchOptions opts = new FetchOptions.Builder()
.cacheControlHeaders(true)
.fastlyDebugHeader(_cdnResponseHeadersLogging)
.responseHeadersCallback(_cdnResponseHeadersLogging ? captor::handle : null)
.build();

SyncResult regularResult = attemptSync(targetChangeNumber, opts,
(discard) -> (long) _onDemandFetchRetryDelayMs, _onDemandFetchMaxRetries);

if (regularResult.success()) {
_log.debug(String.format("Refresh completed in %s attempts.", _onDemandFetchMaxRetries - regularResult.remainingAttempts()));
if (_cdnResponseHeadersLogging) {
logCdnHeaders(_onDemandFetchMaxRetries , regularResult.remainingAttempts(), captor.get());
}
return;
}

FetchOptions withCdnBypass = new FetchOptions.Builder(opts).cdnBypass(true).build();
Backoff backoff = new Backoff(ON_DEMAND_FETCH_BACKOFF_BASE_MS, ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS);
SyncResult withCDNBypassed = attemptSync(targetChangeNumber, withCdnBypass,
(discard) -> backoff.interval(), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES);

if (_cdnResponseHeadersLogging &&
(_onDemandFetchMaxRetries - remainingAttempts) > _failedAttemptsBeforeLogging) {
_log.info(String.format("CDN Debug headers: %s", gson.toJson(captor.get())));
if (_cdnResponseHeadersLogging) {
logCdnHeaders(_onDemandFetchMaxRetries + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
withCDNBypassed.remainingAttempts(), captor.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,20 @@ public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser
@Override
public void forceRefresh(FetchOptions options) {
_log.debug("Force Refresh splits starting ...");
final long initialCN = _splitCache.getChangeNumber();
try {
while (true) {
long start = _splitCache.getChangeNumber();
runWithoutExceptionHandling(options);
long end = _splitCache.getChangeNumber();

// If the previous execution was the first one, clear the `cdnBypass` flag
// for the next fetches. (This will clear a local copy of the fetch options,
// not the original object that was passed to this method).
if (initialCN == start) {
options = new FetchOptions.Builder(options).cdnBypass(false).build();
}

if (start >= end) {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion client/src/test/java/io/split/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static CloseableHttpClient mockHttpClient(String jsonName, int httpStatus
return httpClientMock;
}

private static CloseableHttpResponse classicResponseToCloseableMock(ClassicHttpResponse mocked) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
public static CloseableHttpResponse classicResponseToCloseableMock(ClassicHttpResponse mocked) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
Method adaptMethod = CloseableHttpResponse.class.getDeclaredMethod("adapt", ClassicHttpResponse.class);
adaptMethod.setAccessible(true);
return (CloseableHttpResponse) adaptMethod.invoke(null, mocked);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@
import io.split.client.dtos.SplitChange;
import io.split.engine.common.FetchOptions;
import io.split.engine.metrics.Metrics;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.*;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.io.Closeable;
import java.io.IOException;
import java.io.StringBufferInputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;

import static org.mockito.Mockito.when;

public class HttpSplitChangeFetcherTest {
@Test
public void testDefaultURL() throws URISyntaxException {
Expand Down Expand Up @@ -76,4 +87,44 @@ public void testFetcherWithSpecialCharacters() throws URISyntaxException, Invoca
Assert.assertEquals("{\"test\": \"blue\",\"grüne Straße\": 13}", configs.get("on"));
Assert.assertEquals("{\"test\": \"blue\",\"size\": 15}", configs.get("off"));
}

@Test
public void testFetcherWithCDNBypassOption() throws IOException, URISyntaxException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
URI rootTarget = URI.create("https://api.split.io");

HttpEntity entityMock = Mockito.mock(HttpEntity.class);
when(entityMock.getContent()).thenReturn(new StringBufferInputStream("{\"till\": 1}"));
ClassicHttpResponse response = Mockito.mock(ClassicHttpResponse.class);
when(response.getCode()).thenReturn(200);
when(response.getEntity()).thenReturn(entityMock);
when(response.getHeaders()).thenReturn(new Header[0]);

ArgumentCaptor<ClassicHttpRequest> requestCaptor = ArgumentCaptor.forClass(ClassicHttpRequest.class);
CloseableHttpClient httpClientMock = Mockito.mock(CloseableHttpClient.class);
when(httpClientMock.execute(requestCaptor.capture())).thenReturn(TestHelper.classicResponseToCloseableMock(response));

Metrics.NoopMetrics metrics = new Metrics.NoopMetrics();
HttpSplitChangeFetcher fetcher = HttpSplitChangeFetcher.create(httpClientMock, rootTarget, metrics);

fetcher.fetch(-1, new FetchOptions.Builder().cdnBypass(true).build());
fetcher.fetch(-1, new FetchOptions.Builder().build());
List<ClassicHttpRequest> captured = requestCaptor.getAllValues();
Assert.assertEquals(captured.size(), 2);
Assert.assertTrue(captured.get(0).getUri().toString().contains("till="));
Assert.assertFalse(captured.get(1).getUri().toString().contains("till="));
}

@Test
public void testRandomNumberGeneration() throws URISyntaxException {
URI rootTarget = URI.create("https://api.split.io");
CloseableHttpClient httpClientMock = Mockito.mock(CloseableHttpClient.class);
Metrics.NoopMetrics metrics = new Metrics.NoopMetrics();
HttpSplitChangeFetcher fetcher = HttpSplitChangeFetcher.create(httpClientMock, rootTarget, metrics);

long min = (long)Math.pow(2, 63) * (-1);
for (long x = 0; x < 100000000; x++) {
long r = fetcher.makeRandomTill();
Assert.assertTrue(r < 0 && r > min);
}
}
}
Loading