From 97c358a7001bca9cfd2244f51894fb38d5018eb5 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Sun, 17 Sep 2023 18:50:44 +0200 Subject: [PATCH] Adapt API to handle Vert.x 4.4.5 deprecations --- .../lambda/runtime/MockEventServer.java | 14 ++--- .../grpc/runtime/GrpcServerRecorder.java | 9 ++-- .../blocking/BlockingExecutionHandler.java | 12 ++--- .../blocking/BlockingServerInterceptor.java | 36 +++++++------ .../DevModeBlockingExecutionHandler.java | 13 +++-- .../quartz/runtime/QuartzSchedulerImpl.java | 13 ++--- .../runtime/ConnectOptionsSupplier.java | 13 +++-- .../runtime/standalone/VertxHttpRequest.java | 6 +-- .../runtime/ServletRequestContext.java | 4 +- .../reactive/RequestLeakDetectionTest.java | 4 +- .../scheduler/runtime/SimpleScheduler.java | 13 ++--- .../devui/SchedulerJsonRPCService.java | 5 +- .../WebAuthnAuthenticationMechanism.java | 2 +- .../spi/datafetcher/BlockingHelper.java | 8 +-- .../attribute/LocalServerNameAttribute.java | 2 +- .../runtime/devmode/RemoteSyncHandler.java | 24 ++++----- .../devmode/VertxHttpHotReplacementSetup.java | 52 +++++++++---------- .../security/FormAuthenticationMechanism.java | 6 +-- .../VertxBlockingSecurityExecutor.java | 13 ++--- .../quarkus/vertx/mdc/VerticleDeployer.java | 6 +-- .../java/io/quarkus/vertx/ConsumeEvent.java | 5 +- .../VertxEventBusConsumerRecorder.java | 10 ++-- .../threads/VirtualThreadsRecorder.java | 10 ++-- .../reactive/client/impl/ClientImpl.java | 3 ++ .../VertxResteasyReactiveRequestContext.java | 4 +- .../it/vertx/verticles/MdcVerticle.java | 6 +-- 26 files changed, 131 insertions(+), 162 deletions(-) diff --git a/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java b/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java index 72a623b06f902..2f57136b1c686 100644 --- a/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java +++ b/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java @@ -7,6 +7,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -18,8 +19,6 @@ import org.jboss.logging.Logger; import io.netty.handler.codec.http.HttpHeaderNames; -import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; @@ -132,9 +131,9 @@ public void postEvent(RoutingContext ctx) { } public void nextEvent(RoutingContext ctx) { - vertx.executeBlocking(new Handler<>() { + vertx.executeBlocking(new Callable() { @Override - public void handle(Promise event) { + public Void call() { final AtomicBoolean closed = new AtomicBoolean(false); ctx.response().closeHandler((v) -> closed.set(true)); ctx.response().exceptionHandler((v) -> closed.set(true)); @@ -149,12 +148,12 @@ public void handle(Promise event) { log.debugf("Polled message %s but connection was closed, returning to queue", request.get(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID)); queue.put(request); - return; + return null; } else { break; } } else if (closed.get()) { - return; + return null; } } } catch (InterruptedException e) { @@ -180,8 +179,9 @@ public void handle(Promise event) { } else { ctx.response().setStatusCode(200).end(); } + return null; } - }, false, null); + }, false); } protected String getEventContentType(RoutingContext request) { diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java index 53e3fb840c251..97bb245563490 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java @@ -495,9 +495,9 @@ private Map.Entry buildServer(Vertx vertx, GrpcServerConfigurat applyNettySettings(configuration, vsBuilder); if (launchMode == LaunchMode.DEVELOPMENT) { vsBuilder.commandDecorator(command -> vertx.executeBlocking( - event -> event.complete(GrpcHotReplacementInterceptor.fire()), - false, - (Handler>) result -> devModeWrapper.run(command))); + GrpcHotReplacementInterceptor::fire, + false) + .onComplete(result -> devModeWrapper.run(command))); } builder = vsBuilder; } @@ -650,7 +650,7 @@ public void start(Promise startPromise) { }); } else { // XDS server blocks on initialStartFuture - vertx.executeBlocking((Handler>) event -> { + vertx.executeBlocking(() -> { try { grpcServer.start(); int actualPort = grpcServer.getPort(); @@ -663,6 +663,7 @@ public void start(Promise startPromise) { LOGGER.error("Unable to start gRPC server", e); startPromise.fail(e); } + return null; }); } } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingExecutionHandler.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingExecutionHandler.java index 87fa5e1ab4442..17af296b1d638 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingExecutionHandler.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingExecutionHandler.java @@ -1,15 +1,14 @@ package io.quarkus.grpc.runtime.supports.blocking; +import java.util.concurrent.Callable; import java.util.function.Consumer; import io.grpc.Context; import io.grpc.ServerCall; import io.quarkus.arc.InjectableContext; import io.quarkus.arc.ManagedContext; -import io.vertx.core.Handler; -import io.vertx.core.Promise; -class BlockingExecutionHandler implements Handler> { +class BlockingExecutionHandler implements Callable { private final ServerCall.Listener delegate; private final Context grpcContext; private final Consumer> consumer; @@ -30,7 +29,7 @@ public BlockingExecutionHandler(Consumer> consumer, Co } @Override - public void handle(Promise event) { + public Void call() { /* * We lock here because with client side streaming different messages from the same request * might be served by different worker threads. This guarantees memory consistency. @@ -42,13 +41,10 @@ public void handle(Promise event) { requestContext.activate(state); try { consumer.accept(delegate); - } catch (Throwable any) { - event.fail(any); - return; } finally { requestContext.deactivate(); } - event.complete(); + return null; } finally { grpcContext.detach(previous); } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java index 94229f22fcea5..2aa30e397e0ca 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/BlockingServerInterceptor.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -21,9 +22,6 @@ import io.quarkus.arc.InjectableContext; import io.quarkus.arc.InjectableContext.ContextState; import io.quarkus.arc.ManagedContext; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.Vertx; /** @@ -115,7 +113,7 @@ public ServerCall.Listener interceptCall(ServerCall replay = new ReplayListener<>(state); - vertx.executeBlocking(f -> { + vertx.executeBlocking(() -> { ServerCall.Listener listener; try { requestContext.activate(state); @@ -123,9 +121,9 @@ public ServerCall.Listener interceptCall(ServerCall>>) event -> replay.setDelegate(event.result())); + return listener; + }, false) + .onComplete(event -> replay.setDelegate(event.result())); return replay; } else { @@ -185,14 +183,14 @@ private void scheduleOrEnqueue(Consumer> consumer) { */ private void executeBlockingWithRequestContext(Consumer> consumer) { final Context grpcContext = Context.current(); - Handler> blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate, + Callable blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate, requestContextState, getRequestContext(), this); if (devMode) { blockingHandler = new DevModeBlockingExecutionHandler(Thread.currentThread().getContextClassLoader(), blockingHandler); } this.isConsumingFromIncomingEvents = true; - vertx.executeBlocking(blockingHandler, true, p -> { + vertx.executeBlocking(blockingHandler, true).onComplete(p -> { Consumer> next = incomingEvents.poll(); if (next != null) { executeBlockingWithRequestContext(next); @@ -275,21 +273,25 @@ private void scheduleOrEnqueue(Consumer> consumer) { private void executeVirtualWithRequestContext(Consumer> consumer) { final Context grpcContext = Context.current(); - Handler> blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate, + Callable blockingHandler = new BlockingExecutionHandler<>(consumer, grpcContext, delegate, requestContextState, getRequestContext(), this); if (devMode) { blockingHandler = new DevModeBlockingExecutionHandler(Thread.currentThread().getContextClassLoader(), blockingHandler); } this.isConsumingFromIncomingEvents = true; - Handler> finalBlockingHandler = blockingHandler; + var finalBlockingHandler = blockingHandler; virtualThreadExecutor.execute(() -> { - finalBlockingHandler.handle(Promise.promise()); - Consumer> next = incomingEvents.poll(); - if (next != null) { - executeVirtualWithRequestContext(next); - } else { - this.isConsumingFromIncomingEvents = false; + try { + finalBlockingHandler.call(); + Consumer> next = incomingEvents.poll(); + if (next != null) { + executeVirtualWithRequestContext(next); + } else { + this.isConsumingFromIncomingEvents = false; + } + } catch (Exception e) { + throw new RuntimeException(e); } }); } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/DevModeBlockingExecutionHandler.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/DevModeBlockingExecutionHandler.java index b6b08ab3100d4..0041b5fef668c 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/DevModeBlockingExecutionHandler.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/blocking/DevModeBlockingExecutionHandler.java @@ -1,24 +1,23 @@ package io.quarkus.grpc.runtime.supports.blocking; -import io.vertx.core.Handler; -import io.vertx.core.Promise; +import java.util.concurrent.Callable; -class DevModeBlockingExecutionHandler implements Handler> { +class DevModeBlockingExecutionHandler implements Callable { final ClassLoader tccl; - final Handler> delegate; + final Callable delegate; - public DevModeBlockingExecutionHandler(ClassLoader tccl, Handler> delegate) { + public DevModeBlockingExecutionHandler(ClassLoader tccl, Callable delegate) { this.tccl = tccl; this.delegate = delegate; } @Override - public void handle(Promise event) { + public Void call() throws Exception { ClassLoader originalTccl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(tccl); try { - delegate.handle(event); + return delegate.call(); } finally { Thread.currentThread().setContextClassLoader(originalTccl); } diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java index bbd7c36e9adc3..ce07d6ee5914c 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java @@ -12,6 +12,7 @@ import java.util.OptionalLong; import java.util.Properties; import java.util.TimeZone; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -92,7 +93,6 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.Vertx; /** @@ -894,15 +894,10 @@ public void run() { } }); } else { - context.executeBlocking(new Handler>() { + context.executeBlocking(new Callable() { @Override - public void handle(Promise p) { - try { - trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext)); - p.complete(); - } catch (Exception e) { - p.tryFail(e); - } + public Object call() throws Exception { + return trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext)); } }, false); } diff --git a/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ConnectOptionsSupplier.java b/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ConnectOptionsSupplier.java index a5677b11d0a90..f54209887a1d7 100644 --- a/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ConnectOptionsSupplier.java +++ b/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ConnectOptionsSupplier.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntUnaryOperator; import java.util.function.Supplier; @@ -12,8 +13,6 @@ import io.quarkus.credentials.CredentialsProvider; import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.sqlclient.SqlConnectOptions; @@ -24,7 +23,7 @@ public class ConnectOptionsSupplier implements Sup private final String credentialsProviderName; private final List connectOptionsList; private final UnaryOperator connectOptionsCopy; - private final Handler> blockingCodeHandler; + private final Callable blockingCodeHandler; public ConnectOptionsSupplier(Vertx vertx, CredentialsProvider credentialsProvider, String credentialsProviderName, List connectOptionsList, UnaryOperator connectOptionsCopy) { @@ -33,7 +32,7 @@ public ConnectOptionsSupplier(Vertx vertx, CredentialsProvider credentialsProvid this.credentialsProviderName = credentialsProviderName; this.connectOptionsList = connectOptionsList; this.connectOptionsCopy = connectOptionsCopy; - blockingCodeHandler = new BlockingCodeHandler(); + this.blockingCodeHandler = new BlockingCodeHandler(); } @Override @@ -41,12 +40,12 @@ public Future get() { return vertx.executeBlocking(blockingCodeHandler, false); } - private class BlockingCodeHandler implements Handler>, IntUnaryOperator { + private class BlockingCodeHandler implements Callable, IntUnaryOperator { final AtomicInteger idx = new AtomicInteger(); @Override - public void handle(Promise promise) { + public CO call() { Map credentials = credentialsProvider.getCredentials(credentialsProviderName); String user = credentials.get(USER_PROPERTY_NAME); String password = credentials.get(PASSWORD_PROPERTY_NAME); @@ -56,7 +55,7 @@ public void handle(Promise promise) { CO connectOptions = connectOptionsCopy.apply(connectOptionsList.get(nextIdx)); connectOptions.setUser(user).setPassword(password); - promise.complete(connectOptions); + return connectOptions; } @Override diff --git a/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java b/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java index 328adaa5454da..921f04d3b3f8c 100644 --- a/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java +++ b/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxHttpRequest.java @@ -262,16 +262,16 @@ public CompletionStage executeBlockingIo(RunnableWithException f, boolean suspend(); } CompletableFuture ret = new CompletableFuture<>(); - this.request.context.executeBlocking(future -> { + this.request.context.executeBlocking(() -> { try (CloseableContext newContext = ResteasyContext.addCloseableContextDataLevel(context)) { f.run(); - future.complete(); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } - }, res -> { + return null; + }).onComplete(res -> { if (res.succeeded()) ret.complete(null); else diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java index b3ff49016f17f..369ae603e4610 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java @@ -236,7 +236,7 @@ public String getRequestScheme() { @Override public String getRequestHost() { - return context.request().host(); + return context.request().authority().toString(); } @Override @@ -246,7 +246,7 @@ public void closeConnection() { } catch (IOException e) { //ignore } - context.response().close(); + context.request().connection().close(); } @Override diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java index 046c9040db8c1..d7aa5c3aef4f5 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/RequestLeakDetectionTest.java @@ -269,9 +269,9 @@ private Task(Context context, Runnable runnable, boolean blocking) { void run() { if (blocking) { - context.executeBlocking(p -> { + context.executeBlocking(() -> { runnable.run(); - p.complete(); + return null; }); } else { context.runOnContext(x -> runnable.run()); diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index eca7095ac87f6..d69c97d0d3ba1 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -10,6 +10,7 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -68,7 +69,6 @@ import io.smallrye.common.vertx.VertxContext; import io.vertx.core.Context; import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.Vertx; @Typed(Scheduler.class) @@ -406,14 +406,11 @@ public void run() { } }); } else { - context.executeBlocking(new Handler>() { + context.executeBlocking(new Callable() { @Override - public void handle(Promise p) { - try { - doInvoke(now, scheduledFireTime); - } finally { - p.complete(); - } + public Void call() { + doInvoke(now, scheduledFireTime); + return null; } }, false); } diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/devui/SchedulerJsonRPCService.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/devui/SchedulerJsonRPCService.java index 331261700c7cc..9d01ffd3b6f5a 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/devui/SchedulerJsonRPCService.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/devui/SchedulerJsonRPCService.java @@ -184,13 +184,12 @@ public JsonObject executeJob(String methodDescription) { ScheduledInvoker invoker = c .createInvoker(metadata.getInvokerClassName()); if (invoker.isBlocking()) { - vdc.executeBlocking(p -> { + vdc.executeBlocking(() -> { try { invoker.invoke(new DevUIScheduledExecution()); } catch (Exception ignored) { - } finally { - p.complete(); } + return null; }, false); } else { vdc.runOnContext(x -> { diff --git a/extensions/security-webauthn/runtime/src/main/java/io/quarkus/security/webauthn/WebAuthnAuthenticationMechanism.java b/extensions/security-webauthn/runtime/src/main/java/io/quarkus/security/webauthn/WebAuthnAuthenticationMechanism.java index fc48bc084a74d..7cfffbe779caa 100644 --- a/extensions/security-webauthn/runtime/src/main/java/io/quarkus/security/webauthn/WebAuthnAuthenticationMechanism.java +++ b/extensions/security-webauthn/runtime/src/main/java/io/quarkus/security/webauthn/WebAuthnAuthenticationMechanism.java @@ -61,7 +61,7 @@ public Uni getChallenge(RoutingContext context) { } static Uni getRedirect(final RoutingContext exchange, final String location) { - String loc = exchange.request().scheme() + "://" + exchange.request().host() + location; + String loc = exchange.request().scheme() + "://" + exchange.request().authority() + location; return Uni.createFrom().item(new ChallengeData(302, "Location", loc)); } diff --git a/extensions/smallrye-graphql/runtime/src/main/java/io/quarkus/smallrye/graphql/runtime/spi/datafetcher/BlockingHelper.java b/extensions/smallrye-graphql/runtime/src/main/java/io/quarkus/smallrye/graphql/runtime/spi/datafetcher/BlockingHelper.java index 453d63d94c7e6..440a2fbd1c9d0 100644 --- a/extensions/smallrye-graphql/runtime/src/main/java/io/quarkus/smallrye/graphql/runtime/spi/datafetcher/BlockingHelper.java +++ b/extensions/smallrye-graphql/runtime/src/main/java/io/quarkus/smallrye/graphql/runtime/spi/datafetcher/BlockingHelper.java @@ -25,12 +25,6 @@ public static boolean nonBlockingShouldExecuteBlocking(Operation operation, Cont @SuppressWarnings("unchecked") public static void runBlocking(Context vc, Callable contextualCallable, Promise result) { // Here call blocking with context - vc.executeBlocking(future -> { - try { - future.complete(contextualCallable.call()); - } catch (Exception ex) { - future.fail(ex); - } - }, result); + vc.executeBlocking(contextualCallable).onComplete(result); } } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/attribute/LocalServerNameAttribute.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/attribute/LocalServerNameAttribute.java index 91d1ef1ade649..5f08272dc1cd4 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/attribute/LocalServerNameAttribute.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/attribute/LocalServerNameAttribute.java @@ -19,7 +19,7 @@ private LocalServerNameAttribute() { @Override public String readAttribute(final RoutingContext exchange) { - return exchange.request().host(); + return exchange.request().authority().toString(); } @Override diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/RemoteSyncHandler.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/RemoteSyncHandler.java index bf6fd54e81bc1..059a1dcea9aaa 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/RemoteSyncHandler.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/RemoteSyncHandler.java @@ -7,6 +7,7 @@ import java.util.Base64; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.RejectedExecutionException; import org.jboss.logging.Logger; @@ -17,7 +18,6 @@ import io.quarkus.runtime.util.HashUtil; import io.quarkus.vertx.core.runtime.VertxCoreRecorder; import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; @@ -80,16 +80,13 @@ public void handle(HttpServerRequest event) { final String type = event.headers().get(HttpHeaderNames.CONTENT_TYPE); if (APPLICATION_QUARKUS.equals(type)) { currentSessionTimeout = time + 60000; - VertxCoreRecorder.getVertx().get().executeBlocking(new Handler>() { + VertxCoreRecorder.getVertx().get().executeBlocking(new Callable() { @Override - public void handle(Promise promise) { - try { - handleRequest(event); - } finally { - promise.complete(); - } + public Void call() { + handleRequest(event); + return null; } - }, null); + }); return; } next.handle(event); @@ -127,9 +124,9 @@ public void handle(Buffer b) { if (checkSession(event, b.getBytes())) { return; } - VertxCoreRecorder.getVertx().get().executeBlocking(new Handler>() { + VertxCoreRecorder.getVertx().get().executeBlocking(new Callable() { @Override - public void handle(Promise promise) { + public Void call() { try { Throwable problem = (Throwable) new ObjectInputStream(new ByteArrayInputStream(b.getBytes())) .readObject(); @@ -157,11 +154,10 @@ public void handle(Promise promise) { } catch (Exception e) { log.error("Connect failed", e); event.response().setStatusCode(500).end(); - } finally { - promise.complete(); } + return null; } - }, null); + }); } }).exceptionHandler(new Handler() { @Override diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VertxHttpHotReplacementSetup.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VertxHttpHotReplacementSetup.java index fb9107bb0c6b3..3125c1c6bb4ff 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VertxHttpHotReplacementSetup.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VertxHttpHotReplacementSetup.java @@ -5,6 +5,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.LogManager; @@ -21,8 +22,8 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.MultiMap; -import io.vertx.core.Promise; import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.impl.NoStackTraceException; import io.vertx.core.net.impl.ConnectionBase; import io.vertx.ext.web.RoutingContext; @@ -98,29 +99,26 @@ public void handle(AsyncResult event) { routingContext.request().endHandler(new Handler() { @Override public void handle(Void event) { - VertxCoreRecorder.getVertx().get().getOrCreateContext().executeBlocking(new Handler>() { + VertxCoreRecorder.getVertx().get().getOrCreateContext().executeBlocking(new Callable() { @Override - public void handle(Promise promise) { - try { - String redirect = "/"; - MultiMap attrs = routingContext.request().formAttributes(); - Map newVals = new HashMap<>(); - for (Map.Entry i : attrs) { - if (i.getKey().startsWith("key.")) { - newVals.put(i.getKey().substring("key.".length()), i.getValue()); - } else if (i.getKey().equals("redirect")) { - redirect = i.getValue(); - } + public Void call() { + String redirect = "/"; + MultiMap attrs = routingContext.request().formAttributes(); + Map newVals = new HashMap<>(); + for (Map.Entry i : attrs) { + if (i.getKey().startsWith("key.")) { + newVals.put(i.getKey().substring("key.".length()), i.getValue()); + } else if (i.getKey().equals("redirect")) { + redirect = i.getValue(); } - CurrentConfig.EDITOR.accept(newVals); - routingContext.response().setStatusCode(HttpResponseStatus.SEE_OTHER.code()).headers() - .set(HttpHeaderNames.LOCATION, redirect); - routingContext.response().end(); - } catch (Throwable t) { - routingContext.fail(t); } + CurrentConfig.EDITOR.accept(newVals); + routingContext.response().setStatusCode(HttpResponseStatus.SEE_OTHER.code()).headers() + .set(HttpHeaderNames.LOCATION, redirect); + routingContext.response().end(); + return null; } - }); + }).onFailure(routingContext::fail); } }); routingContext.request().resume(); @@ -136,9 +134,9 @@ public void handle(Promise promise) { return; } ClassLoader current = Thread.currentThread().getContextClassLoader(); - VertxCoreRecorder.getVertx().get().getOrCreateContext().executeBlocking(new Handler>() { + VertxCoreRecorder.getVertx().get().getOrCreateContext().executeBlocking(new Callable() { @Override - public void handle(Promise event) { + public Boolean call() { //the blocking pool may have a stale TCCL Thread.currentThread().setContextClassLoader(current); boolean restart = false; @@ -151,8 +149,7 @@ public void handle(Promise event) { try { restart = hotReplacementContext.doScan(true); } catch (Exception e) { - event.fail(new IllegalStateException("Unable to perform live reload scanning", e)); - return; + throw new IllegalStateException("Unable to perform live reload scanning", e); } if (currentState != VertxHttpRecorder.getCurrentApplicationState()) { //its possible a Kafka message or some other source triggered a reload, @@ -163,8 +160,7 @@ public void handle(Promise event) { } } if (hotReplacementContext.getDeploymentProblem() != null) { - event.fail(hotReplacementContext.getDeploymentProblem()); - return; + throw new NoStackTraceException(hotReplacementContext.getDeploymentProblem()); } if (restart) { //close all connections on close, except for this one @@ -180,9 +176,9 @@ public void handle(Promise event) { } finally { DevConsoleManager.setDoingHttpInitiatedReload(false); } - event.complete(restart); + return restart; } - }, false, new Handler>() { + }, false).onComplete(new Handler>() { @Override public void handle(AsyncResult event) { if (event.failed()) { diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/security/FormAuthenticationMechanism.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/security/FormAuthenticationMechanism.java index 39a722604257d..85927ed2f8922 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/security/FormAuthenticationMechanism.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/security/FormAuthenticationMechanism.java @@ -144,7 +144,7 @@ protected void handleRedirectBack(final RoutingContext exchange) { throw new IllegalStateException( "Landing page is no set, please make sure 'quarkus.http.auth.form.landing-page' is configured properly."); } - location = exchange.request().scheme() + "://" + exchange.request().host() + landingPage; + location = exchange.request().scheme() + "://" + exchange.request().authority() + landingPage; } exchange.response().setStatusCode(302); exchange.response().headers().add(HttpHeaderNames.LOCATION, location); @@ -173,14 +173,14 @@ protected void servePage(final RoutingContext exchange, final String location) { } static void sendRedirect(final RoutingContext exchange, final String location) { - String loc = exchange.request().scheme() + "://" + exchange.request().host() + location; + String loc = exchange.request().scheme() + "://" + exchange.request().authority() + location; exchange.response().headers().add(HttpHeaderNames.LOCATION, loc); exchange.response().setStatusCode(302); exchange.response().end(); } static Uni getRedirect(final RoutingContext exchange, final String location) { - String loc = exchange.request().scheme() + "://" + exchange.request().host() + location; + String loc = exchange.request().scheme() + "://" + exchange.request().authority() + location; return Uni.createFrom().item(new ChallengeData(302, "Location", loc)); } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/security/VertxBlockingSecurityExecutor.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/security/VertxBlockingSecurityExecutor.java index 8a32c22be1973..ac5e42e8e4363 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/security/VertxBlockingSecurityExecutor.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/security/VertxBlockingSecurityExecutor.java @@ -3,6 +3,7 @@ import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe; import static io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext; +import java.util.concurrent.Callable; import java.util.function.Supplier; import jakarta.inject.Inject; @@ -11,8 +12,6 @@ import io.quarkus.security.spi.runtime.BlockingSecurityExecutor; import io.smallrye.mutiny.Uni; import io.vertx.core.Context; -import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.Vertx; public class VertxBlockingSecurityExecutor implements BlockingSecurityExecutor { @@ -38,14 +37,10 @@ public Uni get() { .createFrom() .completionStage( local - .executeBlocking(new Handler>() { + .executeBlocking(new Callable() { @Override - public void handle(Promise promise) { - try { - promise.complete(supplier.get()); - } catch (Throwable t) { - promise.fail(t); - } + public T call() { + return supplier.get(); } }) .toCompletionStage()); diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/mdc/VerticleDeployer.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/mdc/VerticleDeployer.java index b08d9dc8b7c98..d2010e1c00d4c 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/mdc/VerticleDeployer.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/mdc/VerticleDeployer.java @@ -60,10 +60,10 @@ public void start(Promise startPromise) { vertx.setTimer(50, l -> { LOGGER.info("Timer fired ### " + MDC.get(MDC_KEY)); - vertx.executeBlocking(fut -> { + vertx.executeBlocking(() -> { LOGGER.info("Blocking task executed ### " + MDC.get(MDC_KEY)); - fut.complete(); - }, false, bar -> request.send(rar -> { + return null; + }, false).onComplete(bar -> request.send(rar -> { String value = (String) MDC.get(MDC_KEY); LOGGER.info("Received Web Client response ### " + value); req.response().end(value); diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/ConsumeEvent.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/ConsumeEvent.java index d49d1b17da340..7e18bfb55dfbb 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/ConsumeEvent.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/ConsumeEvent.java @@ -5,6 +5,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.Target; +import java.util.concurrent.Callable; import io.vertx.core.eventbus.MessageCodec; import io.vertx.mutiny.core.eventbus.Message; @@ -109,7 +110,7 @@ /** * * @return {@code true} if the consumer should be invoked as a blocking operation using a worker thread - * @see io.vertx.core.Vertx#executeBlocking(io.vertx.core.Handler, boolean, io.vertx.core.Handler) + * @see io.vertx.core.Vertx#executeBlocking(Callable, boolean) */ boolean blocking() default false; @@ -117,7 +118,7 @@ * @return {@code true} if the blocking consumption of the event must be ordered, meaning that the method * won't be called concurrently. Instead, it serializes all the invocations based on the event order. * {@code ordered} must be used in conjunction with {@code blocking=true} or {@code @Blocking}. - * @see io.vertx.core.Vertx#executeBlocking(io.vertx.core.Handler, boolean, io.vertx.core.Handler) + * @see io.vertx.core.Vertx#executeBlocking(Callable, boolean) */ boolean ordered() default false; diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java index 5c9539b93c651..3bdad1e36f8f7 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; @@ -26,7 +27,6 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; @@ -135,9 +135,9 @@ public void run() { } }); } else { - dup.executeBlocking(new Handler>() { + dup.executeBlocking(new Callable() { @Override - public void handle(Promise event) { + public Void call() { try { invoker.invoke(m); } catch (Exception e) { @@ -148,9 +148,9 @@ public void handle(Promise event) { m.fail(ConsumeEvent.FAILURE_CODE, e.toString()); } } - event.complete(); + return null; } - }, invoker.isOrdered(), null); + }, invoker.isOrdered()); } } else { // Will run on the context used for the consumer registration. diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java index 2ff224e56954e..80fef790f3b1e 100644 --- a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java @@ -156,13 +156,9 @@ public void execute(Runnable command) { if (!(context instanceof ContextInternal)) { Infrastructure.getDefaultWorkerPool().execute(command); } else { - context.executeBlocking(fut -> { - try { - command.run(); - fut.complete(null); - } catch (Exception e) { - fut.fail(e); - } + context.executeBlocking(() -> { + command.run(); + return null; }, false); } } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientImpl.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientImpl.java index 476a8f662b887..d8123700fda59 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientImpl.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/ClientImpl.java @@ -647,18 +647,21 @@ public boolean isClustered() { } @Override + @Deprecated public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> asyncResultHandler) { getDelegate().executeBlocking(blockingCodeHandler, ordered, asyncResultHandler); } @Override + @Deprecated public void executeBlocking(Handler> blockingCodeHandler, Handler> asyncResultHandler) { getDelegate().executeBlocking(blockingCodeHandler, asyncResultHandler); } @Override + @Deprecated public Future executeBlocking(Handler> blockingCodeHandler, boolean ordered) { return getDelegate().executeBlocking(blockingCodeHandler, ordered); } diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java index f80c42d785e79..0e02e22bb1b00 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java @@ -191,12 +191,12 @@ public String getRequestScheme() { @Override public String getRequestHost() { - return request.host(); + return request.authority().toString(); } @Override public void closeConnection() { - response.close(); + request.connection().close(); } @Override diff --git a/integration-tests/vertx/src/main/java/io/quarkus/it/vertx/verticles/MdcVerticle.java b/integration-tests/vertx/src/main/java/io/quarkus/it/vertx/verticles/MdcVerticle.java index 7902643852154..1ff3c108928df 100644 --- a/integration-tests/vertx/src/main/java/io/quarkus/it/vertx/verticles/MdcVerticle.java +++ b/integration-tests/vertx/src/main/java/io/quarkus/it/vertx/verticles/MdcVerticle.java @@ -19,10 +19,10 @@ public void start(Promise done) { LOGGER.warn("Received message ### " + MDC.get(MDC_KEY)); vertx.setTimer(50, l -> { LOGGER.warn("Timer fired ### " + MDC.get(MDC_KEY)); - vertx.executeBlocking(fut -> { + vertx.executeBlocking(() -> { LOGGER.warn("Blocking task executed ### " + MDC.get(MDC_KEY)); - fut.complete(); - }, bar -> message.reply("OK-" + MDC.get(MDC_KEY))); + return null; + }).onComplete(bar -> message.reply("OK-" + MDC.get(MDC_KEY))); }); }) .completionHandler(done);