Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
CHANGES

4.1.1 (Sep 30, 2020)
- Fixed fetch retries after received an SPLIT_CHANGE.

4.1.0 (Sep 25, 2020)
- Add local impressions deduping (enabled by default)

Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.1.0</version>
<version>4.1.1</version>
</parent>
<artifactId>java-client</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public SynchronizerImp(RefreshableSplitFetcherProvider refreshableSplitFetcherPr
@Override
public void syncAll() {
_syncAllScheduledExecutorService.schedule(() -> {
_splitFetcher.forceRefresh();
_splitFetcher.run();
_segmentFetcher.forceRefreshAll();
}, 0, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -68,6 +68,7 @@ public void refreshSplits(long targetChangeNumber) {
public void localKillSplit(String splitName, String defaultTreatment, long newChangeNumber) {
if (newChangeNumber > _splitFetcher.changeNumber()) {
_splitFetcher.killSplit(splitName, defaultTreatment, newChangeNumber);
refreshSplits(newChangeNumber);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,23 @@ public RefreshableSplitFetcher(SplitChangeFetcher splitChangeFetcher, SplitParse

@Override
public void forceRefresh() {
run();
_log.debug("Force Refresh splits starting ...");
try {
while (true) {
long start = _changeNumber.get();
runWithoutExceptionHandling();
long end = _changeNumber.get();

if (start >= end) {
break;
}
}
} catch (InterruptedException e) {
_log.warn("Interrupting split fetcher task");
Thread.currentThread().interrupt();
} catch (Throwable t) {
_log.error("RefreshableSplitFetcher failed: " + t.getMessage());
}
}

@Override
Expand Down Expand Up @@ -171,8 +187,7 @@ public void runWithoutExceptionHandling() throws InterruptedException {
return;
}

if (change.since != _changeNumber.get()
|| change.till < _changeNumber.get()) {
if (change.since != _changeNumber.get() || change.till < _changeNumber.get()) {
// some other thread may have updated the shared state. exit
return;
}
Expand Down Expand Up @@ -257,20 +272,5 @@ public void runWithoutExceptionHandling() throws InterruptedException {

_changeNumber.set(change.till);
}

}

private List<String> collectSegmentsInUse(Split split) {
List<String> result = Lists.newArrayList();
for (Condition condition : split.conditions) {
for (Matcher matcher : condition.matcherGroup.matchers) {
if (matcher.matcherType == MatcherType.IN_SEGMENT) {
if (matcher.userDefinedSegmentMatcherData != null && matcher.userDefinedSegmentMatcherData.segmentName != null) {
result.add(matcher.userDefinedSegmentMatcherData.segmentName);
}
}
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,20 @@ public boolean contains(String key) {

@Override
public void forceRefresh() {
run();
try {
_log.debug("Force Refresh segment starting ...");
while (true) {
long start = _changeNumber.get();
runWithoutExceptionHandling();
long end = _changeNumber.get();

if (start >= end) {
break;
}
}
} catch (Throwable t) {
_log.error("forceRefresh segment failed: " + t.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void onMessage(RawEvent event) {
} catch (EventParsingException ex) {
_log.debug(String.format("Error parsing the event: %s. Payload: %s", ex.getMessage(), ex.getPayload()));
} catch (Exception e) {
_log.warn(String.format("Error onMessage: %s", e.getMessage()));
_log.debug(String.format("Error onMessage: %s", e.getMessage()));
}
}
}
4 changes: 4 additions & 0 deletions client/src/test/java/io/split/SplitMockServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ public MockResponse dispatch(RecordedRequest request) {
return new MockResponse().setBody(inputStreamToString("splits2.json"));
case "/api/splitChanges?since=1585948850111":
return new MockResponse().setBody(inputStreamToString("splits_killed.json"));
case "/api/splitChanges?since=1585948850112":
return new MockResponse().setBody("{\"splits\": [], \"since\":1585948850112, \"till\":1585948850112}");
case "/api/segmentChanges/segment-test?since=-1":
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": -1,\"till\": -1}");
case "/api/segmentChanges/segment3?since=-1":
return new MockResponse().setBody(inputStreamToString("segment3.json"));
case "/api/segmentChanges/segment3?since=1585948850110":
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850110,\"till\": 1585948850110}");
case "/api/segmentChanges/segment3?since=1585948850111":
return new MockResponse().setBody("{\"name\": \"segment3\",\"added\": [],\"removed\": [],\"since\": 1585948850111,\"till\": 1585948850111}");
case "/api/metrics/time":
case "api/metrics/counter":
return new MockResponse().setResponseCode(200);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte

Awaitility.await()
.atMost(50L, TimeUnit.SECONDS)
.until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test")));
.until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test")));

Awaitility.await()
.atMost(50L, TimeUnit.SECONDS)
Expand All @@ -420,30 +420,7 @@ public void splitClientMultiFactory() throws IOException, TimeoutException, Inte

Awaitility.await()
.atMost(50L, TimeUnit.SECONDS)
.until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test")));

Awaitility.await()
.atMost(50L, TimeUnit.SECONDS)
.until(() -> "on_whitelist".equals(client2.getTreatment("admin", "push_test")));

Awaitility.await()
.atMost(50L, TimeUnit.SECONDS)
.until(() -> "after_notification_received".equals(client3.getTreatment("admin", "push_test")));

Awaitility.await()
.atMost(50L, TimeUnit.SECONDS)
.until(() -> "on_whitelist".equals(client4.getTreatment("admin", "push_test")));

OutboundSseEvent sseEventSplitUpdate3 = new OutboundEvent
.Builder()
.name("message")
.data("{\"id\":\"22\",\"clientId\":\"22\",\"timestamp\":1592590436082,\"encoding\":\"json\",\"channel\":\"xxxx_xxxx_splits\",\"data\":\"{\\\"type\\\":\\\"SPLIT_UPDATE\\\",\\\"changeNumber\\\":1585948850112}\"}")
.build();
eventQueue3.push(sseEventSplitUpdate3);

Awaitility.await()
.atMost(50L, TimeUnit.SECONDS)
.until(() -> "after_notification_received".equals(client1.getTreatment("admin", "push_test")));
.until(() -> "split_killed".equals(client1.getTreatment("admin", "push_test")));

Awaitility.await()
.atMost(50L, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package io.split.engine.common;

import io.split.client.HttpSegmentChangeFetcher;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.RefreshableSplitFetcher;
import io.split.engine.experiments.RefreshableSplitFetcherProvider;
import io.split.engine.segments.RefreshableSegmentFetcher;
import io.split.engine.segments.SegmentChangeFetcher;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.Test;
import org.mockito.Mockito;

Expand All @@ -21,7 +25,7 @@ public void syncAll() throws InterruptedException {
synchronizer.syncAll();

Thread.sleep(100);
Mockito.verify(splitFetcher, Mockito.times(1)).forceRefresh();
Mockito.verify(splitFetcher, Mockito.times(1)).run();
Mockito.verify(segmentFetcher, Mockito.times(1)).forceRefreshAll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void killShouldTriggerFetch() {
}

@Test
public void messagesNotProcesedWhenWorkerStopped() throws InterruptedException {
public void messagesNotProcessedWhenWorkerStopped() throws InterruptedException {
Synchronizer syncMock = Mockito.mock(Synchronizer.class);
SplitsWorker splitsWorker = new SplitsWorkerImp(syncMock);
splitsWorker.start();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.1.0</version>
<version>4.1.1</version>
<dependencyManagement>
<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.1.0</version>
<version>4.1.1</version>
</parent>

<artifactId>java-client-testing</artifactId>
Expand Down