Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client/src/main/java/io/split/client/HttpSplitChangeFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.split.client.dtos.SplitChange;
import io.split.client.exceptions.UriTooLongException;
import io.split.client.utils.Json;
import io.split.client.utils.Utils;
import io.split.engine.common.FetchOptions;
Expand All @@ -12,6 +13,7 @@
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.io.entity.EntityUtils;
Expand All @@ -35,6 +37,7 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher {

private static final String SINCE = "since";
private static final String TILL = "till";
private static final String SETS = "sets";

private static final String HEADER_CACHE_CONTROL_NAME = "Cache-Control";
private static final String HEADER_CACHE_CONTROL_VALUE = "no-cache";
Expand Down Expand Up @@ -75,6 +78,9 @@ public SplitChange fetch(long since, FetchOptions options) {
if (options.hasCustomCN()) {
uriBuilder.addParameter(TILL, "" + options.targetCN());
}
if (!options.flagSetsFilter().isEmpty()) {
uriBuilder.addParameter(SETS, "" + options.flagSetsFilter());
}
URI uri = uriBuilder.build();

HttpGet request = new HttpGet(uri);
Expand All @@ -98,6 +104,10 @@ public SplitChange fetch(long since, FetchOptions options) {

if (statusCode < HttpStatus.SC_OK || statusCode >= HttpStatus.SC_MULTIPLE_CHOICES) {
_telemetryRuntimeProducer.recordSyncError(ResourceEnum.SPLIT_SYNC, statusCode);
if (statusCode == HttpStatus.SC_REQUEST_URI_TOO_LONG) {
_log.error("The amount of flag sets provided are big causing uri length error.");
throw new UriTooLongException(String.format("Status code: %s. Message: %s", statusCode, response.getReasonPhrase()));
}
_log.warn(String.format("Response status was: %s. Reason: %s", statusCode , response.getReasonPhrase()));
throw new IllegalStateException(String.format("Could not retrieve splitChanges since %s; http return code %s", since, statusCode));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.split.client.exceptions;

public class UriTooLongException extends Exception {
public UriTooLongException (String message) {
super(message);
}
}
26 changes: 21 additions & 5 deletions client/src/main/java/io/split/engine/common/FetchOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public Builder(FetchOptions opts) {
_cacheControlHeaders = opts._cacheControlHeaders;
_fastlyDebugHeader = opts._fastlyDebugHeader;
_responseHeadersCallback = opts._responseHeadersCallback;
_flagSetsFilter = opts._flagSetsFilter;
}

public Builder cacheControlHeaders(boolean on) {
Expand All @@ -39,14 +40,20 @@ public Builder targetChangeNumber(long targetCN) {
return this;
}

public Builder flagSetsFilter(String flagSetsFilter) {
_flagSetsFilter = flagSetsFilter;
return this;
}

public FetchOptions build() {
return new FetchOptions(_cacheControlHeaders, _targetCN, _responseHeadersCallback, _fastlyDebugHeader);
return new FetchOptions(_cacheControlHeaders, _targetCN, _responseHeadersCallback, _fastlyDebugHeader, _flagSetsFilter);
}

private long _targetCN = DEFAULT_TARGET_CHANGENUMBER;
private boolean _cacheControlHeaders = false;
private boolean _fastlyDebugHeader = false;
private Function<Map<String, String>, Void> _responseHeadersCallback = null;
private String _flagSetsFilter = "";
}

public boolean cacheControlHeadersEnabled() {
Expand All @@ -61,6 +68,10 @@ public boolean fastlyDebugHeaderEnabled() {

public boolean hasCustomCN() { return _targetCN != DEFAULT_TARGET_CHANGENUMBER; }

public String flagSetsFilter() {
return _flagSetsFilter;
}

public void handleResponseHeaders(Map<String, String> headers) {
if (Objects.isNull(_responseHeadersCallback) || Objects.isNull(headers)) {
return;
Expand All @@ -71,11 +82,13 @@ public void handleResponseHeaders(Map<String, String> headers) {
private FetchOptions(boolean cacheControlHeaders,
long targetCN,
Function<Map<String, String>, Void> responseHeadersCallback,
boolean fastlyDebugHeader) {
boolean fastlyDebugHeader,
String flagSetsFilter) {
_cacheControlHeaders = cacheControlHeaders;
_targetCN = targetCN;
_responseHeadersCallback = responseHeadersCallback;
_fastlyDebugHeader = fastlyDebugHeader;
_flagSetsFilter = flagSetsFilter;
}

@Override
Expand All @@ -89,16 +102,19 @@ public boolean equals(Object obj) {
return Objects.equals(_cacheControlHeaders, other._cacheControlHeaders)
&& Objects.equals(_fastlyDebugHeader, other._fastlyDebugHeader)
&& Objects.equals(_responseHeadersCallback, other._responseHeadersCallback)
&& Objects.equals(_targetCN, other._targetCN);
&& Objects.equals(_targetCN, other._targetCN)
&& Objects.equals(_flagSetsFilter, other._flagSetsFilter);
}

@Override
public int hashCode() {
return com.google.common.base.Objects.hashCode(_cacheControlHeaders, _fastlyDebugHeader, _responseHeadersCallback, _targetCN);
return com.google.common.base.Objects.hashCode(_cacheControlHeaders, _fastlyDebugHeader, _responseHeadersCallback,
_targetCN, _flagSetsFilter);
}

private final boolean _cacheControlHeaders;
private final boolean _fastlyDebugHeader;
private final long _targetCN;
private final Function<Map<String, String>, Void> _responseHeadersCallback;
}
private final String _flagSetsFilter;
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
while(true) {
remainingAttempts--;
FetchResult fetchResult = _splitFetcher.forceRefresh(opts);
if (fetchResult != null && !fetchResult.retry() && !fetchResult.isSuccess()) {
return new SyncResult(false, remainingAttempts, fetchResult);
}
if (targetChangeNumber <= _splitCacheProducer.getChangeNumber()) {
return new SyncResult(true, remainingAttempts, fetchResult);
} else if (remainingAttempts <= 0) {
Expand Down Expand Up @@ -206,9 +209,9 @@ public SyncResult attemptSegmentSync(String segmentName,
remainingAttempts--;
fetcher.fetch(opts);
if (targetChangeNumber <= segmentCacheProducer.getChangeNumber(segmentName)) {
return new SyncResult(true, remainingAttempts, new FetchResult(false, new HashSet<>()));
return new SyncResult(true, remainingAttempts, new FetchResult(false, true, new HashSet<>()));
} else if (remainingAttempts <= 0) {
return new SyncResult(false, remainingAttempts, new FetchResult(false, new HashSet<>()));
return new SyncResult(false, remainingAttempts, new FetchResult(false, true, new HashSet<>()));
}
try {
long howLong = nextWaitMs.apply(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@

public class FetchResult {
private boolean _success;
private boolean _retry;
private Set<String> _segments;

public FetchResult(boolean success, Set<String> segments) {
public FetchResult(boolean success, boolean retry, Set<String> segments) {
_success = success;
_retry = retry;
_segments = segments;
}

public boolean isSuccess() {
return _success;
}
public boolean retry() {
return _retry;
}

public Set<String> getSegments() {
return _segments;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.split.engine.experiments;

import io.split.client.dtos.SplitChange;
import io.split.client.exceptions.UriTooLongException;
import io.split.client.utils.FeatureFlagsToUpdate;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
Expand Down Expand Up @@ -68,19 +69,21 @@ public FetchResult forceRefresh(FetchOptions options) {
}

if (start >= end) {
return new FetchResult(true, segments);
return new FetchResult(true, false, segments);
}
}
} catch (UriTooLongException u) {
return new FetchResult(false, false, new HashSet<>());
} catch (InterruptedException e) {
_log.warn("Interrupting split fetcher task");
Thread.currentThread().interrupt();
return new FetchResult(false, new HashSet<>());
return new FetchResult(false, true, new HashSet<>());
} catch (Exception e) {
_log.error("RefreshableSplitFetcher failed: " + e.getMessage());
if (_log.isDebugEnabled()) {
_log.debug("Reason:", e);
}
return new FetchResult(false, new HashSet<>());
return new FetchResult(false, true, new HashSet<>());
}
}

Expand All @@ -89,7 +92,7 @@ public void run() {
this.forceRefresh(new FetchOptions.Builder().cacheControlHeaders(false).build());
}

private Set<String> runWithoutExceptionHandling(FetchOptions options) throws InterruptedException {
private Set<String> runWithoutExceptionHandling(FetchOptions options) throws InterruptedException, UriTooLongException {
SplitChange change = _splitChangeFetcher.fetch(_splitCacheProducer.getChangeNumber(), options);
Set<String> segments = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.StringBufferInputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void testFetcherWithCDNBypassOption() throws IOException, URISyntaxExcept
URI rootTarget = URI.create("https://api.split.io");

HttpEntity entityMock = Mockito.mock(HttpEntity.class);
when(entityMock.getContent()).thenReturn(new StringBufferInputStream("{\"till\": 1}"));
when(entityMock.getContent()).thenReturn(new ByteArrayInputStream("{\"till\": 1}".getBytes(StandardCharsets.UTF_8)));
ClassicHttpResponse response = Mockito.mock(ClassicHttpResponse.class);
when(response.getCode()).thenReturn(200);
when(response.getEntity()).thenReturn(entityMock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ public Void apply(Map<String, String> unused) {
.fastlyDebugHeader(true)
.responseHeadersCallback(func)
.targetChangeNumber(123)
.flagSetsFilter("set1,set2")
.build();

assertEquals(options.cacheControlHeadersEnabled(), true);
assertEquals(options.fastlyDebugHeaderEnabled(), true);
assertEquals(options.targetCN(), 123);
options.handleResponseHeaders(new HashMap<>());
assertEquals(called[0], true);
assertEquals("set1,set2", options.flagSetsFilter());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.when;

public class SynchronizerTest {
Expand Down Expand Up @@ -67,7 +68,7 @@ public void beforeMethod() {

@Test
public void syncAll() throws InterruptedException {
Mockito.when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, new HashSet<>()));
Mockito.when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, false, new HashSet<>()));
Mockito.when(_segmentFetcher.fetchAllSynchronous()).thenReturn(true);
_synchronizer.syncAll();

Expand All @@ -87,7 +88,7 @@ public void testSyncAllSegments() throws InterruptedException, NoSuchFieldExcept
modifiersField.setAccessible(true);
modifiersField.setInt(synchronizerSegmentFetcher, synchronizerSegmentFetcher.getModifiers() & ~Modifier.FINAL);
synchronizerSegmentFetcher.set(_synchronizer, segmentSynchronizationTask);
Mockito.when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, Stream.of("Segment1", "Segment2").collect(Collectors.toSet())));
Mockito.when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, false, Stream.of("Segment1", "Segment2").collect(Collectors.toSet())));
Mockito.when(_segmentFetcher.fetchAllSynchronous()).thenReturn(true);
_synchronizer.syncAll();

Expand Down Expand Up @@ -116,7 +117,7 @@ public void stopPeriodicFetching() {
@Test
public void streamingRetryOnSplit() {
when(_splitCacheProducer.getChangeNumber()).thenReturn(0l).thenReturn(0l).thenReturn(1l);
when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, new HashSet<>()));
when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, false, new HashSet<>()));
_synchronizer.refreshSplits(1L);

Mockito.verify(_splitCacheProducer, Mockito.times(3)).getChangeNumber();
Expand All @@ -138,7 +139,7 @@ public void streamingRetryOnSplitAndSegment() {
Set<String> segments = new HashSet<>();
segments.add("segment1");
segments.add("segment2");
when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, segments));
when(_splitFetcher.forceRefresh(Mockito.anyObject())).thenReturn(new FetchResult(true, false, segments));
SegmentFetcher fetcher = Mockito.mock(SegmentFetcher.class);
when(_segmentCacheProducer.getChangeNumber(Mockito.anyString())).thenReturn(0l).thenReturn(0l).thenReturn(1l);
when(_segmentFetcher.getFetcher(Mockito.anyString())).thenReturn(fetcher);
Expand Down Expand Up @@ -168,7 +169,7 @@ public void testCDNBypassIsRequestedAfterNFailures() {
switch (calls.get()) {
case 4: cache.setChangeNumber(123);
}
return new FetchResult(true, new HashSet<>());
return new FetchResult(true, false, new HashSet<>());
}).when(_splitFetcher).forceRefresh(optionsCaptor.capture());

imp.refreshSplits(123L);
Expand Down