From 3ad5f46feaee26dd3ca8e9001401f4e999365c3c Mon Sep 17 00:00:00 2001 From: James Kebinger Date: Thu, 1 Jun 2023 17:01:33 -0500 Subject: [PATCH] Adds capture and periodic upload of evaluated configuration keys --- client/prefab-cloud | 2 +- .../java/cloud/prefab/client/Options.java | 18 +++ .../client/internal/ConfigClientImpl.java | 22 ++- .../internal/EvaluatedKeysAggregator.java | 114 +++++++++++++++ .../client/internal/PrefabHttpClient.java | 36 ++++- .../SseConfigStreamingSubscriber.java | 4 +- .../internal/EvaluatedKeysAggregatorTest.java | 138 ++++++++++++++++++ 7 files changed, 324 insertions(+), 10 deletions(-) create mode 100644 client/src/main/java/cloud/prefab/client/internal/EvaluatedKeysAggregator.java create mode 100644 client/src/test/java/cloud/prefab/client/internal/EvaluatedKeysAggregatorTest.java diff --git a/client/prefab-cloud b/client/prefab-cloud index 146335b0..c1400647 160000 --- a/client/prefab-cloud +++ b/client/prefab-cloud @@ -1 +1 @@ -Subproject commit 146335b08d95fc192af1858b7be56d7cf33efe73 +Subproject commit c140064751bb1f237415ba7e3ea179693391c925 diff --git a/client/src/main/java/cloud/prefab/client/Options.java b/client/src/main/java/cloud/prefab/client/Options.java index fc40a7da..94409379 100644 --- a/client/src/main/java/cloud/prefab/client/Options.java +++ b/client/src/main/java/cloud/prefab/client/Options.java @@ -42,6 +42,7 @@ public enum OnInitializationFailure { private Set changeListenerSet = new HashSet<>(); private boolean contextShapeUploadEnabled = true; + private boolean evaluatedConfigKeyUploadEnabled = true; public Options() { this.apikey = System.getenv("PREFAB_API_KEY"); @@ -191,6 +192,23 @@ public boolean isContextShapeUploadEnabled() { return contextShapeUploadEnabled; } + /** + * Configure client to report the keys of evaluated configurations + * The data allows prefab to show which configs are used vs unused + * Defaults to true + * @param enabled + * @return + */ + + public Options setEvaluatedConfigKeyUploadEnabled(boolean enabled) { + this.evaluatedConfigKeyUploadEnabled = enabled; + return this; + } + + public boolean isEvaluatedConfigKeyUploadEnabled() { + return evaluatedConfigKeyUploadEnabled; + } + /** * Configure client to report context shape data * The captured data consists of names and types of context data, NOT the actual values diff --git a/client/src/main/java/cloud/prefab/client/internal/ConfigClientImpl.java b/client/src/main/java/cloud/prefab/client/internal/ConfigClientImpl.java index 2536f16b..a271a6ab 100644 --- a/client/src/main/java/cloud/prefab/client/internal/ConfigClientImpl.java +++ b/client/src/main/java/cloud/prefab/client/internal/ConfigClientImpl.java @@ -80,6 +80,10 @@ public class ConfigClientImpl implements ConfigClient { private final ContextStore contextStore; + private ContextShapeAggregator contextShapeAggregator = null; + + private EvaluatedKeysAggregator evaluatedKeysAggregator = null; + public ConfigClientImpl( PrefabCloudClient baseClient, ConfigChangeListener... listeners @@ -147,7 +151,14 @@ public ConfigClientImpl( startStreaming(); startCheckpointExecutor(); if (options.isContextShapeUploadEnabled()) { - new ContextShapeAggregator(options, prefabHttpClient, Clock.systemUTC()).start(); + contextShapeAggregator = + new ContextShapeAggregator(options, prefabHttpClient, Clock.systemUTC()); + contextShapeAggregator.start(); + } + if (options.isEvaluatedConfigKeyUploadEnabled()) { + evaluatedKeysAggregator = + new EvaluatedKeysAggregator(options, prefabHttpClient, Clock.systemUTC()); + evaluatedKeysAggregator.start(); } } } @@ -197,7 +208,14 @@ public Optional get( return getInternal(configKey, lookupContext); } - private void reportUsage(String configKey, PrefabContextSetReadable prefabContext) {} + private void reportUsage(String configKey, PrefabContextSetReadable prefabContext) { + if (contextShapeAggregator != null) { + contextShapeAggregator.reportContextUsage(prefabContext); + } + if (evaluatedKeysAggregator != null) { + evaluatedKeysAggregator.reportKeyUsage(configKey); + } + } @Override public Map getAll( diff --git a/client/src/main/java/cloud/prefab/client/internal/EvaluatedKeysAggregator.java b/client/src/main/java/cloud/prefab/client/internal/EvaluatedKeysAggregator.java new file mode 100644 index 00000000..71b34680 --- /dev/null +++ b/client/src/main/java/cloud/prefab/client/internal/EvaluatedKeysAggregator.java @@ -0,0 +1,114 @@ +package cloud.prefab.client.internal; + +import cloud.prefab.client.Options; +import cloud.prefab.domain.Prefab; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; +import java.time.Clock; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EvaluatedKeysAggregator { + + private static final Logger LOG = LoggerFactory.getLogger( + EvaluatedKeysAggregator.class + ); + + static final long MILLIS_BETWEEN_UPLOADS = TimeUnit.MINUTES.toMillis(20); + static final long MILLIS_BETWEEN_UPLOADS_WITH_NEW_DATA = TimeUnit.MINUTES.toMillis(5); + + private final PrefabHttpClient prefabHttpClient; + private final Clock clock; + private final Set evaluatedKeys; + + private final AtomicBoolean dirtyFlag = new AtomicBoolean(true); + private final Optional namespace; + + private long lastUploadTime = 0; + + EvaluatedKeysAggregator( + Options options, + PrefabHttpClient prefabHttpClient, + Clock clock + ) { + this.prefabHttpClient = prefabHttpClient; + this.clock = clock; + // from https://www.baeldung.com/java-concurrent-hashset-concurrenthashmap + this.evaluatedKeys = ConcurrentHashMap.newKeySet(); + this.namespace = options.getNamespace(); + } + + void start() { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor( + 1, + r -> new Thread(r, "prefab-context-shapes-aggregator") + ); + ScheduledExecutorService executorService = MoreExecutors.getExitingScheduledExecutorService( + executor, + 100, + TimeUnit.MILLISECONDS + ); + executorService.scheduleWithFixedDelay( + () -> { + try { + doUpload(); + } catch (Exception e) { + LOG.debug("error uploading context shapes", e); + } + }, + 1, + 1, + TimeUnit.MINUTES + ); + } + + @VisibleForTesting + void doUpload() { + if (shouldUpload()) { + Prefab.EvaluatedKeys.Builder builder = Prefab.EvaluatedKeys + .newBuilder() + .addAllKeys(evaluatedKeys); + namespace.ifPresent(builder::setNamespace); + prefabHttpClient.reportEvaluatedKeys(builder.build()); + lastUploadTime = clock.millis(); + dirtyFlag.set(false); + } + } + + private boolean shouldUpload() { + if (lastUploadTime == 0) { + return true; + } + long millisSinceLastUpload = clock.millis() - lastUploadTime; + if (millisSinceLastUpload >= MILLIS_BETWEEN_UPLOADS) { + return true; + } + if ( + millisSinceLastUpload >= MILLIS_BETWEEN_UPLOADS_WITH_NEW_DATA && dirtyFlag.get() + ) { + return true; + } + LOG.debug( + "Skipping upload, minutes since upload is {} and dirtyFlag is {}", + TimeUnit.MILLISECONDS.toMinutes(millisSinceLastUpload), + dirtyFlag.get() + ); + return false; + } + + void reportKeyUsage(String key) { + if (evaluatedKeys.add(key)) { + boolean dirtyFlagRaised = dirtyFlag.compareAndSet(false, true); + if (dirtyFlagRaised && LOG.isTraceEnabled()) { + LOG.trace("dirty flag raised by key {}", key); + } + } + } +} diff --git a/client/src/main/java/cloud/prefab/client/internal/PrefabHttpClient.java b/client/src/main/java/cloud/prefab/client/internal/PrefabHttpClient.java index 366642c2..dbb7f376 100644 --- a/client/src/main/java/cloud/prefab/client/internal/PrefabHttpClient.java +++ b/client/src/main/java/cloud/prefab/client/internal/PrefabHttpClient.java @@ -76,10 +76,7 @@ void reportContextShape(Prefab.ContextShapes contextShapes) { .uri(URI.create(options.getPrefabApiUrl() + "/api/v1/context-shapes")) .POST(HttpRequest.BodyPublishers.ofByteArray(contextShapes.toByteArray())) .build(); - LOG.info( - "posting to {}", - URI.create(options.getPrefabApiUrl() + "/api/v1/context-shapes") - ); + LOG.debug("posting context shape to {}", request.uri()); try { HttpResponse response = httpClient.send( request, @@ -96,7 +93,36 @@ void reportContextShape(Prefab.ContextShapes contextShapes) { } catch (IOException e) { LOG.warn("Error uploading context shapes via http {}", e.getMessage()); } catch (InterruptedException e) { - LOG.warn("Interrupted while uploading context shapes via http"); + LOG.warn("Interrupted while uploading context shapes"); + Thread.currentThread().interrupt(); + } + } + + void reportEvaluatedKeys(Prefab.EvaluatedKeys evaluatedKeys) { + HttpRequest request = getClientBuilderWithStandardHeaders() + .header("Content-Type", PROTO_MEDIA_TYPE) + .header("Accept", PROTO_MEDIA_TYPE) + .uri(URI.create(options.getPrefabApiUrl() + "/api/v1/evaluated-keys")) + .POST(HttpRequest.BodyPublishers.ofByteArray(evaluatedKeys.toByteArray())) + .build(); + LOG.debug("posting evaluated keys to {}", request.uri()); + try { + HttpResponse response = httpClient.send( + request, + HttpResponse.BodyHandlers.ofString() + ); + + if (!isSuccess(response.statusCode())) { + LOG.info( + "Uploading evaluated keys returned unsuccessful code {} with body {}", + response.statusCode(), + response.body() + ); + } + } catch (IOException e) { + LOG.warn("Error uploading evaluated keys via http {}", e.getMessage()); + } catch (InterruptedException e) { + LOG.warn("Interrupted while uploading evaluated keys"); Thread.currentThread().interrupt(); } } diff --git a/client/src/main/java/cloud/prefab/client/internal/SseConfigStreamingSubscriber.java b/client/src/main/java/cloud/prefab/client/internal/SseConfigStreamingSubscriber.java index 4e68b07b..84601b2e 100644 --- a/client/src/main/java/cloud/prefab/client/internal/SseConfigStreamingSubscriber.java +++ b/client/src/main/java/cloud/prefab/client/internal/SseConfigStreamingSubscriber.java @@ -50,7 +50,7 @@ private void restart(int errorCount) { sseHandler.subscribe(flowSubscriber); prefabHttpClient.requestConfigSSE(highwaterMarkSupplier.get(), sseHandler); } catch (Exception e) { - LOG.warn("Unexpected exception starting SSE stream", e); + LOG.warn("Unexpected exception starting SSE config stream, will retry", e); } }; @@ -62,7 +62,7 @@ private void restart(int errorCount) { TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toMillis(30) ); - LOG.info("Restarting connection in {} ms", delayMillis); + LOG.info("Restarting SSE config connection in {} ms", delayMillis); scheduledExecutorService.schedule(starter, delayMillis, TimeUnit.MILLISECONDS); } } diff --git a/client/src/test/java/cloud/prefab/client/internal/EvaluatedKeysAggregatorTest.java b/client/src/test/java/cloud/prefab/client/internal/EvaluatedKeysAggregatorTest.java new file mode 100644 index 00000000..1f6fc189 --- /dev/null +++ b/client/src/test/java/cloud/prefab/client/internal/EvaluatedKeysAggregatorTest.java @@ -0,0 +1,138 @@ +package cloud.prefab.client.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import cloud.prefab.client.Options; +import cloud.prefab.domain.Prefab; +import java.time.Clock; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class EvaluatedKeysAggregatorTest { + + @Mock + PrefabHttpClient prefabHttpClient; + + @Mock + Clock clock; + + @Captor + ArgumentCaptor evaluatedKeysArgumentCaptor; + + private EvaluatedKeysAggregator aggregator; + + @BeforeEach + void beforeEach() { + aggregator = + new EvaluatedKeysAggregator( + new Options().setNamespace("the-namespace"), + prefabHttpClient, + clock + ); + } + + // make sure the data comes out as expected + @Test + void sendsCorrectData() { + aggregator.reportKeyUsage("foo.bar"); + aggregator.reportKeyUsage("foo.bar"); + aggregator.reportKeyUsage("bar.foo"); + + aggregator.doUpload(); + verify(prefabHttpClient).reportEvaluatedKeys(evaluatedKeysArgumentCaptor.capture()); + + Prefab.EvaluatedKeys reportedKeys = evaluatedKeysArgumentCaptor.getValue(); + + assertThat(reportedKeys.getNamespace()).isEqualTo("the-namespace"); + assertThat(reportedKeys.getKeysList()) + .containsExactlyInAnyOrder("foo.bar", "bar.foo"); + } + + @Test + void sendsDataIfNeverSentBefore() { + when(clock.millis()).thenReturn(1L); + aggregator.reportKeyUsage("foo.bar"); + + aggregator.doUpload(); + verify(prefabHttpClient).reportEvaluatedKeys(any()); + + // won't send again now that last time sent is non zero + aggregator.doUpload(); + verifyNoMoreInteractions(prefabHttpClient); + } + + @Nested + class UnchangedDataSend { + + @BeforeEach + void beforeEach() { + // this will set the last sent timestamp + when(clock.millis()).thenReturn(1L); + aggregator.reportKeyUsage("foo.bar"); + aggregator.doUpload(); + verify(prefabHttpClient).reportEvaluatedKeys(any()); + } + + @ParameterizedTest + @ValueSource(ints = { 1, 5, 10, 15, 19 }) + void itDoesNotSendBefore20Min(int minutesLater) { + when(clock.millis()).thenReturn(1 + TimeUnit.MINUTES.toMillis(minutesLater)); + aggregator.doUpload(); + verifyNoMoreInteractions(prefabHttpClient); + } + + @ParameterizedTest + @ValueSource(ints = { 20, 21, 100 }) + void itSendsAtAndAfter20Min(int minutesLater) { + when(clock.millis()).thenReturn(1 + TimeUnit.MINUTES.toMillis(minutesLater)); + aggregator.doUpload(); + verify(prefabHttpClient, times(2)).reportEvaluatedKeys(any()); // the two counts the call in beforeEach + } + } + + @Nested + class ChangedDataSend { + + @BeforeEach + void beforeEach() { + // this will set the last sent timestamp + when(clock.millis()).thenReturn(1L); + aggregator.reportKeyUsage("foo.bar"); + aggregator.doUpload(); + verify(prefabHttpClient).reportEvaluatedKeys(any()); + // set the dirty flag + aggregator.reportKeyUsage("bar.foo"); + } + + @ParameterizedTest + @ValueSource(ints = { 1, 2, 3, 4 }) + void itDoesNotSendBefore20Min(int minutesLater) { + when(clock.millis()).thenReturn(1 + TimeUnit.MINUTES.toMillis(minutesLater)); + aggregator.doUpload(); + verifyNoMoreInteractions(prefabHttpClient); + } + + @ParameterizedTest + @ValueSource(ints = { 5, 6, 10 }) + void itSendsAtAndAfter20Min(int minutesLater) { + when(clock.millis()).thenReturn(1 + TimeUnit.MINUTES.toMillis(minutesLater)); + aggregator.doUpload(); + verify(prefabHttpClient, times(2)).reportEvaluatedKeys(any()); // the two counts the call in beforeEach + } + } +}