From bfc53017bb7e8abb9b04c1a685a12093aba53215 Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Mon, 29 Mar 2021 17:53:39 -0300 Subject: [PATCH 1/6] Interfaces --- .../split/telemetry/storage/TelemetryEvaluationConsumer.java | 4 ++++ .../split/telemetry/storage/TelemetryEvaluationProducer.java | 4 ++++ .../io/split/telemetry/storage/TelemetryInitConsumer.java | 4 ++++ .../io/split/telemetry/storage/TelemetryInitProducer.java | 4 ++++ .../io/split/telemetry/storage/TelemetryRuntimeConsumer.java | 4 ++++ .../io/split/telemetry/storage/TelemetryRuntimeProducer.java | 4 ++++ .../java/io/split/telemetry/storage/TelemetryStorage.java | 4 ++++ .../io/split/telemetry/storage/TelemetryStorageConsumer.java | 4 ++++ .../io/split/telemetry/storage/TelemetryStorageProducer.java | 4 ++++ 9 files changed, 36 insertions(+) create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryInitConsumer.java create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryInitProducer.java create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryStorage.java create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java new file mode 100644 index 000000000..3537540ce --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryEvaluationConsumer { +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java new file mode 100644 index 000000000..9c4e59d4b --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryEvaluationProducer { +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryInitConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryInitConsumer.java new file mode 100644 index 000000000..2430ff765 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryInitConsumer.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryInitConsumer { +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryInitProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryInitProducer.java new file mode 100644 index 000000000..9547967d4 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryInitProducer.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryInitProducer { +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java new file mode 100644 index 000000000..661b75cdb --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryRuntimeConsumer { +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java new file mode 100644 index 000000000..8d6e3672f --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryRuntimeProducer { +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryStorage.java b/client/src/main/java/io/split/telemetry/storage/TelemetryStorage.java new file mode 100644 index 000000000..477965099 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryStorage.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryStorage extends TelemetryStorageConsumer, TelemetryStorageProducer{ +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java new file mode 100644 index 000000000..217a6e27b --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryStorageConsumer extends TelemetryInitConsumer, TelemetryRuntimeConsumer, TelemetryEvaluationConsumer{ +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java new file mode 100644 index 000000000..382f5939f --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java @@ -0,0 +1,4 @@ +package io.split.telemetry.storage; + +public interface TelemetryStorageProducer extends TelemetryEvaluationProducer, TelemetryInitProducer, TelemetryRuntimeProducer{ +} From ae9da581d0c759ee5cc7ca842298b8d6f6ed969e Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Fri, 9 Apr 2021 12:45:55 -0300 Subject: [PATCH 2/6] Adding interfaces and Implementation --- .../io/split/telemetry/domain/HTTPErrors.java | 85 +++++ .../split/telemetry/domain/HTTPLatencies.java | 86 +++++ .../telemetry/domain/LastSynchronization.java | 73 +++++ .../telemetry/domain/MethodExceptions.java | 62 ++++ .../telemetry/domain/MethodLatencies.java | 73 +++++ .../telemetry/domain/StreamingEvent.java | 40 +++ .../domain/enums/EventsDataRecordsEnum.java | 6 + .../domain/enums/FactoryCountersEnum.java | 6 + .../domain/enums/HTTPLatenciesEnum.java | 10 + .../enums/LastSynchronizationRecordsEnum.java | 10 + .../telemetry/domain/enums/MethodEnum.java | 9 + .../domain/enums/PushCountersEnum.java | 6 + .../telemetry/domain/enums/ResourceEnum.java | 10 + .../domain/enums/SdkRecordsEnum.java | 5 + .../storage/InMemoryTelemetryStorage.java | 306 ++++++++++++++++++ .../storage/TelemetryConfigConsumer.java | 6 + .../storage/TelemetryConfigProducer.java | 7 + .../storage/TelemetryEvaluationConsumer.java | 5 + .../storage/TelemetryEvaluationProducer.java | 4 + .../storage/TelemetryInitConsumer.java | 4 - .../storage/TelemetryInitProducer.java | 4 - .../storage/TelemetryRuntimeConsumer.java | 20 ++ .../storage/TelemetryRuntimeProducer.java | 16 + .../storage/TelemetryStorageConsumer.java | 2 +- .../storage/TelemetryStorageProducer.java | 2 +- 25 files changed, 847 insertions(+), 10 deletions(-) create mode 100644 client/src/main/java/io/split/telemetry/domain/HTTPErrors.java create mode 100644 client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java create mode 100644 client/src/main/java/io/split/telemetry/domain/LastSynchronization.java create mode 100644 client/src/main/java/io/split/telemetry/domain/MethodExceptions.java create mode 100644 client/src/main/java/io/split/telemetry/domain/MethodLatencies.java create mode 100644 client/src/main/java/io/split/telemetry/domain/StreamingEvent.java create mode 100644 client/src/main/java/io/split/telemetry/domain/enums/EventsDataRecordsEnum.java create mode 100644 client/src/main/java/io/split/telemetry/domain/enums/FactoryCountersEnum.java create mode 100644 client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java create mode 100644 client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java create mode 100644 client/src/main/java/io/split/telemetry/domain/enums/MethodEnum.java create mode 100644 client/src/main/java/io/split/telemetry/domain/enums/PushCountersEnum.java create mode 100644 client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java create mode 100644 client/src/main/java/io/split/telemetry/domain/enums/SdkRecordsEnum.java create mode 100644 client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryConfigConsumer.java create mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java delete mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryInitConsumer.java delete mode 100644 client/src/main/java/io/split/telemetry/storage/TelemetryInitProducer.java diff --git a/client/src/main/java/io/split/telemetry/domain/HTTPErrors.java b/client/src/main/java/io/split/telemetry/domain/HTTPErrors.java new file mode 100644 index 000000000..4b1051c56 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/HTTPErrors.java @@ -0,0 +1,85 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HTTPErrors { + /* package private */ static final String FIELD_SPLIT = "sp"; + /* package private */ static final String FIELD_SEGMENTS = "se"; + /* package private */ static final String FIELD_IMPRESSIONS = "im"; + /* package private */ static final String FIELD_EVENTS = "ev"; + /* package private */ static final String FIELD_TOKEN = "to"; + /* package private */ static final String FIELD_TELEMETRY = "te"; + + @SerializedName(FIELD_SPLIT) + private Map _splits; + @SerializedName(FIELD_SEGMENTS) + private Map _segments; + @SerializedName(FIELD_IMPRESSIONS) + private Map _impressions; + @SerializedName(FIELD_EVENTS) + private Map _events; + @SerializedName(FIELD_TOKEN) + private Map _token; + @SerializedName(FIELD_TELEMETRY) + private Map _telemetry; + + public HTTPErrors() { + _splits = new ConcurrentHashMap<>(); + _segments = new ConcurrentHashMap<>(); + _impressions = new ConcurrentHashMap<>(); + _events = new ConcurrentHashMap<>(); + _token = new ConcurrentHashMap<>(); + _telemetry = new ConcurrentHashMap<>(); + } + + public Map get_splits() { + return _splits; + } + + public void set_splits(Map _splits) { + this._splits = _splits; + } + + public Map get_segments() { + return _segments; + } + + public void set_segments(Map _segments) { + this._segments = _segments; + } + + public Map get_impressions() { + return _impressions; + } + + public void set_impressions(Map _impressions) { + this._impressions = _impressions; + } + + public Map get_events() { + return _events; + } + + public void set_events(Map _events) { + this._events = _events; + } + + public Map get_token() { + return _token; + } + + public void set_token(Map _token) { + this._token = _token; + } + + public Map get_telemetry() { + return _telemetry; + } + + public void set_telemetry(Map _telemetry) { + this._telemetry = _telemetry; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java b/client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java new file mode 100644 index 000000000..bf32af484 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java @@ -0,0 +1,86 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class HTTPLatencies { + /* package private */ static final String FIELD_SPLIT = "sp"; + /* package private */ static final String FIELD_SEGMENTS = "se"; + /* package private */ static final String FIELD_IMPRESSIONS = "im"; + /* package private */ static final String FIELD_EVENTS = "ev"; + /* package private */ static final String FIELD_TOKEN = "to"; + /* package private */ static final String FIELD_TELEMETRY = "te"; + + @SerializedName(FIELD_SPLIT) + private List _splits; + @SerializedName(FIELD_SEGMENTS) + private List_segments; + @SerializedName(FIELD_IMPRESSIONS) + private List _impressions; + @SerializedName(FIELD_EVENTS) + private List _events; + @SerializedName(FIELD_TOKEN) + private List _token; + @SerializedName(FIELD_TELEMETRY) + private List _telemetry; + + public HTTPLatencies() { + _splits = new ArrayList<>(); + _segments = new ArrayList<>(); + _impressions = new ArrayList<>(); + _events = new ArrayList<>(); + _token = new ArrayList<>(); + _telemetry = new ArrayList<>(); + } + + public List get_splits() { + return _splits; + } + + public void set_splits(List _splits) { + this._splits = _splits; + } + + public List get_segments() { + return _segments; + } + + public void set_segments(List _segments) { + this._segments = _segments; + } + + public List get_impressions() { + return _impressions; + } + + public void set_impressions(List _impressions) { + this._impressions = _impressions; + } + + public List get_events() { + return _events; + } + + public void set_events(List _events) { + this._events = _events; + } + + public List get_token() { + return _token; + } + + public void set_token(List _token) { + this._token = _token; + } + + public List get_telemetry() { + return _telemetry; + } + + public void set_telemetry(List _telemetry) { + this._telemetry = _telemetry; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/LastSynchronization.java b/client/src/main/java/io/split/telemetry/domain/LastSynchronization.java new file mode 100644 index 000000000..0b77c8a06 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/LastSynchronization.java @@ -0,0 +1,73 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +public class LastSynchronization { + /* package private */ static final String FIELD_SPLIT = "sp"; + /* package private */ static final String FIELD_SEGMENTS = "se"; + /* package private */ static final String FIELD_IMPRESSIONS = "im"; + /* package private */ static final String FIELD_EVENTS = "ev"; + /* package private */ static final String FIELD_TOKEN = "to"; + /* package private */ static final String FIELD_TELEMETRY = "te"; + + @SerializedName(FIELD_SPLIT) + private long _splits; + @SerializedName(FIELD_SEGMENTS) + private long _segments; + @SerializedName(FIELD_IMPRESSIONS) + private long _impressions; + @SerializedName(FIELD_EVENTS) + private long _events; + @SerializedName(FIELD_TOKEN) + private long _token; + @SerializedName(FIELD_TELEMETRY) + private long _telemetry; + + public long get_splits() { + return _splits; + } + + public void set_splits(long _splits) { + this._splits = _splits; + } + + public long get_segments() { + return _segments; + } + + public void set_segments(long _segments) { + this._segments = _segments; + } + + public long get_impressions() { + return _impressions; + } + + public void set_impressions(long _impressions) { + this._impressions = _impressions; + } + + public long get_events() { + return _events; + } + + public void set_events(long _events) { + this._events = _events; + } + + public long get_token() { + return _token; + } + + public void set_token(long _token) { + this._token = _token; + } + + public long get_telemetry() { + return _telemetry; + } + + public void set_telemetry(long _telemetry) { + this._telemetry = _telemetry; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/MethodExceptions.java b/client/src/main/java/io/split/telemetry/domain/MethodExceptions.java new file mode 100644 index 000000000..c6d6561be --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/MethodExceptions.java @@ -0,0 +1,62 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +public class MethodExceptions { + /* package private */ static final String FIELD_TREATMENT = "t"; + /* package private */ static final String FIELD_TREATMENTS = "ts"; + /* package private */ static final String FIELD_TREATMENT_WITH_CONFIG = "tc"; + /* package private */ static final String FIELD_TREATMENTS_WITH_CONFIG = "tcs"; + /* package private */ static final String FIELD_TRACK = "tr"; + + @SerializedName(FIELD_TREATMENT) + private long _treatment; + @SerializedName(FIELD_TREATMENTS) + private long _treatments; + @SerializedName(FIELD_TREATMENT_WITH_CONFIG) + private long _treatmentWithConfig; + @SerializedName(FIELD_TREATMENTS_WITH_CONFIG) + private long _treatmentsWithConfig; + @SerializedName(FIELD_TRACK) + private long _track; + + public long get_treatment() { + return _treatment; + } + + public void set_treatment(long _treatment) { + this._treatment = _treatment; + } + + public long get_treatments() { + return _treatments; + } + + public void set_treatments(long _treatments) { + this._treatments = _treatments; + } + + public long get_treatmentsWithConfig() { + return _treatmentsWithConfig; + } + + public void set_treatmentsWithConfig(long _treatmentsWithConfig) { + this._treatmentsWithConfig = _treatmentsWithConfig; + } + + public long get_treatmentWithConfig() { + return _treatmentWithConfig; + } + + public void set_treatmentWithConfig(long _treatmentWithConfig) { + this._treatmentWithConfig = _treatmentWithConfig; + } + + public long get_track() { + return _track; + } + + public void set_track(long _track) { + this._track = _track; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/MethodLatencies.java b/client/src/main/java/io/split/telemetry/domain/MethodLatencies.java new file mode 100644 index 000000000..21aae636c --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/MethodLatencies.java @@ -0,0 +1,73 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +import java.util.ArrayList; +import java.util.List; + +public class MethodLatencies { + /* package private */ static final String FIELD_TREATMENT = "t"; + /* package private */ static final String FIELD_TREATMENTS = "ts"; + /* package private */ static final String FIELD_TREATMENT_WITH_CONFIG = "tc"; + /* package private */ static final String FIELD_TREATMENTS_WITH_CONFIG = "tcs"; + /* package private */ static final String FIELD_TRACK = "tr"; + + @SerializedName(FIELD_TREATMENT) + private List _treatment; + @SerializedName(FIELD_TREATMENTS) + private List _treatments; + @SerializedName(FIELD_TREATMENT_WITH_CONFIG) + private List _treatmentWithConfig; + @SerializedName(FIELD_TREATMENTS_WITH_CONFIG) + private List _treatmentsWithConfig; + @SerializedName(FIELD_TRACK) + private List _track; + + public MethodLatencies() { + _treatment = new ArrayList<>(); + _treatments = new ArrayList<>(); + _treatmentWithConfig = new ArrayList<>(); + _treatmentsWithConfig = new ArrayList<>(); + _track = new ArrayList<>(); + } + + public List get_treatment() { + return _treatment; + } + + public void set_treatment(List _treatment) { + this._treatment = _treatment; + } + + public List get_treatments() { + return _treatments; + } + + public void set_treatments(List _treatments) { + this._treatments = _treatments; + } + + public List get_treatmentsWithConfig() { + return _treatmentsWithConfig; + } + + public void set_treatmentsWithConfig(List _treatmentsWithConfig) { + this._treatmentsWithConfig = _treatmentsWithConfig; + } + + public List get_treatmentWithConfig() { + return _treatmentWithConfig; + } + + public void set_treatmentWithConfig(List _treatmentWithConfig) { + this._treatmentWithConfig = _treatmentWithConfig; + } + + public List get_track() { + return _track; + } + + public void set_track(List _track) { + this._track = _track; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/StreamingEvent.java b/client/src/main/java/io/split/telemetry/domain/StreamingEvent.java new file mode 100644 index 000000000..eb4e49e2c --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/StreamingEvent.java @@ -0,0 +1,40 @@ +package io.split.telemetry.domain; + +import com.google.gson.annotations.SerializedName; + +public class StreamingEvent { + /* package private */ static final String FIELD_TYPE = "sp"; + /* package private */ static final String FIELD_DATA = "se"; + /* package private */ static final String FIELD_TIMESTAMP = "im"; + + @SerializedName(FIELD_TYPE) + private int _type; + @SerializedName(FIELD_DATA) + private long _data; + @SerializedName(FIELD_TIMESTAMP) + private long timestamp; + + public int get_type() { + return _type; + } + + public void set_type(int _type) { + this._type = _type; + } + + public long get_data() { + return _data; + } + + public void set_data(long _data) { + this._data = _data; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/EventsDataRecordsEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/EventsDataRecordsEnum.java new file mode 100644 index 000000000..8beb3ac48 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/EventsDataRecordsEnum.java @@ -0,0 +1,6 @@ +package io.split.telemetry.domain.enums; + +public enum EventsDataRecordsEnum { + EVENTS_QUEUED, + EVENTS_DROPPED +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/FactoryCountersEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/FactoryCountersEnum.java new file mode 100644 index 000000000..387084612 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/FactoryCountersEnum.java @@ -0,0 +1,6 @@ +package io.split.telemetry.domain.enums; + +public enum FactoryCountersEnum { + BUR_TIMEOUTS, + NON_READY_USAGES +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java new file mode 100644 index 000000000..88a7a7e84 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java @@ -0,0 +1,10 @@ +package io.split.telemetry.domain.enums; + +public enum HTTPLatenciesEnum { + SPLITS, + SEGMENTS, + IMPRESSIONS, + EVENTS, + TELEMETRY, + TOKEN +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java new file mode 100644 index 000000000..f99b43f19 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java @@ -0,0 +1,10 @@ +package io.split.telemetry.domain.enums; + +public enum LastSynchronizationRecordsEnum { + SPLITS, + SEGMENTS, + IMPRESSIONS, + EVENTS, + TOKEN, + TELEMETRY +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/MethodEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/MethodEnum.java new file mode 100644 index 000000000..029d59477 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/MethodEnum.java @@ -0,0 +1,9 @@ +package io.split.telemetry.domain.enums; + +public enum MethodEnum { + TREATMENT, + TREATMENTS, + TREATMENT_WITH_CONFIG, + TREATMENTS_WITH_CONFIG, + TRACK, +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/PushCountersEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/PushCountersEnum.java new file mode 100644 index 000000000..53f023f74 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/PushCountersEnum.java @@ -0,0 +1,6 @@ +package io.split.telemetry.domain.enums; + +public enum PushCountersEnum { + AUTH_REJECTIONS, + TOKEN_REFRESHES +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java new file mode 100644 index 000000000..b705aa43e --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java @@ -0,0 +1,10 @@ +package io.split.telemetry.domain.enums; + +public enum ResourceEnum { + SPLIT_SYNC, + SEGMENT_SYNC, + IMPRESSION_SYNC, + EVENT_SYNC, + TELEMETRY_SYNC, + TOKEN_SYNC +} diff --git a/client/src/main/java/io/split/telemetry/domain/enums/SdkRecordsEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/SdkRecordsEnum.java new file mode 100644 index 000000000..6f8780811 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/SdkRecordsEnum.java @@ -0,0 +1,5 @@ +package io.split.telemetry.domain.enums; + +public enum SdkRecordsEnum { + SESSION +} diff --git a/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java b/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java new file mode 100644 index 000000000..91e8a0d1d --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java @@ -0,0 +1,306 @@ +package io.split.telemetry.storage; + +import com.google.common.collect.Maps; +import io.split.telemetry.domain.*; +import io.split.telemetry.domain.enums.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +public class InMemoryTelemetryStorage implements TelemetryStorage{ + + //Latencies + private final ConcurrentMap> _methodLatencies = Maps.newConcurrentMap(); + private final ConcurrentMap> _httpLatencies = Maps.newConcurrentMap(); + + //Counters + private final ConcurrentMap _exceptionsCounters = Maps.newConcurrentMap(); + private final ConcurrentMap _pushCounters = Maps.newConcurrentMap(); + private final ConcurrentMap _factoryCounters = Maps.newConcurrentMap(); + + //Records + private final ConcurrentMap _impressionsDataRecords = Maps.newConcurrentMap(); + private final ConcurrentMap _eventsDataRecords = Maps.newConcurrentMap(); + private final ConcurrentMap _lastSynchronizationRecords = Maps.newConcurrentMap(); + private final ConcurrentMap _sdkRecords = Maps.newConcurrentMap(); + + //HTTPErrors + private final ConcurrentMap> _httpErrors = Maps.newConcurrentMap(); + + //StreamingEvents + private final Object _streamingEventsLock = new Object(); + private final List _streamingEvents = new ArrayList<>(); + + //Tags + private final Object _tagsLock = new Object(); + private final List _tags = new ArrayList<>(); + + public InMemoryTelemetryStorage() { + initMethodLatencies(); + initHttpLatencies(); + initHttpErrors(); + } + + @Override + public void recordConfigData() { + // No-Op. Config Data will be sent directly to Split Servers. No need to store. + } + + @Override + public long getBURTimeouts() { + long burTimeouts = _factoryCounters.getOrDefault(FactoryCountersEnum.BUR_TIMEOUTS, new AtomicLong()).get(); + return burTimeouts; + } + + @Override + public long getNonReadyUsages() { + long nonReadyUsages = _factoryCounters.getOrDefault(FactoryCountersEnum.NON_READY_USAGES, new AtomicLong()).get(); + return nonReadyUsages; + } + + @Override + public MethodExceptions popExceptions() { + MethodExceptions exceptions = new MethodExceptions(); + exceptions.set_treatment(_exceptionsCounters.getOrDefault(MethodEnum.TREATMENT, new AtomicLong()).get()); + exceptions.set_treatments(_exceptionsCounters.getOrDefault(MethodEnum.TREATMENTS, new AtomicLong()).get()); + exceptions.set_treatmentWithConfig(_exceptionsCounters.getOrDefault(MethodEnum.TREATMENT_WITH_CONFIG, new AtomicLong()).get()); + exceptions.set_treatmentsWithConfig(_exceptionsCounters.getOrDefault(MethodEnum.TREATMENTS_WITH_CONFIG, new AtomicLong()).get()); + exceptions.set_track(_exceptionsCounters.getOrDefault(MethodEnum.TRACK, new AtomicLong()).get()); + + _exceptionsCounters.clear(); + initMethodLatencies(); + + return exceptions; + } + + @Override + public MethodLatencies popLatencies() { + MethodLatencies latencies = new MethodLatencies(); + latencies.set_treatment(_methodLatencies.get(MethodEnum.TREATMENT)); + latencies.set_treatments(_methodLatencies.get(MethodEnum.TREATMENTS)); + latencies.set_treatmentWithConfig(_methodLatencies.get(MethodEnum.TREATMENT_WITH_CONFIG)); + latencies.set_treatmentsWithConfig(_methodLatencies.get(MethodEnum.TREATMENTS_WITH_CONFIG)); + latencies.set_track(_methodLatencies.get(MethodEnum.TRACK)); + + _methodLatencies.clear(); + initMethodLatencies(); + + return latencies; + } + + @Override + public void recordNonReadyUsage() { + _factoryCounters.putIfAbsent(FactoryCountersEnum.NON_READY_USAGES, new AtomicLong(0)); + _factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).incrementAndGet(); + + } + + @Override + public void recordBURTimeout() { + _factoryCounters.putIfAbsent(FactoryCountersEnum.BUR_TIMEOUTS, new AtomicLong(0)); + _factoryCounters.get(FactoryCountersEnum.BUR_TIMEOUTS).incrementAndGet(); + } + + @Override + public void recordLatency(String method, int latency) { + _methodLatencies.get(method).add(Long.valueOf(latency)); + } + + @Override + public void recordException(MethodEnum method) { + _exceptionsCounters.putIfAbsent(method, new AtomicLong(0)); + _exceptionsCounters.get(method).incrementAndGet(); + } + + @Override + public long getImpressionsStats(ImpressionsDataTypeEnum data) { + return _impressionsDataRecords.getOrDefault(data, new AtomicLong()).get(); + } + + @Override + public long getEventStats(EventsDataRecordsEnum type) { + return _eventsDataRecords.getOrDefault(type, new AtomicLong()).get(); + } + + @Override + public LastSynchronization getLastSynchronization() { + LastSynchronization lastSynchronization = new LastSynchronization(); + lastSynchronization.set_splits(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.SPLITS, new AtomicLong()).get()); + lastSynchronization.set_segments(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.SEGMENTS, new AtomicLong()).get()); + lastSynchronization.set_impressions(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.IMPRESSIONS, new AtomicLong()).get()); + lastSynchronization.set_events(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.EVENTS, new AtomicLong()).get()); + lastSynchronization.set_telemetry(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.TELEMETRY, new AtomicLong()).get()); + lastSynchronization.set_token(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.TOKEN, new AtomicLong()).get()); + + return lastSynchronization; + } + + @Override + public HTTPErrors popHTTPErrors() { + HTTPErrors errors = new HTTPErrors(); + errors.set_splits(_httpErrors.get(ResourceEnum.SPLIT_SYNC)); + errors.set_segments(_httpErrors.get(ResourceEnum.SEGMENT_SYNC)); + errors.set_impressions(_httpErrors.get(ResourceEnum.IMPRESSION_SYNC)); + errors.set_events(_httpErrors.get(ResourceEnum.EVENT_SYNC)); + errors.set_telemetry(_httpErrors.get(ResourceEnum.TELEMETRY_SYNC)); + errors.set_token(_httpErrors.get(ResourceEnum.TOKEN_SYNC)); + + _httpErrors.clear(); + initHttpErrors(); + + return errors; + } + + @Override + public HTTPLatencies popHTTPLatencies() { + HTTPLatencies latencies = new HTTPLatencies(); + latencies.set_splits(_httpLatencies.get(HTTPLatenciesEnum.SPLITS)); + latencies.set_segments(_httpLatencies.get(HTTPLatenciesEnum.SEGMENTS)); + latencies.set_impressions(_httpLatencies.get(HTTPLatenciesEnum.IMPRESSIONS)); + latencies.set_events(_httpLatencies.get(HTTPLatenciesEnum.EVENTS)); + latencies.set_telemetry(_httpLatencies.get(HTTPLatenciesEnum.TELEMETRY)); + latencies.set_token(_httpLatencies.get(HTTPLatenciesEnum.TOKEN)); + + _httpLatencies.clear(); + initHttpLatencies(); + + return latencies; + } + + @Override + public long popAuthRejections() { + long authRejections = _pushCounters.getOrDefault(PushCountersEnum.AUTH_REJECTIONS, new AtomicLong()).get(); + + _pushCounters.replace(PushCountersEnum.AUTH_REJECTIONS, new AtomicLong()); + + return authRejections; + } + + @Override + public long popTokenRefreshes() { + long tokenRefreshes = _pushCounters.getOrDefault(PushCountersEnum.TOKEN_REFRESHES, new AtomicLong()).get(); + + _pushCounters.replace(PushCountersEnum.TOKEN_REFRESHES, new AtomicLong()); + + return tokenRefreshes; + } + + @Override + public List popStreamingEvents() { + synchronized (_streamingEventsLock) { + List streamingEvents = _streamingEvents; + + _streamingEvents.clear(); + + return streamingEvents; + } + } + + @Override + public List popTags() { + synchronized (_tagsLock) { + List tags = _tags; + + _tags.clear(); + + return tags; + } + } + + @Override + public long getSessionLength() { + return _sdkRecords.getOrDefault(SdkRecordsEnum.SESSION, new AtomicLong()).get(); + } + + @Override + public void addTag(String tag) { + synchronized (_tagsLock) { + _tags.add(tag); + } + } + + @Override + public void recordImpressionStats(ImpressionsDataTypeEnum dataType, long count) { + _impressionsDataRecords.putIfAbsent(dataType, new AtomicLong()); + _impressionsDataRecords.get(dataType).incrementAndGet(); + } + + @Override + public void recordEventStats(EventsDataRecordsEnum dataType, long count) { + _eventsDataRecords.putIfAbsent(dataType, new AtomicLong()); + _eventsDataRecords.get(dataType).incrementAndGet(); + } + + @Override + public void recordSuccessfulSync(LastSynchronizationRecordsEnum resource, long time) { + _lastSynchronizationRecords.putIfAbsent(resource, new AtomicLong(time)); + + } + + @Override + public void recordSyncError(ResourceEnum resource, int status) { + ConcurrentMap errors = _httpErrors.get(resource); + errors.putIfAbsent(Long.valueOf(status), 0l); + errors.replace(Long.valueOf(status), errors.get(status) + 1); + } + + @Override + public void recordSyncLatency(String resource, long latency) { + _httpLatencies.get(resource).add(latency); + + } + + @Override + public void recordAuthRejections() { + _pushCounters.putIfAbsent(PushCountersEnum.AUTH_REJECTIONS, new AtomicLong(0)); + _pushCounters.get(PushCountersEnum.AUTH_REJECTIONS).incrementAndGet(); + + } + + @Override + public void recordTokenRefreshes() { + _pushCounters.putIfAbsent(PushCountersEnum.TOKEN_REFRESHES, new AtomicLong(0)); + _pushCounters.get(PushCountersEnum.TOKEN_REFRESHES).incrementAndGet(); + + } + + @Override + public void recordStreamingEvents(StreamingEvent streamingEvent) { + synchronized (_streamingEventsLock) { + _streamingEvents.add(streamingEvent); + } + } + + @Override + public void recordSessionLength(long sessionLength) { + _sdkRecords.putIfAbsent(SdkRecordsEnum.SESSION, new AtomicLong(sessionLength)); + } + + private void initMethodLatencies() { + _methodLatencies.put(MethodEnum.TREATMENT, new ArrayList<>()); + _methodLatencies.put(MethodEnum.TREATMENTS, new ArrayList<>()); + _methodLatencies.put(MethodEnum.TREATMENT_WITH_CONFIG, new ArrayList<>()); + _methodLatencies.put(MethodEnum.TREATMENTS_WITH_CONFIG, new ArrayList<>()); + _methodLatencies.put(MethodEnum.TRACK, new ArrayList<>()); + } + + private void initHttpLatencies() { + _httpLatencies.put(HTTPLatenciesEnum.SPLITS, new ArrayList<>()); + _httpLatencies.put(HTTPLatenciesEnum.SEGMENTS, new ArrayList<>()); + _httpLatencies.put(HTTPLatenciesEnum.IMPRESSIONS, new ArrayList<>()); + _httpLatencies.put(HTTPLatenciesEnum.EVENTS, new ArrayList<>()); + _httpLatencies.put(HTTPLatenciesEnum.TELEMETRY, new ArrayList<>()); + _httpLatencies.put(HTTPLatenciesEnum.TOKEN, new ArrayList<>()); + } + + private void initHttpErrors() { + _httpErrors.put(ResourceEnum.SPLIT_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.SEGMENT_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.IMPRESSION_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.EVENT_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.TELEMETRY_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.TOKEN_SYNC, Maps.newConcurrentMap()); + } +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryConfigConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryConfigConsumer.java new file mode 100644 index 000000000..680d128b8 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryConfigConsumer.java @@ -0,0 +1,6 @@ +package io.split.telemetry.storage; + +public interface TelemetryConfigConsumer { + long getBURTimeouts(); + long getNonReadyUsages(); +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java new file mode 100644 index 000000000..913733d03 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java @@ -0,0 +1,7 @@ +package io.split.telemetry.storage; + +public interface TelemetryConfigProducer { + void recordConfigData(); + void recordNonReadyUsage(); + void recordBURTimeout(); +} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java index 3537540ce..976fbb623 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java @@ -1,4 +1,9 @@ package io.split.telemetry.storage; +import io.split.telemetry.domain.MethodExceptions; +import io.split.telemetry.domain.MethodLatencies; + public interface TelemetryEvaluationConsumer { + MethodExceptions popExceptions(); + MethodLatencies popLatencies(); } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java index 9c4e59d4b..51795b054 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java @@ -1,4 +1,8 @@ package io.split.telemetry.storage; +import io.split.telemetry.domain.enums.MethodEnum; + public interface TelemetryEvaluationProducer { + void recordLatency(String method, int latency); + void recordException(MethodEnum method); } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryInitConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryInitConsumer.java deleted file mode 100644 index 2430ff765..000000000 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryInitConsumer.java +++ /dev/null @@ -1,4 +0,0 @@ -package io.split.telemetry.storage; - -public interface TelemetryInitConsumer { -} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryInitProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryInitProducer.java deleted file mode 100644 index 9547967d4..000000000 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryInitProducer.java +++ /dev/null @@ -1,4 +0,0 @@ -package io.split.telemetry.storage; - -public interface TelemetryInitProducer { -} diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java index 661b75cdb..9df890be3 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java @@ -1,4 +1,24 @@ package io.split.telemetry.storage; +import io.split.telemetry.domain.HTTPErrors; +import io.split.telemetry.domain.HTTPLatencies; +import io.split.telemetry.domain.LastSynchronization; +import io.split.telemetry.domain.StreamingEvent; +import io.split.telemetry.domain.enums.EventsDataRecordsEnum; +import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum; + +import java.util.List; + public interface TelemetryRuntimeConsumer { + long getImpressionsStats(ImpressionsDataTypeEnum data); + long getEventStats(EventsDataRecordsEnum type); + LastSynchronization getLastSynchronization(); + HTTPErrors popHTTPErrors(); + HTTPLatencies popHTTPLatencies(); + long popAuthRejections(); + long popTokenRefreshes(); + List popStreamingEvents(); + List popTags(); + long getSessionLength(); + } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java index 8d6e3672f..ab6fe175f 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java @@ -1,4 +1,20 @@ package io.split.telemetry.storage; +import io.split.telemetry.domain.StreamingEvent; +import io.split.telemetry.domain.enums.EventsDataRecordsEnum; +import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum; +import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum; +import io.split.telemetry.domain.enums.ResourceEnum; + public interface TelemetryRuntimeProducer { + void addTag(String tag); + void recordImpressionStats(ImpressionsDataTypeEnum dataType, long count); + void recordEventStats(EventsDataRecordsEnum dataType, long count); + void recordSuccessfulSync(LastSynchronizationRecordsEnum resource, long time); + void recordSyncError(ResourceEnum resource, int status); + void recordSyncLatency(String resource, long latency); + void recordAuthRejections(); + void recordTokenRefreshes(); + void recordStreamingEvents(StreamingEvent streamingEvent); + void recordSessionLength(long sessionLength); } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java index 217a6e27b..7efd537c5 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageConsumer.java @@ -1,4 +1,4 @@ package io.split.telemetry.storage; -public interface TelemetryStorageConsumer extends TelemetryInitConsumer, TelemetryRuntimeConsumer, TelemetryEvaluationConsumer{ +public interface TelemetryStorageConsumer extends TelemetryConfigConsumer, TelemetryRuntimeConsumer, TelemetryEvaluationConsumer{ } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java index 382f5939f..c3ef23031 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryStorageProducer.java @@ -1,4 +1,4 @@ package io.split.telemetry.storage; -public interface TelemetryStorageProducer extends TelemetryEvaluationProducer, TelemetryInitProducer, TelemetryRuntimeProducer{ +public interface TelemetryStorageProducer extends TelemetryEvaluationProducer, TelemetryConfigProducer, TelemetryRuntimeProducer{ } From 086150cbf611e9611a5b009d73adb4407708c1eb Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Fri, 9 Apr 2021 12:46:21 -0300 Subject: [PATCH 3/6] Changing name --- .../telemetry/domain/enums/ImpressionsDataTypeEnum.java | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 client/src/main/java/io/split/telemetry/domain/enums/ImpressionsDataTypeEnum.java diff --git a/client/src/main/java/io/split/telemetry/domain/enums/ImpressionsDataTypeEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/ImpressionsDataTypeEnum.java new file mode 100644 index 000000000..3ca134c93 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/enums/ImpressionsDataTypeEnum.java @@ -0,0 +1,7 @@ +package io.split.telemetry.domain.enums; + +public enum ImpressionsDataTypeEnum { + IMPRESSIONS_QUEUED, + IMPRESSIONS_DROPPED, + IMPRESSIONS_DEDUPED +} From 31a82081a5be3edddc43c8afa9b2fc8b2f1f9e87 Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Fri, 9 Apr 2021 17:41:41 -0300 Subject: [PATCH 4/6] Fixing PR comments --- .../io/split/telemetry/domain/HTTPErrors.java | 12 ++ .../split/telemetry/domain/HTTPLatencies.java | 13 +- .../telemetry/domain/LastSynchronization.java | 11 ++ .../telemetry/domain/StreamingEvent.java | 12 +- .../domain/enums/HTTPLatenciesEnum.java | 1 + .../enums/LastSynchronizationRecordsEnum.java | 1 + .../telemetry/domain/enums/ResourceEnum.java | 1 + .../storage/InMemoryTelemetryStorage.java | 176 +++++++++++------- .../storage/TelemetryConfigProducer.java | 1 - .../storage/TelemetryEvaluationConsumer.java | 2 +- .../storage/TelemetryRuntimeConsumer.java | 2 +- .../telemetry/utils/BucketCalculator.java | 64 +++++++ .../storage/InMemoryTelemetryStorageTest.java | 21 +++ 13 files changed, 243 insertions(+), 74 deletions(-) create mode 100644 client/src/main/java/io/split/telemetry/utils/BucketCalculator.java create mode 100644 client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java diff --git a/client/src/main/java/io/split/telemetry/domain/HTTPErrors.java b/client/src/main/java/io/split/telemetry/domain/HTTPErrors.java index 4b1051c56..dac746117 100644 --- a/client/src/main/java/io/split/telemetry/domain/HTTPErrors.java +++ b/client/src/main/java/io/split/telemetry/domain/HTTPErrors.java @@ -9,6 +9,7 @@ public class HTTPErrors { /* package private */ static final String FIELD_SPLIT = "sp"; /* package private */ static final String FIELD_SEGMENTS = "se"; /* package private */ static final String FIELD_IMPRESSIONS = "im"; + /* package private */ static final String FIELD_IMPRESSIONS_COUNT = "ic"; /* package private */ static final String FIELD_EVENTS = "ev"; /* package private */ static final String FIELD_TOKEN = "to"; /* package private */ static final String FIELD_TELEMETRY = "te"; @@ -19,6 +20,8 @@ public class HTTPErrors { private Map _segments; @SerializedName(FIELD_IMPRESSIONS) private Map _impressions; + @SerializedName(FIELD_IMPRESSIONS_COUNT) + private Map _impressionsCount; @SerializedName(FIELD_EVENTS) private Map _events; @SerializedName(FIELD_TOKEN) @@ -30,6 +33,7 @@ public HTTPErrors() { _splits = new ConcurrentHashMap<>(); _segments = new ConcurrentHashMap<>(); _impressions = new ConcurrentHashMap<>(); + _impressionsCount = new ConcurrentHashMap<>(); _events = new ConcurrentHashMap<>(); _token = new ConcurrentHashMap<>(); _telemetry = new ConcurrentHashMap<>(); @@ -82,4 +86,12 @@ public Map get_telemetry() { public void set_telemetry(Map _telemetry) { this._telemetry = _telemetry; } + + public Map get_impressionsCount() { + return _impressionsCount; + } + + public void set_impressionsCount(Map _impressionsCount) { + this._impressionsCount = _impressionsCount; + } } diff --git a/client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java b/client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java index bf32af484..0e0791ed9 100644 --- a/client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java +++ b/client/src/main/java/io/split/telemetry/domain/HTTPLatencies.java @@ -4,12 +4,12 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; public class HTTPLatencies { /* package private */ static final String FIELD_SPLIT = "sp"; /* package private */ static final String FIELD_SEGMENTS = "se"; /* package private */ static final String FIELD_IMPRESSIONS = "im"; + /* package private */ static final String FIELD_IMPRESSIONS_COUNT = "ic"; /* package private */ static final String FIELD_EVENTS = "ev"; /* package private */ static final String FIELD_TOKEN = "to"; /* package private */ static final String FIELD_TELEMETRY = "te"; @@ -20,6 +20,8 @@ public class HTTPLatencies { private List_segments; @SerializedName(FIELD_IMPRESSIONS) private List _impressions; + @SerializedName(FIELD_IMPRESSIONS_COUNT) + private List _impressionsCount; @SerializedName(FIELD_EVENTS) private List _events; @SerializedName(FIELD_TOKEN) @@ -31,6 +33,7 @@ public HTTPLatencies() { _splits = new ArrayList<>(); _segments = new ArrayList<>(); _impressions = new ArrayList<>(); + _impressionsCount = new ArrayList<>(); _events = new ArrayList<>(); _token = new ArrayList<>(); _telemetry = new ArrayList<>(); @@ -83,4 +86,12 @@ public List get_telemetry() { public void set_telemetry(List _telemetry) { this._telemetry = _telemetry; } + + public List get_impressionsCount() { + return _impressionsCount; + } + + public void set_impressionsCount(List _impressionsCount) { + this._impressionsCount = _impressionsCount; + } } diff --git a/client/src/main/java/io/split/telemetry/domain/LastSynchronization.java b/client/src/main/java/io/split/telemetry/domain/LastSynchronization.java index 0b77c8a06..59586562e 100644 --- a/client/src/main/java/io/split/telemetry/domain/LastSynchronization.java +++ b/client/src/main/java/io/split/telemetry/domain/LastSynchronization.java @@ -6,6 +6,7 @@ public class LastSynchronization { /* package private */ static final String FIELD_SPLIT = "sp"; /* package private */ static final String FIELD_SEGMENTS = "se"; /* package private */ static final String FIELD_IMPRESSIONS = "im"; + /* package private */ static final String FIELD_IMPRESSIONS_COUNT = "ic"; /* package private */ static final String FIELD_EVENTS = "ev"; /* package private */ static final String FIELD_TOKEN = "to"; /* package private */ static final String FIELD_TELEMETRY = "te"; @@ -16,6 +17,8 @@ public class LastSynchronization { private long _segments; @SerializedName(FIELD_IMPRESSIONS) private long _impressions; + @SerializedName(FIELD_IMPRESSIONS_COUNT) + private long _impressionsCount; @SerializedName(FIELD_EVENTS) private long _events; @SerializedName(FIELD_TOKEN) @@ -70,4 +73,12 @@ public long get_telemetry() { public void set_telemetry(long _telemetry) { this._telemetry = _telemetry; } + + public long get_impressionsCount() { + return _impressionsCount; + } + + public void set_impressionsCount(long _impressionsCount) { + this._impressionsCount = _impressionsCount; + } } diff --git a/client/src/main/java/io/split/telemetry/domain/StreamingEvent.java b/client/src/main/java/io/split/telemetry/domain/StreamingEvent.java index eb4e49e2c..e0ebca3ac 100644 --- a/client/src/main/java/io/split/telemetry/domain/StreamingEvent.java +++ b/client/src/main/java/io/split/telemetry/domain/StreamingEvent.java @@ -3,16 +3,16 @@ import com.google.gson.annotations.SerializedName; public class StreamingEvent { - /* package private */ static final String FIELD_TYPE = "sp"; - /* package private */ static final String FIELD_DATA = "se"; - /* package private */ static final String FIELD_TIMESTAMP = "im"; + /* package private */ static final String FIELD_TYPE = "e"; + /* package private */ static final String FIELD_DATA = "d"; + /* package private */ static final String FIELD_TIMESTAMP = "t"; @SerializedName(FIELD_TYPE) private int _type; @SerializedName(FIELD_DATA) private long _data; @SerializedName(FIELD_TIMESTAMP) - private long timestamp; + private long _timestamp; public int get_type() { return _type; @@ -31,10 +31,10 @@ public void set_data(long _data) { } public long getTimestamp() { - return timestamp; + return _timestamp; } public void setTimestamp(long timestamp) { - this.timestamp = timestamp; + this._timestamp = timestamp; } } diff --git a/client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java index 88a7a7e84..6f858397b 100644 --- a/client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java +++ b/client/src/main/java/io/split/telemetry/domain/enums/HTTPLatenciesEnum.java @@ -4,6 +4,7 @@ public enum HTTPLatenciesEnum { SPLITS, SEGMENTS, IMPRESSIONS, + IMPRESSIONS_COUNT, EVENTS, TELEMETRY, TOKEN diff --git a/client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java index f99b43f19..d2439395b 100644 --- a/client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java +++ b/client/src/main/java/io/split/telemetry/domain/enums/LastSynchronizationRecordsEnum.java @@ -4,6 +4,7 @@ public enum LastSynchronizationRecordsEnum { SPLITS, SEGMENTS, IMPRESSIONS, + IMPRESSIONS_COUNT, EVENTS, TOKEN, TELEMETRY diff --git a/client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java b/client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java index b705aa43e..04af11162 100644 --- a/client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java +++ b/client/src/main/java/io/split/telemetry/domain/enums/ResourceEnum.java @@ -4,6 +4,7 @@ public enum ResourceEnum { SPLIT_SYNC, SEGMENT_SYNC, IMPRESSION_SYNC, + IMPRESSION_COUNT_SYNC, EVENT_SYNC, TELEMETRY_SYNC, TOKEN_SYNC diff --git a/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java b/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java index 91e8a0d1d..967790704 100644 --- a/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java +++ b/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java @@ -1,6 +1,7 @@ package io.split.telemetry.storage; import com.google.common.collect.Maps; +import io.split.telemetry.utils.BucketCalculator; import io.split.telemetry.domain.*; import io.split.telemetry.domain.enums.*; @@ -10,10 +11,11 @@ import java.util.concurrent.atomic.AtomicLong; public class InMemoryTelemetryStorage implements TelemetryStorage{ + public static final int MAX_LATENCY_BUCKET_COUNT = 23; //Latencies - private final ConcurrentMap> _methodLatencies = Maps.newConcurrentMap(); - private final ConcurrentMap> _httpLatencies = Maps.newConcurrentMap(); + private final ConcurrentMap _methodLatencies = Maps.newConcurrentMap(); + private final ConcurrentMap _httpLatencies = Maps.newConcurrentMap(); //Counters private final ConcurrentMap _exceptionsCounters = Maps.newConcurrentMap(); @@ -37,37 +39,39 @@ public class InMemoryTelemetryStorage implements TelemetryStorage{ private final Object _tagsLock = new Object(); private final List _tags = new ArrayList<>(); - public InMemoryTelemetryStorage() { + public InMemoryTelemetryStorage() throws Exception { initMethodLatencies(); initHttpLatencies(); initHttpErrors(); - } - - @Override - public void recordConfigData() { - // No-Op. Config Data will be sent directly to Split Servers. No need to store. + initMethodCounters(); + initFactoryCounters(); + initImpressionDataCounters(); + initPushCounters(); + initSdkRecords(); + initLastSynchronizationRecords(); + initEventDataRecords(); } @Override public long getBURTimeouts() { - long burTimeouts = _factoryCounters.getOrDefault(FactoryCountersEnum.BUR_TIMEOUTS, new AtomicLong()).get(); + long burTimeouts = _factoryCounters.get(FactoryCountersEnum.BUR_TIMEOUTS).get(); return burTimeouts; } @Override public long getNonReadyUsages() { - long nonReadyUsages = _factoryCounters.getOrDefault(FactoryCountersEnum.NON_READY_USAGES, new AtomicLong()).get(); + long nonReadyUsages = _factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).get(); return nonReadyUsages; } @Override - public MethodExceptions popExceptions() { + public MethodExceptions popExceptions() throws Exception { MethodExceptions exceptions = new MethodExceptions(); - exceptions.set_treatment(_exceptionsCounters.getOrDefault(MethodEnum.TREATMENT, new AtomicLong()).get()); - exceptions.set_treatments(_exceptionsCounters.getOrDefault(MethodEnum.TREATMENTS, new AtomicLong()).get()); - exceptions.set_treatmentWithConfig(_exceptionsCounters.getOrDefault(MethodEnum.TREATMENT_WITH_CONFIG, new AtomicLong()).get()); - exceptions.set_treatmentsWithConfig(_exceptionsCounters.getOrDefault(MethodEnum.TREATMENTS_WITH_CONFIG, new AtomicLong()).get()); - exceptions.set_track(_exceptionsCounters.getOrDefault(MethodEnum.TRACK, new AtomicLong()).get()); + exceptions.set_treatment(_exceptionsCounters.get(MethodEnum.TREATMENT).get()); + exceptions.set_treatments(_exceptionsCounters.get(MethodEnum.TREATMENTS).get()); + exceptions.set_treatmentWithConfig(_exceptionsCounters.get(MethodEnum.TREATMENT_WITH_CONFIG).get()); + exceptions.set_treatmentsWithConfig(_exceptionsCounters.get(MethodEnum.TREATMENTS_WITH_CONFIG).get()); + exceptions.set_track(_exceptionsCounters.get(MethodEnum.TRACK).get()); _exceptionsCounters.clear(); initMethodLatencies(); @@ -78,61 +82,60 @@ public MethodExceptions popExceptions() { @Override public MethodLatencies popLatencies() { MethodLatencies latencies = new MethodLatencies(); - latencies.set_treatment(_methodLatencies.get(MethodEnum.TREATMENT)); - latencies.set_treatments(_methodLatencies.get(MethodEnum.TREATMENTS)); - latencies.set_treatmentWithConfig(_methodLatencies.get(MethodEnum.TREATMENT_WITH_CONFIG)); - latencies.set_treatmentsWithConfig(_methodLatencies.get(MethodEnum.TREATMENTS_WITH_CONFIG)); - latencies.set_track(_methodLatencies.get(MethodEnum.TRACK)); + latencies.set_treatment(_methodLatencies.get(MethodEnum.TREATMENT).fetchAndClearAll()); + latencies.set_treatments(_methodLatencies.get(MethodEnum.TREATMENTS).fetchAndClearAll()); + latencies.set_treatmentWithConfig(_methodLatencies.get(MethodEnum.TREATMENT_WITH_CONFIG).fetchAndClearAll()); + latencies.set_treatmentsWithConfig(_methodLatencies.get(MethodEnum.TREATMENTS_WITH_CONFIG).fetchAndClearAll()); + latencies.set_track(_methodLatencies.get(MethodEnum.TRACK).fetchAndClearAll()); _methodLatencies.clear(); - initMethodLatencies(); + initMethodCounters(); return latencies; } @Override public void recordNonReadyUsage() { - _factoryCounters.putIfAbsent(FactoryCountersEnum.NON_READY_USAGES, new AtomicLong(0)); _factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).incrementAndGet(); } @Override public void recordBURTimeout() { - _factoryCounters.putIfAbsent(FactoryCountersEnum.BUR_TIMEOUTS, new AtomicLong(0)); _factoryCounters.get(FactoryCountersEnum.BUR_TIMEOUTS).incrementAndGet(); } @Override public void recordLatency(String method, int latency) { - _methodLatencies.get(method).add(Long.valueOf(latency)); + int bucket = BucketCalculator.getBucketForLatencyMillis(latency); + _methodLatencies.get(method).increment(bucket); } @Override public void recordException(MethodEnum method) { - _exceptionsCounters.putIfAbsent(method, new AtomicLong(0)); _exceptionsCounters.get(method).incrementAndGet(); } @Override public long getImpressionsStats(ImpressionsDataTypeEnum data) { - return _impressionsDataRecords.getOrDefault(data, new AtomicLong()).get(); + return _impressionsDataRecords.get(data).get(); } @Override public long getEventStats(EventsDataRecordsEnum type) { - return _eventsDataRecords.getOrDefault(type, new AtomicLong()).get(); + return _eventsDataRecords.get(type).get(); } @Override public LastSynchronization getLastSynchronization() { LastSynchronization lastSynchronization = new LastSynchronization(); - lastSynchronization.set_splits(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.SPLITS, new AtomicLong()).get()); - lastSynchronization.set_segments(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.SEGMENTS, new AtomicLong()).get()); - lastSynchronization.set_impressions(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.IMPRESSIONS, new AtomicLong()).get()); - lastSynchronization.set_events(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.EVENTS, new AtomicLong()).get()); - lastSynchronization.set_telemetry(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.TELEMETRY, new AtomicLong()).get()); - lastSynchronization.set_token(_lastSynchronizationRecords.getOrDefault(LastSynchronizationRecordsEnum.TOKEN, new AtomicLong()).get()); + lastSynchronization.set_splits(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.SPLITS).get()); + lastSynchronization.set_segments(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.SEGMENTS).get()); + lastSynchronization.set_impressions(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.IMPRESSIONS).get()); + lastSynchronization.set_impressionsCount(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.IMPRESSIONS_COUNT).get()); + lastSynchronization.set_events(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.EVENTS).get()); + lastSynchronization.set_telemetry(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.TELEMETRY).get()); + lastSynchronization.set_token(_lastSynchronizationRecords.get(LastSynchronizationRecordsEnum.TOKEN).get()); return lastSynchronization; } @@ -143,6 +146,7 @@ public HTTPErrors popHTTPErrors() { errors.set_splits(_httpErrors.get(ResourceEnum.SPLIT_SYNC)); errors.set_segments(_httpErrors.get(ResourceEnum.SEGMENT_SYNC)); errors.set_impressions(_httpErrors.get(ResourceEnum.IMPRESSION_SYNC)); + errors.set_impressionsCount(_httpErrors.get(ResourceEnum.IMPRESSION_COUNT_SYNC)); errors.set_events(_httpErrors.get(ResourceEnum.EVENT_SYNC)); errors.set_telemetry(_httpErrors.get(ResourceEnum.TELEMETRY_SYNC)); errors.set_token(_httpErrors.get(ResourceEnum.TOKEN_SYNC)); @@ -154,14 +158,15 @@ public HTTPErrors popHTTPErrors() { } @Override - public HTTPLatencies popHTTPLatencies() { + public HTTPLatencies popHTTPLatencies() throws Exception { HTTPLatencies latencies = new HTTPLatencies(); - latencies.set_splits(_httpLatencies.get(HTTPLatenciesEnum.SPLITS)); - latencies.set_segments(_httpLatencies.get(HTTPLatenciesEnum.SEGMENTS)); - latencies.set_impressions(_httpLatencies.get(HTTPLatenciesEnum.IMPRESSIONS)); - latencies.set_events(_httpLatencies.get(HTTPLatenciesEnum.EVENTS)); - latencies.set_telemetry(_httpLatencies.get(HTTPLatenciesEnum.TELEMETRY)); - latencies.set_token(_httpLatencies.get(HTTPLatenciesEnum.TOKEN)); + latencies.set_splits(_httpLatencies.get(HTTPLatenciesEnum.SPLITS).fetchAndClearAll()); + latencies.set_segments(_httpLatencies.get(HTTPLatenciesEnum.SEGMENTS).fetchAndClearAll()); + latencies.set_impressions(_httpLatencies.get(HTTPLatenciesEnum.IMPRESSIONS).fetchAndClearAll()); + latencies.set_impressionsCount(_httpLatencies.get(HTTPLatenciesEnum.IMPRESSIONS_COUNT).fetchAndClearAll()); + latencies.set_events(_httpLatencies.get(HTTPLatenciesEnum.EVENTS).fetchAndClearAll()); + latencies.set_telemetry(_httpLatencies.get(HTTPLatenciesEnum.TELEMETRY).fetchAndClearAll()); + latencies.set_token(_httpLatencies.get(HTTPLatenciesEnum.TOKEN).fetchAndClearAll()); _httpLatencies.clear(); initHttpLatencies(); @@ -171,7 +176,7 @@ public HTTPLatencies popHTTPLatencies() { @Override public long popAuthRejections() { - long authRejections = _pushCounters.getOrDefault(PushCountersEnum.AUTH_REJECTIONS, new AtomicLong()).get(); + long authRejections = _pushCounters.get(PushCountersEnum.AUTH_REJECTIONS).get(); _pushCounters.replace(PushCountersEnum.AUTH_REJECTIONS, new AtomicLong()); @@ -180,7 +185,7 @@ public long popAuthRejections() { @Override public long popTokenRefreshes() { - long tokenRefreshes = _pushCounters.getOrDefault(PushCountersEnum.TOKEN_REFRESHES, new AtomicLong()).get(); + long tokenRefreshes = _pushCounters.get(PushCountersEnum.TOKEN_REFRESHES).get(); _pushCounters.replace(PushCountersEnum.TOKEN_REFRESHES, new AtomicLong()); @@ -211,7 +216,7 @@ public List popTags() { @Override public long getSessionLength() { - return _sdkRecords.getOrDefault(SdkRecordsEnum.SESSION, new AtomicLong()).get(); + return _sdkRecords.get(SdkRecordsEnum.SESSION).get(); } @Override @@ -223,19 +228,17 @@ public void addTag(String tag) { @Override public void recordImpressionStats(ImpressionsDataTypeEnum dataType, long count) { - _impressionsDataRecords.putIfAbsent(dataType, new AtomicLong()); _impressionsDataRecords.get(dataType).incrementAndGet(); } @Override public void recordEventStats(EventsDataRecordsEnum dataType, long count) { - _eventsDataRecords.putIfAbsent(dataType, new AtomicLong()); _eventsDataRecords.get(dataType).incrementAndGet(); } @Override public void recordSuccessfulSync(LastSynchronizationRecordsEnum resource, long time) { - _lastSynchronizationRecords.putIfAbsent(resource, new AtomicLong(time)); + _lastSynchronizationRecords.replace(resource, new AtomicLong(time)); } @@ -248,20 +251,19 @@ public void recordSyncError(ResourceEnum resource, int status) { @Override public void recordSyncLatency(String resource, long latency) { - _httpLatencies.get(resource).add(latency); + int bucket = BucketCalculator.getBucketForLatencyMillis(latency); + _httpLatencies.get(resource).increment(bucket); } @Override public void recordAuthRejections() { - _pushCounters.putIfAbsent(PushCountersEnum.AUTH_REJECTIONS, new AtomicLong(0)); _pushCounters.get(PushCountersEnum.AUTH_REJECTIONS).incrementAndGet(); } @Override public void recordTokenRefreshes() { - _pushCounters.putIfAbsent(PushCountersEnum.TOKEN_REFRESHES, new AtomicLong(0)); _pushCounters.get(PushCountersEnum.TOKEN_REFRESHES).incrementAndGet(); } @@ -275,32 +277,78 @@ public void recordStreamingEvents(StreamingEvent streamingEvent) { @Override public void recordSessionLength(long sessionLength) { - _sdkRecords.putIfAbsent(SdkRecordsEnum.SESSION, new AtomicLong(sessionLength)); + _sdkRecords.replace(SdkRecordsEnum.SESSION, new AtomicLong(sessionLength)); } - private void initMethodLatencies() { - _methodLatencies.put(MethodEnum.TREATMENT, new ArrayList<>()); - _methodLatencies.put(MethodEnum.TREATMENTS, new ArrayList<>()); - _methodLatencies.put(MethodEnum.TREATMENT_WITH_CONFIG, new ArrayList<>()); - _methodLatencies.put(MethodEnum.TREATMENTS_WITH_CONFIG, new ArrayList<>()); - _methodLatencies.put(MethodEnum.TRACK, new ArrayList<>()); + private void initMethodLatencies() throws Exception { + _methodLatencies.put(MethodEnum.TREATMENT, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _methodLatencies.put(MethodEnum.TREATMENTS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _methodLatencies.put(MethodEnum.TREATMENT_WITH_CONFIG, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _methodLatencies.put(MethodEnum.TREATMENTS_WITH_CONFIG, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _methodLatencies.put(MethodEnum.TRACK, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); } - private void initHttpLatencies() { - _httpLatencies.put(HTTPLatenciesEnum.SPLITS, new ArrayList<>()); - _httpLatencies.put(HTTPLatenciesEnum.SEGMENTS, new ArrayList<>()); - _httpLatencies.put(HTTPLatenciesEnum.IMPRESSIONS, new ArrayList<>()); - _httpLatencies.put(HTTPLatenciesEnum.EVENTS, new ArrayList<>()); - _httpLatencies.put(HTTPLatenciesEnum.TELEMETRY, new ArrayList<>()); - _httpLatencies.put(HTTPLatenciesEnum.TOKEN, new ArrayList<>()); + private void initHttpLatencies() throws Exception { + _httpLatencies.put(HTTPLatenciesEnum.SPLITS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.SEGMENTS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.IMPRESSIONS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.IMPRESSIONS_COUNT, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.EVENTS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.TELEMETRY, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); + _httpLatencies.put(HTTPLatenciesEnum.TOKEN, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT)); } private void initHttpErrors() { _httpErrors.put(ResourceEnum.SPLIT_SYNC, Maps.newConcurrentMap()); _httpErrors.put(ResourceEnum.SEGMENT_SYNC, Maps.newConcurrentMap()); _httpErrors.put(ResourceEnum.IMPRESSION_SYNC, Maps.newConcurrentMap()); + _httpErrors.put(ResourceEnum.IMPRESSION_COUNT_SYNC, Maps.newConcurrentMap()); _httpErrors.put(ResourceEnum.EVENT_SYNC, Maps.newConcurrentMap()); _httpErrors.put(ResourceEnum.TELEMETRY_SYNC, Maps.newConcurrentMap()); _httpErrors.put(ResourceEnum.TOKEN_SYNC, Maps.newConcurrentMap()); } + + + + private void initMethodCounters() { + _exceptionsCounters.put(MethodEnum.TREATMENT, new AtomicLong()); + _exceptionsCounters.put(MethodEnum.TREATMENTS, new AtomicLong()); + _exceptionsCounters.put(MethodEnum.TREATMENT_WITH_CONFIG, new AtomicLong()); + _exceptionsCounters.put(MethodEnum.TREATMENTS_WITH_CONFIG, new AtomicLong()); + _exceptionsCounters.put(MethodEnum.TRACK, new AtomicLong()); + } + + private void initFactoryCounters() { + _factoryCounters.put(FactoryCountersEnum.BUR_TIMEOUTS, new AtomicLong()); + _factoryCounters.put(FactoryCountersEnum.NON_READY_USAGES, new AtomicLong()); + } + + private void initImpressionDataCounters() { + _impressionsDataRecords.put(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, new AtomicLong()); + _impressionsDataRecords.put(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, new AtomicLong()); + _impressionsDataRecords.put(ImpressionsDataTypeEnum.IMPRESSIONS_QUEUED, new AtomicLong()); + } + + private void initPushCounters() { + _pushCounters.put(PushCountersEnum.AUTH_REJECTIONS, new AtomicLong()); + _pushCounters.put(PushCountersEnum.TOKEN_REFRESHES, new AtomicLong()); + } + + private void initSdkRecords() { + _sdkRecords.put(SdkRecordsEnum.SESSION, new AtomicLong()); + } + + private void initLastSynchronizationRecords() { + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.SPLITS, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.SEGMENTS, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.EVENTS, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.IMPRESSIONS, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.IMPRESSIONS_COUNT, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.TOKEN, new AtomicLong()); + } + + private void initEventDataRecords() { + _eventsDataRecords.put(EventsDataRecordsEnum.EVENTS_DROPPED, new AtomicLong()); + _eventsDataRecords.put(EventsDataRecordsEnum.EVENTS_QUEUED, new AtomicLong()); + } } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java index 913733d03..64348c154 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryConfigProducer.java @@ -1,7 +1,6 @@ package io.split.telemetry.storage; public interface TelemetryConfigProducer { - void recordConfigData(); void recordNonReadyUsage(); void recordBURTimeout(); } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java index 976fbb623..88ed7f9ca 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java @@ -4,6 +4,6 @@ import io.split.telemetry.domain.MethodLatencies; public interface TelemetryEvaluationConsumer { - MethodExceptions popExceptions(); + MethodExceptions popExceptions() throws Exception; MethodLatencies popLatencies(); } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java index 9df890be3..f1871951c 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java @@ -14,7 +14,7 @@ public interface TelemetryRuntimeConsumer { long getEventStats(EventsDataRecordsEnum type); LastSynchronization getLastSynchronization(); HTTPErrors popHTTPErrors(); - HTTPLatencies popHTTPLatencies(); + HTTPLatencies popHTTPLatencies() throws Exception; long popAuthRejections(); long popTokenRefreshes(); List popStreamingEvents(); diff --git a/client/src/main/java/io/split/telemetry/utils/BucketCalculator.java b/client/src/main/java/io/split/telemetry/utils/BucketCalculator.java new file mode 100644 index 000000000..b68956c97 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/utils/BucketCalculator.java @@ -0,0 +1,64 @@ +package io.split.telemetry.utils; + +import java.util.Arrays; + +/** + * Calculates buckets from latency + *

+ * (1) 1.00 + * (2) 1.50 + * (3) 2.25 + * (4) 3.38 + * (5) 5.06 + * (6) 7.59 + * (7) 11.39 + * (8) 17.09 + * (9) 25.63 + * (10) 38.44 + * (11) 57.67 + * (12) 86.50 + * (13) 129.75 + * (14) 194.62 + * (15) 291.93 + * (16) 437.89 + * (17) 656.84 + * (18) 985.26 + * (19) 1,477.89 + * (20) 2,216.84 + * (21) 3,325.26 + * (22) 4,987.89 + * (23) 7,481.83 + *

+ */ +public class BucketCalculator { + + static final long[] BUCKETS = { + 1000, 1500, 2250, 3375, 5063, + 7594, 11391, 17086, 25629, 38443, + 57665, 86498, 129746, 194620, 291929, + 437894, 656841, 985261, 1477892, 2216838, + 3325257, 4987885, 7481828 + }; + + static final long MAX_LATENCY = 7481828; + + public static int getBucketForLatencyMillis(long latency) { + long micros = latency * 1000; + if (micros > MAX_LATENCY) { + return BUCKETS.length - 1; + } + + int index = Arrays.binarySearch(BUCKETS, micros); + + if (index < 0) { + + // Adjust the index based on Java Array javadocs. <0 means the value wasn't found and it's module value + // is where it should be inserted (in this case, it means the counter it applies - unless it's equals to the + // length of the array). + + index = -(index + 1); + } + return index; + } + +} diff --git a/client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java b/client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java new file mode 100644 index 000000000..98ca18132 --- /dev/null +++ b/client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java @@ -0,0 +1,21 @@ +package io.split.telemetry.storage; + +import io.split.telemetry.domain.MethodExceptions; +import io.split.telemetry.domain.enums.MethodEnum; +import org.junit.Assert; +import org.junit.Test; + +public class InMemoryTelemetryStorageTest{ + + @Test + public void testInMemoryTelemetryStorage() throws Exception { + InMemoryTelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); + + telemetryStorage.recordException(MethodEnum.TREATMENT); + telemetryStorage.recordException(MethodEnum.TREATMENTS); + telemetryStorage.recordException(MethodEnum.TREATMENT); + + MethodExceptions methodExceptions = telemetryStorage.popExceptions(); + Assert.assertEquals(2, methodExceptions.get_treatment()); + } +} \ No newline at end of file From b52c519056156977004804c22cb2ac957f4f06d9 Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Sun, 11 Apr 2021 18:32:56 -0300 Subject: [PATCH 5/6] Adding BucketCalculator, tests and several minor changes --- .../telemetry/domain/AtomicLongArray.java | 37 ++++ .../storage/InMemoryTelemetryStorage.java | 46 ++-- .../storage/TelemetryEvaluationConsumer.java | 2 +- .../storage/TelemetryEvaluationProducer.java | 2 +- .../storage/TelemetryRuntimeConsumer.java | 1 - .../storage/TelemetryRuntimeProducer.java | 7 +- .../telemetry/utils/BucketCalculator.java | 4 +- .../storage/InMemoryTelemetryStorageTest.java | 206 +++++++++++++++++- .../telemetry/utils/BucketCalculatorTest.java | 22 ++ 9 files changed, 288 insertions(+), 39 deletions(-) create mode 100644 client/src/main/java/io/split/telemetry/domain/AtomicLongArray.java create mode 100644 client/src/test/java/io/split/telemetry/utils/BucketCalculatorTest.java diff --git a/client/src/main/java/io/split/telemetry/domain/AtomicLongArray.java b/client/src/main/java/io/split/telemetry/domain/AtomicLongArray.java new file mode 100644 index 000000000..cdd51f893 --- /dev/null +++ b/client/src/main/java/io/split/telemetry/domain/AtomicLongArray.java @@ -0,0 +1,37 @@ +package io.split.telemetry.domain; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +public class AtomicLongArray { + private AtomicLong[] array; + + public AtomicLongArray(int size) throws Exception { + if(size == 0) { + throw new Exception("Invalid array size"); + } + array = new AtomicLong[size]; + IntStream.range(0, array.length).forEach(x -> array[x] = new AtomicLong()); + } + + public void increment(int index) { + if (index < 0 || index >= array.length) { + throw new ArrayIndexOutOfBoundsException(); + } + array[index].getAndIncrement(); + } + + public List fetchAndClearAll() { + List listValues = new ArrayList<>(); + for (AtomicLong a: array) { + listValues.add(a.longValue()); + } + + IntStream.range(0, array.length).forEach(x -> array[x] = new AtomicLong()); + + return listValues; + } +} diff --git a/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java b/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java index 967790704..927e746e4 100644 --- a/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java +++ b/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; public class InMemoryTelemetryStorage implements TelemetryStorage{ public static final int MAX_LATENCY_BUCKET_COUNT = 23; @@ -54,14 +55,12 @@ public InMemoryTelemetryStorage() throws Exception { @Override public long getBURTimeouts() { - long burTimeouts = _factoryCounters.get(FactoryCountersEnum.BUR_TIMEOUTS).get(); - return burTimeouts; + return _factoryCounters.get(FactoryCountersEnum.BUR_TIMEOUTS).get(); } @Override public long getNonReadyUsages() { - long nonReadyUsages = _factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).get(); - return nonReadyUsages; + return _factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).get(); } @Override @@ -74,13 +73,13 @@ public MethodExceptions popExceptions() throws Exception { exceptions.set_track(_exceptionsCounters.get(MethodEnum.TRACK).get()); _exceptionsCounters.clear(); - initMethodLatencies(); + initMethodCounters(); return exceptions; } @Override - public MethodLatencies popLatencies() { + public MethodLatencies popLatencies() throws Exception { MethodLatencies latencies = new MethodLatencies(); latencies.set_treatment(_methodLatencies.get(MethodEnum.TREATMENT).fetchAndClearAll()); latencies.set_treatments(_methodLatencies.get(MethodEnum.TREATMENTS).fetchAndClearAll()); @@ -89,7 +88,7 @@ public MethodLatencies popLatencies() { latencies.set_track(_methodLatencies.get(MethodEnum.TRACK).fetchAndClearAll()); _methodLatencies.clear(); - initMethodCounters(); + initMethodLatencies(); return latencies; } @@ -97,7 +96,6 @@ public MethodLatencies popLatencies() { @Override public void recordNonReadyUsage() { _factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).incrementAndGet(); - } @Override @@ -106,8 +104,8 @@ public void recordBURTimeout() { } @Override - public void recordLatency(String method, int latency) { - int bucket = BucketCalculator.getBucketForLatencyMillis(latency); + public void recordLatency(MethodEnum method, long latency) { + int bucket = BucketCalculator.getBucketForLatency(latency); _methodLatencies.get(method).increment(bucket); } @@ -117,13 +115,13 @@ public void recordException(MethodEnum method) { } @Override - public long getImpressionsStats(ImpressionsDataTypeEnum data) { - return _impressionsDataRecords.get(data).get(); + public long getImpressionsStats(ImpressionsDataTypeEnum dataType) { + return _impressionsDataRecords.get(dataType).get(); } @Override - public long getEventStats(EventsDataRecordsEnum type) { - return _eventsDataRecords.get(type).get(); + public long getEventStats(EventsDataRecordsEnum dataType) { + return _eventsDataRecords.get(dataType).get(); } @Override @@ -195,7 +193,7 @@ public long popTokenRefreshes() { @Override public List popStreamingEvents() { synchronized (_streamingEventsLock) { - List streamingEvents = _streamingEvents; + List streamingEvents = _streamingEvents.stream().collect(Collectors.toList()); _streamingEvents.clear(); @@ -206,7 +204,7 @@ public List popStreamingEvents() { @Override public List popTags() { synchronized (_tagsLock) { - List tags = _tags; + List tags = _tags.stream().collect(Collectors.toList()); _tags.clear(); @@ -228,30 +226,29 @@ public void addTag(String tag) { @Override public void recordImpressionStats(ImpressionsDataTypeEnum dataType, long count) { - _impressionsDataRecords.get(dataType).incrementAndGet(); + _impressionsDataRecords.get(dataType).addAndGet(count); } @Override public void recordEventStats(EventsDataRecordsEnum dataType, long count) { - _eventsDataRecords.get(dataType).incrementAndGet(); + _eventsDataRecords.get(dataType).addAndGet(count); } @Override public void recordSuccessfulSync(LastSynchronizationRecordsEnum resource, long time) { _lastSynchronizationRecords.replace(resource, new AtomicLong(time)); - } @Override public void recordSyncError(ResourceEnum resource, int status) { ConcurrentMap errors = _httpErrors.get(resource); errors.putIfAbsent(Long.valueOf(status), 0l); - errors.replace(Long.valueOf(status), errors.get(status) + 1); + errors.replace(Long.valueOf(status), errors.get(Long.valueOf(status)) + 1); } @Override - public void recordSyncLatency(String resource, long latency) { - int bucket = BucketCalculator.getBucketForLatencyMillis(latency); + public void recordSyncLatency(HTTPLatenciesEnum resource, long latency) { + int bucket = BucketCalculator.getBucketForLatency(latency); _httpLatencies.get(resource).increment(bucket); } @@ -259,13 +256,11 @@ public void recordSyncLatency(String resource, long latency) { @Override public void recordAuthRejections() { _pushCounters.get(PushCountersEnum.AUTH_REJECTIONS).incrementAndGet(); - } @Override public void recordTokenRefreshes() { _pushCounters.get(PushCountersEnum.TOKEN_REFRESHES).incrementAndGet(); - } @Override @@ -308,8 +303,6 @@ private void initHttpErrors() { _httpErrors.put(ResourceEnum.TOKEN_SYNC, Maps.newConcurrentMap()); } - - private void initMethodCounters() { _exceptionsCounters.put(MethodEnum.TREATMENT, new AtomicLong()); _exceptionsCounters.put(MethodEnum.TREATMENTS, new AtomicLong()); @@ -345,6 +338,7 @@ private void initLastSynchronizationRecords() { _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.IMPRESSIONS, new AtomicLong()); _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.IMPRESSIONS_COUNT, new AtomicLong()); _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.TOKEN, new AtomicLong()); + _lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.TELEMETRY, new AtomicLong()); } private void initEventDataRecords() { diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java index 88ed7f9ca..ca63112ac 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java @@ -5,5 +5,5 @@ public interface TelemetryEvaluationConsumer { MethodExceptions popExceptions() throws Exception; - MethodLatencies popLatencies(); + MethodLatencies popLatencies() throws Exception; } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java index 51795b054..005106c30 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java @@ -3,6 +3,6 @@ import io.split.telemetry.domain.enums.MethodEnum; public interface TelemetryEvaluationProducer { - void recordLatency(String method, int latency); + void recordLatency(MethodEnum method, long latency); void recordException(MethodEnum method); } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java index f1871951c..7acd49afb 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java @@ -20,5 +20,4 @@ public interface TelemetryRuntimeConsumer { List popStreamingEvents(); List popTags(); long getSessionLength(); - } diff --git a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java index ab6fe175f..2baf016f0 100644 --- a/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java +++ b/client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java @@ -1,10 +1,7 @@ package io.split.telemetry.storage; import io.split.telemetry.domain.StreamingEvent; -import io.split.telemetry.domain.enums.EventsDataRecordsEnum; -import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum; -import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum; -import io.split.telemetry.domain.enums.ResourceEnum; +import io.split.telemetry.domain.enums.*; public interface TelemetryRuntimeProducer { void addTag(String tag); @@ -12,7 +9,7 @@ public interface TelemetryRuntimeProducer { void recordEventStats(EventsDataRecordsEnum dataType, long count); void recordSuccessfulSync(LastSynchronizationRecordsEnum resource, long time); void recordSyncError(ResourceEnum resource, int status); - void recordSyncLatency(String resource, long latency); + void recordSyncLatency(HTTPLatenciesEnum resource, long latency); void recordAuthRejections(); void recordTokenRefreshes(); void recordStreamingEvents(StreamingEvent streamingEvent); diff --git a/client/src/main/java/io/split/telemetry/utils/BucketCalculator.java b/client/src/main/java/io/split/telemetry/utils/BucketCalculator.java index b68956c97..7c5ce3ced 100644 --- a/client/src/main/java/io/split/telemetry/utils/BucketCalculator.java +++ b/client/src/main/java/io/split/telemetry/utils/BucketCalculator.java @@ -42,8 +42,8 @@ public class BucketCalculator { static final long MAX_LATENCY = 7481828; - public static int getBucketForLatencyMillis(long latency) { - long micros = latency * 1000; + public static int getBucketForLatency(long latency) { + long micros = latency / 1000; //Convert to milliseconds if (micros > MAX_LATENCY) { return BUCKETS.length - 1; } diff --git a/client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java b/client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java index 98ca18132..49c1ce167 100644 --- a/client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java +++ b/client/src/test/java/io/split/telemetry/storage/InMemoryTelemetryStorageTest.java @@ -1,21 +1,221 @@ package io.split.telemetry.storage; -import io.split.telemetry.domain.MethodExceptions; -import io.split.telemetry.domain.enums.MethodEnum; +import io.split.telemetry.domain.*; +import io.split.telemetry.domain.enums.*; import org.junit.Assert; import org.junit.Test; +import java.util.List; + public class InMemoryTelemetryStorageTest{ @Test public void testInMemoryTelemetryStorage() throws Exception { InMemoryTelemetryStorage telemetryStorage = new InMemoryTelemetryStorage(); + //MethodLatencies + telemetryStorage.recordLatency(MethodEnum.TREATMENT, 1500l * 1000); + telemetryStorage.recordLatency(MethodEnum.TREATMENT, 2000l * 1000); + telemetryStorage.recordLatency(MethodEnum.TREATMENTS, 3000l * 1000); + telemetryStorage.recordLatency(MethodEnum.TREATMENTS, 500l * 1000); + telemetryStorage.recordLatency(MethodEnum.TREATMENT_WITH_CONFIG, 800l * 1000); + telemetryStorage.recordLatency(MethodEnum.TREATMENTS_WITH_CONFIG, 1000l * 1000); + + MethodLatencies latencies = telemetryStorage.popLatencies(); + Assert.assertEquals(2, latencies.get_treatment().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(2, latencies.get_treatments().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(1, latencies.get_treatmentsWithConfig().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(1, latencies.get_treatmentWithConfig().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, latencies.get_track().stream().mapToInt(Long::intValue).sum()); + + //Check empty has worked + latencies = telemetryStorage.popLatencies(); + Assert.assertEquals(0, latencies.get_treatment().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, latencies.get_treatments().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, latencies.get_treatmentsWithConfig().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, latencies.get_treatmentWithConfig().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, latencies.get_track().stream().mapToInt(Long::intValue).sum()); + + //HttpLatencies + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.TELEMETRY, 1500l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.TELEMETRY, 2000l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.EVENTS, 1500l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.EVENTS, 2000l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SEGMENTS, 1500l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SPLITS, 2000l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SPLITS, 1500l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.SPLITS, 2000l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.IMPRESSIONS, 1500l * 1000); + telemetryStorage.recordSyncLatency(HTTPLatenciesEnum.IMPRESSIONS_COUNT, 2000l * 1000); + + HTTPLatencies httpLatencies = telemetryStorage.popHTTPLatencies(); + + Assert.assertEquals(3, httpLatencies.get_splits().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(2, httpLatencies.get_telemetry().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(2, httpLatencies.get_events().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(1, httpLatencies.get_segments().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(1, httpLatencies.get_impressions().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(1, httpLatencies.get_impressionsCount().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_token().stream().mapToInt(Long::intValue).sum()); + + httpLatencies = telemetryStorage.popHTTPLatencies(); + Assert.assertEquals(0, httpLatencies.get_splits().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_telemetry().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_events().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_segments().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_impressions().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_impressionsCount().stream().mapToInt(Long::intValue).sum()); + Assert.assertEquals(0, httpLatencies.get_token().stream().mapToInt(Long::intValue).sum()); + + + //Exceptions telemetryStorage.recordException(MethodEnum.TREATMENT); telemetryStorage.recordException(MethodEnum.TREATMENTS); telemetryStorage.recordException(MethodEnum.TREATMENT); + telemetryStorage.recordException(MethodEnum.TREATMENTS); + telemetryStorage.recordException(MethodEnum.TREATMENT_WITH_CONFIG); + telemetryStorage.recordException(MethodEnum.TREATMENTS_WITH_CONFIG); MethodExceptions methodExceptions = telemetryStorage.popExceptions(); Assert.assertEquals(2, methodExceptions.get_treatment()); + Assert.assertEquals(2, methodExceptions.get_treatments()); + Assert.assertEquals(1, methodExceptions.get_treatmentsWithConfig()); + Assert.assertEquals(1, methodExceptions.get_treatmentWithConfig()); + Assert.assertEquals(0, methodExceptions.get_track()); + + //Check empty has worked + methodExceptions = telemetryStorage.popExceptions(); + Assert.assertEquals(0, methodExceptions.get_treatment()); + Assert.assertEquals(0, methodExceptions.get_treatments()); + Assert.assertEquals(0, methodExceptions.get_treatmentsWithConfig()); + Assert.assertEquals(0, methodExceptions.get_treatmentWithConfig()); + Assert.assertEquals(0, methodExceptions.get_track()); + + //AuthRejections + telemetryStorage.recordAuthRejections(); + long authRejections = telemetryStorage.popAuthRejections(); + Assert.assertEquals(1, authRejections); + + //Check amount has been reseted + authRejections = telemetryStorage.popAuthRejections(); + Assert.assertEquals(0, authRejections); + + //AuthRejections + telemetryStorage.recordTokenRefreshes(); + telemetryStorage.recordTokenRefreshes(); + long tokenRefreshes = telemetryStorage.popTokenRefreshes(); + Assert.assertEquals(2, tokenRefreshes); + + //Check amount has been reseted + tokenRefreshes = telemetryStorage.popTokenRefreshes(); + Assert.assertEquals(0, tokenRefreshes); + + //Non Ready usages + telemetryStorage.recordNonReadyUsage(); + telemetryStorage.recordNonReadyUsage(); + long nonReadyUsages = telemetryStorage.getNonReadyUsages(); + Assert.assertEquals(2, nonReadyUsages); + + //BUR Timeouts + telemetryStorage.recordBURTimeout(); + long burTimeouts = telemetryStorage.getBURTimeouts(); + Assert.assertEquals(1, burTimeouts); + + //ImpressionStats + telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, 3); + telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED, 1); + telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 4); + telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 6); + telemetryStorage.recordImpressionStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED, 2); + + long impressionsDeduped = telemetryStorage.getImpressionsStats(ImpressionsDataTypeEnum.IMPRESSIONS_DEDUPED); + long impressionsDropped = telemetryStorage.getImpressionsStats(ImpressionsDataTypeEnum.IMPRESSIONS_DROPPED); + long impressionsQueued = telemetryStorage.getImpressionsStats(ImpressionsDataTypeEnum.IMPRESSIONS_QUEUED); + + Assert.assertEquals(4, impressionsDeduped); + Assert.assertEquals(12, impressionsDropped); + Assert.assertEquals(0, impressionsQueued); + + //Event Stats + telemetryStorage.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 3); + telemetryStorage.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 7); + telemetryStorage.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 3); + + long eventsDropped = telemetryStorage.getEventStats(EventsDataRecordsEnum.EVENTS_DROPPED); + long eventsQueued = telemetryStorage.getEventStats(EventsDataRecordsEnum.EVENTS_QUEUED); + + Assert.assertEquals(10, eventsDropped); + Assert.assertEquals(3, eventsQueued); + + //Successfuly sync + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.EVENTS, 1500); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.EVENTS, 800); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS, 2500); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS, 10500); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.IMPRESSIONS_COUNT, 1500); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, 1580); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.TELEMETRY, 265); + telemetryStorage.recordSuccessfulSync(LastSynchronizationRecordsEnum.TOKEN, 129); + + LastSynchronization lastSynchronization = telemetryStorage.getLastSynchronization(); + Assert.assertEquals(800, lastSynchronization.get_events()); + Assert.assertEquals(129, lastSynchronization.get_token()); + Assert.assertEquals(1580, lastSynchronization.get_segments()); + Assert.assertEquals(0, lastSynchronization.get_splits()); + Assert.assertEquals(10500, lastSynchronization.get_impressions()); + Assert.assertEquals(1500, lastSynchronization.get_impressionsCount()); + Assert.assertEquals(265, lastSynchronization.get_telemetry()); + + //Session length + telemetryStorage.recordSessionLength(91218); + long sessionLength = telemetryStorage.getSessionLength(); + Assert.assertEquals(91218, sessionLength); + + //Sync Error + telemetryStorage.recordSyncError(ResourceEnum.TELEMETRY_SYNC, 400); + telemetryStorage.recordSyncError(ResourceEnum.TELEMETRY_SYNC, 400); + telemetryStorage.recordSyncError(ResourceEnum.SEGMENT_SYNC, 501); + telemetryStorage.recordSyncError(ResourceEnum.IMPRESSION_SYNC, 403); + telemetryStorage.recordSyncError(ResourceEnum.IMPRESSION_SYNC, 403); + telemetryStorage.recordSyncError(ResourceEnum.EVENT_SYNC, 503); + telemetryStorage.recordSyncError(ResourceEnum.SPLIT_SYNC, 403); + telemetryStorage.recordSyncError(ResourceEnum.IMPRESSION_COUNT_SYNC, 403); + telemetryStorage.recordSyncError(ResourceEnum.TOKEN_SYNC, 403); + + HTTPErrors httpErrors = telemetryStorage.popHTTPErrors(); + Assert.assertEquals(2, httpErrors.get_telemetry().get(400l).intValue()); + Assert.assertEquals(1, httpErrors.get_segments().get(501l).intValue()); + Assert.assertEquals(2, httpErrors.get_impressions().get(403l).intValue()); + Assert.assertEquals(1, httpErrors.get_impressionsCount().get(403l).intValue()); + Assert.assertEquals(1, httpErrors.get_events().get(503l).intValue()); + Assert.assertEquals(1, httpErrors.get_splits().get(403l).intValue()); + Assert.assertEquals(1, httpErrors.get_token().get(403l).intValue()); + + //Streaming events + StreamingEvent streamingEvent = new StreamingEvent(); + streamingEvent.set_data(290); + streamingEvent.set_type(1); + streamingEvent.setTimestamp(91218); + telemetryStorage.recordStreamingEvents(streamingEvent); + + List streamingEvents = telemetryStorage.popStreamingEvents(); + Assert.assertEquals(290, streamingEvents.get(0).get_data()); + Assert.assertEquals(1, streamingEvents.get(0).get_type()); + Assert.assertEquals(91218, streamingEvents.get(0).getTimestamp()); + + //Check list has been cleared + streamingEvents = telemetryStorage.popStreamingEvents(); + Assert.assertEquals(0, streamingEvents.size()); + + //Tags + telemetryStorage.addTag("TAG_1"); + telemetryStorage.addTag("TAG_2"); + List tags = telemetryStorage.popTags(); + Assert.assertEquals(2, tags.size()); + + //Check tags have been cleared + tags = telemetryStorage.popTags(); + Assert.assertEquals(0, tags.size()); + } -} \ No newline at end of file +} diff --git a/client/src/test/java/io/split/telemetry/utils/BucketCalculatorTest.java b/client/src/test/java/io/split/telemetry/utils/BucketCalculatorTest.java new file mode 100644 index 000000000..04ba74487 --- /dev/null +++ b/client/src/test/java/io/split/telemetry/utils/BucketCalculatorTest.java @@ -0,0 +1,22 @@ +package io.split.telemetry.utils; + +import org.junit.Assert; +import org.junit.Test; + +public class BucketCalculatorTest{ + + @Test + public void testBucketCalculator() { + int bucket = BucketCalculator.getBucketForLatency(500l * 1000); + Assert.assertEquals(0, bucket); + + bucket = BucketCalculator.getBucketForLatency(1500l * 1000); + Assert.assertEquals(1, bucket); + + bucket = BucketCalculator.getBucketForLatency(8000l * 1000); + Assert.assertEquals(6, bucket); + + bucket = BucketCalculator.getBucketForLatency(7481829l * 1000); + Assert.assertEquals(22, bucket); + } +} From a963764760a6e68317f460f78c9b125a6c94d97f Mon Sep 17 00:00:00 2001 From: Lucas Echeverz Date: Mon, 12 Apr 2021 10:36:48 -0300 Subject: [PATCH 6/6] Changing name of init --- .../split/telemetry/storage/InMemoryTelemetryStorage.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java b/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java index 927e746e4..a115aa8d4 100644 --- a/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java +++ b/client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java @@ -44,7 +44,7 @@ public InMemoryTelemetryStorage() throws Exception { initMethodLatencies(); initHttpLatencies(); initHttpErrors(); - initMethodCounters(); + initMethodExceptions(); initFactoryCounters(); initImpressionDataCounters(); initPushCounters(); @@ -73,7 +73,7 @@ public MethodExceptions popExceptions() throws Exception { exceptions.set_track(_exceptionsCounters.get(MethodEnum.TRACK).get()); _exceptionsCounters.clear(); - initMethodCounters(); + initMethodExceptions(); return exceptions; } @@ -103,6 +103,7 @@ public void recordBURTimeout() { _factoryCounters.get(FactoryCountersEnum.BUR_TIMEOUTS).incrementAndGet(); } + @Override public void recordLatency(MethodEnum method, long latency) { int bucket = BucketCalculator.getBucketForLatency(latency); @@ -303,7 +304,7 @@ private void initHttpErrors() { _httpErrors.put(ResourceEnum.TOKEN_SYNC, Maps.newConcurrentMap()); } - private void initMethodCounters() { + private void initMethodExceptions() { _exceptionsCounters.put(MethodEnum.TREATMENT, new AtomicLong()); _exceptionsCounters.put(MethodEnum.TREATMENTS, new AtomicLong()); _exceptionsCounters.put(MethodEnum.TREATMENT_WITH_CONFIG, new AtomicLong());