diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/security/AuthProvider.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/security/AuthProvider.java index 48065b106f..9dbfce938c 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/security/AuthProvider.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/security/AuthProvider.java @@ -18,6 +18,7 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.vertx.core.Future; +import io.vertx.core.Vertx; /** * AuthProvider provides auth credentials. @@ -25,8 +26,8 @@ @FunctionalInterface public interface AuthProvider { - static AuthProvider kubernetes() { - return new KubernetesAuthProvider(new DefaultKubernetesClient()); + static AuthProvider kubernetes(final Vertx vertx) { + return new KubernetesAuthProvider(vertx, new DefaultKubernetesClient()); } static AuthProvider noAuth() { diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/security/KubernetesAuthProvider.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/security/KubernetesAuthProvider.java index f4eb3de722..41268c1a41 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/security/KubernetesAuthProvider.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/security/KubernetesAuthProvider.java @@ -28,14 +28,16 @@ class KubernetesAuthProvider implements AuthProvider { + private final Vertx vertx; private final KubernetesClient kubernetesClient; - KubernetesAuthProvider(final KubernetesClient client) { + KubernetesAuthProvider(final Vertx vertx, final KubernetesClient client) { + this.vertx = vertx; this.kubernetesClient = client; } private Future getCredentials(final DataPlaneContract.Reference secretReference) { - return Vertx.currentContext().executeBlocking(p -> { + return this.vertx.executeBlocking(p -> { try { final Secret secret = getSecretFromKubernetes(secretReference); final var credentials = new KubernetesCredentials(secret); @@ -52,7 +54,7 @@ private Future getCredentials(final DataPlaneContract.Reference sec } private Future getCredentials(final DataPlaneContract.MultiSecretReference secretReferences) { - return Vertx.currentContext().executeBlocking(p -> { + return this.vertx.executeBlocking(p -> { try { final var credentials = new KubernetesCredentials(secretDataOf(secretReferences)); final var error = CredentialsValidator.validate(credentials); diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/security/KubernetesAuthProviderTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/security/KubernetesAuthProviderTest.java index fd12393626..7bb85cbe0b 100644 --- a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/security/KubernetesAuthProviderTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/security/KubernetesAuthProviderTest.java @@ -51,8 +51,8 @@ public class KubernetesAuthProviderTest { private static final String userPasswordSecretKey = "password"; @BeforeEach - public void setUp() { - provider = new KubernetesAuthProvider(client); + public void setUp(final Vertx vertx) { + provider = new KubernetesAuthProvider(vertx, client); } @Test diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java index 3fc667d6b9..3b4dbb50c4 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java @@ -113,7 +113,7 @@ public static void start( consumerConfig, webClientOptions, producerConfig, - AuthProvider.kubernetes(), + AuthProvider.kubernetes(vertx), Metrics.getRegistry(), reactiveConsumerFactory, reactiveProducerFactory), diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java index 72438f31ae..719c917f1a 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java @@ -69,7 +69,7 @@ public Verticle get() { httpServerOptions, httpsServerOptions, v -> new IngressProducerReconcilableStore( - AuthProvider.kubernetes(), + AuthProvider.kubernetes(v), producerConfigs, properties -> kafkaProducerFactory.create(v, properties)), this.ingressRequestHandler,