Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/prefab-cloud
Submodule prefab-cloud updated 1 files
+9 −0 prefab.proto
20 changes: 20 additions & 0 deletions client/src/main/java/cloud/prefab/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public enum OnInitializationFailure {

private Set<ConfigChangeListener> changeListenerSet = new HashSet<>();

private boolean contextShapeUploadEnabled = true;

public Options() {
this.apikey = System.getenv("PREFAB_API_KEY");
this.prefabApiUrl =
Expand Down Expand Up @@ -185,6 +187,24 @@ public Options setReportLogStats(boolean reportLogStats) {
return this;
}

public boolean isContextShapeUploadEnabled() {
return contextShapeUploadEnabled;
}

/**
* Configure client to report context shape data
* The captured data consists of names and types of context data, NOT the actual values
* The data allows prefab to populate options in the rule builder UI
* Defaults to true
* @param enabled
* @return
*/

public Options setContextShapeUploadEnabled(boolean enabled) {
this.contextShapeUploadEnabled = enabled;
return this;
}

public String getCDNUrl() {
String envVar = System.getenv("PREFAB_CDN_URL");
if (envVar != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public ConfigClientImpl(
prefabHttpClient = new PrefabHttpClient(httpClient, options);
startStreaming();
startCheckpointExecutor();
if (options.isContextShapeUploadEnabled()) {
new ContextShapeAggregator(options, prefabHttpClient, Clock.systemUTC()).start();
}
}
}

Expand Down Expand Up @@ -187,14 +190,15 @@ public Optional<Prefab.ConfigValue> get(
String configKey,
@Nullable PrefabContextSetReadable prefabContext
) {
LookupContext lookupContext = new LookupContext(
namespaceMaybe,
resolveContext(prefabContext)
);
PrefabContextSetReadable resolvedContext = resolveContext(prefabContext);
reportUsage(configKey, resolvedContext);
LookupContext lookupContext = new LookupContext(namespaceMaybe, resolvedContext);

return getInternal(configKey, lookupContext);
}

private void reportUsage(String configKey, PrefabContextSetReadable prefabContext) {}

@Override
public Map<String, Prefab.ConfigValue> getAll(
@Nullable PrefabContextSetReadable prefabContext
Expand Down Expand Up @@ -350,7 +354,6 @@ private void loadCheckpoint() {
if (cdnSuccess) {
return;
}

loadAPI();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package cloud.prefab.client.internal;

import cloud.prefab.client.Options;
import cloud.prefab.context.PrefabContext;
import cloud.prefab.context.PrefabContextSetReadable;
import cloud.prefab.domain.Prefab;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Clock;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Capture context "shape" and
*/
public class ContextShapeAggregator {

private static final Logger LOG = LoggerFactory.getLogger(ContextShapeAggregator.class);

static final long MILLIS_BETWEEN_UPLOADS = TimeUnit.MINUTES.toMillis(20);
static final long MILLIS_BETWEEN_UPLOADS_WITH_NEW_DATA = TimeUnit.MINUTES.toMillis(5);

private final PrefabHttpClient prefabHttpClient;
private final Clock clock;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>> shapes;

private final AtomicBoolean dirtyFlag = new AtomicBoolean(true);
private final Optional<String> namespace;

private long lastUploadTime = 0;

ContextShapeAggregator(
Options options,
PrefabHttpClient prefabHttpClient,
Clock clock
) {
this.prefabHttpClient = prefabHttpClient;
this.clock = clock;
this.shapes = new ConcurrentHashMap<>();
this.namespace = options.getNamespace();
}

void start() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
1,
r -> new Thread(r, "prefab-context-shapes-aggregator")
);
ScheduledExecutorService executorService = MoreExecutors.getExitingScheduledExecutorService(
executor,
100,
TimeUnit.MILLISECONDS
);
executorService.scheduleWithFixedDelay(
() -> {
try {
doUpload();
} catch (Exception e) {
LOG.debug("error uploading context shapes", e);
}
},
1,
1,
TimeUnit.MINUTES
);
}

@VisibleForTesting
void doUpload() {
if (shouldUpload()) {
LOG.debug("uploading context shapes");
prefabHttpClient.reportContextShape(buildProtoShapesFromShapeState());
lastUploadTime = clock.millis();
dirtyFlag.set(false);
}
}

private boolean shouldUpload() {
if (lastUploadTime == 0) {
return true;
}
long millisSinceLastUpload = clock.millis() - lastUploadTime;
if (millisSinceLastUpload >= MILLIS_BETWEEN_UPLOADS) {
return true;
}
if (
millisSinceLastUpload >= MILLIS_BETWEEN_UPLOADS_WITH_NEW_DATA && dirtyFlag.get()
) {
return true;
}
LOG.debug(
"Skipping upload, minutes since upload is {} and dirtyFlag is {}",
TimeUnit.MILLISECONDS.toMinutes(millisSinceLastUpload),
dirtyFlag.get()
);
return false;
}

void reportContextUsage(PrefabContextSetReadable prefabContextSetReadable) {
Prefab.ContextShapes currentShapes = extractShapes(prefabContextSetReadable);
for (Prefab.ContextShape contextShape : currentShapes.getShapesList()) {
ConcurrentHashMap<String, Integer> contextMap = shapes.computeIfAbsent(
contextShape.getName(),
key -> new ConcurrentHashMap<>()
);
contextShape
.getFieldTypesMap()
.forEach((key, value) -> {
Integer oldValue = contextMap.put(key, value);
if (!Objects.equals(oldValue, value)) {
boolean dirtyFlagRaised = dirtyFlag.compareAndSet(false, true);
if (dirtyFlagRaised && LOG.isTraceEnabled()) {
LOG.trace(
"dirty flag raised by context name: {} and property {}",
contextShape.getName(),
key
);
}
}
});
}
}

@VisibleForTesting
Prefab.ContextShapes buildProtoShapesFromShapeState() {
Prefab.ContextShapes.Builder shapesBuilder = Prefab.ContextShapes.newBuilder();
namespace.ifPresent(shapesBuilder::setNamespace);

shapes.forEach((contextName, contextMap) -> {
Prefab.ContextShape.Builder shapeBuilder = Prefab.ContextShape
.newBuilder()
.setName(contextName);
shapeBuilder.putAllFieldTypes(contextMap);
shapesBuilder.addShapes(shapeBuilder);
});

return shapesBuilder.build();
}

private Prefab.ContextShapes extractShapes(
PrefabContextSetReadable prefabContextSetReadable
) {
Prefab.ContextShapes.Builder shapesBuilder = Prefab.ContextShapes.newBuilder();
StreamSupport
.stream(prefabContextSetReadable.getContexts().spliterator(), false)
.map(PrefabContext::getShape)
.forEach(shapesBuilder::addShapes);
return shapesBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,38 @@ void reportLoggers(Prefab.Loggers loggers) {
}
}

void reportContextShape(Prefab.ContextShapes contextShapes) {
HttpRequest request = getClientBuilderWithStandardHeaders()
.header("Content-Type", PROTO_MEDIA_TYPE)
.header("Accept", PROTO_MEDIA_TYPE)
.uri(URI.create(options.getPrefabApiUrl() + "/api/v1/context-shapes"))
.POST(HttpRequest.BodyPublishers.ofByteArray(contextShapes.toByteArray()))
.build();
LOG.info(
"posting to {}",
URI.create(options.getPrefabApiUrl() + "/api/v1/context-shapes")
);
try {
HttpResponse<String> response = httpClient.send(
request,
HttpResponse.BodyHandlers.ofString()
);

if (!isSuccess(response.statusCode())) {
LOG.info(
"Uploading context shapes returned unsuccessful code {} with body {}",
response.statusCode(),
response.body()
);
}
} catch (IOException e) {
LOG.warn("Error uploading context shapes via http {}", e.getMessage());
} catch (InterruptedException e) {
LOG.warn("Interrupted while uploading context shapes via http");
Thread.currentThread().interrupt();
}
}

CompletableFuture<HttpResponse<Void>> requestConfigSSE(
long offset,
Flow.Subscriber<String> lineSubscriber
Expand Down
10 changes: 10 additions & 0 deletions client/src/main/java/cloud/prefab/context/PrefabContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public boolean equals(Object o) {
);
}

public Prefab.ContextShape getShape() {
Prefab.ContextShape.Builder shapeBuilder = Prefab.ContextShape
.newBuilder()
.setName(getName());
properties.forEach((key, value) ->
shapeBuilder.putFieldTypes(key, value.getTypeCase().getNumber())
);
return shapeBuilder.build();
}

@Override
public int hashCode() {
return Objects.hash(name, properties);
Expand Down
Loading