From 5e4a372e885d3ee5062a20399bfb51df7a4dd708 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 21 Apr 2023 10:16:52 -0700 Subject: [PATCH 1/5] minimal manual trace impl Signed-off-by: Kavindu Dodanduwa --- providers/flagd/pom.xml | 265 +++++++++--------- .../providers/flagd/FlagdGrpcInterceptor.java | 61 ++++ .../providers/flagd/FlagdProvider.java | 69 +++-- 3 files changed, 250 insertions(+), 145 deletions(-) create mode 100644 providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java diff --git a/providers/flagd/pom.xml b/providers/flagd/pom.xml index 6086ffcb3..e9b27e935 100644 --- a/providers/flagd/pom.xml +++ b/providers/flagd/pom.xml @@ -1,144 +1,151 @@ - - 4.0.0 - - dev.openfeature.contrib - parent - 0.1.0 - ../../pom.xml - - dev.openfeature.contrib.providers - flagd - 0.5.8 + + 4.0.0 + + dev.openfeature.contrib + parent + 0.1.0 + ../../pom.xml + + dev.openfeature.contrib.providers + flagd + 0.5.8 - flagd - FlagD provider for Java - https://openfeature.dev + flagd + FlagD provider for Java + https://openfeature.dev - - - toddbaert - Todd Baert - OpenFeature - https://openfeature.dev/ - - + + + toddbaert + Todd Baert + OpenFeature + https://openfeature.dev/ + + - - + + - - io.grpc - grpc-netty - 1.54.1 - + + io.grpc + grpc-netty + 1.54.1 + - - - io.netty - netty-transport-native-epoll - 4.1.92.Final - - linux-x86_64 - + + + io.netty + netty-transport-native-epoll + 4.1.92.Final + + linux-x86_64 + - - io.grpc - grpc-protobuf - 1.54.1 - + + io.grpc + grpc-protobuf + 1.54.1 + - - io.grpc - grpc-stub - 1.54.1 - + + io.grpc + grpc-stub + 1.54.1 + - - - org.apache.tomcat - annotations-api - 6.0.53 - provided - + + + org.apache.tomcat + annotations-api + 6.0.53 + provided + - - org.apache.commons - commons-collections4 - 4.4 - - + + org.apache.commons + commons-collections4 + 4.4 + - - - - - kr.motd.maven - os-maven-plugin - 1.7.1 - - + + io.opentelemetry + opentelemetry-sdk + 1.25.0 + + - - - org.codehaus.mojo - exec-maven-plugin - 3.1.0 - - - update-schemas-submodule - validate - - exec - - - - git - - submodule - update - --init - --recursive - - - - - copy-protobuf-definition - validate - - exec - - - - cp - - schemas/protobuf/schema/v1/schema.proto - src/main/proto/ - - - - - + + + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + - - org.xolstice.maven.plugins - protobuf-maven-plugin - 0.6.1 - - com.google.protobuf:protoc:3.21.1:exe:${os.detected.classifier} - grpc-java - io.grpc:protoc-gen-grpc-java:1.48.1:exe:${os.detected.classifier} - - - - - compile - compile-custom - - - - - - + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + + update-schemas-submodule + validate + + exec + + + + git + + submodule + update + --init + --recursive + + + + + copy-protobuf-definition + validate + + exec + + + + cp + + schemas/protobuf/schema/v1/schema.proto + src/main/proto/ + + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.21.1:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.48.1:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + \ No newline at end of file diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java new file mode 100644 index 000000000..f5c3af9ed --- /dev/null +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java @@ -0,0 +1,61 @@ +package dev.openfeature.contrib.providers.flagd; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.sdk.OpenTelemetrySdk; + +import javax.annotation.Nullable; + +/** + * FlagdGrpcInterceptor is an interceptor for grpc communication from java-sdk to flagd + *

