Skip to content

Commit

Permalink
[RESTEASY-2700] Address PR comments.
Browse files Browse the repository at this point in the history
Some notables: use jboss-logging, add HEAD test, deal with several
potential NPEs, use JDK `Collections`, and remove SniTest.  SNI support
can be added later (when someone needs it:)
  • Loading branch information
crankydillo authored and asoldano committed Apr 19, 2021
1 parent 7b0ada6 commit 8b292b6
Show file tree
Hide file tree
Showing 15 changed files with 48 additions and 354 deletions.
2 changes: 1 addition & 1 deletion resteasy-dependencies-bom/pom.xml
Expand Up @@ -106,7 +106,7 @@
<version.org.jboss.marshalling.jboss-marshalling>2.0.6.Final</version.org.jboss.marshalling.jboss-marshalling>
<version.rest-assured>3.3.0</version.rest-assured>
<version.org.springframework>5.2.0.RELEASE</version.org.springframework>
<version.io.projectreactor>2020.0.4</version.io.projectreactor>
<version.io.projectreactor>2020.0.5</version.io.projectreactor>

<version.asyncutil>0.1.0</version.asyncutil>
</properties>
Expand Down
13 changes: 2 additions & 11 deletions server-adapters/resteasy-reactor-netty/pom.xml
Expand Up @@ -30,15 +30,12 @@
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jboss.resteasy</groupId>
Expand Down Expand Up @@ -81,12 +78,6 @@
<artifactId>jakarta.enterprise.cdi-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.tools</groupId>
<artifactId>blockhound</artifactId>
<version>1.0.3.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
@@ -1,8 +1,6 @@
package org.jboss.resteasy.plugins.server.reactor.netty;

