diff --git a/providers/flagd/README.md b/providers/flagd/README.md
index abc812211..40d8092e3 100644
--- a/providers/flagd/README.md
+++ b/providers/flagd/README.md
@@ -26,13 +26,16 @@ OpenFeatureAPI.getInstance().setProvider(provider);
Options can be defined in the constructor or as environment variables, with constructor options having the highest precedence.
-| Option name | Environment variable name | Type | Default |
-| ----------- | ------------------------- | ------- | --------- |
-| host | FLAGD_HOST | string | localhost |
-| port | FLAGD_PORT | number | 8013 |
-| tls | FLAGD_TLS | boolean | false |
-| socketPath | FLAGD_SOCKET_PATH | string | - |
-| certPath | FLAGD_SERVER_CERT_PATH | string | - |
+| Option name | Environment variable name | Type | Default | Values |
+| --------------------- | ------------------------------- | ------- | --------- | ------------- |
+| host | FLAGD_HOST | string | localhost | |
+| port | FLAGD_PORT | number | 8013 | |
+| tls | FLAGD_TLS | boolean | false | |
+| socketPath | FLAGD_SOCKET_PATH | string | - | |
+| certPath | FLAGD_SERVER_CERT_PATH | string | - | |
+| cache | FLAGD_CACHE | string | lru | lru,disabled |
+| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | |
+| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | |
### Unix socket support
@@ -53,3 +56,11 @@ The default deadline is 500ms, though evaluations typically take on the order of
Though not required in deployments where flagd runs on the same host as the workload, TLS is available.
:warning: Note that there's a [vulnerability](https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1042268) in [netty](https://github.com/netty/netty), a transitive dependency of the underlying gRPC libraries used in the flagd-provider that fails to correctly validate certificates. This will be addressed in netty v5.
+
+## Caching
+
+The provider attempts to establish a connection to flagd's event stream (up to 5 times by default). If the connection is successful and caching is enabled each flag returned with reason `STATIC` is cached until an event is received concerning the cached flag (at which point it is removed from cache).
+
+On invocation of a flag evaluation (if caching is available) an attempt is made to retrieve the entry from cache, if found the flag is returned with reason `CACHED`.
+
+By default, the provider is configured to use [least recently used (lru)](https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/map/LRUMap.html) caching with up to 1000 entries.
diff --git a/providers/flagd/pom.xml b/providers/flagd/pom.xml
index 2331d45fc..96f99c902 100644
--- a/providers/flagd/pom.xml
+++ b/providers/flagd/pom.xml
@@ -61,6 +61,12 @@
6.0.53
provided
+
+
+ org.apache.commons
+ commons-collections4
+ 4.4
+
diff --git a/providers/flagd/schemas b/providers/flagd/schemas
index 910fa3391..aa5714e2c 160000
--- a/providers/flagd/schemas
+++ b/providers/flagd/schemas
@@ -1 +1 @@
-Subproject commit 910fa3391adcddbbd8056879c3d7e1b465686bf6
+Subproject commit aa5714e2ce011457a5ca18029b0036cc32905c0f
diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java
new file mode 100644
index 000000000..25ae2dd4f
--- /dev/null
+++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java
@@ -0,0 +1,10 @@
+package dev.openfeature.contrib.providers.flagd;
+
+/**
+ * Defines behaviour required of event stream callbacks.
+ */
+interface EventStreamCallback {
+ void setEventStreamAlive(Boolean alive);
+
+ void restartEventStream() throws Exception;
+}
diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java
new file mode 100644
index 000000000..e6e7c18e0
--- /dev/null
+++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java
@@ -0,0 +1,82 @@
+package dev.openfeature.contrib.providers.flagd;
+
+import java.util.Map;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import dev.openfeature.flagd.grpc.Schema.EventStreamResponse;
+import com.google.protobuf.Value;
+
+/**
+ * EventStreamObserver handles events emitted by flagd.
+ */
+@Slf4j
+public class EventStreamObserver implements StreamObserver {
+ private EventStreamCallback callback;
+ private FlagdCache cache;
+
+ private static final String configurationChange = "configuration_change";
+ private static final String providerReady = "provider_ready";
+ private static final String flagsKey = "flags";
+
+ EventStreamObserver(FlagdCache cache, EventStreamCallback callback) {
+ this.cache = cache;
+ this.callback = callback;
+ }
+
+ @Override
+ public void onNext(EventStreamResponse value) {
+ switch (value.getType()) {
+ case configurationChange:
+ this.handleConfigurationChangeEvent(value);
+ break;
+ case providerReady:
+ this.handleProviderReadyEvent();
+ break;
+ default:
+ log.debug("unhandled event type {}", value.getType());
+ return;
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ log.error("event stream", t);
+ this.cache.clear();
+ this.callback.setEventStreamAlive(false);
+ try {
+ this.callback.restartEventStream();
+ } catch (Exception e) {
+ log.error("restart event stream", e);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ this.cache.clear();
+ this.callback.setEventStreamAlive(false);
+ }
+
+ private void handleConfigurationChangeEvent(EventStreamResponse value) {
+ if (!this.cache.getEnabled()) {
+ return;
+ }
+
+ Map data = value.getData().getFieldsMap();
+ Value flagsValue = data.get(flagsKey);
+ if (flagsValue == null) {
+ this.cache.clear();
+ return;
+ }
+
+ Map flags = flagsValue.getStructValue().getFieldsMap();
+
+ for (String flagKey : flags.keySet()) {
+ this.cache.remove(flagKey);
+ }
+ }
+
+ private void handleProviderReadyEvent() {
+ this.cache.clear();
+ this.callback.setEventStreamAlive(true);
+ }
+}
diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdCache.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdCache.java
new file mode 100644
index 000000000..458b3cfe0
--- /dev/null
+++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdCache.java
@@ -0,0 +1,51 @@
+package dev.openfeature.contrib.providers.flagd;
+
+import dev.openfeature.sdk.ProviderEvaluation;
+import dev.openfeature.sdk.Value;
+
+import java.util.Map;
+import org.apache.commons.collections4.map.LRUMap;
+import java.util.Collections;
+
+/**
+ * Exposes caching mechanism for flag evaluations.
+ */
+public class FlagdCache {
+ private Map> store;
+ private Boolean enabled;
+
+ static final String LRU_CACHE = "lru";
+ static final String DISABLED = "disabled";
+
+ FlagdCache(String cache, int maxCacheSize) {
+ switch (cache) {
+ case DISABLED:
+ return;
+ case LRU_CACHE:
+ default:
+ this.store = Collections.synchronizedMap(new LRUMap>(maxCacheSize));
+ }
+
+ this.enabled = true;
+ }
+
+ public Boolean getEnabled() {
+ return this.enabled;
+ }
+
+ public void put(String key, ProviderEvaluation value) {
+ this.store.put(key, value);
+ }
+
+ public ProviderEvaluation get(String key) {
+ return this.store.get(key);
+ }
+
+ public void remove(String key) {
+ this.store.remove(key);
+ }
+
+ public void clear() {
+ this.store.clear();
+ }
+}
diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java
index 014ff303e..2a662f048 100644
--- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java
+++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java
@@ -5,6 +5,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
@@ -21,6 +24,7 @@
import dev.openfeature.flagd.grpc.Schema.ResolveStringResponse;
import dev.openfeature.flagd.grpc.ServiceGrpc;
import dev.openfeature.flagd.grpc.ServiceGrpc.ServiceBlockingStub;
+import dev.openfeature.flagd.grpc.ServiceGrpc.ServiceStub;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.FeatureProvider;
import dev.openfeature.sdk.Metadata;
@@ -29,17 +33,21 @@
import dev.openfeature.sdk.Value;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.stub.StreamObserver;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.ssl.SslContextBuilder;
import lombok.extern.slf4j.Slf4j;
+import dev.openfeature.flagd.grpc.Schema.EventStreamRequest;
+import dev.openfeature.flagd.grpc.Schema.EventStreamResponse;
+
/**
* OpenFeature provider for flagd.
*/
@Slf4j
-public class FlagdProvider implements FeatureProvider {
+public class FlagdProvider implements FeatureProvider, EventStreamCallback {
static final String PROVIDER_NAME = "flagD Provider";
static final String DEFAULT_PORT = "8013";
@@ -52,9 +60,33 @@ public class FlagdProvider implements FeatureProvider {
static final String TLS_ENV_VAR_NAME = "FLAGD_TLS";
static final String SOCKET_PATH_ENV_VAR_NAME = "FLAGD_SOCKET_PATH";
static final String SERVER_CERT_PATH_ENV_VAR_NAME = "FLAGD_SERVER_CERT_PATH";
+ static final String CACHE_ENV_VAR_NAME = "FLAGD_CACHE";
+ static final String MAX_CACHE_SIZE_ENV_VAR_NAME = "FLAGD_MAX_CACHE_SIZE";
+ static final String MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME = "FLAGD_MAX_EVENT_STREAM_RETRIES";
+
+ static final String STATIC_REASON = "STATIC";
+ static final String CACHED_REASON = "CACHED";
+
+ static final String LRU_CACHE = "lru";
+ static final String DISABLED = "disabled";
+ static final String DEFAULT_CACHE = LRU_CACHE;
+ static final int DEFAULT_MAX_CACHE_SIZE = 1000;
+
+ static final int DEFAULT_MAX_EVENT_STREAM_RETRIES = 5;
+ static final int BASE_EVENT_STREAM_RETRY_BACKOFF_MS = 1000;
private long deadline = DEFAULT_DEADLINE;
- private ServiceBlockingStub serviceStub;
+ private ServiceBlockingStub serviceBlockingStub;
+ private ServiceStub serviceStub;
+
+ private Boolean eventStreamAlive;
+ private FlagdCache cache;
+
+ private int eventStreamAttempt = 1;
+ private int eventStreamRetryBackoff = BASE_EVENT_STREAM_RETRY_BACKOFF_MS;
+ private int maxEventStreamRetries = DEFAULT_MAX_EVENT_STREAM_RETRIES;
+
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Create a new FlagdProvider instance.
@@ -62,7 +94,30 @@ public class FlagdProvider implements FeatureProvider {
* @param socketPath unix socket path
*/
public FlagdProvider(String socketPath) {
- this(buildServiceStub(null, null, null, null, socketPath));
+ this(
+ buildServiceBlockingStub(null, null, null, null, socketPath),
+ buildServiceStub(null, null, null, null, socketPath),
+ fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE),
+ fallBackToEnvOrDefault(MAX_CACHE_SIZE_ENV_VAR_NAME, DEFAULT_MAX_CACHE_SIZE),
+ fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES)
+ );
+ }
+
+ /**
+ * Create a new FlagdProvider instance.
+ *
+ * @param socketPath unix socket path
+ * @param cache caching implementation to use (lru)
+ * @param maxCacheSize limit of the number of cached values
+ * @param maxEventStreamRetries limit of the number of attempts to connect to flagd's event stream,
+ * on successful connection the attempts are reset
+ */
+ public FlagdProvider(String socketPath, String cache, int maxCacheSize, int maxEventStreamRetries) {
+ this(
+ buildServiceBlockingStub(null, null, null, null, socketPath),
+ buildServiceStub(null, null, null, null, socketPath),
+ cache, maxCacheSize, maxEventStreamRetries
+ );
}
/**
@@ -76,18 +131,68 @@ public FlagdProvider(String socketPath) {
*
*/
public FlagdProvider(String host, int port, boolean tls, String certPath) {
- this(buildServiceStub(host, port, tls, certPath, null));
+ this(
+ buildServiceBlockingStub(host, port, tls, certPath, null),
+ buildServiceStub(host, port, tls, certPath, null),
+ fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE),
+ fallBackToEnvOrDefault(MAX_CACHE_SIZE_ENV_VAR_NAME, DEFAULT_MAX_CACHE_SIZE),
+ fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES)
+ );
+ }
+
+ /**
+ * Create a new FlagdProvider instance.
+ *
+ * @param host flagd server host, defaults to "localhost"
+ * @param port flagd server port, defaults to 8013
+ * @param tls use TLS, defaults to false
+ * @param certPath path for server certificate, defaults to null to, using
+ * system certs
+ * @param cache caching implementation to use (lru)
+ * @param maxCacheSize limit of the number of cached values
+ * @param maxEventStreamRetries limit of the number of attempts to connect to flagd's event stream,
+ * on successful connection the attempts are reset
+ *
+ */
+ public FlagdProvider(String host, int port, boolean tls, String certPath, String cache,
+ int maxCacheSize, int maxEventStreamRetries) {
+ this(
+ buildServiceBlockingStub(host, port, tls, certPath, null),
+ buildServiceStub(host, port, tls, certPath, null),
+ cache, maxCacheSize, maxEventStreamRetries
+ );
}
/**
* Create a new FlagdProvider instance.
*/
public FlagdProvider() {
- this(buildServiceStub(null, null, null, null, null));
+ this(
+ buildServiceBlockingStub(null, null, null, null, null),
+ buildServiceStub(null, null, null, null, null),
+ fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE),
+ fallBackToEnvOrDefault(MAX_CACHE_SIZE_ENV_VAR_NAME, DEFAULT_MAX_CACHE_SIZE),
+ fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES)
+ );
}
- FlagdProvider(ServiceBlockingStub serviceStub) {
+ FlagdProvider(ServiceBlockingStub serviceBlockingStub, ServiceStub serviceStub, String cache,
+ int maxCacheSize, int maxEventStreamRetries) {
+ this.serviceBlockingStub = serviceBlockingStub;
this.serviceStub = serviceStub;
+ this.eventStreamAlive = false;
+ this.cache = new FlagdCache(cache, maxCacheSize);
+ this.maxEventStreamRetries = maxEventStreamRetries;
+ this.handleEvents();
+ }
+
+ private Boolean cacheAvailable() {
+ Lock l = this.lock.readLock();
+ l.lock();
+ Boolean available = this.cache.getEnabled() && this.eventStreamAlive;
+ l.unlock();
+
+ return available;
}
@Override
@@ -103,80 +208,191 @@ public String getName() {
@Override
public ProviderEvaluation getBooleanEvaluation(String key, Boolean defaultValue,
EvaluationContext ctx) {
+
+ if (this.cacheAvailable()) {
+ ProviderEvaluation fromCache = this.cache.get(key);
+ if (fromCache != null) {
+ ProviderEvaluation result = ProviderEvaluation.builder()
+ .value(fromCache.getValue().asBoolean())
+ .variant(fromCache.getVariant())
+ .reason(CACHED_REASON)
+ .build();
+ return result;
+ }
+ }
ResolveBooleanRequest request = ResolveBooleanRequest.newBuilder()
.setFlagKey(key)
.setContext(this.convertContext(ctx))
.build();
- ResolveBooleanResponse r = this.serviceStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS)
+ ResolveBooleanResponse r = this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS)
.resolveBoolean(request);
- return ProviderEvaluation.builder()
+ ProviderEvaluation result = ProviderEvaluation.builder()
.value(r.getValue())
.variant(r.getVariant())
.reason(r.getReason())
.build();
+
+
+ if (this.isEvaluationCacheable(result)) {
+ ProviderEvaluation value = ProviderEvaluation.builder()
+ .value(new Value(r.getValue()))
+ .variant(r.getVariant())
+ .reason(r.getReason())
+ .build();
+ this.cache.put(key, value);
+ }
+
+ return result;
}
@Override
public ProviderEvaluation getStringEvaluation(String key, String defaultValue,
EvaluationContext ctx) {
+
+ if (this.cacheAvailable()) {
+ ProviderEvaluation fromCache = this.cache.get(key);
+ if (fromCache != null) {
+ ProviderEvaluation result = ProviderEvaluation.builder()
+ .value(fromCache.getValue().asString())
+ .variant(fromCache.getVariant())
+ .reason(CACHED_REASON)
+ .build();
+ return result;
+ }
+ }
+
ResolveStringRequest request = ResolveStringRequest.newBuilder()
.setFlagKey(key)
.setContext(this.convertContext(ctx)).build();
- ResolveStringResponse r = this.serviceStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS)
+ ResolveStringResponse r = this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS)
.resolveString(request);
- return ProviderEvaluation.builder().value(r.getValue())
+ ProviderEvaluation result = ProviderEvaluation.builder().value(r.getValue())
+ .variant(r.getVariant())
+ .reason(r.getReason())
+ .build();
+
+ if (this.isEvaluationCacheable(result)) {
+ ProviderEvaluation value = ProviderEvaluation.builder()
+ .value(new Value(r.getValue()))
.variant(r.getVariant())
.reason(r.getReason())
.build();
+ this.cache.put(key, value);
+ }
+
+ return result;
}
@Override
public ProviderEvaluation getDoubleEvaluation(String key, Double defaultValue,
EvaluationContext ctx) {
+
+ if (this.cacheAvailable()) {
+ ProviderEvaluation fromCache = this.cache.get(key);
+ if (fromCache != null) {
+ ProviderEvaluation result = ProviderEvaluation.builder()
+ .value(fromCache.getValue().asDouble())
+ .variant(fromCache.getVariant())
+ .reason(CACHED_REASON)
+ .build();
+ return result;
+ }
+ }
+
ResolveFloatRequest request = ResolveFloatRequest.newBuilder()
.setFlagKey(key)
.setContext(this.convertContext(ctx))
.build();
- ResolveFloatResponse r = this.serviceStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS)
+ ResolveFloatResponse r = this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS)
.resolveFloat(request);
- return ProviderEvaluation.builder()
+ ProviderEvaluation result = ProviderEvaluation.builder()
.value(r.getValue())
.variant(r.getVariant())
.reason(r.getReason())
.build();
+
+ if (this.isEvaluationCacheable(result)) {
+ ProviderEvaluation value = ProviderEvaluation.builder()
+ .value(new Value(r.getValue()))
+ .variant(r.getVariant())
+ .reason(r.getReason())
+ .build();
+ this.cache.put(key, value);
+ }
+
+ return result;
}
@Override
public ProviderEvaluation getIntegerEvaluation(String key, Integer defaultValue,
EvaluationContext ctx) {
+
+ if (this.cacheAvailable()) {
+ ProviderEvaluation fromCache = this.cache.get(key);
+ if (fromCache != null) {
+ ProviderEvaluation result = ProviderEvaluation.builder()
+ .value(fromCache.getValue().asInteger())
+ .variant(fromCache.getVariant())
+ .reason(CACHED_REASON)
+ .build();
+ return result;
+ }
+ }
+
ResolveIntRequest request = ResolveIntRequest.newBuilder()
.setFlagKey(key)
.setContext(this.convertContext(ctx))
.build();
- ResolveIntResponse r = this.serviceStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS)
+ ResolveIntResponse r = this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS)
.resolveInt(request);
- return ProviderEvaluation.builder()
+ ProviderEvaluation result = ProviderEvaluation.builder()
.value((int) r.getValue())
.variant(r.getVariant())
.reason(r.getReason())
.build();
+
+ if (this.isEvaluationCacheable(result)) {
+ ProviderEvaluation value = ProviderEvaluation.builder()
+ .value(new Value(result.getValue()))
+ .variant(r.getVariant())
+ .reason(r.getReason())
+ .build();
+ this.cache.put(key, value);
+ }
+
+ return result;
}
@Override
public ProviderEvaluation getObjectEvaluation(String key, Value defaultValue,
EvaluationContext ctx) {
+
+ if (this.cacheAvailable()) {
+ ProviderEvaluation fromCache = this.cache.get(key);
+ if (fromCache != null) {
+ fromCache.setReason(CACHED_REASON);
+ return fromCache;
+ }
+ }
+
ResolveObjectRequest request = ResolveObjectRequest.newBuilder()
.setFlagKey(key)
.setContext(this.convertContext(ctx))
.build();
- ResolveObjectResponse r = this.serviceStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS)
+ ResolveObjectResponse r = this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS)
.resolveObject(request);
- return ProviderEvaluation.builder()
+ ProviderEvaluation result = ProviderEvaluation.builder()
.value(this.convertObjectResponse(r.getValue()))
.variant(r.getVariant())
.reason(r.getReason())
.build();
+
+ if (this.isEvaluationCacheable(result)) {
+ this.cache.put(key, result);
+ }
+
+ return result;
}
/**
@@ -311,9 +527,8 @@ private Value convertPrimitive(com.google.protobuf.Value protobuf) {
return value;
}
- private static ServiceBlockingStub buildServiceStub(String host, Integer port, Boolean tls, String certPath,
- String socketPath) {
-
+ private static NettyChannelBuilder channelBuilder(String host, Integer port, Boolean tls, String certPath,
+ String socketPath) {
host = host != null ? host : fallBackToEnvOrDefault(HOST_ENV_VAR_NAME, DEFAULT_HOST);
port = port != null ? port : Integer.parseInt(fallBackToEnvOrDefault(PORT_ENV_VAR_NAME, DEFAULT_PORT));
tls = tls != null ? tls : Boolean.parseBoolean(fallBackToEnvOrDefault(TLS_ENV_VAR_NAME, DEFAULT_TLS));
@@ -322,12 +537,11 @@ private static ServiceBlockingStub buildServiceStub(String host, Integer port, B
// we have a socket path specified, build a channel with a unix socket
if (socketPath != null) {
- return ServiceGrpc.newBlockingStub(NettyChannelBuilder
+ return NettyChannelBuilder
.forAddress(new DomainSocketAddress(socketPath))
.eventLoopGroup(new EpollEventLoopGroup())
.channelType(EpollDomainSocketChannel.class)
- .usePlaintext()
- .build());
+ .usePlaintext();
}
// build a TCP socket
@@ -345,8 +559,7 @@ private static ServiceBlockingStub buildServiceStub(String host, Integer port, B
builder.usePlaintext();
}
- return ServiceGrpc
- .newBlockingStub(builder.build());
+ return builder;
} catch (SSLException ssle) {
SslConfigException sslConfigException = new SslConfigException("Error with SSL configuration.");
sslConfigException.initCause(ssle);
@@ -354,7 +567,67 @@ private static ServiceBlockingStub buildServiceStub(String host, Integer port, B
}
}
+ private static ServiceBlockingStub buildServiceBlockingStub(String host, Integer port, Boolean tls, String certPath,
+ String socketPath) {
+ return ServiceGrpc.newBlockingStub(channelBuilder(host, port, tls, certPath, socketPath).build());
+ }
+
+ private static ServiceStub buildServiceStub(String host, Integer port, Boolean tls, String certPath,
+ String socketPath) {
+ return ServiceGrpc.newStub(channelBuilder(host, port, tls, certPath, socketPath).build());
+ }
+
private static String fallBackToEnvOrDefault(String key, String defaultValue) {
return System.getenv(key) != null ? System.getenv(key) : defaultValue;
}
+
+ private static int fallBackToEnvOrDefault(String key, int defaultValue) {
+ try {
+ int value = System.getenv(key) != null ? Integer.parseInt(System.getenv(key)) : defaultValue;
+ return value;
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+
+ private void handleEvents() {
+ StreamObserver responseObserver =
+ new EventStreamObserver(this.cache, this);
+ this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);
+ }
+
+ @Override
+ public void setEventStreamAlive(Boolean alive) {
+ Lock l = this.lock.writeLock();
+ try {
+ l.lock();
+ this.eventStreamAlive = alive;
+ if (alive) {
+ // reset attempts on successful connection
+ this.eventStreamAttempt = 1;
+ this.eventStreamRetryBackoff = BASE_EVENT_STREAM_RETRY_BACKOFF_MS;
+ }
+ } finally {
+ l.unlock();
+ }
+ }
+
+ private Boolean isEvaluationCacheable(ProviderEvaluation evaluation) {
+ String reason = evaluation.getReason();
+
+ return reason != null && reason.equals(STATIC_REASON) && this.cacheAvailable();
+ }
+
+ @Override
+ public void restartEventStream() throws Exception {
+ this.eventStreamAttempt++;
+ if (this.eventStreamAttempt > this.maxEventStreamRetries) {
+ log.error("failed to connect to event stream, exhausted retries");
+ return;
+ }
+ this.eventStreamRetryBackoff = 2 * this.eventStreamRetryBackoff;
+ Thread.sleep(this.eventStreamRetryBackoff);
+
+ this.handleEvents();
+ }
}
diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java
index 3d2487146..f4bc45429 100644
--- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java
+++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java
@@ -10,18 +10,25 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
+import com.google.protobuf.Struct;
+
+import dev.openfeature.flagd.grpc.Schema.EventStreamResponse;
+import dev.openfeature.flagd.grpc.Schema.EventStreamRequest;
import dev.openfeature.flagd.grpc.Schema.ResolveBooleanRequest;
import dev.openfeature.flagd.grpc.Schema.ResolveBooleanResponse;
import dev.openfeature.flagd.grpc.Schema.ResolveFloatResponse;
@@ -30,6 +37,7 @@
import dev.openfeature.flagd.grpc.Schema.ResolveStringResponse;
import dev.openfeature.flagd.grpc.ServiceGrpc;
import dev.openfeature.flagd.grpc.ServiceGrpc.ServiceBlockingStub;
+import dev.openfeature.flagd.grpc.ServiceGrpc.ServiceStub;
import dev.openfeature.sdk.FlagEvaluationDetails;
import dev.openfeature.sdk.MutableContext;
import dev.openfeature.sdk.MutableStructure;
@@ -37,8 +45,9 @@
import dev.openfeature.sdk.Reason;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.Value;
-import io.grpc.ManagedChannel;
+import io.grpc.Channel;
import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.stub.StreamObserver;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
@@ -47,6 +56,11 @@
class FlagdProviderTest {
static final String FLAG_KEY = "some-key";
+ static final String FLAG_KEY_BOOLEAN = "some-key-boolean";
+ static final String FLAG_KEY_INTEGER = "some-key-integer";
+ static final String FLAG_KEY_DOUBLE = "some-key-double";
+ static final String FLAG_KEY_STRING = "some-key-string";
+ static final String FLAG_KEY_OBJECT = "some-key-object";
static final String BOOL_VARIANT = "on";
static final String DOUBLE_VARIANT = "half";
static final String INT_VARIANT = "one-hundred";
@@ -79,11 +93,14 @@ public static void init() {
void path_arg_should_build_domain_socket_with_correct_path() {
final String path = "/some/path";
- ServiceBlockingStub mockStub = mock(ServiceBlockingStub.class);
+ ServiceBlockingStub mockBlockingStub = mock(ServiceBlockingStub.class);
+ ServiceStub mockStub = mock(ServiceStub.class);
NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket();
try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) {
- mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(ManagedChannel.class)))
+ mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(Channel.class)))
+ .thenReturn(mockBlockingStub);
+ mockStaticService.when(() -> ServiceGrpc.newStub(any()))
.thenReturn(mockStub);
try (MockedStatic mockStaticChannelBuilder = mockStatic(NettyChannelBuilder.class)) {
@@ -92,8 +109,7 @@ void path_arg_should_build_domain_socket_with_correct_path() {
EpollEventLoopGroup.class,
(mock, context) -> {
})) {
- mockStaticChannelBuilder.when(() -> NettyChannelBuilder
- .forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder);
+ when(NettyChannelBuilder.forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder);
new FlagdProvider(path);
// verify path matches
@@ -101,7 +117,7 @@ void path_arg_should_build_domain_socket_with_correct_path() {
.forAddress(argThat((DomainSocketAddress d) -> {
assertEquals(d.path(), path); // path should match
return true;
- })));
+ })), times(2));
}
}
}
@@ -113,11 +129,14 @@ void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Ex
new EnvironmentVariables("FLAGD_SOCKET_PATH", path).execute(() -> {
- ServiceBlockingStub mockStub = mock(ServiceBlockingStub.class);
+ ServiceBlockingStub mockBlockingStub = mock(ServiceBlockingStub.class);
+ ServiceStub mockStub = mock(ServiceStub.class);
NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket();
try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) {
- mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(ManagedChannel.class)))
+ mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(Channel.class)))
+ .thenReturn(mockBlockingStub);
+ mockStaticService.when(() -> ServiceGrpc.newStub(any()))
.thenReturn(mockStub);
try (MockedStatic mockStaticChannelBuilder = mockStatic(
@@ -135,7 +154,7 @@ void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Ex
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(argThat((DomainSocketAddress d) -> {
return d.path() == path;
- })));
+ })), times(2));
}
}
}
@@ -147,11 +166,14 @@ void host_and_port_arg_should_build_tcp_socket() {
final String host = "host.com";
final int port = 1234;
- ServiceBlockingStub mockStub = mock(ServiceBlockingStub.class);
+ ServiceBlockingStub mockBlockingStub = mock(ServiceBlockingStub.class);
+ ServiceStub mockStub = mock(ServiceStub.class);
NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket();
try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) {
- mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(ManagedChannel.class)))
+ mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(Channel.class)))
+ .thenReturn(mockBlockingStub);
+ mockStaticService.when(() -> ServiceGrpc.newStub(any()))
.thenReturn(mockStub);
try (MockedStatic mockStaticChannelBuilder = mockStatic(NettyChannelBuilder.class)) {
@@ -162,7 +184,7 @@ void host_and_port_arg_should_build_tcp_socket() {
// verify host/port matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
- .forAddress(host, port));
+ .forAddress(host, port), times(2));
}
}
}
@@ -173,11 +195,14 @@ void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
final int port = 4321;
new EnvironmentVariables("FLAGD_HOST", host, "FLAGD_PORT", String.valueOf(port)).execute(() -> {
- ServiceBlockingStub mockStub = mock(ServiceBlockingStub.class);
+ ServiceBlockingStub mockBlockingStub = mock(ServiceBlockingStub.class);
+ ServiceStub mockStub = mock(ServiceStub.class);
NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket();
try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) {
- mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(ManagedChannel.class)))
+ mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(Channel.class)))
+ .thenReturn(mockBlockingStub);
+ mockStaticService.when(() -> ServiceGrpc.newStub(any()))
.thenReturn(mockStub);
try (MockedStatic mockStaticChannelBuilder = mockStatic(
@@ -189,7 +214,7 @@ void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
// verify host/port matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
- .forAddress(host, port));
+ .forAddress(host, port), times(2));
}
}
});
@@ -228,48 +253,65 @@ void resolvers_call_grpc_service_and_return_details() {
.build();
ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class);
+ ServiceStub serviceStubMock = mock(ServiceStub.class);
when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class)))
.thenReturn(serviceBlockingStubMock);
when(serviceBlockingStubMock
- .resolveBoolean(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(booleanResponse);
+ .resolveBoolean(argThat(x -> FLAG_KEY_BOOLEAN.equals(x.getFlagKey())))).thenReturn(booleanResponse);
when(serviceBlockingStubMock
- .resolveFloat(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(floatResponse);
+ .resolveFloat(argThat(x -> FLAG_KEY_DOUBLE.equals(x.getFlagKey())))).thenReturn(floatResponse);
when(serviceBlockingStubMock
- .resolveInt(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(intResponse);
+ .resolveInt(argThat(x -> FLAG_KEY_INTEGER.equals(x.getFlagKey())))).thenReturn(intResponse);
when(serviceBlockingStubMock
- .resolveString(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(stringResponse);
+ .resolveString(argThat(x -> FLAG_KEY_STRING.equals(x.getFlagKey())))).thenReturn(stringResponse);
when(serviceBlockingStubMock
- .resolveObject(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(objectResponse);
+ .resolveObject(argThat(x -> FLAG_KEY_OBJECT.equals(x.getFlagKey())))).thenReturn(objectResponse);
- OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock));
+ OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru",
+ 100, 5 ));
- FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY, false);
+ FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false);
assertTrue(booleanDetails.getValue());
assertEquals(BOOL_VARIANT, booleanDetails.getVariant());
assertEquals(DEFAULT.toString(), booleanDetails.getReason());
- FlagEvaluationDetails stringDetails = api.getClient().getStringDetails(FLAG_KEY, "wrong");
+ FlagEvaluationDetails stringDetails = api.getClient().getStringDetails(FLAG_KEY_STRING, "wrong");
assertEquals(STRING_VALUE, stringDetails.getValue());
assertEquals(STRING_VARIANT, stringDetails.getVariant());
assertEquals(DEFAULT.toString(), stringDetails.getReason());
- FlagEvaluationDetails intDetails = api.getClient().getIntegerDetails(FLAG_KEY, 0);
+ FlagEvaluationDetails intDetails = api.getClient().getIntegerDetails(FLAG_KEY_INTEGER, 0);
assertEquals(INT_VALUE, intDetails.getValue());
assertEquals(INT_VARIANT, intDetails.getVariant());
assertEquals(DEFAULT.toString(), intDetails.getReason());
- FlagEvaluationDetails floatDetails = api.getClient().getDoubleDetails(FLAG_KEY, 0.1);
+ FlagEvaluationDetails floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1);
assertEquals(DOUBLE_VALUE, floatDetails.getValue());
assertEquals(DOUBLE_VARIANT, floatDetails.getVariant());
assertEquals(DEFAULT.toString(), floatDetails.getReason());
- FlagEvaluationDetails objectDetails = api.getClient().getObjectDetails(FLAG_KEY, new Value());
+ FlagEvaluationDetails objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value());
assertEquals(INNER_STRUCT_VALUE, objectDetails.getValue().asStructure()
.asMap().get(INNER_STRUCT_KEY).asString());
assertEquals(OBJECT_VARIANT, objectDetails.getVariant());
assertEquals(DEFAULT.toString(), objectDetails.getReason());
}
+ @Test
+ void resolvers_cache_responses_if_static_and_event_stream_alive() {
+ do_resolvers_cache_responses(FlagdProvider.STATIC_REASON, true, true);
+ }
+
+ @Test
+ void resolvers_should_not_cache_responses_if_not_static() {
+ do_resolvers_cache_responses(DEFAULT.toString(), true, false);
+ }
+
+ @Test
+ void resolvers_should_not_cache_responses_if_event_stream_not_alive() {
+ do_resolvers_cache_responses(FlagdProvider.STATIC_REASON, false, false);
+ }
+
@Test
void context_is_parsed_and_passed_to_grpc_service() {
@@ -301,6 +343,7 @@ void context_is_parsed_and_passed_to_grpc_service() {
.build();
ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class);
+ ServiceStub serviceStubMock = mock(ServiceStub.class);
when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class)))
.thenReturn(serviceBlockingStubMock);
when(serviceBlockingStubMock.resolveBoolean(argThat(
@@ -315,7 +358,8 @@ void context_is_parsed_and_passed_to_grpc_service() {
.getStringValue()))))
.thenReturn(booleanResponse);
- OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock));
+ OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru",
+ 100, 5));
MutableContext context = new MutableContext();
context.add(BOOLEAN_ATTR_KEY, BOOLEAN_ATTR_VALUE);
@@ -340,11 +384,12 @@ void set_deadline_deadline_send_in_grpc() {
.build();
ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class);
+ ServiceStub serviceStubMock = mock(ServiceStub.class);
when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class)))
.thenReturn(serviceBlockingStubMock);
when(serviceBlockingStubMock.resolveBoolean(any(ResolveBooleanRequest.class))).thenReturn(badReasonResponse);
- FlagdProvider provider = new FlagdProvider(serviceBlockingStubMock);
+ FlagdProvider provider = new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", 100, 5);
provider.setDeadline(deadline);
OpenFeatureAPI.getInstance().setProvider(provider);
@@ -362,17 +407,129 @@ void reason_mapped_correctly_if_unknown() {
.build();
ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class);
+ ServiceStub serviceStubMock = mock(ServiceStub.class);
when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class)))
.thenReturn(serviceBlockingStubMock);
when(serviceBlockingStubMock.resolveBoolean(any(ResolveBooleanRequest.class))).thenReturn(badReasonResponse);
- OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock));
+ OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", 100, 5));
FlagEvaluationDetails booleanDetails = api.getClient()
.getBooleanDetails(FLAG_KEY, false, new MutableContext());
assertEquals(Reason.UNKNOWN.toString(), booleanDetails.getReason()); // reason should be converted to UNKNOWN
}
+
+ @Test
+ void invalidate_cache() {
+ ResolveBooleanResponse booleanResponse = ResolveBooleanResponse.newBuilder()
+ .setValue(true)
+ .setVariant(BOOL_VARIANT)
+ .setReason(FlagdProvider.STATIC_REASON)
+ .build();
+
+ ResolveStringResponse stringResponse = ResolveStringResponse.newBuilder()
+ .setValue(STRING_VALUE)
+ .setVariant(STRING_VARIANT)
+ .setReason(FlagdProvider.STATIC_REASON)
+ .build();
+
+ ResolveIntResponse intResponse = ResolveIntResponse.newBuilder()
+ .setValue(INT_VALUE)
+ .setVariant(INT_VARIANT)
+ .setReason(FlagdProvider.STATIC_REASON)
+ .build();
+
+ ResolveFloatResponse floatResponse = ResolveFloatResponse.newBuilder()
+ .setValue(DOUBLE_VALUE)
+ .setVariant(DOUBLE_VARIANT)
+ .setReason(FlagdProvider.STATIC_REASON)
+ .build();
+
+ ResolveObjectResponse objectResponse = ResolveObjectResponse.newBuilder()
+ .setValue(PROTOBUF_STRUCTURE_VALUE)
+ .setVariant(OBJECT_VARIANT)
+ .setReason(FlagdProvider.STATIC_REASON)
+ .build();
+
+
+ ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class);
+ ServiceStub serviceStubMock = mock(ServiceStub.class);
+ when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class)))
+ .thenReturn(serviceBlockingStubMock);
+ when(serviceBlockingStubMock
+ .resolveBoolean(argThat(x -> FLAG_KEY_BOOLEAN.equals(x.getFlagKey())))).thenReturn(booleanResponse);
+ when(serviceBlockingStubMock
+ .resolveFloat(argThat(x -> FLAG_KEY_DOUBLE.equals(x.getFlagKey())))).thenReturn(floatResponse);
+ when(serviceBlockingStubMock
+ .resolveInt(argThat(x -> FLAG_KEY_INTEGER.equals(x.getFlagKey())))).thenReturn(intResponse);
+ when(serviceBlockingStubMock
+ .resolveString(argThat(x -> FLAG_KEY_STRING.equals(x.getFlagKey())))).thenReturn(stringResponse);
+ when(serviceBlockingStubMock
+ .resolveObject(argThat(x -> FLAG_KEY_OBJECT.equals(x.getFlagKey())))).thenReturn(objectResponse);
+
+ FlagdProvider provider = new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", 100, 5);
+ ArgumentCaptor> streamObserverCaptor = ArgumentCaptor.forClass(StreamObserver.class);
+ verify(serviceStubMock).eventStream(any(EventStreamRequest.class), streamObserverCaptor.capture());
+
+ provider.setEventStreamAlive(true);
+ OpenFeatureAPI.getInstance().setProvider(provider);
+
+ HashMap flagsMap = new HashMap();
+ HashMap structMap = new HashMap();
+
+ flagsMap.put(FLAG_KEY_BOOLEAN, com.google.protobuf.Value.newBuilder().setStringValue("foo").build());
+ flagsMap.put(FLAG_KEY_STRING, com.google.protobuf.Value.newBuilder().setStringValue("foo").build());
+ flagsMap.put(FLAG_KEY_INTEGER, com.google.protobuf.Value.newBuilder().setStringValue("foo").build());
+ flagsMap.put(FLAG_KEY_DOUBLE, com.google.protobuf.Value.newBuilder().setStringValue("foo").build());
+ flagsMap.put(FLAG_KEY_OBJECT, com.google.protobuf.Value.newBuilder().setStringValue("foo").build());
+
+ structMap.put("flags", com.google.protobuf.Value.newBuilder().
+ setStructValue(Struct.newBuilder().putAllFields(flagsMap)).build());
+
+ EventStreamResponse eResponse = EventStreamResponse.newBuilder()
+ .setType("configuration_change")
+ .setData(Struct.newBuilder().putAllFields(structMap).build())
+ .build();
+
+ // should cache results
+ FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false);
+ FlagEvaluationDetails stringDetails = api.getClient().getStringDetails(FLAG_KEY_STRING, "wrong");
+ FlagEvaluationDetails intDetails = api.getClient().getIntegerDetails(FLAG_KEY_INTEGER, 0);
+ FlagEvaluationDetails floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1);
+ FlagEvaluationDetails objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value());
+
+ // should clear cache
+ streamObserverCaptor.getValue().onNext(eResponse);
+
+ // assert cache has been invalidated
+ booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false);
+ assertTrue(booleanDetails.getValue());
+ assertEquals(BOOL_VARIANT, booleanDetails.getVariant());
+ assertEquals(FlagdProvider.STATIC_REASON, booleanDetails.getReason());
+
+ stringDetails = api.getClient().getStringDetails(FLAG_KEY_STRING, "wrong");
+ assertEquals(STRING_VALUE, stringDetails.getValue());
+ assertEquals(STRING_VARIANT, stringDetails.getVariant());
+ assertEquals(FlagdProvider.STATIC_REASON, stringDetails.getReason());
+
+ intDetails = api.getClient().getIntegerDetails(FLAG_KEY_INTEGER, 0);
+ assertEquals(INT_VALUE, intDetails.getValue());
+ assertEquals(INT_VARIANT, intDetails.getVariant());
+ assertEquals(FlagdProvider.STATIC_REASON, intDetails.getReason());
+
+ floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1);
+ assertEquals(DOUBLE_VALUE, floatDetails.getValue());
+ assertEquals(DOUBLE_VARIANT, floatDetails.getVariant());
+ assertEquals(FlagdProvider.STATIC_REASON, floatDetails.getReason());
+
+ objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value());
+ assertEquals(INNER_STRUCT_VALUE, objectDetails.getValue().asStructure()
+ .asMap().get(INNER_STRUCT_KEY).asString());
+ assertEquals(OBJECT_VARIANT, objectDetails.getVariant());
+ assertEquals(FlagdProvider.STATIC_REASON, objectDetails.getReason());
+ }
+
private NettyChannelBuilder getMockChannelBuilderSocket() {
NettyChannelBuilder mockChannelBuilder = mock(NettyChannelBuilder.class);
when(mockChannelBuilder.eventLoopGroup(any(EventLoopGroup.class))).thenReturn(mockChannelBuilder);
@@ -381,4 +538,91 @@ private NettyChannelBuilder getMockChannelBuilderSocket() {
when(mockChannelBuilder.build()).thenReturn(null);
return mockChannelBuilder;
}
+
+ private void do_resolvers_cache_responses(String reason, Boolean eventStreamAlive, Boolean shouldCache) {
+ String expectedReason = FlagdProvider.CACHED_REASON;
+ if (!shouldCache) {
+ expectedReason = reason;
+ }
+
+ ResolveBooleanResponse booleanResponse = ResolveBooleanResponse.newBuilder()
+ .setValue(true)
+ .setVariant(BOOL_VARIANT)
+ .setReason(reason)
+ .build();
+
+ ResolveStringResponse stringResponse = ResolveStringResponse.newBuilder()
+ .setValue(STRING_VALUE)
+ .setVariant(STRING_VARIANT)
+ .setReason(reason)
+ .build();
+
+ ResolveIntResponse intResponse = ResolveIntResponse.newBuilder()
+ .setValue(INT_VALUE)
+ .setVariant(INT_VARIANT)
+ .setReason(reason)
+ .build();
+
+ ResolveFloatResponse floatResponse = ResolveFloatResponse.newBuilder()
+ .setValue(DOUBLE_VALUE)
+ .setVariant(DOUBLE_VARIANT)
+ .setReason(reason)
+ .build();
+
+ ResolveObjectResponse objectResponse = ResolveObjectResponse.newBuilder()
+ .setValue(PROTOBUF_STRUCTURE_VALUE)
+ .setVariant(OBJECT_VARIANT)
+ .setReason(reason)
+ .build();
+
+ ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class);
+ ServiceStub serviceStubMock = mock(ServiceStub.class);
+ when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class)))
+ .thenReturn(serviceBlockingStubMock);
+ when(serviceBlockingStubMock
+ .resolveBoolean(argThat(x -> FLAG_KEY_BOOLEAN.equals(x.getFlagKey())))).thenReturn(booleanResponse);
+ when(serviceBlockingStubMock
+ .resolveFloat(argThat(x -> FLAG_KEY_DOUBLE.equals(x.getFlagKey())))).thenReturn(floatResponse);
+ when(serviceBlockingStubMock
+ .resolveInt(argThat(x -> FLAG_KEY_INTEGER.equals(x.getFlagKey())))).thenReturn(intResponse);
+ when(serviceBlockingStubMock
+ .resolveString(argThat(x -> FLAG_KEY_STRING.equals(x.getFlagKey())))).thenReturn(stringResponse);
+ when(serviceBlockingStubMock
+ .resolveObject(argThat(x -> FLAG_KEY_OBJECT.equals(x.getFlagKey())))).thenReturn(objectResponse);
+
+ FlagdProvider provider = new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", 100, 5);
+ provider.setEventStreamAlive(eventStreamAlive); // caching only available when event stream is alive
+ OpenFeatureAPI.getInstance().setProvider(provider);
+
+ FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false);
+ booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false); // should retrieve from cache on second invocation
+ assertTrue(booleanDetails.getValue());
+ assertEquals(BOOL_VARIANT, booleanDetails.getVariant());
+ assertEquals(expectedReason, booleanDetails.getReason());
+
+ FlagEvaluationDetails stringDetails = api.getClient().getStringDetails(FLAG_KEY_STRING, "wrong");
+ stringDetails = api.getClient().getStringDetails(FLAG_KEY_STRING, "wrong");
+ assertEquals(STRING_VALUE, stringDetails.getValue());
+ assertEquals(STRING_VARIANT, stringDetails.getVariant());
+ assertEquals(expectedReason, stringDetails.getReason());
+
+ FlagEvaluationDetails intDetails = api.getClient().getIntegerDetails(FLAG_KEY_INTEGER, 0);
+ intDetails = api.getClient().getIntegerDetails(FLAG_KEY_INTEGER, 0);
+ assertEquals(INT_VALUE, intDetails.getValue());
+ assertEquals(INT_VARIANT, intDetails.getVariant());
+ assertEquals(expectedReason, intDetails.getReason());
+
+ FlagEvaluationDetails floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1);
+ floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1);
+ assertEquals(DOUBLE_VALUE, floatDetails.getValue());
+ assertEquals(DOUBLE_VARIANT, floatDetails.getVariant());
+ assertEquals(expectedReason, floatDetails.getReason());
+
+ FlagEvaluationDetails objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value());
+ objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value());
+ assertEquals(INNER_STRUCT_VALUE, objectDetails.getValue().asStructure()
+ .asMap().get(INNER_STRUCT_KEY).asString());
+ assertEquals(OBJECT_VARIANT, objectDetails.getVariant());
+ assertEquals(expectedReason, objectDetails.getReason());
+ }
}
\ No newline at end of file