Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FilteredStreamMessage.onCancellation() #3375

Merged
merged 15 commits into from Mar 26, 2021
Expand Up @@ -45,12 +45,23 @@ final class HttpDecodedResponse extends FilteredHttpResponse {
@Nullable
private StreamDecoder responseDecoder;
private boolean headersReceived;
private boolean decoderClosed;

HttpDecodedResponse(HttpResponse delegate, Map<String, StreamDecoderFactory> availableDecoders,
ByteBufAllocator alloc) {
super(delegate, true);
this.availableDecoders = availableDecoders;
this.alloc = alloc;
whenComplete().handle((unused, cause) -> {
if (decoderClosed) {
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
return null;
}
decoderClosed = true;
if (responseDecoder != null) {
responseDecoder.finish();
}
return null;
});
}

@Override
Expand Down Expand Up @@ -97,6 +108,10 @@ protected HttpObject filter(HttpObject obj) {

@Override
protected void beforeComplete(Subscriber<? super HttpObject> subscriber) {
if (decoderClosed) {
return;
}
decoderClosed = true;
if (responseDecoder == null) {
return;
}
Expand All @@ -105,12 +120,4 @@ protected void beforeComplete(Subscriber<? super HttpObject> subscriber) {
subscriber.onNext(lastData);
}
}

@Override
protected Throwable beforeError(Subscriber<? super HttpObject> subscriber, Throwable cause) {
if (responseDecoder != null) {
responseDecoder.finish();
}
return cause;
}
}
Expand Up @@ -51,7 +51,7 @@ public abstract class FilteredStreamMessage<T, U> implements StreamMessage<U> {
private final boolean filterSupportsPooledObjects;

/**
* Creates a new {@link FilteredStreamMessage} that filters objects published by {@code delegate}
* Creates a new {@link FilteredStreamMessage} that filters objects published by {@code upstream}
* before passing to a subscriber.
*/
protected FilteredStreamMessage(StreamMessage<T> upstream) {
Expand All @@ -60,7 +60,7 @@ protected FilteredStreamMessage(StreamMessage<T> upstream) {

/**
* (Advanced users only) Creates a new {@link FilteredStreamMessage} that filters objects published by
* {@code delegate} before passing to a subscriber.
* {@code upstream} before passing to a subscriber.
*
* @param withPooledObjects if {@code true}, {@link #filter(Object)} receives the pooled {@link HttpData}
* as is, without making a copy. If you don't know what this means,
Expand All @@ -69,7 +69,7 @@ protected FilteredStreamMessage(StreamMessage<T> upstream) {
*/
@UnstableApi
protected FilteredStreamMessage(StreamMessage<T> upstream, boolean withPooledObjects) {
this.upstream = requireNonNull(upstream, "delegate");
this.upstream = requireNonNull(upstream, "upstream");
filterSupportsPooledObjects = withPooledObjects;
}

Expand Down
Expand Up @@ -20,7 +20,8 @@

import javax.annotation.Nullable;

import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.FilteredHttpRequest;
import com.linecorp.armeria.common.FilteredHttpResponse;
Expand All @@ -39,6 +40,8 @@

public final class ContentPreviewingUtil {

private static final Logger logger = LoggerFactory.getLogger(ContentPreviewingUtil.class);

/**
* Sets up the request {@link ContentPreviewer} to set
* {@link RequestLogBuilder#requestContentPreview(String)} when the preview is available.
Expand All @@ -60,31 +63,27 @@ public static HttpRequest setUpRequestContentPreviewer(RequestContext ctx, HttpR
logBuilder.requestContentPreview(null);
return null;
});
return new FilteredHttpRequest(req) {
final FilteredHttpRequest filteredHttpRequest = new FilteredHttpRequest(req) {
@Override
protected HttpObject filter(HttpObject obj) {
if (obj instanceof HttpData) {
requestContentPreviewer.onData((HttpData) obj);
}
return obj;
}

@Override
protected void beforeComplete(Subscriber<? super HttpObject> subscriber) {
logBuilder.requestContentPreview(requestContentPreviewer.produce());
}

@Override
protected Throwable beforeError(Subscriber<? super HttpObject> subscriber,
Throwable cause) {
// Call produce() to release the resources in the previewer. Consider adding close() method.
requestContentPreviewer.produce();

// Set null to make it sure the log is complete.
logBuilder.requestContentPreview(null);
return cause;
}
};
filteredHttpRequest.whenComplete().handle((unused, cause) -> {
String produced = null;
try {
produced = requestContentPreviewer.produce();
} catch (Exception e) {
logger.warn("Unexpected exception while producing the request content preview. " +
"previewer: {}", requestContentPreviewer, e);
}
logBuilder.requestContentPreview(produced);
return null;
});
return filteredHttpRequest;
}

/**
Expand All @@ -96,54 +95,60 @@ public static HttpResponse setUpResponseContentPreviewer(
requireNonNull(factory, "factory");
requireNonNull(ctx, "ctx");
requireNonNull(res, "res");
return new ContentPreviewerHttpResponse(res, factory, ctx);
}

return new FilteredHttpResponse(res) {
@Nullable
ContentPreviewer responseContentPreviewer;
private static class ContentPreviewerHttpResponse extends FilteredHttpResponse {

@Override
protected HttpObject filter(HttpObject obj) {
if (obj instanceof ResponseHeaders) {
final ResponseHeaders resHeaders = (ResponseHeaders) obj;

// Skip informational headers.
final String status = resHeaders.get(HttpHeaderNames.STATUS);
if (ArmeriaHttpUtil.isInformational(status)) {
return obj;
}
final ContentPreviewer contentPreviewer = factory.responseContentPreviewer(ctx, resHeaders);
if (!contentPreviewer.isDisabled()) {
responseContentPreviewer = contentPreviewer;
}
} else if (obj instanceof HttpData) {
if (responseContentPreviewer != null) {
responseContentPreviewer.onData((HttpData) obj);
}
}
return obj;
}
private final ContentPreviewerFactory factory;
private final RequestContext ctx;
@Nullable
ContentPreviewer responseContentPreviewer;

@Override
protected void beforeComplete(Subscriber<? super HttpObject> subscriber) {
protected ContentPreviewerHttpResponse(HttpResponse delegate, ContentPreviewerFactory factory,
RequestContext ctx) {
super(delegate);
this.factory = factory;
this.ctx = ctx;
whenComplete().handle((unused, cause) -> {
if (responseContentPreviewer != null) {
ctx.logBuilder().responseContentPreview(responseContentPreviewer.produce());
String produced = null;
try {
produced = responseContentPreviewer.produce();
} catch (Exception e) {
logger.warn("Unexpected exception while producing the response content preview. " +
"previewer: {}", responseContentPreviewer, e);
}
ctx.logBuilder().responseContentPreview(produced);
} else {
// Call requestContentPreview(null) to make sure that the log is complete.
ctx.logBuilder().responseContentPreview(null);
}
}
return null;
});
}

@Override
protected Throwable beforeError(Subscriber<? super HttpObject> subscriber, Throwable cause) {
@Override
protected HttpObject filter(HttpObject obj) {
if (obj instanceof ResponseHeaders) {
final ResponseHeaders resHeaders = (ResponseHeaders) obj;

// Skip informational headers.
final String status = resHeaders.get(HttpHeaderNames.STATUS);
if (ArmeriaHttpUtil.isInformational(status)) {
return obj;
}
final ContentPreviewer contentPreviewer = factory.responseContentPreviewer(ctx, resHeaders);
if (!contentPreviewer.isDisabled()) {
responseContentPreviewer = contentPreviewer;
}
} else if (obj instanceof HttpData) {
if (responseContentPreviewer != null) {
// Call produce() to release the resources in the previewer. Consider adding close() method.
responseContentPreviewer.produce();
responseContentPreviewer.onData((HttpData) obj);
}
// Set null to make it sure the log is complete.
ctx.logBuilder().responseContentPreview(null);
return cause;
}
};
return obj;
}
}

private ContentPreviewingUtil() {}
Expand Down
Expand Up @@ -34,10 +34,20 @@ final class HttpDecodedRequest extends FilteredHttpRequest {

private final StreamDecoder responseDecoder;

private boolean decoderFinished;

HttpDecodedRequest(HttpRequest delegate, StreamDecoderFactory decoderFactory,
ByteBufAllocator alloc) {
super(delegate);
responseDecoder = decoderFactory.newDecoder(alloc);
whenComplete().handle((unused, cause) -> {
if (decoderFinished) {
return null;
}
decoderFinished = true;
responseDecoder.finish();
return null;
});
}

@Override
Expand All @@ -51,15 +61,13 @@ protected HttpObject filter(HttpObject obj) {

@Override
protected void beforeComplete(Subscriber<? super HttpObject> subscriber) {
if (decoderFinished) {
return;
}
decoderFinished = true;
final HttpData lastData = responseDecoder.finish();
if (!lastData.isEmpty()) {
subscriber.onNext(lastData);
}
minwoox marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected Throwable beforeError(Subscriber<? super HttpObject> subscriber, Throwable cause) {
responseDecoder.finish();
return cause;
}
}
Expand Up @@ -60,6 +60,8 @@ final class HttpEncodedResponse extends FilteredHttpResponse {

private boolean headersSent;

private boolean encoderClosed;

HttpEncodedResponse(HttpResponse delegate,
HttpEncodingType encodingType,
Predicate<MediaType> encodableContentTypePredicate,
Expand All @@ -68,6 +70,10 @@ final class HttpEncodedResponse extends FilteredHttpResponse {
this.encodingType = encodingType;
this.encodableContentTypePredicate = encodableContentTypePredicate;
this.minBytesToForceChunkedAndEncoding = minBytesToForceChunkedAndEncoding;
whenComplete().handle((unused, cause) -> {
closeEncoder();
return null;
});
}

@Override
Expand Down Expand Up @@ -149,13 +155,11 @@ protected void beforeComplete(Subscriber<? super HttpObject> subscriber) {
}
}

@Override
protected Throwable beforeError(Subscriber<? super HttpObject> subscriber, Throwable cause) {
closeEncoder();
return cause;
}

private void closeEncoder() {
if (encoderClosed) {
return;
}
encoderClosed = true;
if (encodingStream == null) {
return;
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.