Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ public class SplitFactoryImpl implements SplitFactory {
private final URI _eventsRootTarget;
private final UniqueKeysTracker _uniqueKeysTracker;


//Constructor for standalone mode
public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyntaxException {
_userStorageWrapper = null;
Expand Down Expand Up @@ -188,13 +187,14 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
ImpressionsStorage impressionsStorage = new InMemoryImpressionsStorage(config.impressionsQueueSize());
_splitCache = splitCache;
_segmentCache = segmentCache;
_telemetrySynchronizer = new TelemetryInMemorySubmitter(_httpclient, URI.create(config.telemetryURL()), telemetryStorage, splitCache, segmentCache, telemetryStorage, _startTime);
_telemetrySynchronizer = new TelemetryInMemorySubmitter(_httpclient, URI.create(config.telemetryURL()), telemetryStorage, splitCache, _segmentCache, telemetryStorage, _startTime);

// Segments
_segmentSynchronizationTaskImp = buildSegments(config, segmentCache, splitCache);

SplitParser splitParser = new SplitParser();
// SplitFetcher
_splitFetcher = buildSplitFetcher(splitCache, splitCache);
_splitFetcher = buildSplitFetcher(splitCache, splitCache, splitParser);

// SplitSynchronizationTask
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher,
Expand Down Expand Up @@ -241,7 +241,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
SplitAPI splitAPI = SplitAPI.build(_httpclient, buildSSEdHttpClient(apiToken, config, _sdkMetadata));

_syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI,
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config);
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser);
_syncManager.start();

// DestroyOnShutDown
Expand All @@ -255,7 +255,6 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
}
}


//Constructor for consumer mode
protected SplitFactoryImpl(String apiToken, SplitClientConfig config, CustomStorageWrapper customStorageWrapper) throws URISyntaxException {
//Variables that are not used in Consumer mode.
Expand Down Expand Up @@ -559,9 +558,8 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config, Se
config.getThreadFactory());
}

