Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/prefab-cloud
Submodule prefab-cloud updated 1 files
+4 −1 prefab.proto
18 changes: 18 additions & 0 deletions client/src/main/java/cloud/prefab/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public enum OnInitializationFailure {
private Set<ConfigChangeListener> changeListenerSet = new HashSet<>();

private boolean contextShapeUploadEnabled = true;
private boolean evaluatedConfigKeyUploadEnabled = true;

public Options() {
this.apikey = System.getenv("PREFAB_API_KEY");
Expand Down Expand Up @@ -191,6 +192,23 @@ public boolean isContextShapeUploadEnabled() {
return contextShapeUploadEnabled;
}

/**
* Configure client to report the keys of evaluated configurations
* The data allows prefab to show which configs are used vs unused
* Defaults to true
* @param enabled
* @return
*/

public Options setEvaluatedConfigKeyUploadEnabled(boolean enabled) {
this.evaluatedConfigKeyUploadEnabled = enabled;
return this;
}

public boolean isEvaluatedConfigKeyUploadEnabled() {
return evaluatedConfigKeyUploadEnabled;
}

/**
* Configure client to report context shape data
* The captured data consists of names and types of context data, NOT the actual values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public class ConfigClientImpl implements ConfigClient {

private final ContextStore contextStore;

private ContextShapeAggregator contextShapeAggregator = null;

private EvaluatedKeysAggregator evaluatedKeysAggregator = null;

public ConfigClientImpl(
PrefabCloudClient baseClient,
ConfigChangeListener... listeners
Expand Down Expand Up @@ -147,7 +151,14 @@ public ConfigClientImpl(
startStreaming();
startCheckpointExecutor();
if (options.isContextShapeUploadEnabled()) {
new ContextShapeAggregator(options, prefabHttpClient, Clock.systemUTC()).start();
contextShapeAggregator =
new ContextShapeAggregator(options, prefabHttpClient, Clock.systemUTC());
contextShapeAggregator.start();
}
if (options.isEvaluatedConfigKeyUploadEnabled()) {
evaluatedKeysAggregator =
new EvaluatedKeysAggregator(options, prefabHttpClient, Clock.systemUTC());
evaluatedKeysAggregator.start();
}
}
}
Expand Down Expand Up @@ -197,7 +208,14 @@ public Optional<Prefab.ConfigValue> get(
return getInternal(configKey, lookupContext);
}

private void reportUsage(String configKey, PrefabContextSetReadable prefabContext) {}
private void reportUsage(String configKey, PrefabContextSetReadable prefabContext) {
if (contextShapeAggregator != null) {
contextShapeAggregator.reportContextUsage(prefabContext);
}
if (evaluatedKeysAggregator != null) {
evaluatedKeysAggregator.reportKeyUsage(configKey);
}
}

@Override
public Map<String, Prefab.ConfigValue> getAll(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package cloud.prefab.client.internal;

import cloud.prefab.client.Options;
import cloud.prefab.domain.Prefab;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Clock;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EvaluatedKeysAggregator {

private static final Logger LOG = LoggerFactory.getLogger(
EvaluatedKeysAggregator.class
);

static final long MILLIS_BETWEEN_UPLOADS = TimeUnit.MINUTES.toMillis(20);
static final long MILLIS_BETWEEN_UPLOADS_WITH_NEW_DATA = TimeUnit.MINUTES.toMillis(5);

private final PrefabHttpClient prefabHttpClient;
private final Clock clock;
private final Set<String> evaluatedKeys;

private final AtomicBoolean dirtyFlag = new AtomicBoolean(true);
private final Optional<String> namespace;

private long lastUploadTime = 0;

EvaluatedKeysAggregator(
Options options,
PrefabHttpClient prefabHttpClient,
Clock clock
) {
this.prefabHttpClient = prefabHttpClient;
this.clock = clock;
// from https://www.baeldung.com/java-concurrent-hashset-concurrenthashmap
this.evaluatedKeys = ConcurrentHashMap.newKeySet();
this.namespace = options.getNamespace();
}

void start() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
1,
r -> new Thread(r, "prefab-context-shapes-aggregator")
);
ScheduledExecutorService executorService = MoreExecutors.getExitingScheduledExecutorService(
executor,
100,
TimeUnit.MILLISECONDS
);
executorService.scheduleWithFixedDelay(
() -> {
try {
doUpload();
} catch (Exception e) {
LOG.debug("error uploading context shapes", e);
}
},
1,
1,
TimeUnit.MINUTES
);
}

@VisibleForTesting
void doUpload() {
if (shouldUpload()) {
Prefab.EvaluatedKeys.Builder builder = Prefab.EvaluatedKeys
.newBuilder()
.addAllKeys(evaluatedKeys);
namespace.ifPresent(builder::setNamespace);
prefabHttpClient.reportEvaluatedKeys(builder.build());
lastUploadTime = clock.millis();
dirtyFlag.set(false);
}
}

private boolean shouldUpload() {
if (lastUploadTime == 0) {
return true;
}
long millisSinceLastUpload = clock.millis() - lastUploadTime;
if (millisSinceLastUpload >= MILLIS_BETWEEN_UPLOADS) {
return true;
}
if (
millisSinceLastUpload >= MILLIS_BETWEEN_UPLOADS_WITH_NEW_DATA && dirtyFlag.get()
) {
return true;
}
LOG.debug(
"Skipping upload, minutes since upload is {} and dirtyFlag is {}",
TimeUnit.MILLISECONDS.toMinutes(millisSinceLastUpload),
dirtyFlag.get()
);
return false;
}

void reportKeyUsage(String key) {
if (evaluatedKeys.add(key)) {
boolean dirtyFlagRaised = dirtyFlag.compareAndSet(false, true);
if (dirtyFlagRaised && LOG.isTraceEnabled()) {
LOG.trace("dirty flag raised by key {}", key);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ void reportContextShape(Prefab.ContextShapes contextShapes) {
.uri(URI.create(options.getPrefabApiUrl() + "/api/v1/context-shapes"))
.POST(HttpRequest.BodyPublishers.ofByteArray(contextShapes.toByteArray()))
.build();
LOG.info(
"posting to {}",
URI.create(options.getPrefabApiUrl() + "/api/v1/context-shapes")
);
LOG.debug("posting context shape to {}", request.uri());
try {
HttpResponse<String> response = httpClient.send(
request,
Expand All @@ -96,7 +93,36 @@ void reportContextShape(Prefab.ContextShapes contextShapes) {
} catch (IOException e) {
LOG.warn("Error uploading context shapes via http {}", e.getMessage());
} catch (InterruptedException e) {
LOG.warn("Interrupted while uploading context shapes via http");
LOG.warn("Interrupted while uploading context shapes");
Thread.currentThread().interrupt();
}
}

void reportEvaluatedKeys(Prefab.EvaluatedKeys evaluatedKeys) {
HttpRequest request = getClientBuilderWithStandardHeaders()
.header("Content-Type", PROTO_MEDIA_TYPE)
.header("Accept", PROTO_MEDIA_TYPE)
.uri(URI.create(options.getPrefabApiUrl() + "/api/v1/evaluated-keys"))
.POST(HttpRequest.BodyPublishers.ofByteArray(evaluatedKeys.toByteArray()))
.build();
LOG.debug("posting evaluated keys to {}", request.uri());
try {
HttpResponse<String> response = httpClient.send(
request,
HttpResponse.BodyHandlers.ofString()
);

if (!isSuccess(response.statusCode())) {
LOG.info(
"Uploading evaluated keys returned unsuccessful code {} with body {}",
response.statusCode(),
response.body()
);
}
} catch (IOException e) {
LOG.warn("Error uploading evaluated keys via http {}", e.getMessage());
} catch (InterruptedException e) {
LOG.warn("Interrupted while uploading evaluated keys");
Thread.currentThread().interrupt();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private void restart(int errorCount) {
sseHandler.subscribe(flowSubscriber);
prefabHttpClient.requestConfigSSE(highwaterMarkSupplier.get(), sseHandler);
} catch (Exception e) {
LOG.warn("Unexpected exception starting SSE stream", e);
LOG.warn("Unexpected exception starting SSE config stream, will retry", e);
}
};

Expand All @@ -62,7 +62,7 @@ private void restart(int errorCount) {
TimeUnit.SECONDS.toMillis(1),
TimeUnit.SECONDS.toMillis(30)
);
LOG.info("Restarting connection in {} ms", delayMillis);
LOG.info("Restarting SSE config connection in {} ms", delayMillis);
scheduledExecutorService.schedule(starter, delayMillis, TimeUnit.MILLISECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package cloud.prefab.client.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import cloud.prefab.client.Options;
import cloud.prefab.domain.Prefab;
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class EvaluatedKeysAggregatorTest {

@Mock
PrefabHttpClient prefabHttpClient;

@Mock
Clock clock;

@Captor
ArgumentCaptor<Prefab.EvaluatedKeys> evaluatedKeysArgumentCaptor;

private EvaluatedKeysAggregator aggregator;

@BeforeEach
void beforeEach() {
aggregator =
new EvaluatedKeysAggregator(
new Options().setNamespace("the-namespace"),
prefabHttpClient,
clock
);
}

// make sure the data comes out as expected
@Test
void sendsCorrectData() {
aggregator.reportKeyUsage("foo.bar");
aggregator.reportKeyUsage("foo.bar");
aggregator.reportKeyUsage("bar.foo");

aggregator.doUpload();
verify(prefabHttpClient).reportEvaluatedKeys(evaluatedKeysArgumentCaptor.capture());

Prefab.EvaluatedKeys reportedKeys = evaluatedKeysArgumentCaptor.getValue();

assertThat(reportedKeys.getNamespace()).isEqualTo("the-namespace");
assertThat(reportedKeys.getKeysList())
.containsExactlyInAnyOrder("foo.bar", "bar.foo");
}

@Test
void sendsDataIfNeverSentBefore() {
when(clock.millis()).thenReturn(1L);
aggregator.reportKeyUsage("foo.bar");

aggregator.doUpload();
verify(prefabHttpClient).reportEvaluatedKeys(any());

// won't send again now that last time sent is non zero
aggregator.doUpload();
verifyNoMoreInteractions(prefabHttpClient);
}

@Nested
class UnchangedDataSend {

@BeforeEach
void beforeEach() {
// this will set the last sent timestamp
when(clock.millis()).thenReturn(1L);
aggregator.reportKeyUsage("foo.bar");
aggregator.doUpload();
verify(prefabHttpClient).reportEvaluatedKeys(any());
}

@ParameterizedTest
@ValueSource(ints = { 1, 5, 10, 15, 19 })
void itDoesNotSendBefore20Min(int minutesLater) {
when(clock.millis()).thenReturn(1 + TimeUnit.MINUTES.toMillis(minutesLater));
aggregator.doUpload();
verifyNoMoreInteractions(prefabHttpClient);
}

@ParameterizedTest
@ValueSource(ints = { 20, 21, 100 })
void itSendsAtAndAfter20Min(int minutesLater) {
when(clock.millis()).thenReturn(1 + TimeUnit.MINUTES.toMillis(minutesLater));
aggregator.doUpload();
verify(prefabHttpClient, times(2)).reportEvaluatedKeys(any()); // the two counts the call in beforeEach
}
}

@Nested
class ChangedDataSend {

@BeforeEach
void beforeEach() {
// this will set the last sent timestamp
when(clock.millis()).thenReturn(1L);
aggregator.reportKeyUsage("foo.bar");
aggregator.doUpload();
verify(prefabHttpClient).reportEvaluatedKeys(any());
// set the dirty flag
aggregator.reportKeyUsage("bar.foo");
}

@ParameterizedTest
@ValueSource(ints = { 1, 2, 3, 4 })
void itDoesNotSendBefore20Min(int minutesLater) {
when(clock.millis()).thenReturn(1 + TimeUnit.MINUTES.toMillis(minutesLater));
aggregator.doUpload();
verifyNoMoreInteractions(prefabHttpClient);
}

@ParameterizedTest
@ValueSource(ints = { 5, 6, 10 })
void itSendsAtAndAfter20Min(int minutesLater) {
when(clock.millis()).thenReturn(1 + TimeUnit.MINUTES.toMillis(minutesLater));
aggregator.doUpload();
verify(prefabHttpClient, times(2)).reportEvaluatedKeys(any()); // the two counts the call in beforeEach
}
}
}