From cbf6ed3d27b7ba19344033c95abf7c4c2ad100e2 Mon Sep 17 00:00:00 2001 From: Skye Gill Date: Fri, 16 Dec 2022 09:37:39 +0000 Subject: [PATCH 1/9] feat!: implement caching of flagd provider Signed-off-by: Skye Gill --- providers/flagd/pom.xml | 9 +- providers/flagd/schemas | 2 +- .../providers/flagd/EventStreamCallback.java | 10 + .../providers/flagd/EventStreamObserver.java | 101 ++++++ .../providers/flagd/FlagdProvider.java | 298 ++++++++++++++++-- 5 files changed, 394 insertions(+), 26 deletions(-) create mode 100644 providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java create mode 100644 providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java diff --git a/providers/flagd/pom.xml b/providers/flagd/pom.xml index 2331d45fc..15d486619 100644 --- a/providers/flagd/pom.xml +++ b/providers/flagd/pom.xml @@ -61,6 +61,12 @@ 6.0.53 provided + + + org.apache.commons + commons-collections4 + 4.4 + @@ -86,13 +92,14 @@ exec - + git submodule update --init --recursive + --remote diff --git a/providers/flagd/schemas b/providers/flagd/schemas index 910fa3391..aa5714e2c 160000 --- a/providers/flagd/schemas +++ b/providers/flagd/schemas @@ -1 +1 @@ -Subproject commit 910fa3391adcddbbd8056879c3d7e1b465686bf6 +Subproject commit aa5714e2ce011457a5ca18029b0036cc32905c0f diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java new file mode 100644 index 000000000..73822176f --- /dev/null +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java @@ -0,0 +1,10 @@ +package dev.openfeature.contrib.providers.flagd; + +/** + * Defines behaviour required of event stream callbacks. + */ +public interface EventStreamCallback { + void setEventStreamAlive(Boolean alive); + + void restartEventStream() throws Exception; +} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java new file mode 100644 index 000000000..71e457785 --- /dev/null +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java @@ -0,0 +1,101 @@ +package dev.openfeature.contrib.providers.flagd; + +import java.util.Map; +import io.grpc.stub.StreamObserver; +import dev.openfeature.flagd.grpc.Schema.EventStreamResponse; +import dev.openfeature.sdk.ProviderEvaluation; +import com.google.protobuf.Value; + +/** + * EventStreamObserver handles events emitted by flagd. + */ +public class EventStreamObserver implements StreamObserver { + private EventStreamCallback callback; + private Boolean cacheEnabled; + private Map> booleanCache; + private Map> stringCache; + private Map> doubleCache; + private Map> integerCache; + private Map> objectCache; + + + static final String CONFIGURATION_CHANGE = "configuration_change"; + static final String PROVIDER_READY = "provider_ready"; + + EventStreamObserver( + Boolean cacheEnabled, Map> booleanCache, + Map> stringCache, Map> doubleCache, + Map> integerCache, + Map> objectCache, EventStreamCallback callback + ) { + this.cacheEnabled = cacheEnabled; + this.booleanCache = booleanCache; + this.stringCache = stringCache; + this.doubleCache = doubleCache; + this.integerCache = integerCache; + this.objectCache = objectCache; + this.callback = callback; + } + + @Override + public void onNext(EventStreamResponse value) { + switch (value.getType()) { + case CONFIGURATION_CHANGE: + this.handleConfigurationChangeEvent(value); + break; + case PROVIDER_READY: + this.handleProviderReadyEvent(); + break; + default: + // log + return; + } + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + this.purgeCache(); + this.callback.setEventStreamAlive(false); + try { + this.callback.restartEventStream(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void onCompleted() { + this.purgeCache(); + this.callback.setEventStreamAlive(false); + } + + private void handleConfigurationChangeEvent(EventStreamResponse value) { + if (!this.cacheEnabled) { + return; + } + + Map data = value.getData().getFieldsMap(); + Value flagKeyValue = data.get("flagKey"); + String flagKey = flagKeyValue.getStringValue(); + + this.booleanCache.remove(flagKey); + this.stringCache.remove(flagKey); + this.doubleCache.remove(flagKey); + this.integerCache.remove(flagKey); + this.objectCache.remove(flagKey); + } + + private void handleProviderReadyEvent() { + this.purgeCache(); + this.callback.setEventStreamAlive(true); + } + + private void purgeCache() { + this.booleanCache.clear(); + this.stringCache.clear(); + this.doubleCache.clear(); + this.integerCache.clear(); + this.objectCache.clear(); + } +} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 014ff303e..130eb607e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -5,6 +5,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import javax.net.ssl.SSLException; @@ -21,6 +24,7 @@ import dev.openfeature.flagd.grpc.Schema.ResolveStringResponse; import dev.openfeature.flagd.grpc.ServiceGrpc; import dev.openfeature.flagd.grpc.ServiceGrpc.ServiceBlockingStub; +import dev.openfeature.flagd.grpc.ServiceGrpc.ServiceStub; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.FeatureProvider; import dev.openfeature.sdk.Metadata; @@ -29,17 +33,23 @@ import dev.openfeature.sdk.Value; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; import io.netty.channel.epoll.EpollDomainSocketChannel; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.ssl.SslContextBuilder; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.map.LRUMap; +import java.util.Collections; +import dev.openfeature.flagd.grpc.Schema.EventStreamRequest; +import dev.openfeature.flagd.grpc.Schema.EventStreamResponse; + /** * OpenFeature provider for flagd. */ @Slf4j -public class FlagdProvider implements FeatureProvider { +public class FlagdProvider implements FeatureProvider, EventStreamCallback { static final String PROVIDER_NAME = "flagD Provider"; static final String DEFAULT_PORT = "8013"; @@ -52,9 +62,35 @@ public class FlagdProvider implements FeatureProvider { static final String TLS_ENV_VAR_NAME = "FLAGD_TLS"; static final String SOCKET_PATH_ENV_VAR_NAME = "FLAGD_SOCKET_PATH"; static final String SERVER_CERT_PATH_ENV_VAR_NAME = "FLAGD_SERVER_CERT_PATH"; + static final String CACHE_ENV_VAR_NAME = "FLAGD_CACHE"; + + static final String STATIC_REASON = "STATIC"; + static final String CACHED_REASON = "CACHED"; + + static final String LRU_CACHE = "lru"; + static final String DEFAULT_CACHE = LRU_CACHE; + static final int DEFAULT_MAX_CACHE_SIZE = 1000; + + static final int DEFAULT_MAX_EVENT_STREAM_RETRIES = 5; + static final int BASE_EVENT_STREAM_RETRY_BACKOFF_MS = 1000; private long deadline = DEFAULT_DEADLINE; - private ServiceBlockingStub serviceStub; + private ServiceBlockingStub serviceBlockingStub; + private ServiceStub serviceStub; + + private Boolean cacheEnabled; + private Boolean eventStreamAlive; + private Map> booleanCache; + private Map> stringCache; + private Map> doubleCache; + private Map> integerCache; + private Map> objectCache; + + private int eventStreamAttempt = 1; + private int eventStreamRetryBackoff = BASE_EVENT_STREAM_RETRY_BACKOFF_MS; + private int maxEventStreamRetries = DEFAULT_MAX_EVENT_STREAM_RETRIES; + + private ReadWriteLock lock = new ReentrantReadWriteLock(); /** * Create a new FlagdProvider instance. @@ -62,7 +98,29 @@ public class FlagdProvider implements FeatureProvider { * @param socketPath unix socket path */ public FlagdProvider(String socketPath) { - this(buildServiceStub(null, null, null, null, socketPath)); + this( + buildServiceBlockingStub(null, null, null, null, socketPath), + buildServiceStub(null, null, null, null, socketPath), + fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), DEFAULT_MAX_CACHE_SIZE, + DEFAULT_MAX_EVENT_STREAM_RETRIES + ); + } + + /** + * Create a new FlagdProvider instance. + * + * @param socketPath unix socket path + * @param cache caching implementation to use (lru) + * @param maxCacheSize limit of the number of cached values for each type of flag + * @param maxEventStreamRetries limit of the number of attempts to connect to flagd's event stream, + * on successful connection the attempts are reset + */ + public FlagdProvider(String socketPath, String cache, int maxCacheSize, int maxEventStreamRetries) { + this( + buildServiceBlockingStub(null, null, null, null, socketPath), + buildServiceStub(null, null, null, null, socketPath), + cache, maxCacheSize, maxEventStreamRetries + ); } /** @@ -76,18 +134,86 @@ public FlagdProvider(String socketPath) { * */ public FlagdProvider(String host, int port, boolean tls, String certPath) { - this(buildServiceStub(host, port, tls, certPath, null)); + this( + buildServiceBlockingStub(host, port, tls, certPath, null), + buildServiceStub(host, port, tls, certPath, null), + fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), DEFAULT_MAX_CACHE_SIZE, + DEFAULT_MAX_EVENT_STREAM_RETRIES + ); + } + + /** + * Create a new FlagdProvider instance. + * + * @param host flagd server host, defaults to "localhost" + * @param port flagd server port, defaults to 8013 + * @param tls use TLS, defaults to false + * @param certPath path for server certificate, defaults to null to, using + * system certs + * @param cache caching implementation to use (lru) + * @param maxCacheSize limit of the number of cached values for each type of flag + * @param maxEventStreamRetries limit of the number of attempts to connect to flagd's event stream, + * on successful connection the attempts are reset + * + */ + public FlagdProvider(String host, int port, boolean tls, String certPath, String cache, + int maxCacheSize, int maxEventStreamRetries) { + this( + buildServiceBlockingStub(host, port, tls, certPath, null), + buildServiceStub(host, port, tls, certPath, null), + cache, maxCacheSize, maxEventStreamRetries + ); } /** * Create a new FlagdProvider instance. */ public FlagdProvider() { - this(buildServiceStub(null, null, null, null, null)); + this( + buildServiceBlockingStub(null, null, null, null, null), + buildServiceStub(null, null, null, null, null), + fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), DEFAULT_MAX_CACHE_SIZE, + DEFAULT_MAX_EVENT_STREAM_RETRIES + ); } - FlagdProvider(ServiceBlockingStub serviceStub) { + FlagdProvider(ServiceBlockingStub serviceBlockingStub, ServiceStub serviceStub, String cache, + int maxCacheSize, int maxEventStreamRetries) { + this.serviceBlockingStub = serviceBlockingStub; this.serviceStub = serviceStub; + this.eventStreamAlive = false; + if (cache != null) { + initCache(cache, maxCacheSize); + this.cacheEnabled = true; + } + this.maxEventStreamRetries = maxEventStreamRetries; + this.handleEvents(); + } + + private void initCache(String cache, int maxSize) { + switch (cache) { + case LRU_CACHE: + this.booleanCache = Collections.synchronizedMap( + new LRUMap>(maxSize)); + this.stringCache = Collections.synchronizedMap(new LRUMap>(maxSize)); + this.doubleCache = Collections.synchronizedMap(new LRUMap>(maxSize)); + this.integerCache = Collections.synchronizedMap( + new LRUMap>(maxSize)); + this.objectCache = Collections.synchronizedMap(new LRUMap>(maxSize)); + break; + default: + initCache(DEFAULT_CACHE, maxSize); + return; + } + } + + private Boolean cacheAvailable() { + Lock l = this.lock.readLock(); + l.lock(); + Boolean available = this.cacheEnabled && this.eventStreamAlive; + l.unlock(); + + return available; } @Override @@ -103,80 +229,155 @@ public String getName() { @Override public ProviderEvaluation getBooleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx) { + + if (this.cacheAvailable()) { + ProviderEvaluation fromCache = this.booleanCache.get(key); + if (fromCache != null) { + fromCache.setReason(CACHED_REASON); + return fromCache; + } + } ResolveBooleanRequest request = ResolveBooleanRequest.newBuilder() .setFlagKey(key) .setContext(this.convertContext(ctx)) .build(); - ResolveBooleanResponse r = this.serviceStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS) + ResolveBooleanResponse r = this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS) .resolveBoolean(request); - return ProviderEvaluation.builder() + ProviderEvaluation result = ProviderEvaluation.builder() .value(r.getValue()) .variant(r.getVariant()) .reason(r.getReason()) .build(); + + + if (this.isEvaluationCacheable(result)) { + this.booleanCache.put(key, result); + } + + return result; } @Override public ProviderEvaluation getStringEvaluation(String key, String defaultValue, EvaluationContext ctx) { + + if (this.cacheAvailable()) { + ProviderEvaluation fromCache = this.stringCache.get(key); + if (fromCache != null) { + fromCache.setReason(CACHED_REASON); + return fromCache; + } + } + ResolveStringRequest request = ResolveStringRequest.newBuilder() .setFlagKey(key) .setContext(this.convertContext(ctx)).build(); - ResolveStringResponse r = this.serviceStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS) + ResolveStringResponse r = this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS) .resolveString(request); - return ProviderEvaluation.builder().value(r.getValue()) + ProviderEvaluation result = ProviderEvaluation.builder().value(r.getValue()) .variant(r.getVariant()) .reason(r.getReason()) .build(); + + if (this.isEvaluationCacheable(result)) { + this.stringCache.put(key, result); + } + + return result; } @Override public ProviderEvaluation getDoubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) { + + if (this.cacheAvailable()) { + ProviderEvaluation fromCache = this.doubleCache.get(key); + if (fromCache != null) { + fromCache.setReason(CACHED_REASON); + return fromCache; + } + } + ResolveFloatRequest request = ResolveFloatRequest.newBuilder() .setFlagKey(key) .setContext(this.convertContext(ctx)) .build(); - ResolveFloatResponse r = this.serviceStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS) + ResolveFloatResponse r = this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS) .resolveFloat(request); - return ProviderEvaluation.builder() + ProviderEvaluation result = ProviderEvaluation.builder() .value(r.getValue()) .variant(r.getVariant()) .reason(r.getReason()) .build(); + + if (this.isEvaluationCacheable(result)) { + this.doubleCache.put(key, result); + } + + return result; } @Override public ProviderEvaluation getIntegerEvaluation(String key, Integer defaultValue, EvaluationContext ctx) { + + if (this.cacheAvailable()) { + ProviderEvaluation fromCache = this.integerCache.get(key); + if (fromCache != null) { + fromCache.setReason(CACHED_REASON); + return fromCache; + } + } + ResolveIntRequest request = ResolveIntRequest.newBuilder() .setFlagKey(key) .setContext(this.convertContext(ctx)) .build(); - ResolveIntResponse r = this.serviceStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS) + ResolveIntResponse r = this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS) .resolveInt(request); - return ProviderEvaluation.builder() + ProviderEvaluation result = ProviderEvaluation.builder() .value((int) r.getValue()) .variant(r.getVariant()) .reason(r.getReason()) .build(); + + if (this.isEvaluationCacheable(result)) { + this.integerCache.put(key, result); + } + + return result; } @Override public ProviderEvaluation getObjectEvaluation(String key, Value defaultValue, EvaluationContext ctx) { + + if (this.cacheAvailable()) { + ProviderEvaluation fromCache = this.objectCache.get(key); + if (fromCache != null) { + fromCache.setReason(CACHED_REASON); + return fromCache; + } + } + ResolveObjectRequest request = ResolveObjectRequest.newBuilder() .setFlagKey(key) .setContext(this.convertContext(ctx)) .build(); - ResolveObjectResponse r = this.serviceStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS) + ResolveObjectResponse r = this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS) .resolveObject(request); - return ProviderEvaluation.builder() + ProviderEvaluation result = ProviderEvaluation.builder() .value(this.convertObjectResponse(r.getValue())) .variant(r.getVariant()) .reason(r.getReason()) .build(); + + if (this.isEvaluationCacheable(result)) { + this.objectCache.put(key, result); + } + + return result; } /** @@ -311,9 +512,8 @@ private Value convertPrimitive(com.google.protobuf.Value protobuf) { return value; } - private static ServiceBlockingStub buildServiceStub(String host, Integer port, Boolean tls, String certPath, - String socketPath) { - + private static NettyChannelBuilder channelBuilder(String host, Integer port, Boolean tls, String certPath, + String socketPath) { host = host != null ? host : fallBackToEnvOrDefault(HOST_ENV_VAR_NAME, DEFAULT_HOST); port = port != null ? port : Integer.parseInt(fallBackToEnvOrDefault(PORT_ENV_VAR_NAME, DEFAULT_PORT)); tls = tls != null ? tls : Boolean.parseBoolean(fallBackToEnvOrDefault(TLS_ENV_VAR_NAME, DEFAULT_TLS)); @@ -322,12 +522,11 @@ private static ServiceBlockingStub buildServiceStub(String host, Integer port, B // we have a socket path specified, build a channel with a unix socket if (socketPath != null) { - return ServiceGrpc.newBlockingStub(NettyChannelBuilder + return NettyChannelBuilder .forAddress(new DomainSocketAddress(socketPath)) .eventLoopGroup(new EpollEventLoopGroup()) .channelType(EpollDomainSocketChannel.class) - .usePlaintext() - .build()); + .usePlaintext(); } // build a TCP socket @@ -345,8 +544,7 @@ private static ServiceBlockingStub buildServiceStub(String host, Integer port, B builder.usePlaintext(); } - return ServiceGrpc - .newBlockingStub(builder.build()); + return builder; } catch (SSLException ssle) { SslConfigException sslConfigException = new SslConfigException("Error with SSL configuration."); sslConfigException.initCause(ssle); @@ -354,7 +552,59 @@ private static ServiceBlockingStub buildServiceStub(String host, Integer port, B } } + private static ServiceBlockingStub buildServiceBlockingStub(String host, Integer port, Boolean tls, String certPath, + String socketPath) { + return ServiceGrpc.newBlockingStub(channelBuilder(host, port, tls, certPath, socketPath).build()); + } + + private static ServiceStub buildServiceStub(String host, Integer port, Boolean tls, String certPath, + String socketPath) { + return ServiceGrpc.newStub(channelBuilder(host, port, tls, certPath, socketPath).build()); + } + private static String fallBackToEnvOrDefault(String key, String defaultValue) { return System.getenv(key) != null ? System.getenv(key) : defaultValue; } + + private void handleEvents() { + StreamObserver responseObserver = + new EventStreamObserver(this.cacheEnabled, this.booleanCache, this.stringCache, + this.doubleCache, this.integerCache, this.objectCache, this); + this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver); + } + + @Override + public void setEventStreamAlive(Boolean alive) { + Lock l = this.lock.writeLock(); + try { + l.lock(); + this.eventStreamAlive = alive; + if (alive) { + // reset attempts on successful connection + this.eventStreamAttempt = 1; + this.eventStreamRetryBackoff = BASE_EVENT_STREAM_RETRY_BACKOFF_MS; + } + } finally { + l.unlock(); + } + } + + private Boolean isEvaluationCacheable(ProviderEvaluation evaluation) { + String reason = evaluation.getReason(); + + return reason != null && reason.equals(STATIC_REASON) && this.cacheAvailable(); + } + + @Override + public void restartEventStream() throws Exception { + this.eventStreamAttempt++; + if (this.eventStreamAttempt > this.maxEventStreamRetries) { + // log + return; + } + this.eventStreamRetryBackoff = 2 * this.eventStreamRetryBackoff; + Thread.sleep(this.eventStreamRetryBackoff); + + this.handleEvents(); + } } From 182e5d98a1e15eba428da89929b1ef2c0e4c6d86 Mon Sep 17 00:00:00 2001 From: Skye Gill Date: Fri, 16 Dec 2022 17:44:11 +0000 Subject: [PATCH 2/9] caching testing & documentation Signed-off-by: Skye Gill --- providers/flagd/README.md | 25 ++- .../providers/flagd/FlagdProvider.java | 33 +++- .../providers/flagd/FlagdProviderTest.java | 158 +++++++++++++++--- 3 files changed, 183 insertions(+), 33 deletions(-) diff --git a/providers/flagd/README.md b/providers/flagd/README.md index abc812211..a262d6ba7 100644 --- a/providers/flagd/README.md +++ b/providers/flagd/README.md @@ -26,13 +26,16 @@ OpenFeatureAPI.getInstance().setProvider(provider); Options can be defined in the constructor or as environment variables, with constructor options having the highest precedence. -| Option name | Environment variable name | Type | Default | -| ----------- | ------------------------- | ------- | --------- | -| host | FLAGD_HOST | string | localhost | -| port | FLAGD_PORT | number | 8013 | -| tls | FLAGD_TLS | boolean | false | -| socketPath | FLAGD_SOCKET_PATH | string | - | -| certPath | FLAGD_SERVER_CERT_PATH | string | - | +| Option name | Environment variable name | Type | Default | Values | +| --------------------- | ------------------------------- | ------- | --------- | ------------- | +| host | FLAGD_HOST | string | localhost | | +| port | FLAGD_PORT | number | 8013 | | +| tls | FLAGD_TLS | boolean | false | | +| socketPath | FLAGD_SOCKET_PATH | string | - | | +| certPath | FLAGD_SERVER_CERT_PATH | string | - | | +| cache | FLAGD_CACHE | string | lru | lru,disabled | +| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | | +| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | | ### Unix socket support @@ -53,3 +56,11 @@ The default deadline is 500ms, though evaluations typically take on the order of Though not required in deployments where flagd runs on the same host as the workload, TLS is available. :warning: Note that there's a [vulnerability](https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1042268) in [netty](https://github.com/netty/netty), a transitive dependency of the underlying gRPC libraries used in the flagd-provider that fails to correctly validate certificates. This will be addressed in netty v5. + +## Caching + +The provider attempts to establish a connection to flagd's event stream (up to 5 times by default). If the connection is successful and caching is enabled each flag returned with reason `STATIC` is cached until an event is received concerning the cached flag (at which point it is removed from cache). + +On invocation of a flag evaluation (if caching is available) an attempt is made to retrieve the entry from cache, if found the flag is returned with reason `CACHED`. + +By default, the provider is configured to use [least recently used (lru)](https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/map/LRUMap.html) caching with up to 1000 entries per flag type. diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 130eb607e..447c61bd7 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -38,6 +38,7 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.ssl.SslContextBuilder; +import lombok.val; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.map.LRUMap; @@ -63,11 +64,14 @@ public class FlagdProvider implements FeatureProvider, EventStreamCallback { static final String SOCKET_PATH_ENV_VAR_NAME = "FLAGD_SOCKET_PATH"; static final String SERVER_CERT_PATH_ENV_VAR_NAME = "FLAGD_SERVER_CERT_PATH"; static final String CACHE_ENV_VAR_NAME = "FLAGD_CACHE"; + static final String MAX_CACHE_SIZE_ENV_VAR_NAME = "FLAGD_MAX_CACHE_SIZE"; + static final String MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME = "FLAGD_MAX_EVENT_STREAM_RETRIES"; static final String STATIC_REASON = "STATIC"; static final String CACHED_REASON = "CACHED"; static final String LRU_CACHE = "lru"; + static final String DISABLED = "disabled"; static final String DEFAULT_CACHE = LRU_CACHE; static final int DEFAULT_MAX_CACHE_SIZE = 1000; @@ -101,8 +105,9 @@ public FlagdProvider(String socketPath) { this( buildServiceBlockingStub(null, null, null, null, socketPath), buildServiceStub(null, null, null, null, socketPath), - fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), DEFAULT_MAX_CACHE_SIZE, - DEFAULT_MAX_EVENT_STREAM_RETRIES + fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), + fallBackToEnvOrDefault(MAX_CACHE_SIZE_ENV_VAR_NAME, DEFAULT_MAX_CACHE_SIZE), + fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES) ); } @@ -137,8 +142,9 @@ public FlagdProvider(String host, int port, boolean tls, String certPath) { this( buildServiceBlockingStub(host, port, tls, certPath, null), buildServiceStub(host, port, tls, certPath, null), - fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), DEFAULT_MAX_CACHE_SIZE, - DEFAULT_MAX_EVENT_STREAM_RETRIES + fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), + fallBackToEnvOrDefault(MAX_CACHE_SIZE_ENV_VAR_NAME, DEFAULT_MAX_CACHE_SIZE), + fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES) ); } @@ -172,8 +178,9 @@ public FlagdProvider() { this( buildServiceBlockingStub(null, null, null, null, null), buildServiceStub(null, null, null, null, null), - fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), DEFAULT_MAX_CACHE_SIZE, - DEFAULT_MAX_EVENT_STREAM_RETRIES + fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), + fallBackToEnvOrDefault(MAX_CACHE_SIZE_ENV_VAR_NAME, DEFAULT_MAX_CACHE_SIZE), + fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES) ); } @@ -184,7 +191,6 @@ public FlagdProvider() { this.eventStreamAlive = false; if (cache != null) { initCache(cache, maxCacheSize); - this.cacheEnabled = true; } this.maxEventStreamRetries = maxEventStreamRetries; this.handleEvents(); @@ -201,10 +207,14 @@ private void initCache(String cache, int maxSize) { new LRUMap>(maxSize)); this.objectCache = Collections.synchronizedMap(new LRUMap>(maxSize)); break; + case DISABLED: + return; default: initCache(DEFAULT_CACHE, maxSize); return; } + + this.cacheEnabled = true; } private Boolean cacheAvailable() { @@ -566,6 +576,15 @@ private static String fallBackToEnvOrDefault(String key, String defaultValue) { return System.getenv(key) != null ? System.getenv(key) : defaultValue; } + private static int fallBackToEnvOrDefault(String key, int defaultValue) { + try { + int value = System.getenv(key) != null ? Integer.parseInt(System.getenv(key)) : defaultValue; + return value; + } catch (Exception e) { + return defaultValue; + } + } + private void handleEvents() { StreamObserver responseObserver = new EventStreamObserver(this.cacheEnabled, this.booleanCache, this.stringCache, diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index 3d2487146..c995f2ab5 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -30,6 +31,7 @@ import dev.openfeature.flagd.grpc.Schema.ResolveStringResponse; import dev.openfeature.flagd.grpc.ServiceGrpc; import dev.openfeature.flagd.grpc.ServiceGrpc.ServiceBlockingStub; +import dev.openfeature.flagd.grpc.ServiceGrpc.ServiceStub; import dev.openfeature.sdk.FlagEvaluationDetails; import dev.openfeature.sdk.MutableContext; import dev.openfeature.sdk.MutableStructure; @@ -37,7 +39,6 @@ import dev.openfeature.sdk.Reason; import dev.openfeature.sdk.Structure; import dev.openfeature.sdk.Value; -import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; @@ -79,11 +80,14 @@ public static void init() { void path_arg_should_build_domain_socket_with_correct_path() { final String path = "/some/path"; - ServiceBlockingStub mockStub = mock(ServiceBlockingStub.class); + ServiceBlockingStub mockBlockingStub = mock(ServiceBlockingStub.class); + ServiceStub mockStub = mock(ServiceStub.class); NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket(); try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) { - mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(ManagedChannel.class))) + mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any())) // why does any(ManagedChannel.class) fail? + .thenReturn(mockBlockingStub); + mockStaticService.when(() -> ServiceGrpc.newStub(any())) .thenReturn(mockStub); try (MockedStatic mockStaticChannelBuilder = mockStatic(NettyChannelBuilder.class)) { @@ -92,8 +96,7 @@ void path_arg_should_build_domain_socket_with_correct_path() { EpollEventLoopGroup.class, (mock, context) -> { })) { - mockStaticChannelBuilder.when(() -> NettyChannelBuilder - .forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder); + when(NettyChannelBuilder.forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder); new FlagdProvider(path); // verify path matches @@ -101,7 +104,7 @@ void path_arg_should_build_domain_socket_with_correct_path() { .forAddress(argThat((DomainSocketAddress d) -> { assertEquals(d.path(), path); // path should match return true; - }))); + })), times(2)); } } } @@ -113,11 +116,14 @@ void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Ex new EnvironmentVariables("FLAGD_SOCKET_PATH", path).execute(() -> { - ServiceBlockingStub mockStub = mock(ServiceBlockingStub.class); + ServiceBlockingStub mockBlockingStub = mock(ServiceBlockingStub.class); + ServiceStub mockStub = mock(ServiceStub.class); NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket(); try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) { - mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(ManagedChannel.class))) + mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any())) + .thenReturn(mockBlockingStub); + mockStaticService.when(() -> ServiceGrpc.newStub(any())) .thenReturn(mockStub); try (MockedStatic mockStaticChannelBuilder = mockStatic( @@ -135,7 +141,7 @@ void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Ex mockStaticChannelBuilder.verify(() -> NettyChannelBuilder .forAddress(argThat((DomainSocketAddress d) -> { return d.path() == path; - }))); + })), times(2)); } } } @@ -147,11 +153,14 @@ void host_and_port_arg_should_build_tcp_socket() { final String host = "host.com"; final int port = 1234; - ServiceBlockingStub mockStub = mock(ServiceBlockingStub.class); + ServiceBlockingStub mockBlockingStub = mock(ServiceBlockingStub.class); + ServiceStub mockStub = mock(ServiceStub.class); NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket(); try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) { - mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(ManagedChannel.class))) + mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any())) + .thenReturn(mockBlockingStub); + mockStaticService.when(() -> ServiceGrpc.newStub(any())) .thenReturn(mockStub); try (MockedStatic mockStaticChannelBuilder = mockStatic(NettyChannelBuilder.class)) { @@ -162,7 +171,7 @@ void host_and_port_arg_should_build_tcp_socket() { // verify host/port matches mockStaticChannelBuilder.verify(() -> NettyChannelBuilder - .forAddress(host, port)); + .forAddress(host, port), times(2)); } } } @@ -173,11 +182,14 @@ void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception { final int port = 4321; new EnvironmentVariables("FLAGD_HOST", host, "FLAGD_PORT", String.valueOf(port)).execute(() -> { - ServiceBlockingStub mockStub = mock(ServiceBlockingStub.class); + ServiceBlockingStub mockBlockingStub = mock(ServiceBlockingStub.class); + ServiceStub mockStub = mock(ServiceStub.class); NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket(); try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) { - mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(ManagedChannel.class))) + mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any())) + .thenReturn(mockBlockingStub); + mockStaticService.when(() -> ServiceGrpc.newStub(any())) .thenReturn(mockStub); try (MockedStatic mockStaticChannelBuilder = mockStatic( @@ -189,7 +201,7 @@ void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception { // verify host/port matches mockStaticChannelBuilder.verify(() -> NettyChannelBuilder - .forAddress(host, port)); + .forAddress(host, port), times(2)); } } }); @@ -228,6 +240,7 @@ void resolvers_call_grpc_service_and_return_details() { .build(); ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class); + ServiceStub serviceStubMock = mock(ServiceStub.class); when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class))) .thenReturn(serviceBlockingStubMock); when(serviceBlockingStubMock @@ -241,7 +254,8 @@ void resolvers_call_grpc_service_and_return_details() { when(serviceBlockingStubMock .resolveObject(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(objectResponse); - OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock)); + OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", + 100, 5 )); FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY, false); assertTrue(booleanDetails.getValue()); @@ -270,6 +284,21 @@ void resolvers_call_grpc_service_and_return_details() { assertEquals(DEFAULT.toString(), objectDetails.getReason()); } + @Test + void resolvers_cache_responses_if_static_and_event_stream_alive() { + do_resolvers_cache_responses(FlagdProvider.STATIC_REASON, true, true); + } + + @Test + void resolvers_should_not_cache_responses_if_not_static() { + do_resolvers_cache_responses(DEFAULT.toString(), true, false); + } + + @Test + void resolvers_should_not_cache_responses_if_event_stream_not_alive() { + do_resolvers_cache_responses(FlagdProvider.STATIC_REASON, false, false); + } + @Test void context_is_parsed_and_passed_to_grpc_service() { @@ -301,6 +330,7 @@ void context_is_parsed_and_passed_to_grpc_service() { .build(); ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class); + ServiceStub serviceStubMock = mock(ServiceStub.class); when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class))) .thenReturn(serviceBlockingStubMock); when(serviceBlockingStubMock.resolveBoolean(argThat( @@ -315,7 +345,8 @@ void context_is_parsed_and_passed_to_grpc_service() { .getStringValue())))) .thenReturn(booleanResponse); - OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock)); + OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", + 100, 5)); MutableContext context = new MutableContext(); context.add(BOOLEAN_ATTR_KEY, BOOLEAN_ATTR_VALUE); @@ -340,11 +371,12 @@ void set_deadline_deadline_send_in_grpc() { .build(); ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class); + ServiceStub serviceStubMock = mock(ServiceStub.class); when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class))) .thenReturn(serviceBlockingStubMock); when(serviceBlockingStubMock.resolveBoolean(any(ResolveBooleanRequest.class))).thenReturn(badReasonResponse); - FlagdProvider provider = new FlagdProvider(serviceBlockingStubMock); + FlagdProvider provider = new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", 100, 5); provider.setDeadline(deadline); OpenFeatureAPI.getInstance().setProvider(provider); @@ -362,11 +394,12 @@ void reason_mapped_correctly_if_unknown() { .build(); ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class); + ServiceStub serviceStubMock = mock(ServiceStub.class); when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class))) .thenReturn(serviceBlockingStubMock); when(serviceBlockingStubMock.resolveBoolean(any(ResolveBooleanRequest.class))).thenReturn(badReasonResponse); - OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock)); + OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", 100, 5)); FlagEvaluationDetails booleanDetails = api.getClient() .getBooleanDetails(FLAG_KEY, false, new MutableContext()); @@ -381,4 +414,91 @@ private NettyChannelBuilder getMockChannelBuilderSocket() { when(mockChannelBuilder.build()).thenReturn(null); return mockChannelBuilder; } + + private void do_resolvers_cache_responses(String reason, Boolean eventStreamAlive, Boolean shouldCache) { + String expectedReason = FlagdProvider.CACHED_REASON; + if (!shouldCache) { + expectedReason = reason; + } + + ResolveBooleanResponse booleanResponse = ResolveBooleanResponse.newBuilder() + .setValue(true) + .setVariant(BOOL_VARIANT) + .setReason(reason) + .build(); + + ResolveStringResponse stringResponse = ResolveStringResponse.newBuilder() + .setValue(STRING_VALUE) + .setVariant(STRING_VARIANT) + .setReason(reason) + .build(); + + ResolveIntResponse intResponse = ResolveIntResponse.newBuilder() + .setValue(INT_VALUE) + .setVariant(INT_VARIANT) + .setReason(reason) + .build(); + + ResolveFloatResponse floatResponse = ResolveFloatResponse.newBuilder() + .setValue(DOUBLE_VALUE) + .setVariant(DOUBLE_VARIANT) + .setReason(reason) + .build(); + + ResolveObjectResponse objectResponse = ResolveObjectResponse.newBuilder() + .setValue(PROTOBUF_STRUCTURE_VALUE) + .setVariant(OBJECT_VARIANT) + .setReason(reason) + .build(); + + ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class); + ServiceStub serviceStubMock = mock(ServiceStub.class); + when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class))) + .thenReturn(serviceBlockingStubMock); + when(serviceBlockingStubMock + .resolveBoolean(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(booleanResponse); + when(serviceBlockingStubMock + .resolveFloat(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(floatResponse); + when(serviceBlockingStubMock + .resolveInt(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(intResponse); + when(serviceBlockingStubMock + .resolveString(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(stringResponse); + when(serviceBlockingStubMock + .resolveObject(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(objectResponse); + + FlagdProvider provider = new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", 100, 5); + provider.setEventStreamAlive(eventStreamAlive); // caching only available when event stream is alive + OpenFeatureAPI.getInstance().setProvider(provider); + + FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY, false); + booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY, false); // should retrieve from cache on second invocation + assertTrue(booleanDetails.getValue()); + assertEquals(BOOL_VARIANT, booleanDetails.getVariant()); + assertEquals(expectedReason, booleanDetails.getReason()); + + FlagEvaluationDetails stringDetails = api.getClient().getStringDetails(FLAG_KEY, "wrong"); + stringDetails = api.getClient().getStringDetails(FLAG_KEY, "wrong"); + assertEquals(STRING_VALUE, stringDetails.getValue()); + assertEquals(STRING_VARIANT, stringDetails.getVariant()); + assertEquals(expectedReason, stringDetails.getReason()); + + FlagEvaluationDetails intDetails = api.getClient().getIntegerDetails(FLAG_KEY, 0); + intDetails = api.getClient().getIntegerDetails(FLAG_KEY, 0); + assertEquals(INT_VALUE, intDetails.getValue()); + assertEquals(INT_VARIANT, intDetails.getVariant()); + assertEquals(expectedReason, intDetails.getReason()); + + FlagEvaluationDetails floatDetails = api.getClient().getDoubleDetails(FLAG_KEY, 0.1); + floatDetails = api.getClient().getDoubleDetails(FLAG_KEY, 0.1); + assertEquals(DOUBLE_VALUE, floatDetails.getValue()); + assertEquals(DOUBLE_VARIANT, floatDetails.getVariant()); + assertEquals(expectedReason, floatDetails.getReason()); + + FlagEvaluationDetails objectDetails = api.getClient().getObjectDetails(FLAG_KEY, new Value()); + objectDetails = api.getClient().getObjectDetails(FLAG_KEY, new Value()); + assertEquals(INNER_STRUCT_VALUE, objectDetails.getValue().asStructure() + .asMap().get(INNER_STRUCT_KEY).asString()); + assertEquals(OBJECT_VARIANT, objectDetails.getVariant()); + assertEquals(expectedReason, objectDetails.getReason()); + } } \ No newline at end of file From 345947950db115c6c11fcdcf77fb27169fb21f48 Mon Sep 17 00:00:00 2001 From: Skye Gill Date: Thu, 22 Dec 2022 09:17:31 +0000 Subject: [PATCH 3/9] Update providers/flagd/pom.xml Co-authored-by: Todd Baert Signed-off-by: Skye Gill --- providers/flagd/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/flagd/pom.xml b/providers/flagd/pom.xml index 15d486619..827feab28 100644 --- a/providers/flagd/pom.xml +++ b/providers/flagd/pom.xml @@ -99,7 +99,6 @@ update --init --recursive - --remote From d2cb4e84ecb1c63fd89d784d3f124b1d455855e9 Mon Sep 17 00:00:00 2001 From: Skye Gill Date: Thu, 22 Dec 2022 09:17:41 +0000 Subject: [PATCH 4/9] Update providers/flagd/pom.xml Co-authored-by: Todd Baert Signed-off-by: Skye Gill --- providers/flagd/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/flagd/pom.xml b/providers/flagd/pom.xml index 827feab28..96f99c902 100644 --- a/providers/flagd/pom.xml +++ b/providers/flagd/pom.xml @@ -92,7 +92,7 @@ exec - + git submodule From 9af472b438f2756673e3271a28e05d1abf6c35fe Mon Sep 17 00:00:00 2001 From: Skye Gill Date: Thu, 22 Dec 2022 11:16:27 +0000 Subject: [PATCH 5/9] amendments from code review Signed-off-by: Skye Gill --- .../providers/flagd/EventStreamCallback.java | 4 ++-- .../providers/flagd/EventStreamObserver.java | 17 +++++++++-------- .../contrib/providers/flagd/FlagdProvider.java | 10 +++------- .../providers/flagd/FlagdProviderTest.java | 9 +++++---- 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java index 73822176f..25ae2dd4f 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamCallback.java @@ -3,8 +3,8 @@ /** * Defines behaviour required of event stream callbacks. */ -public interface EventStreamCallback { +interface EventStreamCallback { void setEventStreamAlive(Boolean alive); - + void restartEventStream() throws Exception; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java index 71e457785..74fda2349 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java @@ -2,6 +2,7 @@ import java.util.Map; import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; import dev.openfeature.flagd.grpc.Schema.EventStreamResponse; import dev.openfeature.sdk.ProviderEvaluation; import com.google.protobuf.Value; @@ -9,6 +10,7 @@ /** * EventStreamObserver handles events emitted by flagd. */ +@Slf4j public class EventStreamObserver implements StreamObserver { private EventStreamCallback callback; private Boolean cacheEnabled; @@ -18,9 +20,8 @@ public class EventStreamObserver implements StreamObserver private Map> integerCache; private Map> objectCache; - - static final String CONFIGURATION_CHANGE = "configuration_change"; - static final String PROVIDER_READY = "provider_ready"; + private static final String configurationChange = "configuration_change"; + private static final String providerReady = "provider_ready"; EventStreamObserver( Boolean cacheEnabled, Map> booleanCache, @@ -40,27 +41,27 @@ public class EventStreamObserver implements StreamObserver @Override public void onNext(EventStreamResponse value) { switch (value.getType()) { - case CONFIGURATION_CHANGE: + case configurationChange: this.handleConfigurationChangeEvent(value); break; - case PROVIDER_READY: + case providerReady: this.handleProviderReadyEvent(); break; default: - // log + log.warn("unhandled event type {}", value.getType()); return; } } @Override public void onError(Throwable t) { - t.printStackTrace(); + log.error("event stream", t); this.purgeCache(); this.callback.setEventStreamAlive(false); try { this.callback.restartEventStream(); } catch (Exception e) { - e.printStackTrace(); + log.error("restart event stream", e); } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 447c61bd7..7144e471a 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -38,7 +38,6 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.ssl.SslContextBuilder; -import lombok.val; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.map.LRUMap; @@ -198,7 +197,10 @@ public FlagdProvider() { private void initCache(String cache, int maxSize) { switch (cache) { + case DISABLED: + return; case LRU_CACHE: + default: this.booleanCache = Collections.synchronizedMap( new LRUMap>(maxSize)); this.stringCache = Collections.synchronizedMap(new LRUMap>(maxSize)); @@ -206,12 +208,6 @@ private void initCache(String cache, int maxSize) { this.integerCache = Collections.synchronizedMap( new LRUMap>(maxSize)); this.objectCache = Collections.synchronizedMap(new LRUMap>(maxSize)); - break; - case DISABLED: - return; - default: - initCache(DEFAULT_CACHE, maxSize); - return; } this.cacheEnabled = true; diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index c995f2ab5..7d25aeb4e 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -39,6 +39,7 @@ import dev.openfeature.sdk.Reason; import dev.openfeature.sdk.Structure; import dev.openfeature.sdk.Value; +import io.grpc.Channel; import io.grpc.netty.NettyChannelBuilder; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; @@ -85,7 +86,7 @@ void path_arg_should_build_domain_socket_with_correct_path() { NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket(); try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) { - mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any())) // why does any(ManagedChannel.class) fail? + mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(Channel.class))) .thenReturn(mockBlockingStub); mockStaticService.when(() -> ServiceGrpc.newStub(any())) .thenReturn(mockStub); @@ -121,7 +122,7 @@ void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Ex NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket(); try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) { - mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any())) + mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(Channel.class))) .thenReturn(mockBlockingStub); mockStaticService.when(() -> ServiceGrpc.newStub(any())) .thenReturn(mockStub); @@ -158,7 +159,7 @@ void host_and_port_arg_should_build_tcp_socket() { NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket(); try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) { - mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any())) + mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(Channel.class))) .thenReturn(mockBlockingStub); mockStaticService.when(() -> ServiceGrpc.newStub(any())) .thenReturn(mockStub); @@ -187,7 +188,7 @@ void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception { NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket(); try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) { - mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any())) + mockStaticService.when(() -> ServiceGrpc.newBlockingStub(any(Channel.class))) .thenReturn(mockBlockingStub); mockStaticService.when(() -> ServiceGrpc.newStub(any())) .thenReturn(mockStub); From a0350dd567faf9ad6d4b7d102a56ef3c9450f1f4 Mon Sep 17 00:00:00 2001 From: Skye Gill Date: Fri, 23 Dec 2022 16:57:34 +0000 Subject: [PATCH 6/9] extracted caching implementation into FlagdCache class. consolidated cache stores into singular map Signed-off-by: Skye Gill --- providers/flagd/README.md | 2 +- .../providers/flagd/EventStreamObserver.java | 44 ++----- .../contrib/providers/flagd/FlagdCache.java | 51 ++++++++ .../providers/flagd/FlagdProvider.java | 114 ++++++++++-------- .../providers/flagd/FlagdProviderTest.java | 55 +++++---- 5 files changed, 151 insertions(+), 115 deletions(-) create mode 100644 providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdCache.java diff --git a/providers/flagd/README.md b/providers/flagd/README.md index a262d6ba7..40d8092e3 100644 --- a/providers/flagd/README.md +++ b/providers/flagd/README.md @@ -63,4 +63,4 @@ The provider attempts to establish a connection to flagd's event stream (up to 5 On invocation of a flag evaluation (if caching is available) an attempt is made to retrieve the entry from cache, if found the flag is returned with reason `CACHED`. -By default, the provider is configured to use [least recently used (lru)](https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/map/LRUMap.html) caching with up to 1000 entries per flag type. +By default, the provider is configured to use [least recently used (lru)](https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/map/LRUMap.html) caching with up to 1000 entries. diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java index 74fda2349..5bff7e640 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java @@ -4,7 +4,6 @@ import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import dev.openfeature.flagd.grpc.Schema.EventStreamResponse; -import dev.openfeature.sdk.ProviderEvaluation; import com.google.protobuf.Value; /** @@ -13,28 +12,13 @@ @Slf4j public class EventStreamObserver implements StreamObserver { private EventStreamCallback callback; - private Boolean cacheEnabled; - private Map> booleanCache; - private Map> stringCache; - private Map> doubleCache; - private Map> integerCache; - private Map> objectCache; + private FlagdCache cache; private static final String configurationChange = "configuration_change"; private static final String providerReady = "provider_ready"; - EventStreamObserver( - Boolean cacheEnabled, Map> booleanCache, - Map> stringCache, Map> doubleCache, - Map> integerCache, - Map> objectCache, EventStreamCallback callback - ) { - this.cacheEnabled = cacheEnabled; - this.booleanCache = booleanCache; - this.stringCache = stringCache; - this.doubleCache = doubleCache; - this.integerCache = integerCache; - this.objectCache = objectCache; + EventStreamObserver(FlagdCache cache, EventStreamCallback callback) { + this.cache = cache; this.callback = callback; } @@ -56,7 +40,7 @@ public void onNext(EventStreamResponse value) { @Override public void onError(Throwable t) { log.error("event stream", t); - this.purgeCache(); + this.cache.clear(); this.callback.setEventStreamAlive(false); try { this.callback.restartEventStream(); @@ -67,12 +51,12 @@ public void onError(Throwable t) { @Override public void onCompleted() { - this.purgeCache(); + this.cache.clear(); this.callback.setEventStreamAlive(false); } private void handleConfigurationChangeEvent(EventStreamResponse value) { - if (!this.cacheEnabled) { + if (!this.cache.getEnabled()) { return; } @@ -80,23 +64,11 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) { Value flagKeyValue = data.get("flagKey"); String flagKey = flagKeyValue.getStringValue(); - this.booleanCache.remove(flagKey); - this.stringCache.remove(flagKey); - this.doubleCache.remove(flagKey); - this.integerCache.remove(flagKey); - this.objectCache.remove(flagKey); + this.cache.remove(flagKey); } private void handleProviderReadyEvent() { - this.purgeCache(); + this.cache.clear(); this.callback.setEventStreamAlive(true); } - - private void purgeCache() { - this.booleanCache.clear(); - this.stringCache.clear(); - this.doubleCache.clear(); - this.integerCache.clear(); - this.objectCache.clear(); - } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdCache.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdCache.java new file mode 100644 index 000000000..458b3cfe0 --- /dev/null +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdCache.java @@ -0,0 +1,51 @@ +package dev.openfeature.contrib.providers.flagd; + +import dev.openfeature.sdk.ProviderEvaluation; +import dev.openfeature.sdk.Value; + +import java.util.Map; +import org.apache.commons.collections4.map.LRUMap; +import java.util.Collections; + +/** + * Exposes caching mechanism for flag evaluations. + */ +public class FlagdCache { + private Map> store; + private Boolean enabled; + + static final String LRU_CACHE = "lru"; + static final String DISABLED = "disabled"; + + FlagdCache(String cache, int maxCacheSize) { + switch (cache) { + case DISABLED: + return; + case LRU_CACHE: + default: + this.store = Collections.synchronizedMap(new LRUMap>(maxCacheSize)); + } + + this.enabled = true; + } + + public Boolean getEnabled() { + return this.enabled; + } + + public void put(String key, ProviderEvaluation value) { + this.store.put(key, value); + } + + public ProviderEvaluation get(String key) { + return this.store.get(key); + } + + public void remove(String key) { + this.store.remove(key); + } + + public void clear() { + this.store.clear(); + } +} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 7144e471a..2a662f048 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -40,8 +40,6 @@ import io.netty.handler.ssl.SslContextBuilder; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.map.LRUMap; -import java.util.Collections; import dev.openfeature.flagd.grpc.Schema.EventStreamRequest; import dev.openfeature.flagd.grpc.Schema.EventStreamResponse; @@ -81,13 +79,8 @@ public class FlagdProvider implements FeatureProvider, EventStreamCallback { private ServiceBlockingStub serviceBlockingStub; private ServiceStub serviceStub; - private Boolean cacheEnabled; private Boolean eventStreamAlive; - private Map> booleanCache; - private Map> stringCache; - private Map> doubleCache; - private Map> integerCache; - private Map> objectCache; + private FlagdCache cache; private int eventStreamAttempt = 1; private int eventStreamRetryBackoff = BASE_EVENT_STREAM_RETRY_BACKOFF_MS; @@ -115,7 +108,7 @@ public FlagdProvider(String socketPath) { * * @param socketPath unix socket path * @param cache caching implementation to use (lru) - * @param maxCacheSize limit of the number of cached values for each type of flag + * @param maxCacheSize limit of the number of cached values * @param maxEventStreamRetries limit of the number of attempts to connect to flagd's event stream, * on successful connection the attempts are reset */ @@ -156,7 +149,7 @@ public FlagdProvider(String host, int port, boolean tls, String certPath) { * @param certPath path for server certificate, defaults to null to, using * system certs * @param cache caching implementation to use (lru) - * @param maxCacheSize limit of the number of cached values for each type of flag + * @param maxCacheSize limit of the number of cached values * @param maxEventStreamRetries limit of the number of attempts to connect to flagd's event stream, * on successful connection the attempts are reset * @@ -188,35 +181,15 @@ public FlagdProvider() { this.serviceBlockingStub = serviceBlockingStub; this.serviceStub = serviceStub; this.eventStreamAlive = false; - if (cache != null) { - initCache(cache, maxCacheSize); - } + this.cache = new FlagdCache(cache, maxCacheSize); this.maxEventStreamRetries = maxEventStreamRetries; this.handleEvents(); } - private void initCache(String cache, int maxSize) { - switch (cache) { - case DISABLED: - return; - case LRU_CACHE: - default: - this.booleanCache = Collections.synchronizedMap( - new LRUMap>(maxSize)); - this.stringCache = Collections.synchronizedMap(new LRUMap>(maxSize)); - this.doubleCache = Collections.synchronizedMap(new LRUMap>(maxSize)); - this.integerCache = Collections.synchronizedMap( - new LRUMap>(maxSize)); - this.objectCache = Collections.synchronizedMap(new LRUMap>(maxSize)); - } - - this.cacheEnabled = true; - } - private Boolean cacheAvailable() { Lock l = this.lock.readLock(); l.lock(); - Boolean available = this.cacheEnabled && this.eventStreamAlive; + Boolean available = this.cache.getEnabled() && this.eventStreamAlive; l.unlock(); return available; @@ -237,10 +210,14 @@ public ProviderEvaluation getBooleanEvaluation(String key, Boolean defa EvaluationContext ctx) { if (this.cacheAvailable()) { - ProviderEvaluation fromCache = this.booleanCache.get(key); + ProviderEvaluation fromCache = this.cache.get(key); if (fromCache != null) { - fromCache.setReason(CACHED_REASON); - return fromCache; + ProviderEvaluation result = ProviderEvaluation.builder() + .value(fromCache.getValue().asBoolean()) + .variant(fromCache.getVariant()) + .reason(CACHED_REASON) + .build(); + return result; } } @@ -258,7 +235,12 @@ public ProviderEvaluation getBooleanEvaluation(String key, Boolean defa if (this.isEvaluationCacheable(result)) { - this.booleanCache.put(key, result); + ProviderEvaluation value = ProviderEvaluation.builder() + .value(new Value(r.getValue())) + .variant(r.getVariant()) + .reason(r.getReason()) + .build(); + this.cache.put(key, value); } return result; @@ -269,10 +251,14 @@ public ProviderEvaluation getStringEvaluation(String key, String default EvaluationContext ctx) { if (this.cacheAvailable()) { - ProviderEvaluation fromCache = this.stringCache.get(key); + ProviderEvaluation fromCache = this.cache.get(key); if (fromCache != null) { - fromCache.setReason(CACHED_REASON); - return fromCache; + ProviderEvaluation result = ProviderEvaluation.builder() + .value(fromCache.getValue().asString()) + .variant(fromCache.getVariant()) + .reason(CACHED_REASON) + .build(); + return result; } } @@ -287,7 +273,12 @@ public ProviderEvaluation getStringEvaluation(String key, String default .build(); if (this.isEvaluationCacheable(result)) { - this.stringCache.put(key, result); + ProviderEvaluation value = ProviderEvaluation.builder() + .value(new Value(r.getValue())) + .variant(r.getVariant()) + .reason(r.getReason()) + .build(); + this.cache.put(key, value); } return result; @@ -298,10 +289,14 @@ public ProviderEvaluation getDoubleEvaluation(String key, Double default EvaluationContext ctx) { if (this.cacheAvailable()) { - ProviderEvaluation fromCache = this.doubleCache.get(key); + ProviderEvaluation fromCache = this.cache.get(key); if (fromCache != null) { - fromCache.setReason(CACHED_REASON); - return fromCache; + ProviderEvaluation result = ProviderEvaluation.builder() + .value(fromCache.getValue().asDouble()) + .variant(fromCache.getVariant()) + .reason(CACHED_REASON) + .build(); + return result; } } @@ -318,7 +313,12 @@ public ProviderEvaluation getDoubleEvaluation(String key, Double default .build(); if (this.isEvaluationCacheable(result)) { - this.doubleCache.put(key, result); + ProviderEvaluation value = ProviderEvaluation.builder() + .value(new Value(r.getValue())) + .variant(r.getVariant()) + .reason(r.getReason()) + .build(); + this.cache.put(key, value); } return result; @@ -329,10 +329,14 @@ public ProviderEvaluation getIntegerEvaluation(String key, Integer defa EvaluationContext ctx) { if (this.cacheAvailable()) { - ProviderEvaluation fromCache = this.integerCache.get(key); + ProviderEvaluation fromCache = this.cache.get(key); if (fromCache != null) { - fromCache.setReason(CACHED_REASON); - return fromCache; + ProviderEvaluation result = ProviderEvaluation.builder() + .value(fromCache.getValue().asInteger()) + .variant(fromCache.getVariant()) + .reason(CACHED_REASON) + .build(); + return result; } } @@ -349,7 +353,12 @@ public ProviderEvaluation getIntegerEvaluation(String key, Integer defa .build(); if (this.isEvaluationCacheable(result)) { - this.integerCache.put(key, result); + ProviderEvaluation value = ProviderEvaluation.builder() + .value(new Value(result.getValue())) + .variant(r.getVariant()) + .reason(r.getReason()) + .build(); + this.cache.put(key, value); } return result; @@ -360,7 +369,7 @@ public ProviderEvaluation getObjectEvaluation(String key, Value defaultVa EvaluationContext ctx) { if (this.cacheAvailable()) { - ProviderEvaluation fromCache = this.objectCache.get(key); + ProviderEvaluation fromCache = this.cache.get(key); if (fromCache != null) { fromCache.setReason(CACHED_REASON); return fromCache; @@ -380,7 +389,7 @@ public ProviderEvaluation getObjectEvaluation(String key, Value defaultVa .build(); if (this.isEvaluationCacheable(result)) { - this.objectCache.put(key, result); + this.cache.put(key, result); } return result; @@ -583,8 +592,7 @@ private static int fallBackToEnvOrDefault(String key, int defaultValue) { private void handleEvents() { StreamObserver responseObserver = - new EventStreamObserver(this.cacheEnabled, this.booleanCache, this.stringCache, - this.doubleCache, this.integerCache, this.objectCache, this); + new EventStreamObserver(this.cache, this); this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver); } @@ -614,7 +622,7 @@ private Boolean isEvaluationCacheable(ProviderEvaluation evaluation) { public void restartEventStream() throws Exception { this.eventStreamAttempt++; if (this.eventStreamAttempt > this.maxEventStreamRetries) { - // log + log.error("failed to connect to event stream, exhausted retries"); return; } this.eventStreamRetryBackoff = 2 * this.eventStreamRetryBackoff; diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index 7d25aeb4e..aae1e44c6 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -49,6 +49,11 @@ class FlagdProviderTest { static final String FLAG_KEY = "some-key"; + static final String FLAG_KEY_BOOLEAN = "some-key-boolean"; + static final String FLAG_KEY_INTEGER = "some-key-integer"; + static final String FLAG_KEY_DOUBLE = "some-key-double"; + static final String FLAG_KEY_STRING = "some-key-string"; + static final String FLAG_KEY_OBJECT = "some-key-object"; static final String BOOL_VARIANT = "on"; static final String DOUBLE_VARIANT = "half"; static final String INT_VARIANT = "one-hundred"; @@ -245,40 +250,40 @@ void resolvers_call_grpc_service_and_return_details() { when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class))) .thenReturn(serviceBlockingStubMock); when(serviceBlockingStubMock - .resolveBoolean(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(booleanResponse); + .resolveBoolean(argThat(x -> FLAG_KEY_BOOLEAN.equals(x.getFlagKey())))).thenReturn(booleanResponse); when(serviceBlockingStubMock - .resolveFloat(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(floatResponse); + .resolveFloat(argThat(x -> FLAG_KEY_DOUBLE.equals(x.getFlagKey())))).thenReturn(floatResponse); when(serviceBlockingStubMock - .resolveInt(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(intResponse); + .resolveInt(argThat(x -> FLAG_KEY_INTEGER.equals(x.getFlagKey())))).thenReturn(intResponse); when(serviceBlockingStubMock - .resolveString(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(stringResponse); + .resolveString(argThat(x -> FLAG_KEY_STRING.equals(x.getFlagKey())))).thenReturn(stringResponse); when(serviceBlockingStubMock - .resolveObject(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(objectResponse); + .resolveObject(argThat(x -> FLAG_KEY_OBJECT.equals(x.getFlagKey())))).thenReturn(objectResponse); OpenFeatureAPI.getInstance().setProvider(new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", 100, 5 )); - FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY, false); + FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false); assertTrue(booleanDetails.getValue()); assertEquals(BOOL_VARIANT, booleanDetails.getVariant()); assertEquals(DEFAULT.toString(), booleanDetails.getReason()); - FlagEvaluationDetails stringDetails = api.getClient().getStringDetails(FLAG_KEY, "wrong"); + FlagEvaluationDetails stringDetails = api.getClient().getStringDetails(FLAG_KEY_STRING, "wrong"); assertEquals(STRING_VALUE, stringDetails.getValue()); assertEquals(STRING_VARIANT, stringDetails.getVariant()); assertEquals(DEFAULT.toString(), stringDetails.getReason()); - FlagEvaluationDetails intDetails = api.getClient().getIntegerDetails(FLAG_KEY, 0); + FlagEvaluationDetails intDetails = api.getClient().getIntegerDetails(FLAG_KEY_INTEGER, 0); assertEquals(INT_VALUE, intDetails.getValue()); assertEquals(INT_VARIANT, intDetails.getVariant()); assertEquals(DEFAULT.toString(), intDetails.getReason()); - FlagEvaluationDetails floatDetails = api.getClient().getDoubleDetails(FLAG_KEY, 0.1); + FlagEvaluationDetails floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1); assertEquals(DOUBLE_VALUE, floatDetails.getValue()); assertEquals(DOUBLE_VARIANT, floatDetails.getVariant()); assertEquals(DEFAULT.toString(), floatDetails.getReason()); - FlagEvaluationDetails objectDetails = api.getClient().getObjectDetails(FLAG_KEY, new Value()); + FlagEvaluationDetails objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value()); assertEquals(INNER_STRUCT_VALUE, objectDetails.getValue().asStructure() .asMap().get(INNER_STRUCT_KEY).asString()); assertEquals(OBJECT_VARIANT, objectDetails.getVariant()); @@ -457,46 +462,46 @@ private void do_resolvers_cache_responses(String reason, Boolean eventStreamAliv when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class))) .thenReturn(serviceBlockingStubMock); when(serviceBlockingStubMock - .resolveBoolean(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(booleanResponse); + .resolveBoolean(argThat(x -> FLAG_KEY_BOOLEAN.equals(x.getFlagKey())))).thenReturn(booleanResponse); when(serviceBlockingStubMock - .resolveFloat(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(floatResponse); + .resolveFloat(argThat(x -> FLAG_KEY_DOUBLE.equals(x.getFlagKey())))).thenReturn(floatResponse); when(serviceBlockingStubMock - .resolveInt(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(intResponse); + .resolveInt(argThat(x -> FLAG_KEY_INTEGER.equals(x.getFlagKey())))).thenReturn(intResponse); when(serviceBlockingStubMock - .resolveString(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(stringResponse); + .resolveString(argThat(x -> FLAG_KEY_STRING.equals(x.getFlagKey())))).thenReturn(stringResponse); when(serviceBlockingStubMock - .resolveObject(argThat(x -> FLAG_KEY.equals(x.getFlagKey())))).thenReturn(objectResponse); + .resolveObject(argThat(x -> FLAG_KEY_OBJECT.equals(x.getFlagKey())))).thenReturn(objectResponse); FlagdProvider provider = new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", 100, 5); provider.setEventStreamAlive(eventStreamAlive); // caching only available when event stream is alive OpenFeatureAPI.getInstance().setProvider(provider); - FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY, false); - booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY, false); // should retrieve from cache on second invocation + FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false); + booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false); // should retrieve from cache on second invocation assertTrue(booleanDetails.getValue()); assertEquals(BOOL_VARIANT, booleanDetails.getVariant()); assertEquals(expectedReason, booleanDetails.getReason()); - FlagEvaluationDetails stringDetails = api.getClient().getStringDetails(FLAG_KEY, "wrong"); - stringDetails = api.getClient().getStringDetails(FLAG_KEY, "wrong"); + FlagEvaluationDetails stringDetails = api.getClient().getStringDetails(FLAG_KEY_STRING, "wrong"); + stringDetails = api.getClient().getStringDetails(FLAG_KEY_STRING, "wrong"); assertEquals(STRING_VALUE, stringDetails.getValue()); assertEquals(STRING_VARIANT, stringDetails.getVariant()); assertEquals(expectedReason, stringDetails.getReason()); - FlagEvaluationDetails intDetails = api.getClient().getIntegerDetails(FLAG_KEY, 0); - intDetails = api.getClient().getIntegerDetails(FLAG_KEY, 0); + FlagEvaluationDetails intDetails = api.getClient().getIntegerDetails(FLAG_KEY_INTEGER, 0); + intDetails = api.getClient().getIntegerDetails(FLAG_KEY_INTEGER, 0); assertEquals(INT_VALUE, intDetails.getValue()); assertEquals(INT_VARIANT, intDetails.getVariant()); assertEquals(expectedReason, intDetails.getReason()); - FlagEvaluationDetails floatDetails = api.getClient().getDoubleDetails(FLAG_KEY, 0.1); - floatDetails = api.getClient().getDoubleDetails(FLAG_KEY, 0.1); + FlagEvaluationDetails floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1); + floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1); assertEquals(DOUBLE_VALUE, floatDetails.getValue()); assertEquals(DOUBLE_VARIANT, floatDetails.getVariant()); assertEquals(expectedReason, floatDetails.getReason()); - FlagEvaluationDetails objectDetails = api.getClient().getObjectDetails(FLAG_KEY, new Value()); - objectDetails = api.getClient().getObjectDetails(FLAG_KEY, new Value()); + FlagEvaluationDetails objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value()); + objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value()); assertEquals(INNER_STRUCT_VALUE, objectDetails.getValue().asStructure() .asMap().get(INNER_STRUCT_KEY).asString()); assertEquals(OBJECT_VARIANT, objectDetails.getVariant()); From 37d64b0801fc4a58d6e1bca5d19093d322925b68 Mon Sep 17 00:00:00 2001 From: Skye Gill Date: Wed, 4 Jan 2023 09:11:43 +0000 Subject: [PATCH 7/9] Update providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java Co-authored-by: Todd Baert Signed-off-by: Skye Gill --- .../contrib/providers/flagd/EventStreamObserver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java index 5bff7e640..bf4bf7360 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java @@ -32,7 +32,7 @@ public void onNext(EventStreamResponse value) { this.handleProviderReadyEvent(); break; default: - log.warn("unhandled event type {}", value.getType()); + log.debug("unhandled event type {}", value.getType()); return; } } From 2584c0e11315d18d39f43fa7e4133064815d10c4 Mon Sep 17 00:00:00 2001 From: Skye Gill Date: Fri, 6 Jan 2023 17:32:20 +0000 Subject: [PATCH 8/9] parse consolidated configuration_change event Signed-off-by: Skye Gill --- .../contrib/providers/flagd/EventStreamObserver.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java index bf4bf7360..b264fd8fb 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java @@ -61,10 +61,12 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) { } Map data = value.getData().getFieldsMap(); - Value flagKeyValue = data.get("flagKey"); - String flagKey = flagKeyValue.getStringValue(); + Value flagsValue = data.get("flags"); + Map flags = flagsValue.getStructValue().getFieldsMap(); - this.cache.remove(flagKey); + for (String flagKey : flags.keySet()) { + this.cache.remove(flagKey); + } } private void handleProviderReadyEvent() { From 7edaef7222795cbeafefcdff9280452cf77526b1 Mon Sep 17 00:00:00 2001 From: Skye Gill Date: Tue, 10 Jan 2023 12:39:21 +0000 Subject: [PATCH 9/9] cache invalidation test & NPE check Signed-off-by: Skye Gill --- .../providers/flagd/EventStreamObserver.java | 8 +- .../providers/flagd/FlagdProviderTest.java | 118 ++++++++++++++++++ 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java index b264fd8fb..e6e7c18e0 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/EventStreamObserver.java @@ -16,6 +16,7 @@ public class EventStreamObserver implements StreamObserver private static final String configurationChange = "configuration_change"; private static final String providerReady = "provider_ready"; + private static final String flagsKey = "flags"; EventStreamObserver(FlagdCache cache, EventStreamCallback callback) { this.cache = cache; @@ -61,7 +62,12 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) { } Map data = value.getData().getFieldsMap(); - Value flagsValue = data.get("flags"); + Value flagsValue = data.get(flagsKey); + if (flagsValue == null) { + this.cache.clear(); + return; + } + Map flags = flagsValue.getStructValue().getFieldsMap(); for (String flagKey : flags.keySet()) { diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index aae1e44c6..f4bc45429 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -17,12 +17,18 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.HashMap; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.MockedConstruction; import org.mockito.MockedStatic; +import com.google.protobuf.Struct; + +import dev.openfeature.flagd.grpc.Schema.EventStreamResponse; +import dev.openfeature.flagd.grpc.Schema.EventStreamRequest; import dev.openfeature.flagd.grpc.Schema.ResolveBooleanRequest; import dev.openfeature.flagd.grpc.Schema.ResolveBooleanResponse; import dev.openfeature.flagd.grpc.Schema.ResolveFloatResponse; @@ -41,6 +47,7 @@ import dev.openfeature.sdk.Value; import io.grpc.Channel; import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; @@ -412,6 +419,117 @@ void reason_mapped_correctly_if_unknown() { assertEquals(Reason.UNKNOWN.toString(), booleanDetails.getReason()); // reason should be converted to UNKNOWN } + + @Test + void invalidate_cache() { + ResolveBooleanResponse booleanResponse = ResolveBooleanResponse.newBuilder() + .setValue(true) + .setVariant(BOOL_VARIANT) + .setReason(FlagdProvider.STATIC_REASON) + .build(); + + ResolveStringResponse stringResponse = ResolveStringResponse.newBuilder() + .setValue(STRING_VALUE) + .setVariant(STRING_VARIANT) + .setReason(FlagdProvider.STATIC_REASON) + .build(); + + ResolveIntResponse intResponse = ResolveIntResponse.newBuilder() + .setValue(INT_VALUE) + .setVariant(INT_VARIANT) + .setReason(FlagdProvider.STATIC_REASON) + .build(); + + ResolveFloatResponse floatResponse = ResolveFloatResponse.newBuilder() + .setValue(DOUBLE_VALUE) + .setVariant(DOUBLE_VARIANT) + .setReason(FlagdProvider.STATIC_REASON) + .build(); + + ResolveObjectResponse objectResponse = ResolveObjectResponse.newBuilder() + .setValue(PROTOBUF_STRUCTURE_VALUE) + .setVariant(OBJECT_VARIANT) + .setReason(FlagdProvider.STATIC_REASON) + .build(); + + + ServiceBlockingStub serviceBlockingStubMock = mock(ServiceBlockingStub.class); + ServiceStub serviceStubMock = mock(ServiceStub.class); + when(serviceBlockingStubMock.withDeadlineAfter(anyLong(), any(TimeUnit.class))) + .thenReturn(serviceBlockingStubMock); + when(serviceBlockingStubMock + .resolveBoolean(argThat(x -> FLAG_KEY_BOOLEAN.equals(x.getFlagKey())))).thenReturn(booleanResponse); + when(serviceBlockingStubMock + .resolveFloat(argThat(x -> FLAG_KEY_DOUBLE.equals(x.getFlagKey())))).thenReturn(floatResponse); + when(serviceBlockingStubMock + .resolveInt(argThat(x -> FLAG_KEY_INTEGER.equals(x.getFlagKey())))).thenReturn(intResponse); + when(serviceBlockingStubMock + .resolveString(argThat(x -> FLAG_KEY_STRING.equals(x.getFlagKey())))).thenReturn(stringResponse); + when(serviceBlockingStubMock + .resolveObject(argThat(x -> FLAG_KEY_OBJECT.equals(x.getFlagKey())))).thenReturn(objectResponse); + + FlagdProvider provider = new FlagdProvider(serviceBlockingStubMock, serviceStubMock, "lru", 100, 5); + ArgumentCaptor> streamObserverCaptor = ArgumentCaptor.forClass(StreamObserver.class); + verify(serviceStubMock).eventStream(any(EventStreamRequest.class), streamObserverCaptor.capture()); + + provider.setEventStreamAlive(true); + OpenFeatureAPI.getInstance().setProvider(provider); + + HashMap flagsMap = new HashMap(); + HashMap structMap = new HashMap(); + + flagsMap.put(FLAG_KEY_BOOLEAN, com.google.protobuf.Value.newBuilder().setStringValue("foo").build()); + flagsMap.put(FLAG_KEY_STRING, com.google.protobuf.Value.newBuilder().setStringValue("foo").build()); + flagsMap.put(FLAG_KEY_INTEGER, com.google.protobuf.Value.newBuilder().setStringValue("foo").build()); + flagsMap.put(FLAG_KEY_DOUBLE, com.google.protobuf.Value.newBuilder().setStringValue("foo").build()); + flagsMap.put(FLAG_KEY_OBJECT, com.google.protobuf.Value.newBuilder().setStringValue("foo").build()); + + structMap.put("flags", com.google.protobuf.Value.newBuilder(). + setStructValue(Struct.newBuilder().putAllFields(flagsMap)).build()); + + EventStreamResponse eResponse = EventStreamResponse.newBuilder() + .setType("configuration_change") + .setData(Struct.newBuilder().putAllFields(structMap).build()) + .build(); + + // should cache results + FlagEvaluationDetails booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false); + FlagEvaluationDetails stringDetails = api.getClient().getStringDetails(FLAG_KEY_STRING, "wrong"); + FlagEvaluationDetails intDetails = api.getClient().getIntegerDetails(FLAG_KEY_INTEGER, 0); + FlagEvaluationDetails floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1); + FlagEvaluationDetails objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value()); + + // should clear cache + streamObserverCaptor.getValue().onNext(eResponse); + + // assert cache has been invalidated + booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false); + assertTrue(booleanDetails.getValue()); + assertEquals(BOOL_VARIANT, booleanDetails.getVariant()); + assertEquals(FlagdProvider.STATIC_REASON, booleanDetails.getReason()); + + stringDetails = api.getClient().getStringDetails(FLAG_KEY_STRING, "wrong"); + assertEquals(STRING_VALUE, stringDetails.getValue()); + assertEquals(STRING_VARIANT, stringDetails.getVariant()); + assertEquals(FlagdProvider.STATIC_REASON, stringDetails.getReason()); + + intDetails = api.getClient().getIntegerDetails(FLAG_KEY_INTEGER, 0); + assertEquals(INT_VALUE, intDetails.getValue()); + assertEquals(INT_VARIANT, intDetails.getVariant()); + assertEquals(FlagdProvider.STATIC_REASON, intDetails.getReason()); + + floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1); + assertEquals(DOUBLE_VALUE, floatDetails.getValue()); + assertEquals(DOUBLE_VARIANT, floatDetails.getVariant()); + assertEquals(FlagdProvider.STATIC_REASON, floatDetails.getReason()); + + objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value()); + assertEquals(INNER_STRUCT_VALUE, objectDetails.getValue().asStructure() + .asMap().get(INNER_STRUCT_KEY).asString()); + assertEquals(OBJECT_VARIANT, objectDetails.getVariant()); + assertEquals(FlagdProvider.STATIC_REASON, objectDetails.getReason()); + } + private NettyChannelBuilder getMockChannelBuilderSocket() { NettyChannelBuilder mockChannelBuilder = mock(NettyChannelBuilder.class); when(mockChannelBuilder.eventLoopGroup(any(EventLoopGroup.class))).thenReturn(mockChannelBuilder);