private SplitFetcher buildSplitFetcher(SplitCacheConsumer splitCacheConsumer, SplitCacheProducer splitCacheProducer) throws URISyntaxException {
private SplitFetcher buildSplitFetcher(SplitCacheConsumer splitCacheConsumer, SplitCacheProducer splitCacheProducer, SplitParser splitParser) throws URISyntaxException {
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorageProducer);
SplitParser splitParser = new SplitParser();

return new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheConsumer, splitCacheProducer, _telemetryStorageProducer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.split.client.impressions.ImpressionsManager;
import io.split.client.impressions.UniqueKeysTracker;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -39,7 +40,7 @@ public void refreshSplits(Long targetChangeNumber) {
}

@Override
public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) {
public void localKillSplit(SplitKillNotification splitKillNotification) {
//No-Op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.engine.sse.dtos.SplitKillNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,7 +67,7 @@ public void refreshSplits(Long targetChangeNumber) {
}

@Override
public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) {
public void localKillSplit(SplitKillNotification splitKillNotification) {
//No-Op
}

Expand Down
26 changes: 15 additions & 11 deletions client/src/main/java/io/split/engine/common/PushManagerImp.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.split.engine.common;

import com.google.common.annotations.VisibleForTesting;
import io.split.engine.experiments.SplitParser;
import io.split.engine.sse.AuthApiClient;
import io.split.engine.sse.AuthApiClientImp;
import io.split.engine.sse.EventSourceClient;
Expand All @@ -11,10 +12,11 @@
import io.split.engine.sse.dtos.AuthenticationResponse;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.workers.SegmentsWorkerImp;
import io.split.engine.sse.workers.SplitsWorker;
import io.split.engine.sse.workers.SplitsWorkerImp;
import io.split.engine.sse.workers.FeatureFlagsWorker;
import io.split.engine.sse.workers.FeatureFlagWorkerImp;
import io.split.engine.sse.workers.Worker;

import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
Expand All @@ -36,7 +38,7 @@ public class PushManagerImp implements PushManager {

private final AuthApiClient _authApiClient;
private final EventSourceClient _eventSourceClient;
private final SplitsWorker _splitsWorker;
private final FeatureFlagsWorker _featureFlagsWorker;
private final Worker<SegmentQueueDto> _segmentWorker;
private final PushStatusTracker _pushStatusTracker;

Expand All @@ -48,15 +50,15 @@ public class PushManagerImp implements PushManager {
@VisibleForTesting
/* package private */ PushManagerImp(AuthApiClient authApiClient,
EventSourceClient eventSourceClient,
SplitsWorker splitsWorker,
FeatureFlagsWorker featureFlagsWorker,
Worker<SegmentQueueDto> segmentWorker,
PushStatusTracker pushStatusTracker,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ThreadFactory threadFactory) {

_authApiClient = checkNotNull(authApiClient);
_eventSourceClient = checkNotNull(eventSourceClient);
_splitsWorker = splitsWorker;
_featureFlagsWorker = featureFlagsWorker;
_segmentWorker = segmentWorker;
_pushStatusTracker = pushStatusTracker;
_expirationTime = new AtomicLong();
Expand All @@ -70,13 +72,15 @@ public static PushManagerImp build(Synchronizer synchronizer,
SplitAPI splitAPI,
LinkedBlockingQueue<PushManager.Status> statusMessages,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ThreadFactory threadFactory) {
SplitsWorker splitsWorker = new SplitsWorkerImp(synchronizer);
ThreadFactory threadFactory,
SplitParser splitParser,
SplitCacheProducer splitCacheProducer) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer);
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);
return new PushManagerImp(new AuthApiClientImp(authUrl, splitAPI.getHttpClient(), telemetryRuntimeProducer),
EventSourceClientImp.build(streamingUrl, splitsWorker, segmentWorker, pushStatusTracker, splitAPI.getSseHttpClient(), telemetryRuntimeProducer, threadFactory),
splitsWorker,
EventSourceClientImp.build(streamingUrl, featureFlagsWorker, segmentWorker, pushStatusTracker, splitAPI.getSseHttpClient(), telemetryRuntimeProducer, threadFactory),
featureFlagsWorker,
segmentWorker,
pushStatusTracker,
telemetryRuntimeProducer,
Expand Down Expand Up @@ -134,13 +138,13 @@ private boolean startSse(String token, String channels) {

@Override
public synchronized void startWorkers() {
_splitsWorker.start();
_featureFlagsWorker.start();
_segmentWorker.start();
}

@Override
public synchronized void stopWorkers() {
_splitsWorker.stop();
_featureFlagsWorker.stop();
_segmentWorker.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.split.client.SplitClientConfig;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitParser;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.SegmentCacheProducer;
Expand Down Expand Up @@ -85,7 +86,8 @@ public static SyncManagerImp build(SplitTasks splitTasks,
SDKReadinessGates gates,
TelemetryRuntimeProducer telemetryRuntimeProducer,
TelemetrySynchronizer telemetrySynchronizer,
SplitClientConfig config) {
SplitClientConfig config,
SplitParser splitParser) {
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
Synchronizer synchronizer = new SynchronizerImp(splitTasks,
splitFetcher,
Expand All @@ -102,7 +104,9 @@ public static SyncManagerImp build(SplitTasks splitTasks,
splitAPI,
pushMessages,
telemetryRuntimeProducer,
config.getThreadFactory());
config.getThreadFactory(),
splitParser,
splitCacheProducer);

return new SyncManagerImp(splitTasks,
config.streamingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.split.engine.common;

import io.split.engine.sse.dtos.SplitKillNotification;

public interface Synchronizer {
boolean syncAll();
void startPeriodicFetching();
void stopPeriodicFetching();
void refreshSplits(Long targetChangeNumber);
void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber);
void localKillSplit(SplitKillNotification splitKillNotification);
void refreshSegment(String segmentName, Long targetChangeNumber);
void startPeriodicDataRecording();
void stopPeriodicDataRecording();
Expand Down
11 changes: 6 additions & 5 deletions client/src/main/java/io/split/engine/common/SynchronizerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.synchronizer.TelemetrySyncTask;
Expand Down Expand Up @@ -183,10 +184,10 @@ public void refreshSplits(Long targetChangeNumber) {
}

@Override
public void localKillSplit(String featureFlagName, String defaultTreatment, long newChangeNumber) {
if (newChangeNumber > _splitCacheProducer.getChangeNumber()) {
_splitCacheProducer.kill(featureFlagName, defaultTreatment, newChangeNumber);
refreshSplits(newChangeNumber);
public void localKillSplit(SplitKillNotification splitKillNotification) {
if (splitKillNotification.getChangeNumber() > _splitCacheProducer.getChangeNumber()) {
_splitCacheProducer.kill(splitKillNotification.getSplitName(), splitKillNotification.getDefaultTreatment(), splitKillNotification.getChangeNumber());
refreshSplits(splitKillNotification.getChangeNumber());
}
}

Expand Down Expand Up @@ -307,4 +308,4 @@ private void forceRefreshSegment(String segmentName){
SegmentFetcher segmentFetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
segmentFetcher.fetch(new FetchOptions.Builder().build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,4 @@ private AttributeMatcher toMatcher(Matcher matcher) {

return new AttributeMatcher(attribute, delegate, negate);
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.split.engine.sse.client.SSEClient;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.exceptions.EventParsingException;
import io.split.engine.sse.workers.SplitsWorker;
import io.split.engine.sse.workers.FeatureFlagsWorker;
import io.split.engine.sse.workers.Worker;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
Expand Down Expand Up @@ -56,15 +56,15 @@ public class EventSourceClientImp implements EventSourceClient {
}

public static EventSourceClientImp build(String baseStreamingUrl,
SplitsWorker splitsWorker,
FeatureFlagsWorker featureFlagsWorker,
Worker<SegmentQueueDto> segmentWorker,
PushStatusTracker pushStatusTracker,
CloseableHttpClient sseHttpClient,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ThreadFactory threadFactory) {
return new EventSourceClientImp(baseStreamingUrl,
new NotificationParserImp(),
NotificationProcessorImp.build(splitsWorker, segmentWorker, pushStatusTracker),
NotificationProcessorImp.build(featureFlagsWorker, segmentWorker, pushStatusTracker),
pushStatusTracker,
sseHttpClient,
telemetryRuntimeProducer,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.split.engine.sse;

import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.dtos.IncomingNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.dtos.StatusNotification;

public interface NotificationProcessor {
void process(IncomingNotification notification);
void processSplitUpdate(long changeNumber);
void processSplitKill(long changeNumber, String splitName, String defaultTreatment);
void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification);
void processSplitKill(SplitKillNotification splitKillNotification);
void processSegmentUpdate(long changeNumber, String segmentName);
void processStatus(StatusNotification statusNotification);
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
package io.split.engine.sse;

import com.google.common.annotations.VisibleForTesting;
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.dtos.GenericNotificationData;
import io.split.engine.sse.dtos.IncomingNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.engine.sse.dtos.StatusNotification;
import io.split.engine.sse.dtos.SegmentQueueDto;
import io.split.engine.sse.workers.SplitsWorker;
import io.split.engine.sse.workers.FeatureFlagsWorker;
import io.split.engine.sse.workers.Worker;

import static com.google.common.base.Preconditions.checkNotNull;

public class NotificationProcessorImp implements NotificationProcessor {
private final SplitsWorker _splitsWorker;
private final FeatureFlagsWorker _featureFlagsWorker;
private final Worker<SegmentQueueDto> _segmentWorker;
private final PushStatusTracker _pushStatusTracker;

@VisibleForTesting
/* package private */ NotificationProcessorImp(SplitsWorker splitsWorker,
/* package private */ NotificationProcessorImp(FeatureFlagsWorker featureFlagsWorker,
Worker<SegmentQueueDto> segmentWorker,
PushStatusTracker pushStatusTracker) {
_splitsWorker = checkNotNull(splitsWorker);
_featureFlagsWorker = checkNotNull(featureFlagsWorker);
_segmentWorker = checkNotNull(segmentWorker);
_pushStatusTracker = checkNotNull(pushStatusTracker);
}

public static NotificationProcessorImp build(SplitsWorker splitsWorker, Worker<SegmentQueueDto> segmentWorker, PushStatusTracker pushStatusTracker) {
return new NotificationProcessorImp(splitsWorker, segmentWorker, pushStatusTracker);
public static NotificationProcessorImp build(FeatureFlagsWorker featureFlagsWorker, Worker<SegmentQueueDto> segmentWorker, PushStatusTracker pushStatusTracker) {
return new NotificationProcessorImp(featureFlagsWorker, segmentWorker, pushStatusTracker);
}

@Override
Expand All @@ -33,14 +36,17 @@ public void process(IncomingNotification notification) {
}

@Override
public void processSplitUpdate(long changeNumber) {
_splitsWorker.addToQueue(changeNumber);
public void processSplitUpdate(FeatureFlagChangeNotification featureFlagChangeNotification) {
_featureFlagsWorker.addToQueue(featureFlagChangeNotification);
}

@Override
public void processSplitKill(long changeNumber, String splitName, String defaultTreatment) {
_splitsWorker.killSplit(changeNumber, splitName, defaultTreatment);
_splitsWorker.addToQueue(changeNumber);
public void processSplitKill(SplitKillNotification splitKillNotification) {
_featureFlagsWorker.kill(splitKillNotification);
_featureFlagsWorker.addToQueue(new FeatureFlagChangeNotification(GenericNotificationData.builder()
.changeNumber(splitKillNotification.getChangeNumber())
.channel(splitKillNotification.getChannel())
.build()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public CompressType getCompressType() {

@Override
public void handler(NotificationProcessor notificationProcessor) {
notificationProcessor.processSplitUpdate(getChangeNumber());
notificationProcessor.processSplitUpdate(this);
}

@Override
Expand Down
Loading