From bbe9de8823e178ad4a0ec29c2eb125f419f696a4 Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Fri, 17 Nov 2023 23:11:53 +0100 Subject: [PATCH] Keep AppendBuffer and VertxServletOutputStream in sync with the originals in Quarkus fix #1111 --- extensions/core/runtime/pom.xml | 26 ++ .../transport/VertxHttpServletResponse.java | 8 +- .../VertxReactiveRequestContext.java | 59 ++++ .../{ => generated}/AppendBuffer.java | 19 +- .../VertxServletOutputStream.java | 54 ++-- pom.xml | 21 ++ src/build/scripts/sync-quarkus-classes.groovy | 249 +++++++++++++++ .../cxf/transport/generated/AppendBuffer.java | 202 ++++++++++++ .../generated/VertxServletOutputStream.java | 294 ++++++++++++++++++ 9 files changed, 889 insertions(+), 43 deletions(-) create mode 100644 extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxReactiveRequestContext.java rename extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/{ => generated}/AppendBuffer.java (91%) rename extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/{ => generated}/VertxServletOutputStream.java (88%) create mode 100644 src/build/scripts/sync-quarkus-classes.groovy create mode 100644 src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java create mode 100644 src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java diff --git a/extensions/core/runtime/pom.xml b/extensions/core/runtime/pom.xml index 7e116b38b..9d75e3181 100644 --- a/extensions/core/runtime/pom.xml +++ b/extensions/core/runtime/pom.xml @@ -129,6 +129,18 @@ org.codehaus.gmaven groovy-maven-plugin + + sync-quarkus-classes + + execute + + generate-sources + + file:${maven.multiModuleProjectDirectory}/src/build/scripts/sync-quarkus-classes.groovy + + + + filter-and-move-config-options-docs @@ -144,6 +156,20 @@ + + + io.quarkus.resteasy.reactive + resteasy-reactive-vertx + jar + sources + ${quarkus.version} + + + com.github.javaparser + javaparser-symbol-solver-core + ${javaparser.version} + + diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java index bc5a56e60..85eaeedf6 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java @@ -9,13 +9,12 @@ import jakarta.servlet.http.Cookie; import jakarta.servlet.http.HttpServletResponse; -import io.vertx.core.http.HttpServerRequest; +import io.quarkiverse.cxf.transport.generated.VertxServletOutputStream; import io.vertx.core.http.HttpServerResponse; import io.vertx.ext.web.RoutingContext; public class VertxHttpServletResponse implements HttpServletResponse { protected final RoutingContext context; - private final HttpServerRequest request; protected final HttpServerResponse response; private final int outputBufferSize; private final int minChunkSize; @@ -23,12 +22,11 @@ public class VertxHttpServletResponse implements HttpServletResponse { private PrintWriter printWriter; public VertxHttpServletResponse(RoutingContext context, int outputBufferSize, int minChunkSize) { - this.request = context.request(); this.response = context.response(); this.context = context; this.outputBufferSize = outputBufferSize; this.minChunkSize = minChunkSize; - this.os = new VertxServletOutputStream(request, response, context, outputBufferSize, minChunkSize); + this.os = new VertxServletOutputStream(new VertxReactiveRequestContext(context, minChunkSize, outputBufferSize)); } @Override @@ -191,7 +189,7 @@ public void resetBuffer() { } catch (IOException e) { } } - os = new VertxServletOutputStream(request, response, context, outputBufferSize, minChunkSize); + os = new VertxServletOutputStream(new VertxReactiveRequestContext(context, minChunkSize, outputBufferSize)); } @Override diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxReactiveRequestContext.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxReactiveRequestContext.java new file mode 100644 index 000000000..c48a29ede --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxReactiveRequestContext.java @@ -0,0 +1,59 @@ +package io.quarkiverse.cxf.transport; + +import io.vertx.ext.web.RoutingContext; + +public class VertxReactiveRequestContext { + private final RoutingContext context; + private final Deployment deployment; + + public VertxReactiveRequestContext( + RoutingContext context, + int minChunkSize, + int outputBufferSize) { + super(); + this.context = context; + this.deployment = new Deployment(minChunkSize, outputBufferSize); + } + + public Deployment getDeployment() { + return deployment; + } + + public RoutingContext getContext() { + return context; + } + + public static class Deployment { + + private final ResteasyReactiveConfig resteasyReactiveConfig; + + public Deployment(int minChunkSize, int outputBufferSize) { + super(); + this.resteasyReactiveConfig = new ResteasyReactiveConfig(minChunkSize, outputBufferSize); + } + + public ResteasyReactiveConfig getResteasyReactiveConfig() { + return resteasyReactiveConfig; + } + + } + + public static class ResteasyReactiveConfig { + private final int minChunkSize; + private final int outputBufferSize; + + public ResteasyReactiveConfig(int minChunkSize, int outputBufferSize) { + super(); + this.minChunkSize = minChunkSize; + this.outputBufferSize = outputBufferSize; + } + + public int getOutputBufferSize() { + return outputBufferSize; + } + + public int getMinChunkSize() { + return minChunkSize; + } + } +} diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java similarity index 91% rename from extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java rename to extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java index 81bae1e61..3b8414d0f 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java @@ -1,28 +1,34 @@ -package io.quarkiverse.cxf.transport; +package io.quarkiverse.cxf.transport.generated; import java.util.ArrayDeque; import java.util.Objects; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; /** - * Adapted from + * Adapted by sync-quarkus-classes.groovy from * AppendBuffer + * 'https://github.com/quarkusio/quarkus/blob/main/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java'>ResteasyReactiveOutputStream * from Quarkus. * - * It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.
- * In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}. + *

