Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ public void onSuccess(Response response)
else
{
// Server either does not support 100 Continue,
// or it does and wants to refuse the request content,
// or it does but does not want to read the request content,
// or we got some other HTTP status code like a redirect.
ResponseListeners listeners = exchange.getResponseListeners();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
listeners.emitSuccess(contentResponse);
exchange.proceed(new HttpRequestException("Expectation failed", request));
exchange.proceed(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,12 @@ public Request getRequest()
{
return request;
}

public static class NoErrorException extends HttpRequestException
{
public NoErrorException(String message, Request request)
{
super(message, request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package org.eclipse.jetty.client.transport;

import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.HttpRequestException;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.Promise;
Expand Down Expand Up @@ -75,6 +75,14 @@ public Throwable getRequestFailure()
}
}

private Throwable resolveRequestFailure()
{
assert lock.isHeldByCurrentThread();
if (requestFailure instanceof HttpRequestException.NoErrorException)
return null;
return requestFailure;
}

public ResponseListeners getResponseListeners()
{
return listeners;
Expand Down Expand Up @@ -207,7 +215,7 @@ public Result terminateRequest()
if (requestState == State.COMPLETED)
requestState = State.TERMINATED;
if (requestState == State.TERMINATED && responseState == State.TERMINATED)
result = new Result(getRequest(), requestFailure, getResponse(), responseFailure);
result = new Result(getRequest(), resolveRequestFailure(), getResponse(), responseFailure);
}

if (LOG.isDebugEnabled())
Expand All @@ -224,7 +232,7 @@ public Result terminateResponse()
if (responseState == State.COMPLETED)
responseState = State.TERMINATED;
if (requestState == State.TERMINATED && responseState == State.TERMINATED)
result = new Result(getRequest(), requestFailure, getResponse(), responseFailure);
result = new Result(getRequest(), resolveRequestFailure(), getResponse(), responseFailure);
}

if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -256,12 +264,6 @@ public void abort(Throwable failure, Promise<Boolean> promise)

// We failed this exchange, deal with it.

// Applications could be blocked providing
// request content, notify them of the failure.
Request.Content body = request.getBody();
if (abortRequest && body != null)
body.fail(failure);

// Case #1: exchange was in the destination queue.
if (destination.remove(this))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ private void abortRequest(HttpExchange exchange)
{
Throwable failure = this.failure.get();

// Applications could be blocked providing
// request content, notify them of the failure.
HttpRequest request = exchange.getRequest();
Content.Source content = request.getBody();
if (content != null)
Expand All @@ -243,7 +245,7 @@ private void abortRequest(HttpExchange exchange)
dispose();

if (LOG.isDebugEnabled())
LOG.debug("Request abort {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
LOG.debug("Request abort {} {} on {}", request, exchange, getHttpChannel(), failure);
request.notifyFailure(failure);

// Mark atomically the request as terminated, with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,24 @@ public void exchangeTerminated(HttpExchange exchange, Result result)
Stream stream = getStream();
if (LOG.isDebugEnabled())
LOG.debug("exchange terminated {} {}", result, stream);
if (result.isSucceeded())
if (result.isSucceeded() || stream == null)
{
release();
}
else
{
if (stream != null)
if (result.getResponseFailure() != null)
{
// The response failed, try to send a RST_STREAM to
// the server in case it is still sending DATA frames.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), new ReleaseCallback());
}
else
release();
{
// The request failed, the server should send a RST_STREAM,
// but we cannot count on that, get rid of the stream here.
((HTTP2Stream)stream).dispose();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;

import org.eclipse.jetty.client.HttpRequestException;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.Response;
Expand Down Expand Up @@ -217,7 +218,11 @@ void onReset(ResetFrame frame)
if (exchange == null)
return;
int error = frame.getError();
exchange.getRequest().abort(new IOException(ErrorCode.toString(error, "reset_code_" + error)));
String message = ErrorCode.toString(error, "reset_code_" + error);
Throwable failure = error == ErrorCode.NO_ERROR.code
? new HttpRequestException.NoErrorException(message, exchange.getRequest())
: new IOException(message);
exchange.getRequest().abort(failure);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,12 @@ public void onPriority(PriorityFrame frame)
@Override
public void onReset(ResetFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {} on {}", frame, this);

int streamId = frame.getStreamId();
HTTP2Stream stream = getStream(streamId);

if (LOG.isDebugEnabled())
LOG.debug("Received {} for {} on {}", frame, stream, this);

if (stream != null)
{
stream.process(frame, new OnResetCallback());
Expand Down Expand Up @@ -703,10 +704,7 @@ void reset(HTTP2Stream stream, ResetFrame frame, Callback callback)
control(stream, Callback.from(() ->
{
if (stream != null)
{
stream.close();
removeStream(stream);
}
stream.dispose();
}, callback), frame);
}

Expand Down Expand Up @@ -1273,10 +1271,7 @@ boolean hasHighPriority()
public void failed(Throwable x)
{
if (stream != null)
{
stream.close();
stream.getSession().removeStream(stream);
}
stream.dispose();
super.failed(x);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,7 @@ private void onReset(ResetFrame frame, Callback callback)
failure = new EofException("reset");
flowControlLength = drain();
}
close();
boolean removed = session.removeStream(this);
boolean removed = dispose();
session.dataConsumed(this, flowControlLength);
if (removed)
notifyReset(this, frame, callback);
Expand Down Expand Up @@ -609,8 +608,7 @@ private void onFailure(FailureFrame frame, Callback callback)
failure = frame.getFailure();
flowControlLength = drain();
}
close();
boolean removed = session.removeStream(this);
boolean removed = dispose();
session.dataConsumed(this, flowControlLength);
if (removed)
notifyFailure(this, frame, callback);
Expand Down Expand Up @@ -766,6 +764,8 @@ public int updateRecvWindow(int delta)
@Override
public void close()
{
if (LOG.isDebugEnabled())
LOG.debug("Close for {}", this);
CloseState oldState = closeState.getAndSet(CloseState.CLOSED);
if (oldState != CloseState.CLOSED)
{
Expand All @@ -775,6 +775,12 @@ public void close()
}
}

public boolean dispose()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc please.
Does it need to be public?

{
close();
return getSession().removeStream(this);
}

public void onClose()
{
notifyClosed(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ public void succeeded()
// Send a reset to the other end so that it stops sending data.
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 response #{}/{}: unconsumed request content, resetting stream", _stream.getId(), Integer.toHexString(_stream.getSession().hashCode()));
_stream.reset(new ResetFrame(_stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
_stream.reset(new ResetFrame(_stream.getId(), ErrorCode.NO_ERROR.code), Callback.NOOP);
}
}
_httpChannel.recycle();
Expand All @@ -619,9 +619,10 @@ public void succeeded()
@Override
public void failed(Throwable x)
{
ErrorCode errorCode = x == HttpStream.CONTENT_NOT_CONSUMED ? ErrorCode.NO_ERROR : ErrorCode.CANCEL_STREAM_ERROR;
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 response #{}/{} aborted", _stream.getId(), Integer.toHexString(_stream.getSession().hashCode()));
_stream.reset(new ResetFrame(_stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
LOG.debug("HTTP2 response #{}/{} failed {}", _stream.getId(), Integer.toHexString(_stream.getSession().hashCode()), errorCode, x);
_stream.reset(new ResetFrame(_stream.getId(), errorCode.code), Callback.NOOP);
}

private class SendTrailers extends Callback.Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,16 +494,17 @@ public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP3 Response #{}/{}: unconsumed request content, resetting stream", stream.getId(), Integer.toHexString(stream.getSession().hashCode()));
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), new IOException("unconsumed content"));
stream.reset(HTTP3ErrorCode.NO_ERROR.code(), new IOException("unconsumed content"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use CONTENT_NOT_CONSUMED?

}
}

@Override
public void failed(Throwable x)
{
HTTP3ErrorCode errorCode = x == HttpStream.CONTENT_NOT_CONSUMED ? HTTP3ErrorCode.NO_ERROR : HTTP3ErrorCode.REQUEST_CANCELLED_ERROR;
if (LOG.isDebugEnabled())
LOG.debug("HTTP3 Response #{}/{} aborted", stream.getId(), Integer.toHexString(stream.getSession().hashCode()));
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
LOG.debug("HTTP3 Response #{}/{} failed {}", stream.getId(), Integer.toHexString(stream.getSession().hashCode()), errorCode, x);
stream.reset(errorCode.code(), x);
}

public void onIdleTimeout(TimeoutException failure, BiConsumer<Runnable, Boolean> consumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/
public interface HttpStream extends Callback
{
Exception CONTENT_NOT_CONSUMED = new StaticException("Content not consumed");
Exception CONTENT_NOT_CONSUMED = new StaticException("Unconsumed request content");

/**
* <p>Attribute name to be used as a {@link Request} attribute to store/retrieve
Expand Down Expand Up @@ -119,7 +119,7 @@ static Throwable consumeAvailable(HttpStream stream, HttpConfiguration httpConfi

// if we cannot read to EOF then fail the stream rather than wait for unconsumed content
if (content == null)
return CONTENT_NOT_CONSUMED;
break;

// Always release any returned content. This is a noop for EOF and Error content.
content.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ public boolean handle(Request request, Response response, Callback callback)

assertThat(stream.isComplete(), is(true));
assertThat(stream.getFailure(), notNullValue());
assertThat(stream.getFailure().getMessage(), containsString("Content not consumed"));
assertThat(stream.getFailure().getMessage(), containsString("Unconsumed request content"));
assertThat(stream.getResponse(), notNullValue());
assertThat(stream.getResponse().getStatus(), equalTo(200));
assertThat(stream.getResponseHeaders().get(HttpHeader.CONTENT_TYPE), equalTo(MimeTypes.Type.TEXT_PLAIN_UTF_8.asString()));
Expand Down
Loading