diff --git a/providers/flagd/pom.xml b/providers/flagd/pom.xml index 6086ffcb3..93329d919 100644 --- a/providers/flagd/pom.xml +++ b/providers/flagd/pom.xml @@ -1,144 +1,151 @@ - - 4.0.0 - - dev.openfeature.contrib - parent - 0.1.0 - ../../pom.xml - - dev.openfeature.contrib.providers - flagd - 0.5.8 + + 4.0.0 + + dev.openfeature.contrib + parent + 0.1.0 + ../../pom.xml + + dev.openfeature.contrib.providers + flagd + 0.5.8-snapshot - flagd - FlagD provider for Java - https://openfeature.dev + flagd + FlagD provider for Java + https://openfeature.dev - - - toddbaert - Todd Baert - OpenFeature - https://openfeature.dev/ - - + + + toddbaert + Todd Baert + OpenFeature + https://openfeature.dev/ + + - - + + - - io.grpc - grpc-netty - 1.54.1 - + + io.grpc + grpc-netty + 1.54.1 + - - - io.netty - netty-transport-native-epoll - 4.1.92.Final - - linux-x86_64 - + + + io.netty + netty-transport-native-epoll + 4.1.92.Final + + linux-x86_64 + - - io.grpc - grpc-protobuf - 1.54.1 - + + io.grpc + grpc-protobuf + 1.54.1 + - - io.grpc - grpc-stub - 1.54.1 - + + io.grpc + grpc-stub + 1.54.1 + - - - org.apache.tomcat - annotations-api - 6.0.53 - provided - + + + org.apache.tomcat + annotations-api + 6.0.53 + provided + - - org.apache.commons - commons-collections4 - 4.4 - - + + org.apache.commons + commons-collections4 + 4.4 + - - - - - kr.motd.maven - os-maven-plugin - 1.7.1 - - + + io.opentelemetry + opentelemetry-api + 1.25.0 + + - - - org.codehaus.mojo - exec-maven-plugin - 3.1.0 - - - update-schemas-submodule - validate - - exec - - - - git - - submodule - update - --init - --recursive - - - - - copy-protobuf-definition - validate - - exec - - - - cp - - schemas/protobuf/schema/v1/schema.proto - src/main/proto/ - - - - - + + + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + - - org.xolstice.maven.plugins - protobuf-maven-plugin - 0.6.1 - - com.google.protobuf:protoc:3.21.1:exe:${os.detected.classifier} - grpc-java - io.grpc:protoc-gen-grpc-java:1.48.1:exe:${os.detected.classifier} - - - - - compile - compile-custom - - - - - - + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + + update-schemas-submodule + validate + + exec + + + + git + + submodule + update + --init + --recursive + + + + + copy-protobuf-definition + validate + + exec + + + + cp + + schemas/protobuf/schema/v1/schema.proto + src/main/proto/ + + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.21.1:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.48.1:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + \ No newline at end of file diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java new file mode 100644 index 000000000..42b4f6201 --- /dev/null +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java @@ -0,0 +1,54 @@ +package dev.openfeature.contrib.providers.flagd; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapSetter; + +import javax.annotation.Nullable; + +/** + * FlagdGrpcInterceptor is an interceptor for grpc communication from java-sdk to flagd. + * credits + */ +final class FlagdGrpcInterceptor implements ClientInterceptor { + private static final TextMapSetter SETTER = new Setter(); + + private final OpenTelemetry openTelemetry; + + FlagdGrpcInterceptor(final OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + } + + @Override public ClientCall interceptCall(MethodDescriptor methodDescriptor, + CallOptions callOptions, Channel channel) { + + final ClientCall call = channel.newCall(methodDescriptor, callOptions); + + return new ForwardingClientCall.SimpleForwardingClientCall(call) { + @Override public void start(Listener responseListener, Metadata headers) { + openTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), headers, SETTER); + super.start(responseListener, headers); + } + }; + } + + /** + * Setter implements TextMapSetter with carrier check. + */ + static class Setter implements TextMapSetter { + @Override public void set(@Nullable Metadata carrier, String key, String value) { + if (carrier == null) { + return; + } + + carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + } + } +} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 6ba4b6d1c..279c8f218 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -1,21 +1,7 @@ package dev.openfeature.contrib.providers.flagd; -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import java.util.stream.Collectors; - -import javax.net.ssl.SSLException; - import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.Message; - import com.google.protobuf.NullValue; import dev.openfeature.flagd.grpc.Schema.EventStreamRequest; import dev.openfeature.flagd.grpc.Schema.EventStreamResponse; @@ -40,8 +26,25 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.ssl.SslContextBuilder; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; import lombok.extern.slf4j.Slf4j; +import javax.net.ssl.SSLException; +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; + /** * OpenFeature provider for flagd. */ @@ -81,15 +84,18 @@ public class FlagdProvider implements FeatureProvider, EventStreamCallback { static final int BASE_EVENT_STREAM_RETRY_BACKOFF_MS = 1000; private long deadline = DEFAULT_DEADLINE; + private ServiceBlockingStub serviceBlockingStub; private ServiceStub serviceStub; + private Tracer tracer; + private Boolean eventStreamAlive; private FlagdCache cache; private int eventStreamAttempt = 1; private int eventStreamRetryBackoff = BASE_EVENT_STREAM_RETRY_BACKOFF_MS; - private int maxEventStreamRetries = DEFAULT_MAX_EVENT_STREAM_RETRIES; + private int maxEventStreamRetries; private ReadWriteLock lock = new ReentrantReadWriteLock(); private Object eventStreamAliveSync; @@ -180,6 +186,21 @@ public FlagdProvider() { fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES)); } + + /** + * Create a new FlagdProvider instance with manual telemetry sdk. + */ + public FlagdProvider(OpenTelemetry openTelemetry) { + this( + buildServiceBlockingStub(openTelemetry), + buildServiceStub(null, null, null, null, null), + fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), + fallBackToEnvOrDefault(MAX_CACHE_SIZE_ENV_VAR_NAME, DEFAULT_MAX_CACHE_SIZE), + fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES)); + + this.tracer = openTelemetry.getTracer("OpenFeature/dev.openfeature.contrib.providers.flagd"); + } + FlagdProvider(ServiceBlockingStub serviceBlockingStub, ServiceStub serviceStub, String cache, int maxCacheSize, int maxEventStreamRetries) { this.serviceBlockingStub = serviceBlockingStub; @@ -454,6 +475,12 @@ private static ServiceBlockingStub buildServiceBlockingStub(String host, Integer return ServiceGrpc.newBlockingStub(channelBuilder(host, port, tls, certPath, socketPath).build()); } + private static ServiceBlockingStub buildServiceBlockingStub(OpenTelemetry telemetry) { + return ServiceGrpc + .newBlockingStub(channelBuilder(null, null, null, null, null) + .intercept(new FlagdGrpcInterceptor(telemetry)).build()); + } + private static ServiceStub buildServiceStub(String host, Integer port, Boolean tls, String certPath, String socketPath) { return ServiceGrpc.newStub(channelBuilder(host, port, tls, certPath, socketPath).build()); @@ -532,7 +559,22 @@ private Provid .build(); // run the referenced resolver method - ResT response = resolverRef.apply((ReqT) req); + final ResT response; + + if (tracer != null) { + final Span span = tracer.spanBuilder("resolve") + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + span.setAttribute("feature_flag.key", key); + span.setAttribute("feature_flag.provider_name", "flagd"); + try (Scope scope = span.makeCurrent()) { + response = resolverRef.apply((ReqT) req); + } finally { + span.end(); + } + } else { + response = resolverRef.apply((ReqT) req); + } // parse the response ValT value = converter == null ? this.getField(response, VALUE_FIELD)