+ * credits + */ +final class FlagdGrpcInterceptor implements ClientInterceptor { + private static final TextMapSetter SETTER = new Setter(); + + private final OpenTelemetrySdk openTelemetry; + + FlagdGrpcInterceptor(final OpenTelemetrySdk openTelemetry) { + this.openTelemetry = openTelemetry; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { + + final ClientCall call = channel.newCall(methodDescriptor, callOptions); + + return new ForwardingClientCall.SimpleForwardingClientCall(call) { + @Override + public void start(Listener responseListener, io.grpc.Metadata headers) { + openTelemetry.getPropagators() + .getTextMapPropagator() + .inject(Context.current(), headers, SETTER); + + super.start(responseListener, headers); + } + }; + } + + /** + * Setter implements TextMapSetter with carrier check + */ + static class Setter implements TextMapSetter { + @Override + public void set(@Nullable Metadata carrier, String key, String value) { + if (carrier == null) { + return; + } + + carrier.put(io.grpc.Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + } + } +} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 6ba4b6d1c..eec3319b3 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -1,21 +1,7 @@ package dev.openfeature.contrib.providers.flagd; -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import java.util.stream.Collectors; - -import javax.net.ssl.SSLException; - import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.Message; - import com.google.protobuf.NullValue; import dev.openfeature.flagd.grpc.Schema.EventStreamRequest; import dev.openfeature.flagd.grpc.Schema.EventStreamResponse; @@ -40,8 +26,25 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.ssl.SslContextBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; import lombok.extern.slf4j.Slf4j; +import javax.net.ssl.SSLException; +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; + /** * OpenFeature provider for flagd. */ @@ -81,15 +84,18 @@ public class FlagdProvider implements FeatureProvider, EventStreamCallback { static final int BASE_EVENT_STREAM_RETRY_BACKOFF_MS = 1000; private long deadline = DEFAULT_DEADLINE; + private ServiceBlockingStub serviceBlockingStub; private ServiceStub serviceStub; + private Tracer tracer; + private Boolean eventStreamAlive; private FlagdCache cache; private int eventStreamAttempt = 1; private int eventStreamRetryBackoff = BASE_EVENT_STREAM_RETRY_BACKOFF_MS; - private int maxEventStreamRetries = DEFAULT_MAX_EVENT_STREAM_RETRIES; + private int maxEventStreamRetries; private ReadWriteLock lock = new ReentrantReadWriteLock(); private Object eventStreamAliveSync; @@ -180,6 +186,17 @@ public FlagdProvider() { fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES)); } + public FlagdProvider(OpenTelemetrySdk telemetrySdk) { + this( + buildServiceBlockingStub(telemetrySdk), + buildServiceStub(null, null, null, null, null), + fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), + fallBackToEnvOrDefault(MAX_CACHE_SIZE_ENV_VAR_NAME, DEFAULT_MAX_CACHE_SIZE), + fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES)); + + this.tracer = telemetrySdk.getTracer("OpenFeature/dev.openfeature.contrib.providers.flagd"); + } + FlagdProvider(ServiceBlockingStub serviceBlockingStub, ServiceStub serviceStub, String cache, int maxCacheSize, int maxEventStreamRetries) { this.serviceBlockingStub = serviceBlockingStub; @@ -454,6 +471,12 @@ private static ServiceBlockingStub buildServiceBlockingStub(String host, Integer return ServiceGrpc.newBlockingStub(channelBuilder(host, port, tls, certPath, socketPath).build()); } + private static ServiceBlockingStub buildServiceBlockingStub(OpenTelemetrySdk sdk) { + return ServiceGrpc + .newBlockingStub(channelBuilder(null, null, null, null, null) + .intercept(new FlagdGrpcInterceptor(sdk)).build()); + } + private static ServiceStub buildServiceStub(String host, Integer port, Boolean tls, String certPath, String socketPath) { return ServiceGrpc.newStub(channelBuilder(host, port, tls, certPath, socketPath).build()); @@ -532,7 +555,21 @@ private Provid .build(); // run the referenced resolver method - ResT response = resolverRef.apply((ReqT) req); + final ResT response; + + if (tracer != null){ + final Span span = tracer.spanBuilder("resolve") + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + span.setAttribute("feature_flag.key", key); + try (Scope scope = span.makeCurrent()) { + response = resolverRef.apply((ReqT) req); + } finally { + span.end(); + } + }else { + response = resolverRef.apply((ReqT) req); + } // parse the response ValT value = converter == null ? this.getField(response, VALUE_FIELD) From d4a12f5fb89200ff71d83b88608a45a870856657 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 21 Apr 2023 10:43:29 -0700 Subject: [PATCH 2/5] fix lints Signed-off-by: Kavindu Dodanduwa --- .../providers/flagd/FlagdGrpcInterceptor.java | 23 +++++++------------ .../providers/flagd/FlagdProvider.java | 8 +++++-- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java index f5c3af9ed..6cda47e55 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java @@ -14,9 +14,8 @@ import javax.annotation.Nullable; /** - * FlagdGrpcInterceptor is an interceptor for grpc communication from java-sdk to flagd - *

- * credits + * FlagdGrpcInterceptor is an interceptor for grpc communication from java-sdk to flagd. + * credits */ final class FlagdGrpcInterceptor implements ClientInterceptor { private static final TextMapSetter SETTER = new Setter(); @@ -27,30 +26,24 @@ final class FlagdGrpcInterceptor implements ClientInterceptor { this.openTelemetry = openTelemetry; } - @Override - public ClientCall interceptCall( - MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { + @Override public ClientCall interceptCall(MethodDescriptor methodDescriptor, + CallOptions callOptions, Channel channel) { final ClientCall call = channel.newCall(methodDescriptor, callOptions); return new ForwardingClientCall.SimpleForwardingClientCall(call) { - @Override - public void start(Listener responseListener, io.grpc.Metadata headers) { - openTelemetry.getPropagators() - .getTextMapPropagator() - .inject(Context.current(), headers, SETTER); - + @Override public void start(Listener responseListener, io.grpc.Metadata headers) { + openTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), headers, SETTER); super.start(responseListener, headers); } }; } /** - * Setter implements TextMapSetter with carrier check + * Setter implements TextMapSetter with carrier check. */ static class Setter implements TextMapSetter { - @Override - public void set(@Nullable Metadata carrier, String key, String value) { + @Override public void set(@Nullable Metadata carrier, String key, String value) { if (carrier == null) { return; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index eec3319b3..9c207f2b2 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -186,6 +186,10 @@ public FlagdProvider() { fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES)); } + + /** + * Create a new FlagdProvider instance with manual telemetry sdk. + */ public FlagdProvider(OpenTelemetrySdk telemetrySdk) { this( buildServiceBlockingStub(telemetrySdk), @@ -557,7 +561,7 @@ private Provid // run the referenced resolver method final ResT response; - if (tracer != null){ + if (tracer != null) { final Span span = tracer.spanBuilder("resolve") .setSpanKind(SpanKind.CLIENT) .startSpan(); @@ -567,7 +571,7 @@ private Provid } finally { span.end(); } - }else { + } else { response = resolverRef.apply((ReqT) req); } From 29a70410eb7325978acb602e046437ca269bd31e Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 21 Apr 2023 11:06:58 -0700 Subject: [PATCH 3/5] pmd fix Signed-off-by: Kavindu Dodanduwa --- .../contrib/providers/flagd/FlagdGrpcInterceptor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java index 6cda47e55..14f49ab1c 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java @@ -32,7 +32,7 @@ final class FlagdGrpcInterceptor implements ClientInterceptor { final ClientCall call = channel.newCall(methodDescriptor, callOptions); return new ForwardingClientCall.SimpleForwardingClientCall(call) { - @Override public void start(Listener responseListener, io.grpc.Metadata headers) { + @Override public void start(Listener responseListener, Metadata headers) { openTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), headers, SETTER); super.start(responseListener, headers); } @@ -48,7 +48,7 @@ static class Setter implements TextMapSetter { return; } - carrier.put(io.grpc.Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); } } } From f6890a7fa17f7a8eaad40dde45240b7f319d8122 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Mon, 24 Apr 2023 15:17:22 -0700 Subject: [PATCH 4/5] review changes Signed-off-by: Kavindu Dodanduwa --- .../dev/openfeature/contrib/providers/flagd/FlagdProvider.java | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 9c207f2b2..7efa9454d 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -566,6 +566,7 @@ private Provid .setSpanKind(SpanKind.CLIENT) .startSpan(); span.setAttribute("feature_flag.key", key); + span.setAttribute("feature_flag.provider_name", "flagd"); try (Scope scope = span.makeCurrent()) { response = resolverRef.apply((ReqT) req); } finally { From 0faf1b89e6adad4f7c6b80f27a6beaca10415771 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 25 Apr 2023 08:16:14 -0700 Subject: [PATCH 5/5] review change Signed-off-by: Kavindu Dodanduwa --- providers/flagd/pom.xml | 4 ++-- .../providers/flagd/FlagdGrpcInterceptor.java | 6 +++--- .../contrib/providers/flagd/FlagdProvider.java | 12 ++++++------ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/providers/flagd/pom.xml b/providers/flagd/pom.xml index e9b27e935..93329d919 100644 --- a/providers/flagd/pom.xml +++ b/providers/flagd/pom.xml @@ -10,7 +10,7 @@ dev.openfeature.contrib.providers flagd - 0.5.8 + 0.5.8-snapshot flagd FlagD provider for Java @@ -71,7 +71,7 @@ io.opentelemetry - opentelemetry-sdk + opentelemetry-api 1.25.0 diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java index 14f49ab1c..42b4f6201 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdGrpcInterceptor.java @@ -7,9 +7,9 @@ import io.grpc.ForwardingClientCall; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.TextMapSetter; -import io.opentelemetry.sdk.OpenTelemetrySdk; import javax.annotation.Nullable; @@ -20,9 +20,9 @@ final class FlagdGrpcInterceptor implements ClientInterceptor { private static final TextMapSetter SETTER = new Setter(); - private final OpenTelemetrySdk openTelemetry; + private final OpenTelemetry openTelemetry; - FlagdGrpcInterceptor(final OpenTelemetrySdk openTelemetry) { + FlagdGrpcInterceptor(final OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 7efa9454d..279c8f218 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -26,11 +26,11 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.ssl.SslContextBuilder; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Scope; -import io.opentelemetry.sdk.OpenTelemetrySdk; import lombok.extern.slf4j.Slf4j; import javax.net.ssl.SSLException; @@ -190,15 +190,15 @@ public FlagdProvider() { /** * Create a new FlagdProvider instance with manual telemetry sdk. */ - public FlagdProvider(OpenTelemetrySdk telemetrySdk) { + public FlagdProvider(OpenTelemetry openTelemetry) { this( - buildServiceBlockingStub(telemetrySdk), + buildServiceBlockingStub(openTelemetry), buildServiceStub(null, null, null, null, null), fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE), fallBackToEnvOrDefault(MAX_CACHE_SIZE_ENV_VAR_NAME, DEFAULT_MAX_CACHE_SIZE), fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES)); - this.tracer = telemetrySdk.getTracer("OpenFeature/dev.openfeature.contrib.providers.flagd"); + this.tracer = openTelemetry.getTracer("OpenFeature/dev.openfeature.contrib.providers.flagd"); } FlagdProvider(ServiceBlockingStub serviceBlockingStub, ServiceStub serviceStub, String cache, @@ -475,10 +475,10 @@ private static ServiceBlockingStub buildServiceBlockingStub(String host, Integer return ServiceGrpc.newBlockingStub(channelBuilder(host, port, tls, certPath, socketPath).build()); } - private static ServiceBlockingStub buildServiceBlockingStub(OpenTelemetrySdk sdk) { + private static ServiceBlockingStub buildServiceBlockingStub(OpenTelemetry telemetry) { return ServiceGrpc .newBlockingStub(channelBuilder(null, null, null, null, null) - .intercept(new FlagdGrpcInterceptor(sdk)).build()); + .intercept(new FlagdGrpcInterceptor(telemetry)).build()); } private static ServiceStub buildServiceStub(String host, Integer port, Boolean tls, String certPath,