Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New request body API #10781

Merged
merged 24 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion config/accepted-api-changes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,18 @@
"type": "io.micronaut.http.client.tck.tests.$Person$IntrospectionRef",
"member": "Implemented interface io.micronaut.core.beans.BeanIntrospectionReference",
"reason": "Introspection changes"
},
{
"type": "io.micronaut.http.filter.BaseFilterProcessor$RequiresRequestBodyBinder",
"member": "Implemented interface io.micronaut.core.bind.ArgumentBinder",
"reason": "Internal class"
},
{
"type": "io.micronaut.http.server.netty.$DefaultHttpContentProcessorResolver$Definition",
"reason": "Internal class removed"
},
{
"type": "io.micronaut.http.server.netty.jackson.$JsonHttpContentSubscriberFactory$Definition",
"reason": "Internal class removed"
}

]
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.scheduler.NonBlocking;

import java.io.Closeable;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -153,6 +154,9 @@ public T take() throws InterruptedException {
return null;
}
if (demanded) {
if (Thread.currentThread() instanceof NonBlocking) {
throw new IllegalStateException("Attempted to do blocking operation on a thread marked as NonBlocking. (Maybe the netty event loop?) Please only run blocking operations on IO or virtual threads, for example by marking your controller with @ExecuteOn(TaskExecutors.BLOCKING).");
}
newDataCondition.await();
}
subscription = this.subscription;
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.ByteBufHolder;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.FileUpload;
import io.netty.handler.codec.http.multipart.HttpDataFactory;
Expand All @@ -31,10 +32,12 @@
import io.netty.handler.codec.http.multipart.HttpPostStandardRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.InterfaceHttpPostRequestDecoder;
import io.netty.util.ReferenceCountUtil;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;

/**
* <p>Decodes {@link MediaType#MULTIPART_FORM_DATA} in a non-blocking manner.</p>
Expand All @@ -45,10 +48,14 @@
* @since 1.0
*/
@Internal
public class FormDataHttpContentProcessor extends AbstractHttpContentProcessor {
public final class FormDataHttpContentProcessor {

protected final NettyHttpRequest<?> nettyHttpRequest;
protected final long advertisedLength;
protected final long requestMaxSize;
protected final AtomicLong receivedLength = new AtomicLong();
protected final HttpServerConfiguration configuration;
private final InterfaceHttpPostRequestDecoder decoder;
private final boolean enabled;
private final long partMaxSize;

/**
Expand All @@ -63,13 +70,20 @@ public class FormDataHttpContentProcessor extends AbstractHttpContentProcessor {
* {@code true} if the decoder has been destroyed or will be destroyed in the near future.
*/
private boolean destroyed = false;
/**
* Whether we received a LastHttpContent.
*/
private boolean receivedLast = false;

/**
* @param nettyHttpRequest The {@link NettyHttpRequest}
* @param configuration The {@link NettyHttpServerConfiguration}
*/
public FormDataHttpContentProcessor(NettyHttpRequest<?> nettyHttpRequest, HttpServerConfiguration configuration) {
super(nettyHttpRequest, configuration);
this.nettyHttpRequest = nettyHttpRequest;
this.advertisedLength = nettyHttpRequest.getContentLength();
this.requestMaxSize = configuration.getMaxRequestSize();
this.configuration = configuration;
Charset characterEncoding = nettyHttpRequest.getCharacterEncoding();
HttpServerConfiguration.MultipartConfiguration multipart = configuration.getMultipart();
HttpDataFactory factory = new MicronautHttpData.Factory(multipart, characterEncoding);
Expand All @@ -80,18 +94,10 @@ public FormDataHttpContentProcessor(NettyHttpRequest<?> nettyHttpRequest, HttpSe
} else {
this.decoder = new HttpPostStandardRequestDecoder(factory, nativeRequest, characterEncoding);
}
this.enabled = nettyHttpRequest.getContentType().map(type -> type.equals(MediaType.APPLICATION_FORM_URLENCODED_TYPE)).orElse(false) ||
multipart.isEnabled();
this.partMaxSize = multipart.getMaxFileSize();
}

@Override
public boolean isEnabled() {
return enabled;
}

@Override
protected void onData(ByteBufHolder message, Collection<Object> out) {
protected void onData(ByteBufHolder message, Collection<? super InterfaceHttpData> out) {
boolean skip;
synchronized (this) {
if (destroyed) {
Expand Down Expand Up @@ -163,22 +169,32 @@ protected void onData(ByteBufHolder message, Collection<Object> out) {
}
}

@Override
public void add(ByteBufHolder message, Collection<Object> out) throws Throwable {
public void add(ByteBufHolder message, Collection<? super InterfaceHttpData> out) throws Throwable {
try {
super.add(message, out);
receivedLast |= message instanceof LastHttpContent;
long receivedLength1 = this.receivedLength.addAndGet(message.content().readableBytes());

ReferenceCountUtil.touch(message);
if (advertisedLength > requestMaxSize) {
fireExceedsLength(advertisedLength, requestMaxSize, message);
} else if (receivedLength1 > requestMaxSize) {
fireExceedsLength(receivedLength1, requestMaxSize, message);
} else {
onData(message, out);
}
} catch (Throwable e) {
cancel();
throw e;
}
}

@Override
public void complete(Collection<Object> out) {
public void complete(Collection<? super InterfaceHttpData> out) throws Throwable {
if (!receivedLast) {
add(LastHttpContent.EMPTY_LAST_CONTENT, out);
}
cancel();
}

@Override
public void cancel() {
pleaseDestroy = true;
destroyIfRequested();
Expand All @@ -199,4 +215,13 @@ private void destroyIfRequested() {
}
}

/**
* @param receivedLength The length of the content received
* @param expected The expected length of the content
* @param message The message to release
*/
protected void fireExceedsLength(long receivedLength, long expected, ByteBufHolder message) {
message.release();
throw new ContentLengthExceededException(expected, receivedLength);
}
}
Loading
Loading