Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
10204c2
[SDKS-7001] Update FeatureFlagNotification and GenericNotificationData
nmayorsplit May 17, 2023
7695ca3
[SDKS-7001] Pr suggestions
nmayorsplit May 17, 2023
38223ef
[SDKS-7001] Pr suggestions
nmayorsplit May 18, 2023
cd48c5c
[SDKS-7001] Update CompressType
nmayorsplit May 18, 2023
5cffa44
[SDKS-7001] Update data to featureFlagDefinition
nmayorsplit May 18, 2023
dc29373
[SDKS-7029] Add ZLib decompress
nmayorsplit May 18, 2023
5939189
Merge pull request #410 from splitio/SDKS-7001
nmayorsplit May 18, 2023
94fdccc
Merge branch 'incremental-ff' into SDKS-7029
nmayorsplit May 18, 2023
20782d0
Merge pull request #412 from splitio/SDKS-7029
nmayorsplit May 18, 2023
9e05847
[SDKS-7028] Add gzip decompression and logic to use it
nmayorsplit May 23, 2023
d99aaa7
[SDKS-7028] Some checks
nmayorsplit May 23, 2023
7f0fdad
[SDKS-7028] Update FeatureFlagChangeNotification to log the exceptions
nmayorsplit May 23, 2023
07baebc
[SDKS-7028] Remove imports
nmayorsplit May 23, 2023
9fa3926
[SDKS-7028] Pr suggestions
nmayorsplit May 23, 2023
9698840
[SDKS-7028] Update FFChangeNotification and DecompressionUtil
nmayorsplit May 23, 2023
9a773db
[SDKS-7028] Update log message
nmayorsplit May 23, 2023
e85f4e5
Update log message
nmayorsplit May 23, 2023
06a533f
Merge pull request #413 from splitio/SDKS-7028
nmayorsplit May 23, 2023
b890cc0
[SDKS-7030] Update FeatureFlagWorker to add in the queue the ffnotifi…
nmayorsplit May 24, 2023
96dedb8
[SDKS-7030] Pr suggestions
nmayorsplit May 29, 2023
edaf9cd
[SDKS-7030] Pr suggestions
nmayorsplit May 29, 2023
2d39ce1
[SDKS-7030] Update SplitFactoryImpl to use the same SplitParser
nmayorsplit May 30, 2023
8025e92
Merge pull request #414 from splitio/SDKS-7030
nmayorsplit May 30, 2023
f87a9d4
Update version to release tapps
nmayorsplit May 30, 2023
4910763
[SDKS-7113] Add new Archive case for instant ff
nmayorsplit Jun 21, 2023
2e30089
Merge branch 'development' into incremental-ff
nmayorsplit Jun 21, 2023
96b276a
Merge branch 'incremental-ff' into SDKS-7113
nmayorsplit Jun 21, 2023
d960aa2
[SDKS-7113] Prs suggestions
nmayorsplit Jun 21, 2023
e2f1746
[SDKS-7113] Changes in SplitCacheProducer when update it
nmayorsplit Jun 21, 2023
5a2f417
[SDKS-7113] Add changes in
nmayorsplit Jun 22, 2023
6d4feb5
Remove imports
nmayorsplit Jun 22, 2023
e88a177
[SDKS-7113] Remove some imports
nmayorsplit Jun 22, 2023
f0d1409
Merge branch 'SDKS-7113' into SDKS-7087
nmayorsplit Jun 22, 2023
cb577db
[SDKS-7087] Use SplitCache update in teh Fetcher
nmayorsplit Jun 22, 2023
6685f9c
[SDKS-7113] Check if ParsedSplit is null
nmayorsplit Jun 22, 2023
8ed8d09
Merge branch 'SDKS-7113' into SDKS-7087
nmayorsplit Jun 22, 2023
4ea57ea
[SDKS-7087] Update SplitFetcherImp
nmayorsplit Jun 22, 2023
0679afc
[SDKS-7087] Add FeatureFlagProcessor
nmayorsplit Jun 23, 2023
031418a
[SDKS-7087] Update imports
nmayorsplit Jun 23, 2023
be3a84d
Merge pull request #417 from splitio/SDKS-7087
nmayorsplit Jun 23, 2023
2bcfa5c
Merge pull request #416 from splitio/SDKS-7113
nmayorsplit Jun 23, 2023
d78fc28
[SDKS-7182] Add fetch segments and telemetry for iff
nmayorsplit Jun 26, 2023
748b392
[SDKS-7182] Some pr suggestions
nmayorsplit Jun 27, 2023
1e722d1
Merge pull request #418 from splitio/iff-segments
nmayorsplit Jun 27, 2023
9efe67b
[SDKS-7193] Update version to release iff in tapps
nmayorsplit Jun 27, 2023
ab7ef88
Merge branch 'development' into incremental-ff
nmayorsplit Jul 11, 2023
635cf05
Update version
nmayorsplit Jul 11, 2023
de14c42
[SDKS-7121] IFF Release
nmayorsplit Jul 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
4.8.0 (Jul 18, 2023)
- Improved streaming architecture implementation to apply feature flag updates from the notification received which is now enhanced, improving efficiency and reliability of the whole update system.
- Updated `com.google.guava` dependence to 32.0.1 for fixing a vulnerability.
- Updated SegmentFetcher for better readability.

