Skip to content

Commit

Permalink
Merge branch 'rel-1.4.2'
Browse files Browse the repository at this point in the history
Conflicts:
	ratpack.gradle
  • Loading branch information
ldaley committed Sep 13, 2016
2 parents f20b567 + e6036c3 commit 2a3811e
Show file tree
Hide file tree
Showing 25 changed files with 887 additions and 611 deletions.
Expand Up @@ -111,8 +111,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject) th
httpContent.release();
}
if (httpObject instanceof LastHttpContent) {
write.complete();
dispose(ctx.pipeline(), response);
write.complete();
} else {
if (write.getRequested() > 0) {
ctx.read();
Expand All @@ -122,6 +122,11 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject) th
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
forceDispose(ctx.pipeline());
Expand Down Expand Up @@ -200,6 +205,7 @@ public void forwardTo(Response response, Action<? super MutableHeaders> headerMu
outgoingHeaders.remove(HttpHeaderNames.CONNECTION);
Exceptions.uncheck(() -> headerMutator.execute(outgoingHeaders));
response.status(status);

response.sendStream(getBody().bindExec());
}

Expand Down
Expand Up @@ -177,30 +177,33 @@ private void addCommonResponseHandlers(ChannelPipeline p, Downstream<? super T>
p.addLast(SSL_HANDLER_NAME, createSslHandler());
}

p.addLast(CLIENT_CODEC_HANDLER_NAME, new HttpClientCodec(4096, 8192, 8192, true));
p.addLast(CLIENT_CODEC_HANDLER_NAME, new HttpClientCodec(4096, 8192, 8192, false));

p.addLast(READ_TIMEOUT_HANDLER_NAME, new ReadTimeoutHandler(requestConfig.readTimeout.toNanos(), TimeUnit.NANOSECONDS));

p.addLast(REDIRECT_HANDLER_NAME, new SimpleChannelInboundHandler<HttpObject>(false) {
boolean redirected;
HttpResponse response;

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof PrematureChannelClosureException) {
Exception e = new PrematureChannelClosureException("Server " + requestConfig.uri + " closed the connection prematurely");
e.setStackTrace(cause.getStackTrace());
cause = e;
} else if (cause instanceof ReadTimeoutException) {
if (cause instanceof ReadTimeoutException) {
cause = new HttpClientReadTimeoutException("Read timeout (" + requestConfig.readTimeout + ") waiting on HTTP server at " + requestConfig.uri);
}

error(downstream, cause);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Exception e = new PrematureChannelClosureException("Server " + requestConfig.uri + " closed the connection prematurely");
error(downstream, e);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
this.response = (HttpResponse) msg;
int maxRedirects = requestConfig.maxRedirects;
int status = response.status().code();
String locationValue = response.headers().getAsString(HttpHeaderConstants.LOCATION);
Expand Down
Expand Up @@ -95,6 +95,9 @@ private ChannelFuture pre(HttpResponseStatus responseStatus) {
}

HttpResponse headersResponse = new CustomHttpResponse(responseStatus, responseHeaders);
if (isKeepAlive && HttpUtil.getContentLength(headersResponse, -1) == -1 && !HttpUtil.isTransferEncodingChunked(headersResponse)) {
HttpUtil.setTransferEncodingChunked(headersResponse, true);
}

if (channel.isOpen()) {
return channel.writeAndFlush(headersResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
Expand Down Expand Up @@ -231,6 +234,7 @@ public void onError(Throwable t) {
@Override
public void onComplete() {
if (done.compareAndSet(false, true)) {
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(cancelOnFailure);
post(responseStatus);
}
}
Expand All @@ -257,7 +261,7 @@ private void post(HttpResponseStatus responseStatus, ChannelFuture lastContentFu
private void notifyListeners(final HttpResponseStatus responseStatus, ChannelFuture future) {
if (outcomeListeners != null) {
future.addListener(ignore -> {
channel.attr(ATTRIBUTE_KEY).remove();
channel.attr(ATTRIBUTE_KEY).set(null);
SentResponse sentResponse = new DefaultSentResponse(new NettyHeadersBackedHeaders(responseHeaders), new DefaultStatus(responseStatus));
RequestOutcome requestOutcome = new DefaultRequestOutcome(ratpackRequest, sentResponse, stopTime);
for (Action<? super RequestOutcome> outcomeListener : outcomeListeners) {
Expand Down
Expand Up @@ -161,7 +161,7 @@ private void newRequest(final ChannelHandlerContext ctx, final HttpRequest netty
DefaultContext.start(channel.eventLoop(), requestConstants, serverRegistry, handlers, execution -> {
if (requestBody != null) {
requestBody.close();
channel.attr(BODY_ACCUMULATOR_KEY).remove();
channel.attr(BODY_ACCUMULATOR_KEY).set(null);
}

if (!transmitted.get()) {
Expand Down
4 changes: 2 additions & 2 deletions ratpack-core/src/main/java/ratpack/sse/ServerSentEvents.java
Expand Up @@ -68,7 +68,7 @@
*
* String expectedOutput = Arrays.asList(0, 1, 2, 3, 4)
* .stream()
* .map(i -> "event: counter\ndata: event " + i + "\nid: " + i + "\n")
* .map(i -> "id: " + i + "\nevent: counter\ndata: event " + i + "\n")
* .collect(joining("\n"))
* + "\n";
*
Expand Down Expand Up @@ -130,9 +130,9 @@ public void render(Context context) throws Exception {
ByteBufAllocator bufferAllocator = context.get(ByteBufAllocator.class);
Response response = context.getResponse();
response.getHeaders().add(HttpHeaderConstants.CONTENT_TYPE, HttpHeaderConstants.TEXT_EVENT_STREAM_CHARSET_UTF_8);
response.getHeaders().add(HttpHeaderConstants.TRANSFER_ENCODING, HttpHeaderConstants.CHUNKED);
response.getHeaders().add(HttpHeaderConstants.CACHE_CONTROL, HttpHeaderConstants.NO_CACHE_FULL);
response.getHeaders().add(HttpHeaderConstants.PRAGMA, HttpHeaderConstants.NO_CACHE);
response.forceCloseConnection();
response.sendStream(Streams.map(publisher, i -> ServerSentEventEncoder.INSTANCE.encode(i, bufferAllocator)));
}

Expand Down
Expand Up @@ -90,4 +90,9 @@ public String getEvent() {
public String getData() {
return data;
}

@Override
public String toString() {
return "Event{id='" + id + '\'' + ", event='" + event + '\'' + ", data='" + data + '\'' + '}';
}
}
Expand Up @@ -36,8 +36,9 @@ public DefaultServerSentEventStreamClient(HttpClient httpClient) {

@Override
public Promise<TransformablePublisher<Event<?>>> request(URI uri, Action<? super RequestSpec> action) {
return httpClient.requestStream(uri, action).
map(stream -> stream.getBody().streamMap(new ServerSentEventStreamMapDecoder(httpClient.getByteBufAllocator())));
return httpClient.requestStream(uri, action).map(r ->
new ServerSentEventDecodingPublisher(r.getBody(), httpClient.getByteBufAllocator())
);
}

}

0 comments on commit 2a3811e

Please sign in to comment.