+ * + * It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.
+ * In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}. */ final class AppendBuffer { + private final ByteBufAllocator allocator; private final int minChunkSize; + private final int capacity; + private ByteBuf buffer; + private ArrayDeque otherBuffers; + private int size; private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) { @@ -193,5 +199,4 @@ public int capacity() { public int availableCapacity() { return capacity - size; } - } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java similarity index 88% rename from extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java rename to extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java index 9380da4d5..1a92283e7 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java @@ -1,17 +1,15 @@ -package io.quarkiverse.cxf.transport; +package io.quarkiverse.cxf.transport.generated; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InterruptedIOException; - import jakarta.servlet.ServletOutputStream; import jakarta.servlet.WriteListener; - import org.jboss.logging.Logger; - import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.handler.codec.http.HttpHeaderNames; +import io.quarkiverse.cxf.transport.VertxReactiveRequestContext; import io.quarkus.vertx.core.runtime.VertxBufferImpl; import io.vertx.core.AsyncResult; import io.vertx.core.Context; @@ -19,43 +17,43 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; -import io.vertx.ext.web.RoutingContext; /** - * Adapted from + * Adapted by sync-quarkus-classes.groovy from * ResteasyReactiveOutputStream + * 'https://github.com/quarkusio/quarkus/blob/main/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java'>ResteasyReactiveOutputStream * from Quarkus. */ public class VertxServletOutputStream extends ServletOutputStream { - private static final Logger log = Logger.getLogger(VertxServletOutputStream.class); + private static final Logger log = Logger.getLogger("org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveOutputStream"); - private final HttpServerRequest request; - private final HttpServerResponse response; + private final VertxReactiveRequestContext context; + + protected final HttpServerRequest request; private final AppendBuffer appendBuffer; + private boolean committed; private boolean closed; + protected boolean waitingForDrain; + protected boolean drainHandlerRegistered; + protected boolean first = true; + protected Throwable throwable; + private ByteArrayOutputStream overflow; - public VertxServletOutputStream( - HttpServerRequest request, - HttpServerResponse response, - RoutingContext context, - int outputBufferSize, - int minChunkSize) { - this.request = request; - this.response = response; - this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT, - minChunkSize, - outputBufferSize); + public VertxServletOutputStream(VertxReactiveRequestContext context) { + this.context = context; + this.request = context.getContext().request(); + this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT, context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(), context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize()); request.response().exceptionHandler(new Handler() { + @Override public void handle(Throwable event) { throwable = event; @@ -70,8 +68,8 @@ public void handle(Throwable event) { } } }); + context.getContext().addEndHandler(new Handler>() { - context.addEndHandler(new Handler>() { @Override public void handle(AsyncResult event) { synchronized (request.connection()) { @@ -85,7 +83,6 @@ public void handle(AsyncResult event) { } public void terminateResponse() { - } Buffer createBuffer(ByteBuf data) { @@ -165,6 +162,7 @@ private void registerDrainHandler() { if (!drainHandlerRegistered) { drainHandlerRegistered = true; Handler handler = new Handler() { + @Override public void handle(Void event) { synchronized (request.connection()) { @@ -192,7 +190,6 @@ public void handle(Void event) { /** * {@inheritDoc} */ - @Override public void write(final int b) throws IOException { write(new byte[] { (byte) b }, 0, 1); } @@ -200,7 +197,6 @@ public void write(final int b) throws IOException { /** * {@inheritDoc} */ - @Override public void write(final byte[] b) throws IOException { write(b, 0, b.length); } @@ -208,7 +204,6 @@ public void write(final byte[] b) throws IOException { /** * {@inheritDoc} */ - @Override public void write(final byte[] b, final int off, final int len) throws IOException { if (len < 1) { return; @@ -216,7 +211,6 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti if (closed) { throw new IOException("Stream is closed"); } - int rem = len; int idx = off; try { @@ -242,11 +236,12 @@ private void prepareWrite(ByteBuf buffer, boolean finished) throws IOException { if (!committed) { committed = true; if (finished) { + final HttpServerResponse response = request.response(); if (!response.headWritten()) { if (buffer == null) { response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "0"); } else { - response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "" + buffer.readableBytes()); + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(buffer.readableBytes())); } } } else { @@ -258,7 +253,6 @@ private void prepareWrite(ByteBuf buffer, boolean finished) throws IOException { /** * {@inheritDoc} */ - @Override public void flush() throws IOException { if (closed) { throw new IOException("Stream is closed"); @@ -276,7 +270,6 @@ public void flush() throws IOException { /** * {@inheritDoc} */ - @Override public void close() throws IOException { if (closed) return; @@ -298,5 +291,4 @@ public boolean isReady() { public void setWriteListener(WriteListener writeListener) { throw new UnsupportedOperationException(); } - } diff --git a/pom.xml b/pom.xml index c2ee6f3f7..39bf3707e 100644 --- a/pom.xml +++ b/pom.xml @@ -67,6 +67,9 @@ [3.6.2,) + + 3.25.6 + 4.4.7 2.1.1 3.0.19 @@ -336,6 +339,24 @@ + + net.revelc.code.formatter + formatter-maven-plugin + + + **/generated/*.java + + + + + net.revelc.code + impsort-maven-plugin + + + **/generated/*.java + + + diff --git a/src/build/scripts/sync-quarkus-classes.groovy b/src/build/scripts/sync-quarkus-classes.groovy new file mode 100644 index 000000000..2028f79d8 --- /dev/null +++ b/src/build/scripts/sync-quarkus-classes.groovy @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the 'License'); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Get AppendBuffer and ResteasyReactiveOutputStream from Quarkus and adapt them for Quarkus CXF + */ + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.github.javaparser.JavaParser; +import com.github.javaparser.StaticJavaParser; +import com.github.javaparser.ast.CompilationUnit; +import com.github.javaparser.ast.ImportDeclaration; +import com.github.javaparser.ast.Modifier.Keyword; +import com.github.javaparser.ast.NodeList; +import com.github.javaparser.ast.body.ConstructorDeclaration; +import com.github.javaparser.ast.body.EnumDeclaration; +import com.github.javaparser.ast.body.MethodDeclaration; +import com.github.javaparser.ast.body.Parameter; +import com.github.javaparser.ast.body.TypeDeclaration; +import com.github.javaparser.ast.comments.JavadocComment; +import com.github.javaparser.ast.expr.ObjectCreationExpr; +import com.github.javaparser.ast.stmt.BlockStmt; +import com.github.javaparser.ast.stmt.ThrowStmt; +import com.github.javaparser.ast.type.ClassOrInterfaceType; + +new Transform() + +public class Transform { + + public Transform() throws IOException { + final Path destinationDir = Paths.get('src/main/java/io/quarkiverse/cxf/transport/generated'); + + JavaParser parser = new JavaParser(StaticJavaParser.getParserConfiguration()); + final CompilationUnit appendBuffer = parse('org/jboss/resteasy/reactive/server/vertx/AppendBuffer.java', parser); + String quarkusVersion = 'main'; + transformCommon(appendBuffer, quarkusVersion); + store(appendBuffer, destinationDir); + + final CompilationUnit resteasyReactiveOutputStream = parse( + 'org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java', parser); + transformCommon(resteasyReactiveOutputStream, quarkusVersion); + transformStream(resteasyReactiveOutputStream, parser); + store(resteasyReactiveOutputStream, destinationDir); + } + + private void transformCommon(final CompilationUnit unit, String quarkusVersion) { + TypeDeclaration primaryType = unit.getType(0); + String cmt = 'Adapted by sync-quarkus-classes.groovy from\n' + + 'ResteasyReactiveOutputStream\n' + + 'from Quarkus.\n'; + Optional javaDoc = primaryType.getJavadocComment(); + if (javaDoc.isEmpty()) { + JavadocComment newJavaDoc = new JavadocComment(cmt); + primaryType.setComment(newJavaDoc); + } else { + javaDoc.get().setContent(cmt + '\n

\n' + javaDoc.get().getContent()); + } + unit.getPackageDeclaration().get().setName('io.quarkiverse.cxf.transport.generated'); + + } + + private void transformStream(final CompilationUnit unit, JavaParser parser) { + + /* Remove some imports */ + NodeList imports = unit.getImports(); + for (Iterator i = imports.iterator(); i.hasNext();) { + ImportDeclaration imp = i.next(); + String cl = imp.getNameAsString(); + if (cl.startsWith('jakarta.ws.rs.') + || cl.startsWith('org.jboss.resteasy.') + || cl.contains('java.io.OutputStream')) { + i.remove(); + } + } + + Stream.of( + 'io.quarkus.vertx.core.runtime.VertxBufferImpl', + 'io.quarkiverse.cxf.transport.VertxReactiveRequestContext', + 'jakarta.servlet.ServletOutputStream', + 'jakarta.servlet.WriteListener', + 'io.vertx.core.http.HttpServerResponse') + .map(cl -> new ImportDeclaration(cl, false, false)) + .forEach(imports::add); + final String[] prefixOrder = new String[]{ 'java.', 'jakarta.', 'org.' }; + Comparator importComparator = new Comparator() { + @Override + public int compare(ImportDeclaration i1, ImportDeclaration i2) { + String s1 = i1.getNameAsString(); + String s2 = i2.getNameAsString(); + // Define the order of prefixes + + // Get the index of the prefixes in the array or -1 if not found + int indexS1 = -1; + int indexS2 = -1; + for (int i = 0; i < prefixOrder.length; i++) { + if (s1.startsWith(prefixOrder[i])) { + indexS1 = i; + break; + } + } + for (int i = 0; i < prefixOrder.length; i++) { + if (s2.startsWith(prefixOrder[i])) { + indexS2 = i; + break; + } + } + + // Compare based on prefix order + if (indexS1 != -1 && indexS2 != -1) { + return Integer.compare(indexS1, indexS2); // Compare based on prefix order + } else if (indexS1 != -1) { + return -1; // s1 starts with a known prefix, so it comes first + } else if (indexS2 != -1) { + return 1; // s2 starts with a known prefix, so it comes first + } else { + return s1.compareTo(s2); // Compare alphabetically if none of the prefixes match + } + } + }; + Collections.sort(imports, importComparator); + + /* Rename to VertxServletOutputStream */ + TypeDeclaration primaryType = unit.getType(0); + primaryType.setName('VertxServletOutputStream'); + for (ConstructorDeclaration constructor : primaryType.getConstructors()) { + constructor.setName('VertxServletOutputStream'); + constructor.getParameter(0).setType('VertxReactiveRequestContext'); + } + /* Change the supertype */ + primaryType.asClassOrInterfaceDeclaration().getExtendedTypes(0).setName('ServletOutputStream'); + + /* Use our fake context */ + primaryType.getFieldByName('context').get().getVariable(0).setType('VertxReactiveRequestContext'); + + /* Remove contentLengthSet() */ + primaryType.getMethodsByName('contentLengthSet').forEach(m -> m.remove()); + + /* Remove ContentLengthSetResult */ + primaryType.getMembers().stream() + .filter(m -> m.isEnumDeclaration()) + .map(m -> (EnumDeclaration) m) + .filter(m -> m.getNameAsString().equals('ContentLengthSetResult')) + .collect(Collectors.toList()).stream() + .forEach(m -> m.remove()); + + /* Replace prepareWrite() */ + final String body = + ' private void prepareWrite(ByteBuf buffer, boolean finished) throws IOException {\n' + + ' if (!committed) {\n' + + ' committed = true;\n' + + ' if (finished) {\n' + + ' final HttpServerResponse response = request.response();\n' + + ' if (!response.headWritten()) {\n' + + ' if (buffer == null) {\n' + + ' response.headers().set(HttpHeaderNames.CONTENT_LENGTH, \"0\");\n' + + ' } else {\n' + + ' response.headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(buffer.readableBytes()));\n' + + ' }\n' + + ' }\n' + + ' } else {\n' + + ' request.response().setChunked(true);\n' + + ' }\n' + + ' }\n' + + ' }\n'; + final MethodDeclaration newPrepareWrite = parser.parseMethodDeclaration(body).getResult().get(); + primaryType.getMethodsByName('prepareWrite').forEach(m -> m.setBody(newPrepareWrite.getBody().get())); + + + /* Add @Override where needed */ + addOverride(primaryType, 'close', 'flush', 'write'); + + /* Make ServletOutputStream happy */ + MethodDeclaration isReady = primaryType.addMethod('isReady', Keyword.PUBLIC); + isReady.addAnnotation('Override'); + isReady.setType('boolean'); + BlockStmt isReadyBody = new BlockStmt(); + isReadyBody.addStatement(new ThrowStmt(new ObjectCreationExpr(null, new ClassOrInterfaceType(null, 'UnsupportedOperationException'), new NodeList<>()))); + isReady.setBody(isReadyBody); + + + MethodDeclaration setWriteListener = primaryType.addMethod('setWriteListener', Keyword.PUBLIC); + setWriteListener.addAnnotation(Override.class); + // Add the writeListener parameter + setWriteListener.addParameter(new Parameter(parser.parseClassOrInterfaceType('WriteListener').getResult().get(), 'writeListener')); + BlockStmt setWriteListenerBody = new BlockStmt(); + setWriteListenerBody.addStatement(new ThrowStmt(new ObjectCreationExpr(null, new ClassOrInterfaceType(null, 'UnsupportedOperationException'), new NodeList<>()))); + setWriteListener.setBody(setWriteListenerBody); + + } + + private void addOverride(TypeDeclaration primaryType, String... methodNames) { + Stream.of(methodNames) + .map(methodName -> primaryType.getMethodsByName(methodName)) + .flatMap(List::stream) + .filter(methodDeclaration -> !methodDeclaration.getNameAsString().equals('write') + && methodDeclaration.getParameters().size() > 0 + && !methodDeclaration.getParameter(0).getTypeAsString().equals('ByteBuf')) + .forEach(methodDeclaration -> methodDeclaration.addAnnotation('Override')); + } + + private void store(CompilationUnit unit, Path destinationDir) throws IOException { + String name = unit.getType(0).getNameAsString(); + Path file = destinationDir.resolve(name + '.java'); + final String oldContent = Files.exists(file) ? Files.readString(file) : null; + final String newContent = unit.toString().replace('@Override()', '@Override'); + if (!newContent.equals(oldContent)) { + println('Updating ' + name + '.java') + Files.createDirectories(destinationDir); + Files.write(file, newContent.getBytes(StandardCharsets.UTF_8)); + } else { + println(name + '.java up to date') + } + } + + CompilationUnit parse(String resourcePath, JavaParser parser) throws IOException { + final String src = new String(getClass().getClassLoader().getResourceAsStream(resourcePath).readAllBytes(), + StandardCharsets.UTF_8); + return parser.parse(src).getResult().get(); + } + +} \ No newline at end of file diff --git a/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java b/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java new file mode 100644 index 000000000..3b8414d0f --- /dev/null +++ b/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java @@ -0,0 +1,202 @@ +package io.quarkiverse.cxf.transport.generated; + +import java.util.ArrayDeque; +import java.util.Objects; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; + +/** + * Adapted by sync-quarkus-classes.groovy from + * ResteasyReactiveOutputStream + * from Quarkus. + * + *

+ * + * It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.
+ * In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}. + */ +final class AppendBuffer { + + private final ByteBufAllocator allocator; + + private final int minChunkSize; + + private final int capacity; + + private ByteBuf buffer; + + private ArrayDeque otherBuffers; + + private int size; + + private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) { + this.allocator = allocator; + this.minChunkSize = Math.min(minChunkSize, capacity); + this.capacity = capacity; + } + + /** + * This buffer append data in a single eagerly allocated {@link ByteBuf}. + */ + public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, capacity, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, 0, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s which minimum capacity is {@code minChunkSize} or + * as each {@code len}, if greater than it.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) { + return new AppendBuffer(allocator, minChunkSize, capacity); + } + + private ByteBuf lastBuffer() { + if (otherBuffers == null || otherBuffers.isEmpty()) { + return buffer; + } + return otherBuffers.peekLast(); + } + + /** + * It returns how many bytes have been appended
+ * If returns a value different from {@code len}, is it required to invoke {@link #clear} + * that would refill the available capacity till {@link #capacity()} + */ + public int append(byte[] bytes, int off, int len) { + Objects.requireNonNull(bytes); + if (len == 0) { + return 0; + } + int alreadyWritten = 0; + if (minChunkSize > 0) { + var lastBuffer = lastBuffer(); + if (lastBuffer != null) { + int availableOnLast = lastBuffer.writableBytes(); + if (availableOnLast > 0) { + int toWrite = Math.min(len, availableOnLast); + lastBuffer.writeBytes(bytes, off, toWrite); + size += toWrite; + len -= toWrite; + // we stop if there's no more to append + if (len == 0) { + return toWrite; + } + off += toWrite; + alreadyWritten = toWrite; + } + } + } + final int availableCapacity = capacity - size; + if (availableCapacity == 0) { + return alreadyWritten; + } + // we can still write some + int toWrite = Math.min(len, availableCapacity); + assert toWrite > 0; + final int chunkCapacity; + if (minChunkSize > 0) { + // Cannot allocate less than minChunkSize, till the limit of capacity left + chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity); + } else { + chunkCapacity = toWrite; + } + var tmpBuf = allocator.directBuffer(chunkCapacity); + try { + tmpBuf.writeBytes(bytes, off, toWrite); + } catch (Throwable t) { + tmpBuf.release(); + throw t; + } + if (buffer == null) { + buffer = tmpBuf; + } else { + boolean resetOthers = false; + try { + if (otherBuffers == null) { + otherBuffers = new ArrayDeque<>(); + resetOthers = true; + } + otherBuffers.add(tmpBuf); + } catch (Throwable t) { + rollback(alreadyWritten, tmpBuf, resetOthers); + throw t; + } + } + size += toWrite; + return toWrite + alreadyWritten; + } + + private void rollback(int alreadyWritten, ByteBuf tmpBuf, boolean resetOthers) { + tmpBuf.release(); + if (resetOthers) { + otherBuffers = null; + } + if (alreadyWritten > 0) { + var last = lastBuffer(); + last.writerIndex(last.writerIndex() - alreadyWritten); + size -= alreadyWritten; + assert last.writerIndex() > 0; + } + } + + public ByteBuf clear() { + var firstBuf = buffer; + if (firstBuf == null) { + return null; + } + var others = otherBuffers; + if (others == null || others.isEmpty()) { + size = 0; + buffer = null; + // super fast-path + return firstBuf; + } + return clearBuffers(); + } + + private CompositeByteBuf clearBuffers() { + var firstBuf = buffer; + var others = otherBuffers; + var batch = allocator.compositeDirectBuffer(1 + others.size()); + try { + buffer = null; + size = 0; + batch.addComponent(true, 0, firstBuf); + for (int i = 0, othersCount = others.size(); i < othersCount; i++) { + // if addComponent fail, it takes care of releasing curr and throwing the exception: + batch.addComponent(true, 1 + i, others.poll()); + } + return batch; + } catch (Throwable anyError) { + batch.release(); + releaseOthers(others); + throw anyError; + } + } + + private static void releaseOthers(ArrayDeque others) { + ByteBuf buf; + while ((buf = others.poll()) != null) { + buf.release(); + } + } + + public int capacity() { + return capacity; + } + + public int availableCapacity() { + return capacity - size; + } +} diff --git a/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java b/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java new file mode 100644 index 000000000..1a92283e7 --- /dev/null +++ b/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java @@ -0,0 +1,294 @@ +package io.quarkiverse.cxf.transport.generated; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import jakarta.servlet.ServletOutputStream; +import jakarta.servlet.WriteListener; +import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.quarkiverse.cxf.transport.VertxReactiveRequestContext; +import io.quarkus.vertx.core.runtime.VertxBufferImpl; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; + +/** + * Adapted by sync-quarkus-classes.groovy from + * ResteasyReactiveOutputStream + * from Quarkus. + */ +public class VertxServletOutputStream extends ServletOutputStream { + + private static final Logger log = Logger.getLogger("org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveOutputStream"); + + private final VertxReactiveRequestContext context; + + protected final HttpServerRequest request; + + private final AppendBuffer appendBuffer; + + private boolean committed; + + private boolean closed; + + protected boolean waitingForDrain; + + protected boolean drainHandlerRegistered; + + protected boolean first = true; + + protected Throwable throwable; + + private ByteArrayOutputStream overflow; + + public VertxServletOutputStream(VertxReactiveRequestContext context) { + this.context = context; + this.request = context.getContext().request(); + this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT, context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(), context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize()); + request.response().exceptionHandler(new Handler() { + + @Override + public void handle(Throwable event) { + throwable = event; + log.debugf(event, "IO Exception "); + //TODO: do we need this? + terminateResponse(); + request.connection().close(); + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + } + } + }); + context.getContext().addEndHandler(new Handler>() { + + @Override + public void handle(AsyncResult event) { + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + } + terminateResponse(); + } + }); + } + + public void terminateResponse() { + } + + Buffer createBuffer(ByteBuf data) { + return new VertxBufferImpl(data); + } + + public void write(ByteBuf data, boolean last) throws IOException { + if (last && data == null) { + request.response().end((Handler>) null); + return; + } + //do all this in the same lock + synchronized (request.connection()) { + try { + boolean bufferRequired = awaitWriteable() || (overflow != null && overflow.size() > 0); + if (bufferRequired) { + //just buffer everything + registerDrainHandler(); + if (overflow == null) { + overflow = new ByteArrayOutputStream(); + } + if (data.hasArray()) { + overflow.write(data.array(), data.arrayOffset() + data.readerIndex(), data.readableBytes()); + } else { + data.getBytes(data.readerIndex(), overflow, data.readableBytes()); + } + if (last) { + closed = true; + } + data.release(); + } else { + if (last) { + request.response().end(createBuffer(data), null); + } else { + request.response().write(createBuffer(data), null); + } + } + } catch (Exception e) { + if (data != null && data.refCnt() > 0) { + data.release(); + } + throw new IOException("Failed to write", e); + } + } + } + + private boolean awaitWriteable() throws IOException { + if (Context.isOnEventLoopThread()) { + return request.response().writeQueueFull(); + } + if (first) { + first = false; + return false; + } + assert Thread.holdsLock(request.connection()); + while (request.response().writeQueueFull()) { + if (throwable != null) { + throw new IOException(throwable); + } + if (request.response().closed()) { + throw new IOException("Connection has been closed"); + } + registerDrainHandler(); + try { + waitingForDrain = true; + request.connection().wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } finally { + waitingForDrain = false; + } + } + return false; + } + + private void registerDrainHandler() { + if (!drainHandlerRegistered) { + drainHandlerRegistered = true; + Handler handler = new Handler() { + + @Override + public void handle(Void event) { + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + if (overflow != null) { + if (overflow.size() > 0) { + if (closed) { + request.response().end(Buffer.buffer(overflow.toByteArray()), null); + } else { + request.response().write(Buffer.buffer(overflow.toByteArray()), null); + } + overflow.reset(); + } + } + } + } + }; + request.response().drainHandler(handler); + request.response().closeHandler(handler); + } + } + + /** + * {@inheritDoc} + */ + public void write(final int b) throws IOException { + write(new byte[] { (byte) b }, 0, 1); + } + + /** + * {@inheritDoc} + */ + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + /** + * {@inheritDoc} + */ + public void write(final byte[] b, final int off, final int len) throws IOException { + if (len < 1) { + return; + } + if (closed) { + throw new IOException("Stream is closed"); + } + int rem = len; + int idx = off; + try { + while (rem > 0) { + final int written = appendBuffer.append(b, idx, rem); + if (written < rem) { + writeBlocking(appendBuffer.clear(), false); + } + rem -= written; + idx += written; + } + } catch (Exception e) { + throw new IOException(e); + } + } + + public void writeBlocking(ByteBuf buffer, boolean finished) throws IOException { + prepareWrite(buffer, finished); + write(buffer, finished); + } + + private void prepareWrite(ByteBuf buffer, boolean finished) throws IOException { + if (!committed) { + committed = true; + if (finished) { + final HttpServerResponse response = request.response(); + if (!response.headWritten()) { + if (buffer == null) { + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "0"); + } else { + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(buffer.readableBytes())); + } + } + } else { + request.response().setChunked(true); + } + } + } + + /** + * {@inheritDoc} + */ + public void flush() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + try { + var toFlush = appendBuffer.clear(); + if (toFlush != null) { + writeBlocking(toFlush, false); + } + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * {@inheritDoc} + */ + public void close() throws IOException { + if (closed) + return; + try { + writeBlocking(appendBuffer.clear(), true); + } catch (Exception e) { + throw new IOException(e); + } finally { + closed = true; + } + } + + @Override + public boolean isReady() { + throw new UnsupportedOperationException(); + } + + @Override + public void setWriteListener(WriteListener writeListener) { + throw new UnsupportedOperationException(); + } +}