4.7.2 (May 16, 2023)
- Updated default treatment to be control for yaml and json localhost.
- Updated terminology on the SDKs codebase to be more aligned with current standard without causing a breaking change. The core change is the term split for feature flag on things like logs and javadoc comments.
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.7.2</version>
<version>4.8.0</version>
</parent>
<artifactId>java-client</artifactId>
<packaging>jar</packaging>
Expand Down
19 changes: 7 additions & 12 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

Expand All @@ -115,8 +114,6 @@ public class SplitFactoryImpl implements SplitFactory {
private final static long SSE_CONNECT_TIMEOUT = 30000;
private final static long SSE_SOCKET_TIMEOUT = 70000;

private static Random RANDOM = new Random();

private final SDKReadinessGates _gates;
private final ImpressionsManager _impressionsManager;
private final Evaluator _evaluator;
Expand Down Expand Up @@ -152,7 +149,6 @@ public class SplitFactoryImpl implements SplitFactory {
private final URI _eventsRootTarget;
private final UniqueKeysTracker _uniqueKeysTracker;


//Constructor for standalone mode
public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyntaxException {
_userStorageWrapper = null;
Expand Down Expand Up @@ -188,13 +184,14 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
ImpressionsStorage impressionsStorage = new InMemoryImpressionsStorage(config.impressionsQueueSize());
_splitCache = splitCache;
_segmentCache = segmentCache;
_telemetrySynchronizer = new TelemetryInMemorySubmitter(_httpclient, URI.create(config.telemetryURL()), telemetryStorage, splitCache, segmentCache, telemetryStorage, _startTime);
_telemetrySynchronizer = new TelemetryInMemorySubmitter(_httpclient, URI.create(config.telemetryURL()), telemetryStorage, splitCache, _segmentCache, telemetryStorage, _startTime);

// Segments
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache);

SplitParser splitParser = new SplitParser();
// SplitFetcher
_splitFetcher = buildSplitFetcher(splitCache, splitCache);
_splitFetcher = buildSplitFetcher(splitCache, splitParser);

// SplitSynchronizationTask
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher,
Expand Down Expand Up @@ -241,7 +238,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
SplitAPI splitAPI = SplitAPI.build(_httpclient, buildSSEdHttpClient(apiToken, config, _sdkMetadata));

_syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI,
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config);
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser);
_syncManager.start();

// DestroyOnShutDown
Expand All @@ -255,7 +252,6 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
}
}


//Constructor for consumer mode
protected SplitFactoryImpl(String apiToken, SplitClientConfig config, CustomStorageWrapper customStorageWrapper) throws URISyntaxException {
//Variables that are not used in Consumer mode.
Expand Down Expand Up @@ -378,7 +374,7 @@ protected SplitFactoryImpl(SplitClientConfig config) {

SplitParser splitParser = new SplitParser();

_splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, _splitCache, splitCache, _telemetryStorageProducer);
_splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCache, _telemetryStorageProducer);

// SplitSynchronizationTask
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher, splitCache, config.featuresRefreshRate(), config.getThreadFactory());
Expand Down Expand Up @@ -559,11 +555,10 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config, Se
config.getThreadFactory());
}

private SplitFetcher buildSplitFetcher(SplitCacheConsumer splitCacheConsumer, SplitCacheProducer splitCacheProducer) throws URISyntaxException {
private SplitFetcher buildSplitFetcher(SplitCacheProducer splitCacheProducer, SplitParser splitParser) throws URISyntaxException {
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorageProducer);
SplitParser splitParser = new SplitParser();

return new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheConsumer, splitCacheProducer, _telemetryStorageProducer);
return new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, _telemetryStorageProducer);
}