import org.jboss.resteasy.spi.AsyncOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
Expand All @@ -28,7 +26,7 @@
*/
public class ChunkOutputStream extends AsyncOutputStream {

private static final Logger log = LoggerFactory.getLogger(ChunkOutputStream.class);
private static final EmitFailureHandler EMIT_FAILURE_HANDLER = EmitFailureHandler.FAIL_FAST;

private final ReactorNettyHttpResponse parentResponse;

Expand All @@ -54,8 +52,6 @@ public class ChunkOutputStream extends AsyncOutputStream {
*/
private final Supplier<Sinks.Many<Tuple2<byte[], CompletableFuture<Void>>>> byteSinkSupplier;

private static final EmitFailureHandler EMIT_FAILURE_HANDLER = EmitFailureHandler.FAIL_FAST;

ChunkOutputStream(
final ReactorNettyHttpResponse parentResponse,
final HttpServerResponse reactorNettyResponse,
Expand All @@ -65,19 +61,11 @@ public class ChunkOutputStream extends AsyncOutputStream {
this.completionSink = Objects.requireNonNull(completionSink);
Objects.requireNonNull(reactorNettyResponse);
this.byteSinkSupplier = () -> {
log.trace("Creating FluxSink for output.");
final Sinks.Many<Tuple2<byte[], CompletableFuture<Void>>> outSink = Sinks.many().multicast().onBackpressureBuffer();
final Flux<byte[]> byteFlux = outSink.asFlux().map(tup -> {
log.trace("Submitting bytes to downstream");
tup.getT2().complete(null);
return tup.getT1();
})
// TODO remove the log stuff below once we are confident
.doOnRequest(l -> log.trace("Requested: {}", l))
.doOnSubscribe(s -> {
log.trace("Subscription on Flux<byte[]> occurred: {}", s);
})
.doFinally(s -> log.trace("Flux<byte[]> closing with signal: {}", s));
});

SinkSubscriber.subscribe(completionSink, Mono.from(reactorNettyResponse.sendByteArray(byteFlux)));

Expand All @@ -87,14 +75,13 @@ public class ChunkOutputStream extends AsyncOutputStream {

@Override
public void write(int b) {
byteSink.emitNext(Tuples.of(new byte[] {(byte)b}, new CompletableFuture<>()), EMIT_FAILURE_HANDLER);
write(new byte[] {(byte)b}, 0, 1);
}

@Override
public void close() throws IOException {
log.trace("Closing the ChunkOutputStream.");
if (!started || byteSink == null) {
SinkSubscriber.subscribe(completionSink, Mono.<Void>empty());
SinkSubscriber.subscribe(completionSink, Mono.empty());
} else {
byteSink.emitComplete(EMIT_FAILURE_HANDLER);
}
Expand All @@ -113,8 +100,7 @@ public void write(byte[] bs, int off, int len) {
}

@Override
public void flush() throws IOException {
log.trace("Blocking flush called on ChunkOutputStream");
public void flush() {
try {
asyncFlush().get();
} catch (final InterruptedException ie) {
Expand Down Expand Up @@ -164,7 +150,6 @@ public CompletableFuture<Void> asyncWrite(final byte[] bs, int offset, int lengt
if (offset != 0 || length != bs.length) {
bytes = Arrays.copyOfRange(bs, offset, offset + length);
}
log.trace("Sending bytes to the sink");
byteSink.emitNext(Tuples.of(bytes, cf), EMIT_FAILURE_HANDLER);
return cf;
}
Expand Down

This file was deleted.

@@ -1,112 +1,9 @@
package org.jboss.resteasy.plugins.server.reactor.netty;

import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.jboss.resteasy.core.Headers;
import org.jboss.resteasy.specimpl.ResteasyHttpHeaders;
import org.jboss.resteasy.specimpl.ResteasyUriInfo;
import org.jboss.resteasy.util.CookieParser;
import org.jboss.resteasy.util.HttpHeaderNames;
import org.jboss.resteasy.util.MediaTypeHelper;

import javax.ws.rs.core.Cookie;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// TODO copied from netty4. Do we make common?
// There's actually quite a bit of stuff copied from there..
public class NettyUtil
{
public static ResteasyUriInfo extractUriInfo(HttpRequest request, String contextPath, String protocol)
{
String uri = request.uri();

String uriString;

// If we appear to have an absolute URL, don't try to recreate it from the host and request line.
if (uri.startsWith(protocol + "://")) {
uriString = uri;
} else {
String host = request.headers().get(HttpHeaderNames.HOST, "unknown");
if ("".equals(host)) {
host = "unknown";
}
uriString = protocol + "://" + host + uri;
}

return new ResteasyUriInfo(uriString, contextPath);
}

public static ResteasyHttpHeaders extractHttpHeaders(HttpRequest request)
{

MultivaluedMap<String, String> requestHeaders = extractRequestHeaders(request);
ResteasyHttpHeaders headers = new ResteasyHttpHeaders(requestHeaders);

Map<String, Cookie> cookies = extractCookies(requestHeaders);
headers.setCookies(cookies);
return headers;

}

static Map<String, Cookie> extractCookies(MultivaluedMap<String, String> headers)
{
Map<String, Cookie> cookies = new HashMap<String, Cookie>();
List<String> cookieHeaders = headers.get("Cookie");
if (cookieHeaders == null) return cookies;

for (String cookieHeader : cookieHeaders)
{
for (Cookie cookie : CookieParser.parseCookies(cookieHeader))
{
cookies.put(cookie.getName(), cookie);
}
}
return cookies;
}

public static List<MediaType> extractAccepts(MultivaluedMap<String, String> requestHeaders)
{
List<MediaType> acceptableMediaTypes = new ArrayList<MediaType>();
List<String> accepts = requestHeaders.get(HttpHeaderNames.ACCEPT);
if (accepts == null) return acceptableMediaTypes;

for (String accept : accepts)
{
acceptableMediaTypes.addAll(MediaTypeHelper.parseHeader(accept));
}
return acceptableMediaTypes;
}

public static List<String> extractLanguages(MultivaluedMap<String, String> requestHeaders)
{
List<String> acceptable = new ArrayList<String>();
List<String> accepts = requestHeaders.get(HttpHeaderNames.ACCEPT_LANGUAGE);
if (accepts == null) return acceptable;

for (String accept : accepts)
{
String[] splits = accept.split(",");
for (String split : splits) acceptable.add(split.trim());
}
return acceptable;
}

public static MultivaluedMap<String, String> extractRequestHeaders(HttpRequest request)
{
Headers<String> requestHeaders = new Headers<String>();

for (Map.Entry<String, String> header : request.headers())
{
requestHeaders.add(header.getKey(), header.getValue());
}
return requestHeaders;
}

public static boolean isIoThread() {
return Thread.currentThread() instanceof FastThreadLocalThread;
}
Expand Down
@@ -1,15 +1,14 @@
package org.jboss.resteasy.plugins.server.reactor.netty;

import org.jboss.logging.Logger;
import org.jboss.resteasy.core.ResteasyDeploymentImpl;
import org.jboss.resteasy.plugins.server.embedded.SecurityDomain;
import org.jboss.resteasy.spi.ResteasyDeployment;
import org.jboss.resteasy.util.PortProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReactorNettyContainer {

private static final Logger log = LoggerFactory.getLogger(ReactorNettyContainer.class);
private static final Logger log = Logger.getLogger(ReactorNettyContainer.class);

public static ReactorNettyJaxrsServer reactorNettyJaxrsServer;

Expand Down
@@ -1,5 +1,6 @@
package org.jboss.resteasy.plugins.server.reactor.netty;

import org.jboss.logging.Logger;
import org.jboss.resteasy.core.AbstractAsynchronousResponse;
import org.jboss.resteasy.core.AbstractExecutionContext;
import org.jboss.resteasy.core.ResteasyContext;
Expand All @@ -13,8 +14,6 @@
import org.jboss.resteasy.spi.ResteasyAsynchronousResponse;
import org.jboss.resteasy.spi.RunnableWithException;
import org.jboss.resteasy.util.CaseInsensitiveMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.http.server.HttpServerRequest;

import javax.ws.rs.container.AsyncResponse;
Expand All @@ -24,10 +23,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -42,14 +41,14 @@
* body via {@link #getInputStream}.
*/
class ReactorNettyHttpRequest extends BaseHttpRequest {
private static final Logger log = LoggerFactory.getLogger(ReactorNettyHttpRequest.class);
private static final Logger log = Logger.getLogger(ReactorNettyHttpRequest.class);

private final HttpServerRequest req;
private final ResteasyHttpHeaders resteasyHttpHeaders;
private String httpMethod;
private InputStream in;
private final NettyExecutionContext executionContext;
private final Map<String, Object> attributes = new HashMap<String, Object>();
private final Map<String, Object> attributes = new HashMap<>();
private Duration timeout;

ReactorNettyHttpRequest(
Expand Down Expand Up @@ -108,22 +107,7 @@ public void setHttpMethod(String method) {
@Override
public Enumeration<String> getAttributeNames()
{
Enumeration<String> en = new Enumeration<String>()
{
private Iterator<String> it = attributes.keySet().iterator();
@Override
public boolean hasMoreElements()
{
return it.hasNext();
}

@Override
public String nextElement()
{
return it.next();
}
};
return en;
return Collections.enumeration(attributes.keySet());
}

@Override
Expand Down

0 comments on commit 8b292b6

Please sign in to comment.