Skip to content

Commit

Permalink
feat!: use new eval/sync protos (requires flagd v0.7.3+) (#683)
Browse files Browse the repository at this point in the history
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
Co-authored-by: Kavindu Dodanduwa <Kavindu-Dodan@users.noreply.github.com>
Co-authored-by: Giovanni Liva <giovanni.liva@dynatrace.com>
  • Loading branch information
3 people committed Feb 20, 2024
1 parent 4de6c46 commit 20ca053
Show file tree
Hide file tree
Showing 32 changed files with 296 additions and 234 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ jobs:
- 8014:8013
# sync-testbed for flagd in-process provider e2e tests
sync:
image: ghcr.io/open-feature/sync-testbed:v0.4.11
image: ghcr.io/open-feature/sync-testbed:v0.5.1
ports:
- 9090:9090
# sync-testbed for flagd in-process provider reconnect e2e tests
sync-unstable:
image: ghcr.io/open-feature/sync-testbed-unstable:v0.4.11
image: ghcr.io/open-feature/sync-testbed-unstable:v0.5.1
ports:
- 9091:9090

Expand Down
4 changes: 2 additions & 2 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# flagd Provider for OpenFeature

This provider is designed to use flagd's [evaluation protocol](https://github.com/open-feature/schemas/blob/main/protobuf/schema/v1/schema.proto), or locally evaluate flags defined in a flagd [flag definition](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json).
This provider is designed to use flagd's [evaluation protocol](https://github.com/open-feature/schemas/blob/main/protobuf/schema/v1/schema.proto), or locally evaluate flags defined in a flagd [flag definition](https://github.com/open-feature/schemas/blob/main/json/flags.json).

## Installation
<!-- x-release-please-start-version -->
Expand Down Expand Up @@ -45,7 +45,7 @@ FlagdProvider flagdProvider = new FlagdProvider(
.build());
```

In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json).
In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flags.json).

#### Offline mode

Expand Down
21 changes: 18 additions & 3 deletions providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,31 @@
</configuration>
</execution>
<execution>
<id>copy-schema-json</id>
<id>copy-flags-json-schema</id>
<phase>validate</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<!-- run: cp schemas/json/flagd-definitions.json src/main/resources/ -->
<!-- run: cp schemas/json/flags.json src/main/resources/ -->
<executable>cp</executable>
<arguments>
<argument>schemas/json/flagd-definitions.json</argument>
<argument>schemas/json/flags.json</argument>
<argument>src/main/resources/</argument>
</arguments>
</configuration>
</execution>
<execution>
<id>copy-flags-targeting-schema</id>
<phase>validate</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<!-- run: cp schemas/json/targeting.json src/main/resources/ -->
<executable>cp</executable>
<arguments>
<argument>schemas/json/targeting.json</argument>
<argument>src/main/resources/</argument>
</arguments>
</configuration>
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/schemas
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.util.concurrent.atomic.AtomicBoolean;

import dev.openfeature.sdk.exceptions.GeneralError;

/**
* Utils for flagd resolvers.
*/
Expand All @@ -21,7 +23,7 @@ public static void busyWaitAndCheck(final Long deadline, final AtomicBoolean che

do {
if (deadline <= System.currentTimeMillis() - start) {
throw new RuntimeException(
throw new GeneralError(
String.format("Deadline exceeded. Condition did not complete within the %d deadline", deadline));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.google.protobuf.Value;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.flagd.grpc.Schema.EventStreamResponse;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
import dev.openfeature.sdk.ProviderState;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.stub.StreamObserver;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package dev.openfeature.contrib.providers.flagd.resolver.grpc;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.flagd.grpc.Schema;
import dev.openfeature.flagd.grpc.ServiceGrpc;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamRequest;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc;
import dev.openfeature.sdk.ProviderState;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
* Class that abstracts the gRPC communication with flagd.
*/
Expand Down Expand Up @@ -116,9 +117,9 @@ public ServiceGrpc.ServiceBlockingStub getResolver() {
*/
private void observeEventStream() {
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
final StreamObserver<Schema.EventStreamResponse> responseObserver =
final StreamObserver<EventStreamResponse> responseObserver =
new EventStreamObserver(sync, this.cache, this::grpcStateConsumer);
this.serviceStub.eventStream(Schema.EventStreamRequest.getDefaultInstance(), responseObserver);
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);

try {
synchronized (sync) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.strategy.ResolveFactory;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.strategy.ResolveStrategy;
import dev.openfeature.flagd.grpc.Schema;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.ResolveBooleanRequest;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.ResolveFloatRequest;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.ResolveIntRequest;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.ResolveObjectRequest;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.ResolveStringRequest;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.ImmutableMetadata;
import dev.openfeature.sdk.MutableStructure;
Expand Down Expand Up @@ -92,7 +96,7 @@ public void shutdown() throws Exception {
*/
public ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue,
EvaluationContext ctx) {
Schema.ResolveBooleanRequest request = Schema.ResolveBooleanRequest.newBuilder().buildPartial();
ResolveBooleanRequest request = ResolveBooleanRequest.newBuilder().buildPartial();

return resolve(key, ctx, request, this.connector.getResolver()::resolveBoolean, null);
}
Expand All @@ -102,7 +106,7 @@ public ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean default
*/
public ProviderEvaluation<String> stringEvaluation(String key, String defaultValue,
EvaluationContext ctx) {
Schema.ResolveStringRequest request = Schema.ResolveStringRequest.newBuilder().buildPartial();
ResolveStringRequest request = ResolveStringRequest.newBuilder().buildPartial();

return resolve(key, ctx, request, this.connector.getResolver()::resolveString, null);
}
Expand All @@ -112,7 +116,7 @@ public ProviderEvaluation<String> stringEvaluation(String key, String defaultVal
*/
public ProviderEvaluation<Double> doubleEvaluation(String key, Double defaultValue,
EvaluationContext ctx) {
Schema.ResolveFloatRequest request = Schema.ResolveFloatRequest.newBuilder().buildPartial();
ResolveFloatRequest request = ResolveFloatRequest.newBuilder().buildPartial();

return resolve(key, ctx, request, this.connector.getResolver()::resolveFloat, null);
}
Expand All @@ -123,7 +127,7 @@ public ProviderEvaluation<Double> doubleEvaluation(String key, Double defaultVal
public ProviderEvaluation<Integer> integerEvaluation(String key, Integer defaultValue,
EvaluationContext ctx) {

Schema.ResolveIntRequest request = Schema.ResolveIntRequest.newBuilder().buildPartial();
ResolveIntRequest request = ResolveIntRequest.newBuilder().buildPartial();

return resolve(key, ctx, request, this.connector.getResolver()::resolveInt,
(Object value) -> ((Long) value).intValue());
Expand All @@ -135,7 +139,7 @@ public ProviderEvaluation<Integer> integerEvaluation(String key, Integer default
public ProviderEvaluation<Value> objectEvaluation(String key, Value defaultValue,
EvaluationContext ctx) {

Schema.ResolveObjectRequest request = Schema.ResolveObjectRequest.newBuilder().buildPartial();
ResolveObjectRequest request = ResolveObjectRequest.newBuilder().buildPartial();

return resolve(key, ctx, request, this.connector.getResolver()::resolveObject,
(Object value) -> convertObjectResponse((Struct) value));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.model;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.schema.JsonSchema;
import com.networknt.schema.JsonSchemaFactory;
import com.networknt.schema.SpecVersion;
import com.networknt.schema.ValidationMessage;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

/**
* flagd feature flag configuration parser.
*/
Expand All @@ -29,7 +29,6 @@ public class FlagParser {
private static final String FLAG_KEY = "flags";
private static final String EVALUATOR_KEY = "$evaluators";
private static final String REPLACER_FORMAT = "\"\\$ref\":(\\s)*\"%s\"";
private static final String SCHEMA_RESOURCE = "flagd-definitions.json";

private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Map<String, Pattern> PATTERN_MAP = new HashMap<>();
Expand All @@ -40,37 +39,39 @@ private FlagParser() {
}

static {
try (InputStream schema = FlagParser.class.getClassLoader().getResourceAsStream(SCHEMA_RESOURCE)) {
if (schema == null) {
log.warn(String.format("Resource %s not found", SCHEMA_RESOURCE));
} else {
final ByteArrayOutputStream result = new ByteArrayOutputStream();
byte[] buffer = new byte[512];
for (int size; 0 < (size = schema.read(buffer)); ) {
result.write(buffer, 0, size);
}

JsonSchemaFactory instance = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V7);
SCHEMA_VALIDATOR = instance.getSchema(result.toString("UTF-8"));
}
try {
// load both schemas from resources (root (flags.json) and referenced (targeting.json)
// we don't want to resolve anything from the network
Map<String, String> mappings = new HashMap<>();
mappings.put("https://flagd.dev/schema/v0/targeting.json", "classpath:targeting.json");
mappings.put("https://flagd.dev/schema/v0/flags.json", "classpath:flags.json");

SCHEMA_VALIDATOR = JsonSchemaFactory
.getInstance(SpecVersion.VersionFlag.V7,
builder -> builder
.schemaMappers(schemaMappers -> schemaMappers.mappings(mappings)))
.getSchema(new URI("https://flagd.dev/schema/v0/flags.json"));
} catch (Throwable e) {
// log only, do not throw
log.warn(String.format("Error loading resource %s, schema validation will be skipped", SCHEMA_RESOURCE), e);
log.warn(String.format("Error loading schema resources, schema validation will be skipped"));
}
}

/**
* Parse {@link String} for feature flags.
*/
public static Map<String, FeatureFlag> parseString(final String configuration) throws IOException {
public static Map<String, FeatureFlag> parseString(final String configuration, boolean throwIfInvalid)
throws IOException {
if (SCHEMA_VALIDATOR != null) {
try (JsonParser parser = MAPPER.createParser(configuration)) {
Set<ValidationMessage> validationMessages = SCHEMA_VALIDATOR.validate(parser.readValueAsTree());

if (!validationMessages.isEmpty()) {
throw new IllegalArgumentException(
String.format("Failed to parse configurations. %d validation error(s) reported.",
validationMessages.size()));
String message = String.format("Invalid flag configuration: %s", validationMessages.toArray());
log.warn(message);
if (throwIfInvalid) {
throw new IllegalArgumentException(message);
}
}
}
}
Expand Down Expand Up @@ -128,5 +129,4 @@ private static String transposeEvaluators(final String configuration) throws IOE
return replacedConfigurations;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ public class FlagStore implements Storage {
private final Map<String, FeatureFlag> flags = new HashMap<>();

private final Connector connector;
private final boolean throwIfInvalid;

public FlagStore(final Connector connector) {
this(connector, false);
}

public FlagStore(final Connector connector, final boolean throwIfInvalid) {
this.connector = connector;
this.throwIfInvalid = throwIfInvalid;
}

/**
Expand Down Expand Up @@ -94,7 +100,7 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
switch (take.getType()) {
case DATA:
try {
Map<String, FeatureFlag> flagMap = FlagParser.parseString(take.getData());
Map<String, FeatureFlag> flagMap = FlagParser.parseString(take.getData(), throwIfInvalid);
writeLock.lock();
try {
flags.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;

import dev.openfeature.flagd.sync.SyncService;
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
import lombok.Getter;

@Getter
class GrpcResponseModel {
private final SyncService.SyncFlagsResponse syncFlagsResponse;
private final SyncFlagsResponse syncFlagsResponse;
private final Throwable error;
private final boolean complete;

public GrpcResponseModel(final Throwable error) {
this(null, error, false);
}

public GrpcResponseModel(final SyncService.SyncFlagsResponse syncFlagsResponse) {
public GrpcResponseModel(final SyncFlagsResponse syncFlagsResponse) {
this(syncFlagsResponse, null, false);
}

public GrpcResponseModel(final Boolean complete) {
this(null, null, complete);
}

GrpcResponseModel(SyncService.SyncFlagsResponse syncFlagsResponse, Throwable error, boolean complete) {
GrpcResponseModel(SyncFlagsResponse syncFlagsResponse, Throwable error, boolean complete) {
this.syncFlagsResponse = syncFlagsResponse;
this.error = error;
this.complete = complete;
Expand Down
Loading

0 comments on commit 20ca053

Please sign in to comment.