Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public void stopPeriodicDataRecording() {
_telemetrySyncTask.stopScheduledTask();
_log.info("Successful shutdown of telemetry sync task");
}
}

@Override
public void forceRefreshSegment(String segmentName) {
//No-Op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ public void startPeriodicDataRecording() {
public void stopPeriodicDataRecording() {
//No-Op
}

@Override
public void forceRefreshSegment(String segmentName) {
//No-Op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static PushManagerImp build(Synchronizer synchronizer,
ThreadFactory threadFactory,
SplitParser splitParser,
SplitCacheProducer splitCacheProducer) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer);
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer, telemetryRuntimeProducer);
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);
return new PushManagerImp(new AuthApiClientImp(authUrl, splitAPI.getHttpClient(), telemetryRuntimeProducer),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ public interface Synchronizer {
void refreshSegment(String segmentName, Long targetChangeNumber);
void startPeriodicDataRecording();
void stopPeriodicDataRecording();
void forceRefreshSegment(String segmentName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ public void stopPeriodicDataRecording() {
_log.info("Successful shutdown of telemetry sync task");
}

private void forceRefreshSegment(String segmentName){
@Override
public void forceRefreshSegment(String segmentName){
SegmentFetcher segmentFetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
segmentFetcher.fetch(new FetchOptions.Builder().build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
import io.split.engine.sse.dtos.SplitKillNotification;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.enums.UpdatesFromSSEEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Set;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.split.client.utils.FeatureFlagProcessor.processFeatureFlagChanges;
Expand All @@ -20,12 +23,14 @@ public class FeatureFlagWorkerImp extends Worker<FeatureFlagChangeNotification>
private final Synchronizer _synchronizer;
private final SplitParser _splitParser;
private final SplitCacheProducer _splitCacheProducer;
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;

public FeatureFlagWorkerImp(Synchronizer synchronizer, SplitParser splitParser, SplitCacheProducer splitCacheProducer) {
public FeatureFlagWorkerImp(Synchronizer synchronizer, SplitParser splitParser, SplitCacheProducer splitCacheProducer, TelemetryRuntimeProducer telemetryRuntimeProducer) {
super("Feature flags");
_synchronizer = checkNotNull(synchronizer);
_splitParser = splitParser;
_splitCacheProducer = splitCacheProducer;
_telemetryRuntimeProducer = telemetryRuntimeProducer;
}

@Override
Expand Down Expand Up @@ -57,6 +62,11 @@ private boolean addOrUpdateFeatureFlag(FeatureFlagChangeNotification featureFlag
Split featureFlag = featureFlagChangeNotification.getFeatureFlagDefinition();
FeatureFlagsToUpdate featureFlagsToUpdate = processFeatureFlagChanges(_splitParser, Collections.singletonList(featureFlag));
_splitCacheProducer.update(featureFlagsToUpdate.getToAdd(), featureFlagsToUpdate.getToRemove(), featureFlagChangeNotification.getChangeNumber());
Set<String> segments = featureFlagsToUpdate.getSegments();
for (String segmentName: segments) {
_synchronizer.forceRefreshSegment(segmentName);
}
_telemetryRuntimeProducer.recordUpdatesFromSSE(UpdatesFromSSEEnum.SPLITS);
return true;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
import io.split.telemetry.domain.enums.MethodEnum;
import io.split.telemetry.domain.enums.ResourceEnum;
import io.split.telemetry.domain.enums.UpdatesFromSSEEnum;
import io.split.telemetry.storage.TelemetryStorageProducer;
import io.split.telemetry.utils.BucketCalculator;
import pluggable.CustomStorageWrapper;
Expand Down Expand Up @@ -99,4 +100,9 @@ public void recordStreamingEvents(StreamingEvent streamingEvent) {
public void recordSessionLength(long sessionLength) {
//No-op
}

@Override
public void recordUpdatesFromSSE(UpdatesFromSSEEnum updatesFromSSEEnum) {
//No-op
}
}
11 changes: 11 additions & 0 deletions client/src/main/java/io/split/telemetry/domain/Stats.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class Stats {
/* package private */ static final String FIELD_EVENTS_DROPPED = "eD";
/* package private */ static final String FIELD_STREAMING_EVENT = "sE";
/* package private */ static final String FIELD_TAGS = "t";
/* package private */ static final String FIELD_UPDATES_FROM_SSE = "ufs";

@SerializedName(FIELD_LAST_SYNCHRONIZATION)
private LastSynchronization _lastSynchronization;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class Stats {
private List<StreamingEvent> _streamingEvents;
@SerializedName(FIELD_TAGS)
private List<String> _tags;
@SerializedName(FIELD_UPDATES_FROM_SSE)
private UpdatesFromSSE _updatesFromSSE;

public LastSynchronization get_lastSynchronization() {
return _lastSynchronization;
Expand Down Expand Up @@ -204,4 +207,12 @@ public List<String> get_tags() {
public void set_tags(List<String> _tags) {
this._tags = _tags;
}

public UpdatesFromSSE get_updatesFromSSE() {
return _updatesFromSSE;
}

public void set_updatesFromSSE(UpdatesFromSSE _updatesFromSSE) {
this._updatesFromSSE = _updatesFromSSE;
}
}
19 changes: 19 additions & 0 deletions client/src/main/java/io/split/telemetry/domain/UpdatesFromSSE.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.split.telemetry.domain;

import com.google.gson.annotations.SerializedName;

public class UpdatesFromSSE {

/* package private */ static final String FIELD_FEATURE_FLAGS = "sp";

@SerializedName(FIELD_FEATURE_FLAGS)
private long splits;

public long getSplits() {
return splits;
}

public void setSplits(long splits) {
this.splits = splits;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.split.telemetry.domain.enums;

public enum UpdatesFromSSEEnum {
SPLITS
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
package io.split.telemetry.storage;

import com.google.common.collect.Maps;
import io.split.telemetry.domain.*;
import io.split.telemetry.domain.enums.*;

import io.split.telemetry.domain.HTTPErrors;
import io.split.telemetry.domain.HTTPLatencies;
import io.split.telemetry.domain.LastSynchronization;
import io.split.telemetry.domain.MethodExceptions;
import io.split.telemetry.domain.MethodLatencies;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.UpdatesFromSSE;
import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
import io.split.telemetry.domain.enums.FactoryCountersEnum;
import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
import io.split.telemetry.domain.enums.MethodEnum;
import io.split.telemetry.domain.enums.PushCountersEnum;
import io.split.telemetry.domain.enums.ResourceEnum;
import io.split.telemetry.domain.enums.SdkRecordsEnum;
import io.split.telemetry.domain.enums.UpdatesFromSSEEnum;
import io.split.telemetry.utils.AtomicLongArray;
import io.split.telemetry.utils.BucketCalculator;

Expand All @@ -13,7 +29,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

public class InMemoryTelemetryStorage implements TelemetryStorage{
public class InMemoryTelemetryStorage implements TelemetryStorage{
public static final int MAX_LATENCY_BUCKET_COUNT = 23;
public static final int MAX_STREAMING_EVENTS = 20;
public static final int MAX_TAGS = 10;
Expand All @@ -32,6 +48,7 @@ public class InMemoryTelemetryStorage implements TelemetryStorage{
private final ConcurrentMap<EventsDataRecordsEnum, AtomicLong> _eventsDataRecords = Maps.newConcurrentMap();
private final ConcurrentMap<LastSynchronizationRecordsEnum, AtomicLong> _lastSynchronizationRecords = Maps.newConcurrentMap();
private final ConcurrentMap<SdkRecordsEnum, AtomicLong> _sdkRecords = Maps.newConcurrentMap();
private final ConcurrentMap<UpdatesFromSSEEnum, AtomicLong> _updatesFromSSERecords = Maps.newConcurrentMap();

//HTTPErrors
private final ConcurrentMap<ResourceEnum, ConcurrentMap<Long, Long>> _httpErrors = Maps.newConcurrentMap();
Expand All @@ -55,6 +72,7 @@ public InMemoryTelemetryStorage() {
initSdkRecords();
initLastSynchronizationRecords();
initEventDataRecords();
initUpdatesFromSEE();
}

@Override
Expand Down Expand Up @@ -209,6 +227,13 @@ public long getSessionLength() {
return _sdkRecords.get(SdkRecordsEnum.SESSION).get();
}

@Override
public UpdatesFromSSE popUpdatesFromSSE() {
UpdatesFromSSE updatesFromSSE = new UpdatesFromSSE();
updatesFromSSE.setSplits(_updatesFromSSERecords.get(UpdatesFromSSEEnum.SPLITS).getAndSet(0L));
return updatesFromSSE;
}

@Override
public void addTag(String tag) {
synchronized (_tagsLock) {
Expand Down Expand Up @@ -271,6 +296,11 @@ public void recordSessionLength(long sessionLength) {
_sdkRecords.replace(SdkRecordsEnum.SESSION, new AtomicLong(sessionLength));
}

@Override
public void recordUpdatesFromSSE(UpdatesFromSSEEnum updatesFromSSEEnum) {
_updatesFromSSERecords.get(UpdatesFromSSEEnum.SPLITS).incrementAndGet();
}

private void initMethodLatencies() {
_methodLatencies.put(MethodEnum.TREATMENT, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT));
_methodLatencies.put(MethodEnum.TREATMENTS, new AtomicLongArray(MAX_LATENCY_BUCKET_COUNT));
Expand Down Expand Up @@ -341,4 +371,8 @@ private void initEventDataRecords() {
_eventsDataRecords.put(EventsDataRecordsEnum.EVENTS_DROPPED, new AtomicLong());
_eventsDataRecords.put(EventsDataRecordsEnum.EVENTS_QUEUED, new AtomicLong());
}
}

private void initUpdatesFromSEE() {
_updatesFromSSERecords.put(UpdatesFromSSEEnum.SPLITS, new AtomicLong());
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
package io.split.telemetry.storage;

import io.split.telemetry.domain.*;
import io.split.telemetry.domain.enums.*;
import io.split.telemetry.domain.HTTPErrors;
import io.split.telemetry.domain.HTTPLatencies;
import io.split.telemetry.domain.LastSynchronization;
import io.split.telemetry.domain.MethodExceptions;
import io.split.telemetry.domain.MethodLatencies;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.UpdatesFromSSE;
import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
import io.split.telemetry.domain.enums.MethodEnum;
import io.split.telemetry.domain.enums.ResourceEnum;
import io.split.telemetry.domain.enums.UpdatesFromSSEEnum;

import java.util.List;

Expand Down Expand Up @@ -77,6 +89,11 @@ public void recordSessionLength(long sessionLength) {

}

@Override
public void recordUpdatesFromSSE(UpdatesFromSSEEnum updatesFromSSEEnum) {

}

@Override
public long getBURTimeouts() {
return 0;
Expand Down Expand Up @@ -146,4 +163,9 @@ public List<String> popTags() {
public long getSessionLength() {
return 0;
}

@Override
public UpdatesFromSSE popUpdatesFromSSE() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.split.telemetry.domain.HTTPLatencies;
import io.split.telemetry.domain.LastSynchronization;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.UpdatesFromSSE;
import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;

Expand All @@ -20,4 +21,5 @@ public interface TelemetryRuntimeConsumer {
List<StreamingEvent> popStreamingEvents();
List<String> popTags();
long getSessionLength();
UpdatesFromSSE popUpdatesFromSSE();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.split.telemetry.storage;

import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.UpdatesFromSSE;
import io.split.telemetry.domain.enums.*;

public interface TelemetryRuntimeProducer {
Expand All @@ -14,4 +15,5 @@ public interface TelemetryRuntimeProducer {
void recordTokenRefreshes();
void recordStreamingEvents(StreamingEvent streamingEvent);
void recordSessionLength(long sessionLength);
}
void recordUpdatesFromSSE(UpdatesFromSSEEnum updatesFromSSEEnum);
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Stats generateStats() throws Exception {
stats.set_eventsDropped(_teleTelemetryStorageConsumer.getEventStats(EventsDataRecordsEnum.EVENTS_DROPPED));
stats.set_streamingEvents(_teleTelemetryStorageConsumer.popStreamingEvents());
stats.set_tags(_teleTelemetryStorageConsumer.popTags());
stats.set_updatesFromSSE(_teleTelemetryStorageConsumer.popUpdatesFromSSE());
return stats;
}

Expand Down
Loading