Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions client/src/main/java/io/split/client/SplitClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import io.split.client.impressions.ImpressionListener;
import io.split.client.impressions.ImpressionsManager;
import io.split.integrations.IntegrationsConfig;
import org.apache.http.HttpHost;

Expand All @@ -24,6 +25,7 @@ public class SplitClientConfig {
private final int _segmentsRefreshRate;
private final int _impressionsRefreshRate;
private final int _impressionsQueueSize;
private final ImpressionsManager.Mode _impressionsMode;
private final int _metricsRefreshRate;
private final int _connectionTimeout;
private final int _readTimeout;
Expand Down Expand Up @@ -64,6 +66,7 @@ private SplitClientConfig(String endpoint,
int segmentsRefreshRate,
int impressionsRefreshRate,
int impressionsQueueSize,
ImpressionsManager.Mode impressionsMode,
int metricsRefreshRate,
int connectionTimeout,
int readTimeout,
Expand Down Expand Up @@ -93,6 +96,7 @@ private SplitClientConfig(String endpoint,
_segmentsRefreshRate = segmentsRefreshRate;
_impressionsRefreshRate = impressionsRefreshRate;
_impressionsQueueSize = impressionsQueueSize;
_impressionsMode = impressionsMode;
_metricsRefreshRate = metricsRefreshRate;
_connectionTimeout = connectionTimeout;
_readTimeout = readTimeout;
Expand Down Expand Up @@ -158,6 +162,8 @@ public int impressionsQueueSize() {
return _impressionsQueueSize;
}

public ImpressionsManager.Mode impressionsMode() { return _impressionsMode; }

public int metricsRefreshRate() {
return _metricsRefreshRate;
}
Expand Down Expand Up @@ -250,8 +256,9 @@ public static final class Builder {
private boolean _eventsEndpointSet = false;
private int _featuresRefreshRate = 60;
private int _segmentsRefreshRate = 60;
private int _impressionsRefreshRate = 30;
private int _impressionsRefreshRate = -1; // use -1 to identify lack of a user submitted value & handle in build()
private int _impressionsQueueSize = 30000;
private ImpressionsManager.Mode _impressionsMode = ImpressionsManager.Mode.OPTIMIZED;
private int _connectionTimeout = 15000;
private int _readTimeout = 15000;
private int _numThreadsForSegmentFetch = 2;
Expand Down Expand Up @@ -380,6 +387,11 @@ public Builder impressionsRefreshRate(int seconds) {
return this;
}

public Builder impressionsMode(ImpressionsManager.Mode mode) {
_impressionsMode = mode;
return this;
}

/**
* The impression listener captures the which key saw what treatment ("on", "off", etc)
* at what time. This log is periodically pushed back to split endpoint.
Expand Down Expand Up @@ -671,8 +683,13 @@ public SplitClientConfig build() {
throw new IllegalArgumentException("segmentsRefreshRate must be >= 30: " + _segmentsRefreshRate);
}

if (_impressionsRefreshRate <= 0) {
throw new IllegalArgumentException("impressionsRefreshRate must be > 0: " + _impressionsRefreshRate);
switch (_impressionsMode) {
case OPTIMIZED:
_impressionsRefreshRate = (_impressionsRefreshRate <= 0) ? 300 : Math.max(60, _impressionsRefreshRate);
break;
case DEBUG:
_impressionsRefreshRate = (_impressionsRefreshRate <= 0) ? 60 : _impressionsRefreshRate;
break;
}

if (_eventFlushIntervalInMillis < 1000) {
Expand Down Expand Up @@ -734,6 +751,7 @@ public SplitClientConfig build() {
_segmentsRefreshRate,
_impressionsRefreshRate,
_impressionsQueueSize,
_impressionsMode,
_metricsRefreshRate,
_connectionTimeout,
_readTimeout,
Expand Down
13 changes: 7 additions & 6 deletions client/src/main/java/io/split/client/SplitClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import io.split.client.dtos.Event;
import io.split.client.exceptions.ChangeNumberExceptionWrapper;
import io.split.client.impressions.Impression;
import io.split.client.impressions.ImpressionListener;
import io.split.client.impressions.ImpressionsManager;
import io.split.client.impressions.ImpressionsManagerImpl;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.ParsedCondition;
import io.split.engine.experiments.ParsedSplit;
Expand Down Expand Up @@ -50,7 +51,7 @@ public final class SplitClientImpl implements SplitClient {

private final SplitFactory _container;
private final SplitFetcher _splitFetcher;
private final ImpressionListener _impressionListener;
private final ImpressionsManager _impressionManager;
private final Metrics _metrics;
private final SplitClientConfig _config;
private final EventClient _eventClient;
Expand All @@ -59,22 +60,22 @@ public final class SplitClientImpl implements SplitClient {

public SplitClientImpl(SplitFactory container,
SplitFetcher splitFetcher,
ImpressionListener impressionListener,
ImpressionsManager impressionManager,
Metrics metrics,
EventClient eventClient,
SplitClientConfig config,
SDKReadinessGates gates) {
_container = container;
_splitFetcher = splitFetcher;
_impressionListener = impressionListener;
_impressionManager = impressionManager;
_metrics = metrics;
_eventClient = eventClient;
_config = config;
_gates = gates;

checkNotNull(gates);
checkNotNull(_splitFetcher);
checkNotNull(_impressionListener);
checkNotNull(_impressionManager);
}

@Override
Expand Down Expand Up @@ -222,7 +223,7 @@ private SplitResult getTreatmentWithConfigInternal(String label, String matching
private void recordStats(String matchingKey, String bucketingKey, String split, long start, String result,
String operation, String label, Long changeNumber, Map<String, Object> attributes) {
try {
_impressionListener.log(new Impression(matchingKey, bucketingKey, split, result, System.currentTimeMillis(), label, changeNumber, attributes));
_impressionManager.track(new Impression(matchingKey, bucketingKey, split, result, System.currentTimeMillis(), label, changeNumber, attributes));
_metrics.time(operation, System.currentTimeMillis() - start);
} catch (Throwable t) {
_log.error("Exception", t);
Expand Down
47 changes: 13 additions & 34 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.google.common.collect.Multiset;
import io.split.client.impressions.AsynchronousImpressionListener;
import io.split.client.impressions.ImpressionListener;
import io.split.client.impressions.ImpressionsManager;
import io.split.client.impressions.ImpressionsManagerImpl;
import io.split.client.interceptors.AddSplitHeadersFilter;
import io.split.client.interceptors.GzipDecoderResponseInterceptor;
import io.split.client.interceptors.GzipEncoderRequestInterceptor;
Expand Down Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class SplitFactoryImpl implements SplitFactory {
private static final Logger _log = LoggerFactory.getLogger(SplitFactory.class);
Expand Down Expand Up @@ -168,43 +169,21 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn

final RefreshableSplitFetcherProvider splitFetcherProvider = new RefreshableSplitFetcherProvider(splitChangeFetcher, splitParser, findPollingPeriod(RANDOM, config.featuresRefreshRate()), gates);

// Impressions
final ImpressionsManager splitImpressionListener = ImpressionsManager.instance(httpclient, config);

List<ImpressionListener> impressionListeners = new ArrayList<>();
impressionListeners.add(splitImpressionListener);

// Setup integrations
if (config.integrationsConfig() != null) {
config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.ASYNC).stream()
.map(l -> AsynchronousImpressionListener.build(l.listener(), l.queueSize()))
.collect(Collectors.toCollection(() -> impressionListeners));

// asynchronous impressions listeners
List<IntegrationsConfig.ImpressionListenerWithMeta> asyncListeners = config
.integrationsConfig()
.getImpressionsListeners(IntegrationsConfig.Execution.ASYNC);

for (IntegrationsConfig.ImpressionListenerWithMeta listener : asyncListeners) {
AsynchronousImpressionListener wrapper = AsynchronousImpressionListener
.build(listener.listener(), listener.queueSize());
impressionListeners.add(wrapper);
}

// synchronous impressions listeners
List<IntegrationsConfig.ImpressionListenerWithMeta> syncListeners = config
.integrationsConfig()
.getImpressionsListeners(IntegrationsConfig.Execution.SYNC);
for (IntegrationsConfig.ImpressionListenerWithMeta listener: syncListeners) {
impressionListeners.add(listener.listener());

}
config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.SYNC).stream()
.map(IntegrationsConfig.ImpressionListenerWithMeta::listener)
.collect(Collectors.toCollection(() -> impressionListeners));
}

final ImpressionListener impressionListener;
if (impressionListeners.size() > 1) {
// since there are more than just the default integration, let's federate and add them all.
impressionListener = new ImpressionListener.FederatedImpressionListener(impressionListeners);
} else {
impressionListener = splitImpressionListener;
}
// Impressions
final ImpressionsManagerImpl impressionsManager = ImpressionsManagerImpl.instance(httpclient, config, impressionListeners);

CachedMetrics cachedMetrics = new CachedMetrics(httpMetrics, TimeUnit.SECONDS.toMillis(config.metricsRefreshRate()));
final FireAndForgetMetrics cachedFireAndForgetMetrics = FireAndForgetMetrics.instance(cachedMetrics, 2, 1000);
Expand All @@ -223,12 +202,12 @@ public void run() {
_log.info("Successful shutdown of segment fetchers");
splitFetcherProvider.close();
_log.info("Successful shutdown of splits");
impressionsManager.close();
_log.info("Successful shutdown of impressions manager");
uncachedFireAndForget.close();
_log.info("Successful shutdown of metrics 1");
cachedFireAndForgetMetrics.close();
_log.info("Successful shutdown of metrics 2");
impressionListener.close();
_log.info("Successful shutdown of ImpressionListener");
httpclient.close();
_log.info("Successful shutdown of httpclient");
eventClient.close();
Expand All @@ -253,7 +232,7 @@ public void run() {

_client = new SplitClientImpl(this,
splitFetcherProvider.getFetcher(),
impressionListener,
impressionsManager,
cachedFireAndForgetMetrics,
eventClient,
config,
Expand Down
77 changes: 77 additions & 0 deletions client/src/main/java/io/split/client/dtos/ImpressionCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.split.client.dtos;

import com.google.gson.annotations.SerializedName;
import io.split.client.impressions.ImpressionCounter;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class ImpressionCount {

private static final String FIELD_PER_FEATURE_COUNTS = "pf";

@SerializedName(FIELD_PER_FEATURE_COUNTS)
public final List<CountPerFeature> perFeature;

public ImpressionCount(List<CountPerFeature> cs) {
perFeature = cs;
}

public static ImpressionCount fromImpressionCounterData(Map<ImpressionCounter.Key, Integer> raw) {
return new ImpressionCount(raw.entrySet().stream()
.map(e -> new CountPerFeature(e.getKey().featureName(), e.getKey().timeFrame(), e.getValue()))
.collect(Collectors.toList()));
}

@Override
public int hashCode() {
return Objects.hash(perFeature);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ImpressionCount c = (ImpressionCount) o;
return Objects.equals(perFeature, c.perFeature);
}

public static class CountPerFeature {

private static final String FIELD_FEATURE = "f";
private static final String FIELD_TIMEFRAME = "m";
private static final String FIELD_COUNT = "rc";

@SerializedName(FIELD_FEATURE)
public final String feature;

@SerializedName(FIELD_TIMEFRAME)
public final long timeframe;

@SerializedName(FIELD_COUNT)
public final int count;

public CountPerFeature(String f, long t, int c) {
feature = f;
timeframe = t;
count = c;
}

@Override
public int hashCode() {
return Objects.hash(feature, timeframe, count);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CountPerFeature c = (CountPerFeature) o;
return Objects.equals(feature, c.feature) && Objects.equals(timeframe, c.timeframe) &&
Objects.equals(count, c.count);
}
}
}
46 changes: 43 additions & 3 deletions client/src/main/java/io/split/client/dtos/KeyImpression.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,43 @@
package io.split.client.dtos;


import com.google.gson.annotations.SerializedName;
import io.split.client.impressions.Impression;

import java.util.Objects;

public class KeyImpression {
public String feature;

/* package private */ static final String FIELD_KEY_NAME = "k";
/* package private */ static final String FIELD_BUCKETING_KEY = "b";
/* package private */ static final String FIELD_TREATMENT = "t";
/* package private */ static final String FIELD_LABEL = "r";
/* package private */ static final String FIELD_TIME = "m";
/* package private */ static final String FIELD_CHANGE_NUMBER = "c";
/* package private */ static final String FIELD_PREVIOUS_TIME = "pt";

public transient String feature; // Non-serializable

@SerializedName(FIELD_KEY_NAME)
public String keyName;

@SerializedName(FIELD_BUCKETING_KEY)
public String bucketingKey;

@SerializedName(FIELD_TREATMENT)
public String treatment;

@SerializedName(FIELD_LABEL)
public String label;

@SerializedName(FIELD_TIME)
public long time;

@SerializedName(FIELD_CHANGE_NUMBER)
public Long changeNumber; // can be null if there is no changeNumber
public Long pt;

@SerializedName(FIELD_PREVIOUS_TIME)
public Long previousTime;

@Override
public boolean equals(Object o) {
Expand All @@ -19,7 +47,7 @@ public boolean equals(Object o) {
KeyImpression that = (KeyImpression) o;

if (time != that.time) return false;
if (feature != null ? !feature.equals(that.feature) : that.feature != null) return false;
if (!Objects.equals(feature, that.feature)) return false;
if (!keyName.equals(that.keyName)) return false;
if (!treatment.equals(that.treatment)) return false;

Expand All @@ -39,4 +67,16 @@ public int hashCode() {
result = 31 * result + (int) (time ^ (time >>> 32));
return result;
}

public static KeyImpression fromImpression(Impression i) {
KeyImpression ki = new KeyImpression();
ki.feature = i.split();
ki.keyName = i.key();
ki.bucketingKey = i.bucketingKey();
ki.time = i.time();
ki.changeNumber = i.changeNumber();
ki.treatment = i.treatment();
ki.label = i.appliedRule();
return ki;
}
}
Loading