private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config, ImpressionsStorageConsumer impressionsStorageConsumer, ImpressionsStorageProducer impressionsStorageProducer) throws URISyntaxException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.split.client.utils;

import io.split.client.dtos.Split;
import io.split.client.dtos.Status;
import io.split.engine.experiments.ParsedSplit;
import io.split.engine.experiments.SplitParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class FeatureFlagProcessor {
private static final Logger _log = LoggerFactory.getLogger(FeatureFlagProcessor.class);

public static FeatureFlagsToUpdate processFeatureFlagChanges(SplitParser splitParser, List<Split> splits) {
List<ParsedSplit> toAdd = new ArrayList<>();
List<String> toRemove = new ArrayList<>();
Set<String> segments = new HashSet<>();
for (Split split : splits) {
if (split.status != Status.ACTIVE) {
// archive.
toRemove.add(split.name);
continue;
}
ParsedSplit parsedSplit = splitParser.parse(split);
if (parsedSplit == null) {
_log.debug(String.format("We could not parse the feature flag definition for: %s", split.name));
continue;
}
segments.addAll(parsedSplit.getSegmentsNames());
toAdd.add(parsedSplit);
}
return new FeatureFlagsToUpdate(toAdd, toRemove, segments);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.split.client.utils;

import io.split.engine.experiments.ParsedSplit;

import java.util.List;
import java.util.Set;

public class FeatureFlagsToUpdate {
List<ParsedSplit> toAdd;
List<String> toRemove;
Set<String> segments;

public FeatureFlagsToUpdate(List<ParsedSplit> toAdd, List<String> toRemove, Set<String> segments) {
this.toAdd = toAdd;
this.toRemove = toRemove;
this.segments = segments;
}

public List<ParsedSplit> getToAdd() {
return toAdd;
}

public List<String> getToRemove() {
return toRemove;
}

public Set<String> getSegments() {
return segments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.split.client.impressions.ImpressionsManager;
import io.split.client.impressions.UniqueKeysTracker;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -39,7 +40,7 @@ public void refreshSplits(Long targetChangeNumber) {
}

@Override
public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) {
public void localKillSplit(SplitKillNotification splitKillNotification) {
//No-Op
}

Expand Down Expand Up @@ -80,4 +81,9 @@ public void stopPeriodicDataRecording() {
_telemetrySyncTask.stopScheduledTask();
_log.info("Successful shutdown of telemetry sync task");
}
}

@Override
public void forceRefreshSegment(String segmentName) {
//No-Op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.engine.sse.dtos.SplitKillNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,7 +67,7 @@ public void refreshSplits(Long targetChangeNumber) {
}

@Override
public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) {
public void localKillSplit(SplitKillNotification splitKillNotification) {
//No-Op
}

Expand All @@ -85,4 +86,9 @@ public void startPeriodicDataRecording() {
public void stopPeriodicDataRecording() {
//No-Op
}

@Override
public void forceRefreshSegment(String segmentName) {
//No-Op
}
}
26 changes: 15 additions & 11 deletions client/src/main/java/io/split/engine/common/PushManagerImp.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.split.engine.common;

import com.google.common.annotations.VisibleForTesting;
import io.split.engine.experiments.SplitParser;
import io.split.engine.sse.AuthApiClient;
import io.split.engine.sse.AuthApiClientImp;
import io.split.engine.sse.EventSourceClient;
Expand All @@ -11,10 +12,11 @@
import io.split.engine.sse.dtos.AuthenticationResponse;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.workers.SegmentsWorkerImp;
import io.split.engine.sse.workers.SplitsWorker;
import io.split.engine.sse.workers.SplitsWorkerImp;
import io.split.engine.sse.workers.FeatureFlagsWorker;
import io.split.engine.sse.workers.FeatureFlagWorkerImp;
import io.split.engine.sse.workers.Worker;

import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
Expand All @@ -36,7 +38,7 @@ public class PushManagerImp implements PushManager {

private final AuthApiClient _authApiClient;
private final EventSourceClient _eventSourceClient;
private final SplitsWorker _splitsWorker;
private final FeatureFlagsWorker _featureFlagsWorker;
private final Worker<SegmentQueueDto> _segmentWorker;
private final PushStatusTracker _pushStatusTracker;

Expand All @@ -48,15 +50,15 @@ public class PushManagerImp implements PushManager {
@VisibleForTesting
/* package private */ PushManagerImp(AuthApiClient authApiClient,
EventSourceClient eventSourceClient,
SplitsWorker splitsWorker,
FeatureFlagsWorker featureFlagsWorker,
Worker<SegmentQueueDto> segmentWorker,
PushStatusTracker pushStatusTracker,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ThreadFactory threadFactory) {

_authApiClient = checkNotNull(authApiClient);
_eventSourceClient = checkNotNull(eventSourceClient);
_splitsWorker = splitsWorker;
_featureFlagsWorker = featureFlagsWorker;
_segmentWorker = segmentWorker;
_pushStatusTracker = pushStatusTracker;
_expirationTime = new AtomicLong();
Expand All @@ -70,13 +72,15 @@ public static PushManagerImp build(Synchronizer synchronizer,
SplitAPI splitAPI,
LinkedBlockingQueue<PushManager.Status> statusMessages,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ThreadFactory threadFactory) {
SplitsWorker splitsWorker = new SplitsWorkerImp(synchronizer);
ThreadFactory threadFactory,
SplitParser splitParser,
SplitCacheProducer splitCacheProducer) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer, telemetryRuntimeProducer);
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);
return new PushManagerImp(new AuthApiClientImp(authUrl, splitAPI.getHttpClient(), telemetryRuntimeProducer),
EventSourceClientImp.build(streamingUrl, splitsWorker, segmentWorker, pushStatusTracker, splitAPI.getSseHttpClient(), telemetryRuntimeProducer, threadFactory),
splitsWorker,
EventSourceClientImp.build(streamingUrl, featureFlagsWorker, segmentWorker, pushStatusTracker, splitAPI.getSseHttpClient(), telemetryRuntimeProducer, threadFactory),
featureFlagsWorker,
segmentWorker,
pushStatusTracker,
telemetryRuntimeProducer,
Expand Down Expand Up @@ -134,13 +138,13 @@ private boolean startSse(String token, String channels) {

@Override
public synchronized void startWorkers() {
_splitsWorker.start();
_featureFlagsWorker.start();
_segmentWorker.start();
}

@Override
public synchronized void stopWorkers() {
_splitsWorker.stop();
_featureFlagsWorker.stop();
_segmentWorker.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.split.client.SplitClientConfig;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitParser;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.SegmentCacheProducer;
Expand Down Expand Up @@ -85,7 +86,8 @@ public static SyncManagerImp build(SplitTasks splitTasks,
SDKReadinessGates gates,
TelemetryRuntimeProducer telemetryRuntimeProducer,
TelemetrySynchronizer telemetrySynchronizer,
SplitClientConfig config) {
SplitClientConfig config,
SplitParser splitParser) {
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
Synchronizer synchronizer = new SynchronizerImp(splitTasks,
splitFetcher,
Expand All @@ -102,7 +104,9 @@ public static SyncManagerImp build(SplitTasks splitTasks,
splitAPI,
pushMessages,
telemetryRuntimeProducer,
config.getThreadFactory());
config.getThreadFactory(),
splitParser,
splitCacheProducer);

return new SyncManagerImp(splitTasks,
config.streamingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package io.split.engine.common;

import io.split.engine.sse.dtos.SplitKillNotification;

public interface Synchronizer {
boolean syncAll();
void startPeriodicFetching();
void stopPeriodicFetching();
void refreshSplits(Long targetChangeNumber);
void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber);
void localKillSplit(SplitKillNotification splitKillNotification);
void refreshSegment(String segmentName, Long targetChangeNumber);
void startPeriodicDataRecording();
void stopPeriodicDataRecording();
void forceRefreshSegment(String segmentName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
Expand Down Expand Up @@ -183,10 +184,10 @@ public void refreshSplits(Long targetChangeNumber) {
}

@Override
public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) {
if (newChangeNumber > _splitCacheProducer.getChangeNumber()) {
_splitCacheProducer.kill(featureFlagName, defaultTreatment, newChangeNumber);
refreshSplits(newChangeNumber);
public void localKillSplit(SplitKillNotification splitKillNotification) {
if (splitKillNotification.getChangeNumber() > _splitCacheProducer.getChangeNumber()) {
_splitCacheProducer.kill(splitKillNotification.getSplitName(), splitKillNotification.getDefaultTreatment(), splitKillNotification.getChangeNumber());
refreshSplits(splitKillNotification.getChangeNumber());
}
}

Expand Down Expand Up @@ -303,8 +304,9 @@ public void stopPeriodicDataRecording() {
_log.info("Successful shutdown of telemetry sync task");
}

private void forceRefreshSegment(String segmentName){
@Override
public void forceRefreshSegment(String segmentName){
SegmentFetcher segmentFetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
segmentFetcher.fetch(new FetchOptions.Builder().build());
}
}
}
Loading