Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ OpenFeatureAPI.getInstance().setProvider(provider);

Options can be defined in the constructor or as environment variables, with constructor options having the highest precedence.

| Option name | Environment variable name | Type | Default |
| ----------- | ------------------------- | ------- | --------- |
| host | FLAGD_HOST | string | localhost |
| port | FLAGD_PORT | number | 8013 |
| tls | FLAGD_TLS | boolean | false |
| socketPath | FLAGD_SOCKET_PATH | string | - |
| certPath | FLAGD_SERVER_CERT_PATH | string | - |
| Option name | Environment variable name | Type | Default | Values |
| --------------------- | ------------------------------- | ------- | --------- | ------------- |
| host | FLAGD_HOST | string | localhost | |
| port | FLAGD_PORT | number | 8013 | |
| tls | FLAGD_TLS | boolean | false | |
| socketPath | FLAGD_SOCKET_PATH | string | - | |
| certPath | FLAGD_SERVER_CERT_PATH | string | - | |
| cache | FLAGD_CACHE | string | lru | lru,disabled |
| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | |
| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | |

### Unix socket support

Expand All @@ -53,3 +56,11 @@ The default deadline is 500ms, though evaluations typically take on the order of
Though not required in deployments where flagd runs on the same host as the workload, TLS is available.

:warning: Note that there's a [vulnerability](https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1042268) in [netty](https://github.com/netty/netty), a transitive dependency of the underlying gRPC libraries used in the flagd-provider that fails to correctly validate certificates. This will be addressed in netty v5.

## Caching

The provider attempts to establish a connection to flagd's event stream (up to 5 times by default). If the connection is successful and caching is enabled each flag returned with reason `STATIC` is cached until an event is received concerning the cached flag (at which point it is removed from cache).

On invocation of a flag evaluation (if caching is available) an attempt is made to retrieve the entry from cache, if found the flag is returned with reason `CACHED`.

By default, the provider is configured to use [least recently used (lru)](https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/map/LRUMap.html) caching with up to 1000 entries.
6 changes: 6 additions & 0 deletions providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
<version>6.0.53</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
</dependencies>

<build>
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/schemas
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package dev.openfeature.contrib.providers.flagd;

/**
* Defines behaviour required of event stream callbacks.
*/
interface EventStreamCallback {
void setEventStreamAlive(Boolean alive);

void restartEventStream() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package dev.openfeature.contrib.providers.flagd;

import java.util.Map;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import dev.openfeature.flagd.grpc.Schema.EventStreamResponse;
import com.google.protobuf.Value;

/**
* EventStreamObserver handles events emitted by flagd.
*/
@Slf4j
public class EventStreamObserver implements StreamObserver<EventStreamResponse> {
private EventStreamCallback callback;
private FlagdCache cache;

private static final String configurationChange = "configuration_change";
private static final String providerReady = "provider_ready";
private static final String flagsKey = "flags";

EventStreamObserver(FlagdCache cache, EventStreamCallback callback) {
this.cache = cache;
this.callback = callback;
}

@Override
public void onNext(EventStreamResponse value) {
switch (value.getType()) {
case configurationChange:
this.handleConfigurationChangeEvent(value);
break;
case providerReady:
this.handleProviderReadyEvent();
break;
default:
log.debug("unhandled event type {}", value.getType());
return;
}
}

@Override
public void onError(Throwable t) {
log.error("event stream", t);
this.cache.clear();
this.callback.setEventStreamAlive(false);
try {
this.callback.restartEventStream();
} catch (Exception e) {
log.error("restart event stream", e);
}
}

@Override
public void onCompleted() {
this.cache.clear();
this.callback.setEventStreamAlive(false);
}

private void handleConfigurationChangeEvent(EventStreamResponse value) {
if (!this.cache.getEnabled()) {
return;
}

Map<String, Value> data = value.getData().getFieldsMap();
Value flagsValue = data.get(flagsKey);
if (flagsValue == null) {
this.cache.clear();
return;
}

Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();

for (String flagKey : flags.keySet()) {
this.cache.remove(flagKey);
}
}

private void handleProviderReadyEvent() {
this.cache.clear();
this.callback.setEventStreamAlive(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.Value;

import java.util.Map;
import org.apache.commons.collections4.map.LRUMap;
import java.util.Collections;

/**
* Exposes caching mechanism for flag evaluations.
*/
public class FlagdCache {
private Map<String,ProviderEvaluation<Value>> store;
private Boolean enabled;

static final String LRU_CACHE = "lru";
static final String DISABLED = "disabled";

FlagdCache(String cache, int maxCacheSize) {
switch (cache) {
case DISABLED:
return;
case LRU_CACHE:
default:
this.store = Collections.synchronizedMap(new LRUMap<String, ProviderEvaluation<Value>>(maxCacheSize));
}

this.enabled = true;
}

public Boolean getEnabled() {
return this.enabled;
}

public void put(String key, ProviderEvaluation<Value> value) {
this.store.put(key, value);
}

public ProviderEvaluation<Value> get(String key) {
return this.store.get(key);
}

public void remove(String key) {
this.store.remove(key);
}

public void clear() {
this.store.clear();
}
}
Loading