diff --git a/client/src/main/java/io/split/telemetry/domain/AtomicLongArray.java b/client/src/main/java/io/split/telemetry/domain/AtomicLongArray.java new file mode 100644 index 000000000..cdd51f893 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/AtomicLongArray.java @@ -0,0 +1,37 @@ +package io.split.telemetry.domain; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +public class AtomicLongArray { + private AtomicLong[] array; + + public AtomicLongArray(int size) throws Exception { + if(size == 0) { + throw new Exception("Invalid array size"); + } + array = new AtomicLong[size]; + IntStream.range(0, array.length).forEach(x -> array[x] = new AtomicLong()); + } + + public void increment(int index) { + if (index < 0 || index >= array.length) { + throw new ArrayIndexOutOfBoundsException(); + } + array[index].getAndIncrement(); + } + + public List fetchAndClearAll() { + List listValues = new ArrayList<>(); + for (AtomicLong a: array) { + listValues.add(a.longValue()); + } + + IntStream.range(0, array.length).forEach(x -> array[x] = new AtomicLong()); + + return listValues; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/HTTPErrors.java b/client/src/main/java/io/split/telemetry/domain/HTTPErrors.java new file mode 100644 index 000000000..dac746117 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/HTTPErrors.java @@ -0,0 +1,97 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HTTPErrors { + /* package private */ static final String FIELD_SPLIT = "sp"; + /* package private */ static final String FIELD_SEGMENTS = "se"; + /* package private */ static final String FIELD_IMPRESSIONS = "im"; + /* package private */ static final String FIELD_IMPRESSIONS_COUNT = "ic"; + /* package private */ static final String FIELD_EVENTS = "ev"; + /* package private */ static final String FIELD_TOKEN = "to"; + /* package private */ static final String FIELD_TELEMETRY = "te"; + + @SerializedName(FIELD_SPLIT) + private Map _splits; + @SerializedName(FIELD_SEGMENTS) + private Map _segments; + @SerializedName(FIELD_IMPRESSIONS) + private Map _impressions; + @SerializedName(FIELD_IMPRESSIONS_COUNT) + private Map _impressionsCount; + @SerializedName(FIELD_EVENTS) + private Map _events; + @SerializedName(FIELD_TOKEN) + private Map _token; + @SerializedName(FIELD_TELEMETRY) + private Map _telemetry; + + public HTTPErrors() { + _splits = new ConcurrentHashMap<>(); + _segments = new ConcurrentHashMap<>(); + _impressions = new ConcurrentHashMap<>(); + _impressionsCount = new ConcurrentHashMap<>(); + _events = new ConcurrentHashMap<>(); + _token = new ConcurrentHashMap<>(); + _telemetry = new ConcurrentHashMap<>(); + } + + public Map get_splits() { + return _splits; + } + + public void set_splits(Map _splits) { + this._splits = _splits; + } + + public Map get_segments() { + return _segments; + } + + public void set_segments(Map _segments) { + this._segments = _segments; + } + + public Map get_impressions() { + return _impressions; + } + + public void set_impressions(Map _impressions) { + this._impressions = _impressions; + } + + public Map get_events() { + return _events; + } + + public void set_events(Map _events) { + this._events = _events; + } + + public Map get_token() { + return _token; + } + + public void set_token(Map _token) { + this._token = _token; + } + + public Map get_telemetry() { + return _telemetry; + } + + public void set_telemetry(Map _telemetry) { + this._telemetry = _telemetry; + } + + public Map get_impressionsCount() { + return _impressionsCount; + } + + public void set_impressionsCount(Map _impressionsCount) { + this._impressionsCount = _impressionsCount; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java b/client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java new file mode 100644 index 000000000..0e0791ed9 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java @@ -0,0 +1,97 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +import java.util.ArrayList; +import java.util.List; + +public class HTTPLatencies { + /* package private */ static final String FIELD_SPLIT = "sp"; + /* package private */ static final String FIELD_SEGMENTS = "se"; + /* package private */ static final String FIELD_IMPRESSIONS = "im"; + /* package private */ static final String FIELD_IMPRESSIONS_COUNT = "ic"; + /* package private */ static final String FIELD_EVENTS = "ev"; + /* package private */ static final String FIELD_TOKEN = "to"; + /* package private */ static final String FIELD_TELEMETRY = "te"; + + @SerializedName(FIELD_SPLIT) + private List _splits; + @SerializedName(FIELD_SEGMENTS) + private List_segments; + @SerializedName(FIELD_IMPRESSIONS) + private List _impressions; + @SerializedName(FIELD_IMPRESSIONS_COUNT) + private List _impressionsCount; + @SerializedName(FIELD_EVENTS) + private List _events; + @SerializedName(FIELD_TOKEN) + private List _token; + @SerializedName(FIELD_TELEMETRY) + private List _telemetry; + + public HTTPLatencies() { + _splits = new ArrayList<>(); + _segments = new ArrayList<>(); + _impressions = new ArrayList<>(); + _impressionsCount = new ArrayList<>(); + _events = new ArrayList<>(); + _token = new ArrayList<>(); + _telemetry = new ArrayList<>(); + } + + public List get_splits() { + return _splits; + } + + public void set_splits(List _splits) { + this._splits = _splits; + } + + public List get_segments() { + return _segments; + } + + public void set_segments(List _segments) { + this._segments = _segments; + } + + public List get_impressions() { + return _impressions; + } + + public void set_impressions(List _impressions) { + this._impressions = _impressions; + } + + public List get_events() { + return _events; + } + + public void set_events(List _events) { + this._events = _events; + } + + public List get_token() { + return _token; + } + + public void set_token(List _token) { + this._token = _token; + } + + public List get_telemetry() { + return _telemetry; + } + + public void set_telemetry(List _telemetry) { + this._telemetry = _telemetry; + } + + public List get_impressionsCount() { + return _impressionsCount; + } + + public void set_impressionsCount(List _impressionsCount) { + this._impressionsCount = _impressionsCount; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/LastSynchronization.java b/client/src/main/java/io/split/telemetry/domain/LastSynchronization.java new file mode 100644 index 000000000..59586562e --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/LastSynchronization.java @@ -0,0 +1,84 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +public class LastSynchronization { + /* package private */ static final String FIELD_SPLIT = "sp"; + /* package private */ static final String FIELD_SEGMENTS = "se"; + /* package private */ static final String FIELD_IMPRESSIONS = "im"; + /* package private */ static final String FIELD_IMPRESSIONS_COUNT = "ic"; + /* package private */ static final String FIELD_EVENTS = "ev"; + /* package private */ static final String FIELD_TOKEN = "to"; + /* package private */ static final String FIELD_TELEMETRY = "te"; + + @SerializedName(FIELD_SPLIT) + private long _splits; + @SerializedName(FIELD_SEGMENTS) + private long _segments; + @SerializedName(FIELD_IMPRESSIONS) + private long _impressions; + @SerializedName(FIELD_IMPRESSIONS_COUNT) + private long _impressionsCount; + @SerializedName(FIELD_EVENTS) + private long _events; + @SerializedName(FIELD_TOKEN) + private long _token; + @SerializedName(FIELD_TELEMETRY) + private long _telemetry; + + public long get_splits() { + return _splits; + } + + public void set_splits(long _splits) { + this._splits = _splits; + } + + public long get_segments() { + return _segments; + } + + public void set_segments(long _segments) { + this._segments = _segments; + } + + public long get_impressions() { + return _impressions; + } + + public void set_impressions(long _impressions) { + this._impressions = _impressions; + } + + public long get_events() { + return _events; + } + + public void set_events(long _events) { + this._events = _events; + } + + public long get_token() { + return _token; + } + + public void set_token(long _token) { + this._token = _token; + } + + public long get_telemetry() { + return _telemetry; + } + + public void set_telemetry(long _telemetry) { + this._telemetry = _telemetry; + } + + public long get_impressionsCount() { + return _impressionsCount; + } + + public void set_impressionsCount(long _impressionsCount) { + this._impressionsCount = _impressionsCount; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/MethodExceptions.java b/client/src/main/java/io/split/telemetry/domain/MethodExceptions.java new file mode 100644 index 000000000..c6d6561be --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/MethodExceptions.java @@ -0,0 +1,62 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +public class MethodExceptions { + /* package private */ static final String FIELD_TREATMENT = "t"; + /* package private */ static final String FIELD_TREATMENTS = "ts"; + /* package private */ static final String FIELD_TREATMENT_WITH_CONFIG = "tc"; + /* package private */ static final String FIELD_TREATMENTS_WITH_CONFIG = "tcs"; + /* package private */ static final String FIELD_TRACK = "tr"; + + @SerializedName(FIELD_TREATMENT) + private long _treatment; + @SerializedName(FIELD_TREATMENTS) + private long _treatments; + @SerializedName(FIELD_TREATMENT_WITH_CONFIG) + private long _treatmentWithConfig; + @SerializedName(FIELD_TREATMENTS_WITH_CONFIG) + private long _treatmentsWithConfig; + @SerializedName(FIELD_TRACK) + private long _track; + + public long get_treatment() { + return _treatment; + } + + public void set_treatment(long _treatment) { + this._treatment = _treatment; + } + + public long get_treatments() { + return _treatments; + } + + public void set_treatments(long _treatments) { + this._treatments = _treatments; + } + + public long get_treatmentsWithConfig() { + return _treatmentsWithConfig; + } + + public void set_treatmentsWithConfig(long _treatmentsWithConfig) { + this._treatmentsWithConfig = _treatmentsWithConfig; + } + + public long get_treatmentWithConfig() { + return _treatmentWithConfig; + } + + public void set_treatmentWithConfig(long _treatmentWithConfig) { + this._treatmentWithConfig = _treatmentWithConfig; + } + + public long get_track() { + return _track; + } + + public void set_track(long _track) { + this._track = _track; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/MethodLatencies.java b/client/src/main/java/io/split/telemetry/domain/MethodLatencies.java new file mode 100644 index 000000000..21aae636c --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/MethodLatencies.java @@ -0,0 +1,73 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +import java.util.ArrayList; +import java.util.List; + +public class MethodLatencies { + /* package private */ static final String FIELD_TREATMENT = "t"; + /* package private */ static final String FIELD_TREATMENTS = "ts"; + /* package private */ static final String FIELD_TREATMENT_WITH_CONFIG = "tc"; + /* package private */ static final String FIELD_TREATMENTS_WITH_CONFIG = "tcs"; + /* package private */ static final String FIELD_TRACK = "tr"; + + @SerializedName(FIELD_TREATMENT) + private List _treatment; + @SerializedName(FIELD_TREATMENTS) + private List _treatments; + @SerializedName(FIELD_TREATMENT_WITH_CONFIG) + private List _treatmentWithConfig; + @SerializedName(FIELD_TREATMENTS_WITH_CONFIG) + private List _treatmentsWithConfig; + @SerializedName(FIELD_TRACK) + private List _track; + + public MethodLatencies() { + _treatment = new ArrayList<>(); + _treatments = new ArrayList<>(); + _treatmentWithConfig = new ArrayList<>(); + _treatmentsWithConfig = new ArrayList<>(); + _track = new ArrayList<>(); + } + + public List get_treatment() { + return _treatment; + } + + public void set_treatment(List _treatment) { + this._treatment = _treatment; + } + + public List get_treatments() { + return _treatments; + } + + public void set_treatments(List _treatments) { + this._treatments = _treatments; + } + + public List get_treatmentsWithConfig() { + return _treatmentsWithConfig; + } + + public void set_treatmentsWithConfig(List _treatmentsWithConfig) { + this._treatmentsWithConfig = _treatmentsWithConfig; + } + + public List get_treatmentWithConfig() { + return _treatmentWithConfig; + } + + public void set_treatmentWithConfig(List _treatmentWithConfig) { + this._treatmentWithConfig = _treatmentWithConfig; + } + + public List get_track() { + return _track; + } + + public void set_track(List _track) { + this._track = _track; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/StreamingEvent.java b/client/src/main/java/io/split/telemetry/domain/StreamingEvent.java new file mode 100644 index 000000000..e0ebca3ac --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/StreamingEvent.java @@ -0,0 +1,40 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +public class StreamingEvent { + /* package private */ static final String FIELD_TYPE = "e"; + /* package private */ static final String FIELD_DATA = "d"; + /* package private */ static final String FIELD_TIMESTAMP = "t"; + + @SerializedName(FIELD_TYPE) + private int _type; + @SerializedName(FIELD_DATA) + private long _data; + @SerializedName(FIELD_TIMESTAMP) + private long _timestamp; + + public int get_type() { + return _type; + } + + public void set_type(int _type) { + this._type = _type; + } + + public long get_data() { + return _data; + } + + public void set_data(long _data) { + this._data = _data; + } + + public long getTimestamp() { + return _timestamp; + } + + public void setTimestamp(long timestamp) { + this._timestamp = timestamp; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/EventsDataRecordsEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/EventsDataRecordsEnum.java new file mode 100644 index 000000000..8beb3ac48 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/EventsDataRecordsEnum.java @@ -0,0 +1,6 @@ +package io.split.telemetry.domain.enums; + +public enum EventsDataRecordsEnum { + EVENTS_QUEUED, + EVENTS_DROPPED +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/FactoryCountersEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/FactoryCountersEnum.java new file mode 100644 index 000000000..387084612 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/FactoryCountersEnum.java @@ -0,0 +1,6 @@ +package io.split.telemetry.domain.enums; + +public enum FactoryCountersEnum { + BUR_TIMEOUTS, + NON_READY_USAGES +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java new file mode 100644 index 000000000..6f858397b --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java @@ -0,0 +1,11 @@ +package io.split.telemetry.domain.enums; + +public enum HTTPLatenciesEnum { + SPLITS, + SEGMENTS, + IMPRESSIONS, + IMPRESSIONS_COUNT, + EVENTS, + TELEMETRY, + TOKEN +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/ImpressionsDataTypeEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/ImpressionsDataTypeEnum.java new file mode 100644 index 000000000..3ca134c93 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/ImpressionsDataTypeEnum.java @@ -0,0 +1,7 @@ +package io.split.telemetry.domain.enums; + +public enum ImpressionsDataTypeEnum { + IMPRESSIONS_QUEUED, + IMPRESSIONS_DROPPED, + IMPRESSIONS_DEDUPED +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java new file mode 100644 index 000000000..d2439395b --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java @@ -0,0 +1,11 @@ +package io.split.telemetry.domain.enums; + +public enum LastSynchronizationRecordsEnum { + SPLITS, + SEGMENTS, + IMPRESSIONS, + IMPRESSIONS_COUNT, + EVENTS, + TOKEN, + TELEMETRY +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/MethodEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/MethodEnum.java new file mode 100644 index 000000000..029d59477 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/MethodEnum.java @@ -0,0 +1,9 @@ +package io.split.telemetry.domain.enums; + +public enum MethodEnum { + TREATMENT, + TREATMENTS, + TREATMENT_WITH_CONFIG, + TREATMENTS_WITH_CONFIG, + TRACK, +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/PushCountersEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/PushCountersEnum.java new file mode 100644 index 000000000..53f023f74 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/PushCountersEnum.java @@ -0,0 +1,6 @@ +package io.split.telemetry.domain.enums; + +public enum PushCountersEnum { + AUTH_REJECTIONS, + TOKEN_REFRESHES +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java new file mode 100644 index 000000000..04af11162 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java @@ -0,0 +1,11 @@ +package io.split.telemetry.domain.enums; + +public enum ResourceEnum { + SPLIT_SYNC, + SEGMENT_SYNC, + IMPRESSION_SYNC, + IMPRESSION_COUNT_SYNC, + EVENT_SYNC, + TELEMETRY_SYNC, + TOKEN_SYNC +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/SdkRecordsEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/SdkRecordsEnum.java new file mode 100644 index 000000000..6f8780811 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/SdkRecordsEnum.java @@ -0,0 +1,5 @@ +package io.split.telemetry.domain.enums; + +public enum SdkRecordsEnum { + SESSION +} diff --git a/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java b/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java new file mode 100644 index 000000000..a115aa8d4 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java @@ -0,0 +1,349 @@ +package io.split.telemetry.storage; + +import com.google.common.collect.Maps; +import io.split.telemetry.utils.BucketCalculator; +import io.split.telemetry.domain.*; +import io.split.telemetry.domain.enums.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class InMemoryTelemetryStorage implements TelemetryStorage{ + public static final int MAX_LATENCY_BUCKET_COUNT = 23; + + //Latencies + private final ConcurrentMap _methodLatencies = Maps.newConcurrentMap(); + private final ConcurrentMap _httpLatencies = Maps.newConcurrentMap(); + + //Counters + private final ConcurrentMap _exceptionsCounters = Maps.newConcurrentMap(); + private final ConcurrentMap _pushCounters = Maps.newConcurrentMap(); + private final ConcurrentMap _factoryCounters = Maps.newConcurrentMap(); + + //Records + private final ConcurrentMap _impressionsDataRecords = Maps.newConcurrentMap(); + private final ConcurrentMap _eventsDataRecords = Maps.newConcurrentMap(); + private final ConcurrentMap _lastSynchronizationRecords = Maps.newConcurrentMap(); + private final ConcurrentMap _sdkRecords = Maps.newConcurrentMap(); + + //HTTPErrors + private final ConcurrentMap> _httpErrors = Maps.newConcurrentMap(); + + //StreamingEvents + private final Object _streamingEventsLock = new Object(); + private final List _streamingEvents = new ArrayList<>(); + + //Tags + private final Object _tagsLock = new Object(); + private final List _tags = new ArrayList<>(); + + public InMemoryTelemetryStorage() throws Exception { + initMethodLatencies(); + initHttpLatencies(); + initHttpErrors(); + initMethodExceptions(); + initFactoryCounters(); + initImpressionDataCounters(); + initPushCounters(); + initSdkRecords(); + initLastSynchronizationRecords(); + initEventDataRecords(); + } + + @Override + public long getBURTimeouts() { + return _factoryCounters.get(FactoryCountersEnum.BUR_TIMEOUTS).get(); + } + + @Override + public long getNonReadyUsages() { + return _factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).get(); + } + + @Override + public MethodExceptions popExceptions() throws Exception { + MethodExceptions exceptions = new MethodExceptions(); + exceptions.set_treatment(_exceptionsCounters.get(MethodEnum.TREATMENT).get()); + exceptions.set_treatments(_exceptionsCounters.get(MethodEnum.TREATMENTS).get()); + exceptions.set_treatmentWithConfig(_exceptionsCounters.get(MethodEnum.TREATMENT_WITH_CONFIG).get()); + exceptions.set_treatmentsWithConfig(_exceptionsCounters.get(MethodEnum.TREATMENTS_WITH_CONFIG).get()); + exceptions.set_track(_exceptionsCounters.get(MethodEnum.TRACK).get()); + + _exceptionsCounters.clear(); + initMethodExceptions(); + + return exceptions; + } + + @Override + public MethodLatencies popLatencies() throws Exception { + MethodLatencies latencies = new MethodLatencies(); + latencies.set_treatment(_methodLatencies.get(MethodEnum.TREATMENT).fetchAndClearAll()); + latencies.set_treatments(_methodLatencies.get(MethodEnum.TREATMENTS).fetchAndClearAll()); + latencies.set_treatmentWithConfig(_methodLatencies.get(MethodEnum.TREATMENT_WITH_CONFIG).fetchAndClearAll()); + latencies.set_treatmentsWithConfig(_methodLatencies.get(MethodEnum.TREATMENTS_WITH_CONFIG).fetchAndClearAll()); + latencies.set_track(_methodLatencies.get(MethodEnum.TRACK).fetchAndClearAll()); + + _methodLatencies.clear(); + initMethodLatencies(); + + return latencies; + } + + @Override + public void recordNonReadyUsage() { + _factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).incrementAndGet(); + } + + @Override + public void recordBURTimeout() { + _factoryCounters.get(FactoryCountersEnum.BUR_TIMEOUTS).incrementAndGet(); + } + + + @Override + public void recordLatency(MethodEnum method, long latency) { + int bucket = BucketCalculator.getBucketForLatency(latency); + _methodLatencies.get(method).increment(bucket); + } + + @Override + public void recordException(MethodEnum method) { + _exceptionsCounters.get(method).incrementAndGet(); + } + + @Override + public long getImpressionsStats(ImpressionsDataTypeEnum dataType) { + return _impressionsDataRecords.get(dataType).get(); + } + + @Override + public long getEventStats(EventsDataRecordsEnum dataType) { + return _eventsDataRecords.get(dataType).get(); + } + + @Override + public LastSynchronization getLastSynchronization() { + LastSynchronization lastSynchronization = new LastSynchronization(); + lastSynchronization.set_splits(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.SPLITS).get()); + lastSynchronization.set_segments(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.SEGMENTS).get()); + lastSynchronization.set_impressions(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.IMPRESSIONS).get()); + lastSynchronization.set_impressionsCount(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.IMPRESSIONS_COUNT).get()); + lastSynchronization.set_events(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.EVENTS).get()); + lastSynchronization.set_telemetry(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.TELEMETRY).get()); + lastSynchronization.set_token(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.TOKEN).get()); + + return lastSynchronization; + } + + @Override + public HTTPErrors popHTTPErrors() { + HTTPErrors errors = new HTTPErrors(); + errors.set_splits(_httpErrors.get(ResourceEnum.SPLIT_SYNC)); + errors.set_segments(_httpErrors.get(ResourceEnum.SEGMENT_SYNC)); + errors.set_impressions(_httpErrors.get(ResourceEnum.IMPRESSION_SYNC)); + errors.set_impressionsCount(_httpErrors.get(ResourceEnum.IMPRESSION_COUNT_SYNC)); + errors.set_events(_httpErrors.get(ResourceEnum.EVENT_SYNC)); + errors.set_telemetry(_httpErrors.get(ResourceEnum.TELEMETRY_SYNC)); + errors.set_token(_httpErrors.get(ResourceEnum.TOKEN_SYNC)); + + _httpErrors.clear(); + initHttpErrors(); + + return errors; + } + + @Override + public HTTPLatencies popHTTPLatencies() throws Exception { + HTTPLatencies latencies = new HTTPLatencies(); + latencies.set_splits(_httpLatencies.get(HTTPLatenciesEnum.SPLITS).fetchAndClearAll()); + latencies.set_segments(_httpLatencies.get(HTTPLatenciesEnum.SEGMENTS).fetchAndClearAll()); + latencies.set_impressions(_httpLatencies.get(HTTPLatenciesEnum.IMPRESSIONS).fetchAndClearAll()); + latencies.set_impressionsCount(_httpLatencies.get(HTTPLatenciesEnum.IMPRESSIONS_COUNT).fetchAndClearAll()); + latencies.set_events(_httpLatencies.get(HTTPLatenciesEnum.EVENTS).fetchAndClearAll()); + latencies.set_telemetry(_httpLatencies.get(HTTPLatenciesEnum.TELEMETRY).fetchAndClearAll()); + latencies.set_token(_httpLatencies.get(HTTPLatenciesEnum.TOKEN).fetchAndClearAll()); + + _httpLatencies.clear(); + initHttpLatencies(); + + return latencies; + } + + @Override + public long popAuthRejections() { + long authRejections = _pushCounters.get(PushCountersEnum.AUTH_REJECTIONS).get(); + + _pushCounters.replace(PushCountersEnum.AUTH_REJECTIONS, new AtomicLong()); + + return authRejections; + } + + @Override + public long popTokenRefreshes() { + long tokenRefreshes = _pushCounters.get(PushCountersEnum.TOKEN_REFRESHES).get(); + + _pushCounters.replace(PushCountersEnum.TOKEN_REFRESHES, new AtomicLong()); + + return tokenRefreshes; + } + + @Override + public List popStreamingEvents() { + synchronized (_streamingEventsLock) { + List streamingEvents = _streamingEvents.stream().collect(Collectors.toList()); + + _streamingEvents.clear(); + + return streamingEvents; + } + } + + @Override + public List popTags() { + synchronized (_tagsLock) { + List tags = _tags.stream().collect(Collectors.toList()); + + _tags.clear(); + + return tags; + } + } + + @Override + public long getSessionLength() { + return _sdkRecords.get(SdkRecordsEnum.SESSION).get(); + } + + @Override + public void addTag(String tag) { + synchronized (_tagsLock) { + _tags.add(tag); + } + } + + @Override + public void recordImpressionStats(ImpressionsDataTypeEnum dataType, long count) { + _impressionsDataRecords.get(dataType).addAndGet(count); + } + + @Override + public void recordEventStats(EventsDataRecordsEnum dataType, long count) { + _eventsDataRecords.get(dataType).addAndGet(count); + } + + @Override + public void recordSuccessfulSync(LastSynchronizationRecordsEnum resource, long time) { + _lastSynchronizationRecords.replace(resource, new AtomicLong(time)); + } + + @Override + public void recordSyncError(ResourceEnum resource, int status) { + ConcurrentMap errors = _httpErrors.get(resource); + errors.putIfAbsent(Long.valueOf(status), 0l); + errors.replace(Long.valueOf(status), errors.get(Long.valueOf(status)) + 1); + } + + @Override + public void recordSyncLatency(HTTPLatenciesEnum resource, long latency) { + int bucket = BucketCalculator.getBucketForLatency(latency); + _httpLatencies.get(resource).increment(bucket); + + } + + @Override + public void recordAuthRejections() { + _pushCounters.get(PushCountersEnum.AUTH_REJECTIONS).incrementAndGet(); + } + + @Override + public void recordTokenRefreshes() { + _pushCounters.get(PushCountersEnum.TOKEN_REFRESHES).incrementAndGet(); + } + + @Override + public void recordStreamingEvents(StreamingEvent streamingEvent) { + synchronized (_streamingEventsLock) { + _streamingEvents.add(streamingEvent); + } + } + + @Override + public void recordSessionLength(long sessionLength) { + _sdkRecords.replace(SdkRecordsEnum.SESSION, new AtomicLong(sessionLength)); + } + + private void initMethodLatencies() throws Exception { + _methodLatencies.put(MethodEnum.TREATMENT, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _methodLatencies.put(MethodEnum.TREATMENTS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _methodLatencies.put(MethodEnum.TREATMENT_WITH_CONFIG, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _methodLatencies.put(MethodEnum.TREATMENTS_WITH_CONFIG, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _methodLatencies.put(MethodEnum.TRACK, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + } + + private void initHttpLatencies() throws Exception { + _httpLatencies.put(HTTPLatenciesEnum.SPLITS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.SEGMENTS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.IMPRESSIONS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.IMPRESSIONS_COUNT, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.EVENTS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.TELEMETRY, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.TOKEN, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + } + + private void initHttpErrors() { + _httpErrors.put(ResourceEnum.SPLIT_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.SEGMENT_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.IMPRESSION_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.IMPRESSION_COUNT_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.EVENT_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.TELEMETRY_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.TOKEN_SYNC, Maps.newConcurrentMap()); + } + + private void initMethodExceptions() { + _exceptionsCounters.put(MethodEnum.TREATMENT, new AtomicLong()); + _exceptionsCounters.put(MethodEnum.TREATMENTS, new AtomicLong()); + _exceptionsCounters.put(MethodEnum.TREATMENT_WITH_CONFIG, new AtomicLong()); + _exceptionsCounters.put(MethodEnum.TREATMENTS_WITH_CONFIG, new AtomicLong()); + _exceptionsCounters.put(MethodEnum.TRACK, new AtomicLong()); + } + + private void initFactoryCounters() { + _factoryCounters.put(FactoryCountersEnum.BUR_TIMEOUTS, new AtomicLong()); + _factoryCounters.put(FactoryCountersEnum.NON_READY_USAGES, new AtomicLong()); + } + + private void initImpressionDataCounters() { + _impressionsDataRecords.put(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, new AtomicLong()); + _impressionsDataRecords.put(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, new AtomicLong()); + _impressionsDataRecords.put(ImpressionsDataTypeEnum.IMPRESSIONS_QUEUED, new AtomicLong()); + } + + private void initPushCounters() { + _pushCounters.put(PushCountersEnum.AUTH_REJECTIONS, new AtomicLong()); + _pushCounters.put(PushCountersEnum.TOKEN_REFRESHES, new AtomicLong()); + } + + private void initSdkRecords() { + _sdkRecords.put(SdkRecordsEnum.SESSION, new AtomicLong()); + } + + private void initLastSynchronizationRecords() { + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.SPLITS, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.SEGMENTS, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.EVENTS, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.IMPRESSIONS, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.IMPRESSIONS_COUNT, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.TOKEN, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.TELEMETRY, new AtomicLong()); + } + + private void initEventDataRecords() { + _eventsDataRecords.put(EventsDataRecordsEnum.EVENTS_DROPPED, new AtomicLong()); + _eventsDataRecords.put(EventsDataRecordsEnum.EVENTS_QUEUED, new AtomicLong()); + } +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryConfigConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryConfigConsumer.java new file mode 100644 index 000000000..680d128b8 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryConfigConsumer.java @@ -0,0 +1,6 @@ +package io.split.telemetry.storage; + +public interface TelemetryConfigConsumer { + long getBURTimeouts(); + long getNonReadyUsages(); +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java new file mode 100644 index 000000000..64348c154 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java @@ -0,0 +1,6 @@ +package io.split.telemetry.storage; + +public interface TelemetryConfigProducer { + void recordNonReadyUsage(); + void recordBURTimeout(); +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java new file mode 100644 index 000000000..ca63112ac --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java @@ -0,0 +1,9 @@ +package io.split.telemetry.storage; + +import io.split.telemetry.domain.MethodExceptions; +import io.split.telemetry.domain.MethodLatencies; + +public interface TelemetryEvaluationConsumer { + MethodExceptions popExceptions() throws Exception; + MethodLatencies popLatencies() throws Exception; +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java new file mode 100644 index 000000000..005106c30 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java @@ -0,0 +1,8 @@ +package io.split.telemetry.storage; + +import io.split.telemetry.domain.enums.MethodEnum; + +public interface TelemetryEvaluationProducer { + void recordLatency(MethodEnum method, long latency); + void recordException(MethodEnum method); +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java new file mode 100644 index 000000000..7acd49afb --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java @@ -0,0 +1,23 @@ +package io.split.telemetry.storage; + +import io.split.telemetry.domain.HTTPErrors; +import io.split.telemetry.domain.HTTPLatencies; +import io.split.telemetry.domain.LastSynchronization; +import io.split.telemetry.domain.StreamingEvent; +import io.split.telemetry.domain.enums.EventsDataRecordsEnum; +import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum; + +import java.util.List; + +public interface TelemetryRuntimeConsumer { + long getImpressionsStats(ImpressionsDataTypeEnum data); + long getEventStats(EventsDataRecordsEnum type); + LastSynchronization getLastSynchronization(); + HTTPErrors popHTTPErrors(); + HTTPLatencies popHTTPLatencies() throws Exception; + long popAuthRejections(); + long popTokenRefreshes(); + List popStreamingEvents(); + List popTags(); + long getSessionLength(); +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java new file mode 100644 index 000000000..2baf016f0 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java @@ -0,0 +1,17 @@ +package io.split.telemetry.storage; + +import io.split.telemetry.domain.StreamingEvent; +import io.split.telemetry.domain.enums.*; + +public interface TelemetryRuntimeProducer { + void addTag(String tag); + void recordImpressionStats(ImpressionsDataTypeEnum dataType, long count); + void recordEventStats(EventsDataRecordsEnum dataType, long count); + void recordSuccessfulSync(LastSynchronizationRecordsEnum resource, long time); + void recordSyncError(ResourceEnum resource, int status); + void recordSyncLatency(HTTPLatenciesEnum resource, long latency); + void recordAuthRejections(); + void recordTokenRefreshes(); + void recordStreamingEvents(StreamingEvent streamingEvent); + void recordSessionLength(long sessionLength); +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryStorage.java b/client/src/main/java/io/split/telemetry/storage/TelemetryStorage.java new file mode 100644 index 000000000..477965099 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryStorage.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryStorage extends TelemetryStorageConsumer, TelemetryStorageProducer{ +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java new file mode 100644 index 000000000..7efd537c5 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryStorageConsumer extends TelemetryConfigConsumer, TelemetryRuntimeConsumer, TelemetryEvaluationConsumer{ +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java new file mode 100644 index 000000000..c3ef23031 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryStorageProducer extends TelemetryEvaluationProducer, TelemetryConfigProducer, TelemetryRuntimeProducer{ +} diff --git a/client/src/main/java/io/split/telemetry/utils/BucketCalculator.java b/client/src/main/java/io/split/telemetry/utils/BucketCalculator.java new file mode 100644 index 000000000..7c5ce3ced --- /dev/null +++ b/client/src/main/java/io/split/telemetry/utils/BucketCalculator.java @@ -0,0 +1,64 @@ +package io.split.telemetry.utils; + +import java.util.Arrays; + +/** + * Calculates buckets from latency + *

+ * (1) 1.00 + * (2) 1.50 + * (3) 2.25 + * (4) 3.38 + * (5) 5.06 + * (6) 7.59 + * (7) 11.39 + * (8) 17.09 + * (9) 25.63 + * (10) 38.44 + * (11) 57.67 + * (12) 86.50 + * (13) 129.75 + * (14) 194.62 + * (15) 291.93 + * (16) 437.89 + * (17) 656.84 + * (18) 985.26 + * (19) 1,477.89 + * (20) 2,216.84 + * (21) 3,325.26 + * (22) 4,987.89 + * (23) 7,481.83 + *

+ */ +public class BucketCalculator { + + static final long[] BUCKETS = { + 1000, 1500, 2250, 3375, 5063, + 7594, 11391, 17086, 25629, 38443, + 57665, 86498, 129746, 194620, 291929, + 437894, 656841, 985261, 1477892, 2216838, + 3325257, 4987885, 7481828 + }; + + static final long MAX_LATENCY = 7481828; + + public static int getBucketForLatency(long latency) { + long micros = latency / 1000; //Convert to milliseconds + if (micros > MAX_LATENCY) { + return BUCKETS.length - 1; + } + + int index = Arrays.binarySearch(BUCKETS, micros); + + if (index < 0) { + + // Adjust the index based on Java Array javadocs. <0 means the value wasn't found and it's module value + // is where it should be inserted (in this case, it means the counter it applies - unless it's equals to the + // length of the array). + + index = -(index + 1); + } + return index; + } + +} diff --git a/client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java b/client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java new file mode 100644 index 000000000..49c1ce167 --- /dev/null +++ b/client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java @@ -0,0 +1,221 @@ +package io.split.telemetry.storage; + +import io.split.telemetry.domain.*; +import io.split.telemetry.domain.enums.*; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class InMemoryTelemetryStorageTest{ + + @Test + public void testInMemoryTelemetryStorage() throws Exception { + InMemoryTelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); + + //MethodLatencies + telemetryStorage.recordLatency(MethodEnum.TREATMENT, 1500l * 1000); + telemetryStorage.recordLatency(MethodEnum.TREATMENT, 2000l * 1000); + telemetryStorage.recordLatency(MethodEnum.TREATMENTS, 3000l * 1000); + telemetryStorage.recordLatency(MethodEnum.TREATMENTS, 500l * 1000); + telemetryStorage.recordLatency(MethodEnum.TREATMENT_WITH_CONFIG, 800l * 1000); + telemetryStorage.recordLatency(MethodEnum.TREATMENTS_WITH_CONFIG, 1000l * 1000); + + MethodLatencies latencies = telemetryStorage.popLatencies(); + Assert.assertEquals(2, latencies.get_treatment().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(2, latencies.get_treatments().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(1, latencies.get_treatmentsWithConfig().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(1, latencies.get_treatmentWithConfig().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, latencies.get_track().stream().mapToInt(Long::intValue).sum()); + + //Check empty has worked + latencies = telemetryStorage.popLatencies(); + Assert.assertEquals(0, latencies.get_treatment().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, latencies.get_treatments().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, latencies.get_treatmentsWithConfig().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, latencies.get_treatmentWithConfig().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, latencies.get_track().stream().mapToInt(Long::intValue).sum()); + + //HttpLatencies + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.TELEMETRY, 1500l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.TELEMETRY, 2000l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.EVENTS, 1500l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.EVENTS, 2000l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SEGMENTS, 1500l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SPLITS, 2000l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SPLITS, 1500l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SPLITS, 2000l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.IMPRESSIONS, 1500l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.IMPRESSIONS_COUNT, 2000l * 1000); + + HTTPLatencies httpLatencies = telemetryStorage.popHTTPLatencies(); + + Assert.assertEquals(3, httpLatencies.get_splits().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(2, httpLatencies.get_telemetry().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(2, httpLatencies.get_events().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(1, httpLatencies.get_segments().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(1, httpLatencies.get_impressions().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(1, httpLatencies.get_impressionsCount().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_token().stream().mapToInt(Long::intValue).sum()); + + httpLatencies = telemetryStorage.popHTTPLatencies(); + Assert.assertEquals(0, httpLatencies.get_splits().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_telemetry().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_events().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_segments().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_impressions().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_impressionsCount().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_token().stream().mapToInt(Long::intValue).sum()); + + + //Exceptions + telemetryStorage.recordException(MethodEnum.TREATMENT); + telemetryStorage.recordException(MethodEnum.TREATMENTS); + telemetryStorage.recordException(MethodEnum.TREATMENT); + telemetryStorage.recordException(MethodEnum.TREATMENTS); + telemetryStorage.recordException(MethodEnum.TREATMENT_WITH_CONFIG); + telemetryStorage.recordException(MethodEnum.TREATMENTS_WITH_CONFIG); + + MethodExceptions methodExceptions = telemetryStorage.popExceptions(); + Assert.assertEquals(2, methodExceptions.get_treatment()); + Assert.assertEquals(2, methodExceptions.get_treatments()); + Assert.assertEquals(1, methodExceptions.get_treatmentsWithConfig()); + Assert.assertEquals(1, methodExceptions.get_treatmentWithConfig()); + Assert.assertEquals(0, methodExceptions.get_track()); + + //Check empty has worked + methodExceptions = telemetryStorage.popExceptions(); + Assert.assertEquals(0, methodExceptions.get_treatment()); + Assert.assertEquals(0, methodExceptions.get_treatments()); + Assert.assertEquals(0, methodExceptions.get_treatmentsWithConfig()); + Assert.assertEquals(0, methodExceptions.get_treatmentWithConfig()); + Assert.assertEquals(0, methodExceptions.get_track()); + + //AuthRejections + telemetryStorage.recordAuthRejections(); + long authRejections = telemetryStorage.popAuthRejections(); + Assert.assertEquals(1, authRejections); + + //Check amount has been reseted + authRejections = telemetryStorage.popAuthRejections(); + Assert.assertEquals(0, authRejections); + + //AuthRejections + telemetryStorage.recordTokenRefreshes(); + telemetryStorage.recordTokenRefreshes(); + long tokenRefreshes = telemetryStorage.popTokenRefreshes(); + Assert.assertEquals(2, tokenRefreshes); + + //Check amount has been reseted + tokenRefreshes = telemetryStorage.popTokenRefreshes(); + Assert.assertEquals(0, tokenRefreshes); + + //Non Ready usages + telemetryStorage.recordNonReadyUsage(); + telemetryStorage.recordNonReadyUsage(); + long nonReadyUsages = telemetryStorage.getNonReadyUsages(); + Assert.assertEquals(2, nonReadyUsages); + + //BUR Timeouts + telemetryStorage.recordBURTimeout(); + long burTimeouts = telemetryStorage.getBURTimeouts(); + Assert.assertEquals(1, burTimeouts); + + //ImpressionStats + telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, 3); + telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, 1); + telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 4); + telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 6); + telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 2); + + long impressionsDeduped = telemetryStorage.getImpressionsStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED); + long impressionsDropped = telemetryStorage.getImpressionsStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED); + long impressionsQueued = telemetryStorage.getImpressionsStats(ImpressionsDataTypeEnum.IMPRESSIONS_QUEUED); + + Assert.assertEquals(4, impressionsDeduped); + Assert.assertEquals(12, impressionsDropped); + Assert.assertEquals(0, impressionsQueued); + + //Event Stats + telemetryStorage.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 3); + telemetryStorage.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 7); + telemetryStorage.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 3); + + long eventsDropped = telemetryStorage.getEventStats(EventsDataRecordsEnum.EVENTS_DROPPED); + long eventsQueued = telemetryStorage.getEventStats(EventsDataRecordsEnum.EVENTS_QUEUED); + + Assert.assertEquals(10, eventsDropped); + Assert.assertEquals(3, eventsQueued); + + //Successfuly sync + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.EVENTS, 1500); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.EVENTS, 800); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS, 2500); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS, 10500); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS_COUNT, 1500); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, 1580); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.TELEMETRY, 265); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.TOKEN, 129); + + LastSynchronization lastSynchronization = telemetryStorage.getLastSynchronization(); + Assert.assertEquals(800, lastSynchronization.get_events()); + Assert.assertEquals(129, lastSynchronization.get_token()); + Assert.assertEquals(1580, lastSynchronization.get_segments()); + Assert.assertEquals(0, lastSynchronization.get_splits()); + Assert.assertEquals(10500, lastSynchronization.get_impressions()); + Assert.assertEquals(1500, lastSynchronization.get_impressionsCount()); + Assert.assertEquals(265, lastSynchronization.get_telemetry()); + + //Session length + telemetryStorage.recordSessionLength(91218); + long sessionLength = telemetryStorage.getSessionLength(); + Assert.assertEquals(91218, sessionLength); + + //Sync Error + telemetryStorage.recordSyncError(ResourceEnum.TELEMETRY_SYNC, 400); + telemetryStorage.recordSyncError(ResourceEnum.TELEMETRY_SYNC, 400); + telemetryStorage.recordSyncError(ResourceEnum.SEGMENT_SYNC, 501); + telemetryStorage.recordSyncError(ResourceEnum.IMPRESSION_SYNC, 403); + telemetryStorage.recordSyncError(ResourceEnum.IMPRESSION_SYNC, 403); + telemetryStorage.recordSyncError(ResourceEnum.EVENT_SYNC, 503); + telemetryStorage.recordSyncError(ResourceEnum.SPLIT_SYNC, 403); + telemetryStorage.recordSyncError(ResourceEnum.IMPRESSION_COUNT_SYNC, 403); + telemetryStorage.recordSyncError(ResourceEnum.TOKEN_SYNC, 403); + + HTTPErrors httpErrors = telemetryStorage.popHTTPErrors(); + Assert.assertEquals(2, httpErrors.get_telemetry().get(400l).intValue()); + Assert.assertEquals(1, httpErrors.get_segments().get(501l).intValue()); + Assert.assertEquals(2, httpErrors.get_impressions().get(403l).intValue()); + Assert.assertEquals(1, httpErrors.get_impressionsCount().get(403l).intValue()); + Assert.assertEquals(1, httpErrors.get_events().get(503l).intValue()); + Assert.assertEquals(1, httpErrors.get_splits().get(403l).intValue()); + Assert.assertEquals(1, httpErrors.get_token().get(403l).intValue()); + + //Streaming events + StreamingEvent streamingEvent = new StreamingEvent(); + streamingEvent.set_data(290); + streamingEvent.set_type(1); + streamingEvent.setTimestamp(91218); + telemetryStorage.recordStreamingEvents(streamingEvent); + + List streamingEvents = telemetryStorage.popStreamingEvents(); + Assert.assertEquals(290, streamingEvents.get(0).get_data()); + Assert.assertEquals(1, streamingEvents.get(0).get_type()); + Assert.assertEquals(91218, streamingEvents.get(0).getTimestamp()); + + //Check list has been cleared + streamingEvents = telemetryStorage.popStreamingEvents(); + Assert.assertEquals(0, streamingEvents.size()); + + //Tags + telemetryStorage.addTag("TAG_1"); + telemetryStorage.addTag("TAG_2"); + List tags = telemetryStorage.popTags(); + Assert.assertEquals(2, tags.size()); + + //Check tags have been cleared + tags = telemetryStorage.popTags(); + Assert.assertEquals(0, tags.size()); + + } +} diff --git a/client/src/test/java/io/split/telemetry/utils/BucketCalculatorTest.java b/client/src/test/java/io/split/telemetry/utils/BucketCalculatorTest.java new file mode 100644 index 000000000..04ba74487 --- /dev/null +++ b/client/src/test/java/io/split/telemetry/utils/BucketCalculatorTest.java @@ -0,0 +1,22 @@ +package io.split.telemetry.utils; + +import org.junit.Assert; +import org.junit.Test; + +public class BucketCalculatorTest{ + + @Test + public void testBucketCalculator() { + int bucket = BucketCalculator.getBucketForLatency(500l * 1000); + Assert.assertEquals(0, bucket); + + bucket = BucketCalculator.getBucketForLatency(1500l * 1000); + Assert.assertEquals(1, bucket); + + bucket = BucketCalculator.getBucketForLatency(8000l * 1000); + Assert.assertEquals(6, bucket); + + bucket = BucketCalculator.getBucketForLatency(7481829l * 1000); + Assert.assertEquals(22, bucket); + } +}