Skip to content

Commit

Permalink
Http/2 Priority on CLOSED stream
Browse files Browse the repository at this point in the history
Motivation:
The encoder/decoder currently do not handle streams which have previously existed but no longer exist because they were closed. The specification requires supporting this.

Modifications:
- encoder/decoder should tolerate the frame or the dependent frame not existing in the streams map due to the fact that it may have previously existed.

Result:
encoder/decoder are more compliant with the specification.
  • Loading branch information
Scottmitch committed Mar 29, 2015
1 parent 0d3a6e0 commit ab74dcc
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 46 deletions.
Expand Up @@ -22,6 +22,7 @@
import static io.netty.handler.codec.http2.Http2CodecUtil.immediateRemovalPolicy;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
Expand Down Expand Up @@ -689,8 +690,7 @@ private final class DefaultEndpoint<F extends Http2FlowController> implements En

@Override
public int nextStreamId() {
// For manually created client-side streams, 1 is reserved for HTTP upgrade, so
// start at 3.
// For manually created client-side streams, 1 is reserved for HTTP upgrade, so start at 3.
return nextStreamId > 1 ? nextStreamId : nextStreamId + 2;
}

Expand Down Expand Up @@ -836,24 +836,22 @@ private void checkNewStreamAllowed(int streamId) throws Http2Exception {
if (isGoAway()) {
throw connectionError(PROTOCOL_ERROR, "Cannot create a stream since the connection is going away");
}
verifyStreamId(streamId);
if (!canCreateStream()) {
throw connectionError(REFUSED_STREAM, "Maximum streams exceeded for this endpoint.");
}
}

private void verifyStreamId(int streamId) throws Http2Exception {
if (streamId < 0) {
throw new Http2NoMoreStreamIdsException();
}
if (streamId < nextStreamId) {
throw connectionError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
streamId, nextStreamId);
}
if (!createdStreamId(streamId)) {
throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection",
streamId, server ? "server" : "client");
}
// This check must be after all id validated checks, but before the max streams check because it may be
// recoverable to some degree for handling frames which can be sent on closed streams.
if (streamId < nextStreamId) {
throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
streamId, nextStreamId);
}
if (!canCreateStream()) {
throw connectionError(REFUSED_STREAM, "Maximum streams exceeded for this endpoint.");
}
}

private boolean isLocal() {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;

import java.util.List;

Expand Down Expand Up @@ -374,15 +375,20 @@ public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDe
return;
}

if (stream == null) {
// PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the
// first frame to be received for a stream that we must create the stream.
stream = connection.remote().createStream(streamId);
}
try {
if (stream == null) {
// PRIORITY frames always identify a stream. This means that if a PRIORITY frame is the
// first frame to be received for a stream that we must create the stream.
stream = connection.remote().createStream(streamId);
}

// This call will create a stream for streamDependency if necessary.
// For this reason it must be done before notifying the listener.
stream.setPriority(streamDependency, weight, exclusive);
// This call will create a stream for streamDependency if necessary.
// For this reason it must be done before notifying the listener.
stream.setPriority(streamDependency, weight, exclusive);
} catch (ClosedStreamCreationException ignored) {
// It is possible that either the stream for this frame or the parent stream is closed.
// In this case we should ignore the exception and allow the frame to be sent.
}

listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
}
Expand Down
Expand Up @@ -24,6 +24,7 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import io.netty.util.ReferenceCountUtil;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -238,9 +239,14 @@ public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int
stream = connection.local().createStream(streamId);
}

// The set priority operation must be done before sending the frame. The parent may not yet exist
// and the priority tree may also be modified before sending.
stream.setPriority(streamDependency, weight, exclusive);
} catch (Throwable e) {
return promise.setFailure(e);
} catch (ClosedStreamCreationException ignored) {
// It is possible that either the stream for this frame or the parent stream is closed.
// In this case we should ignore the exception and allow the frame to be sent.
} catch (Throwable t) {
return promise.setFailure(t);
}

ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
Expand Down
Expand Up @@ -72,6 +72,18 @@ public static Http2Exception connectionError(Http2Error error, Throwable cause,
return new Http2Exception(error, String.format(fmt, args), cause);
}

/**
* Use if an error has occurred which can not be isolated to a single stream, but instead applies
* to the entire connection.
* @param error The type of error as defined by the HTTP/2 specification.
* @param fmt String with the content and format for the additional debug data.
* @param args Objects which fit into the format defined by {@code fmt}.
* @return An exception which can be translated into a HTTP/2 error.
*/
public static Http2Exception closedStreamError(Http2Error error, String fmt, Object... args) {
return new ClosedStreamCreationException(error, String.format(fmt, args));
}

/**
* Use if an error which can be isolated to a single stream has occurred. If the {@code id} is not
* {@link Http2CodecUtil#CONNECTION_STREAM_ID} then a {@link Http2Exception.StreamException} will be returned.
Expand Down Expand Up @@ -130,6 +142,25 @@ public static int streamId(Http2Exception e) {
return isStreamError(e) ? ((StreamException) e).streamId() : CONNECTION_STREAM_ID;
}

/**
* Used when a stream creation attempt fails but may be because the stream was previously closed.
*/
public static final class ClosedStreamCreationException extends Http2Exception {
private static final long serialVersionUID = -1911637707391622439L;

public ClosedStreamCreationException(Http2Error error) {
super(error);
}

public ClosedStreamCreationException(Http2Error error, String message) {
super(error, message);
}

public ClosedStreamCreationException(Http2Error error, String message, Throwable cause) {
super(error, message, cause);
}
}

/**
* Represents an exception that can be isolated to a single stream (as opposed to the entire connection).
*/
Expand Down
Expand Up @@ -46,10 +46,10 @@ int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int paddin
boolean endOfStream) throws Http2Exception;

/**
* Handles an inbound HEADERS frame.
* Handles an inbound {@code HEADERS} frame.
* <p>
* Only one of the following methods will be called for each HEADERS frame sequence.
* One will be called when the END_HEADERS flag has been received.
* Only one of the following methods will be called for each {@code HEADERS} frame sequence.
* One will be called when the {@code END_HEADERS} flag has been received.
* <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
Expand All @@ -70,11 +70,11 @@ void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers
boolean endOfStream) throws Http2Exception;

/**
* Handles an inbound HEADERS frame with priority information specified. Only called if END_HEADERS encountered.
*
* Handles an inbound {@code HEADERS} frame with priority information specified.
* Only called if {@code END_HEADERS} encountered.
* <p>
* Only one of the following methods will be called for each HEADERS frame sequence.
* One will be called when the END_HEADERS flag has been received.
* Only one of the following methods will be called for each {@code HEADERS} frame sequence.
* One will be called when the {@code END_HEADERS} flag has been received.
* <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
Expand All @@ -100,7 +100,11 @@ void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers
throws Http2Exception;

/**
* Handles an inbound PRIORITY frame.
* Handles an inbound {@code PRIORITY} frame.
* <p>
* Note that is it possible to have this method called and no stream object exist for either
* {@code streamId}, {@code streamDependency}, or both. This is because the {@code PRIORITY} frame can be
* sent/received when streams are in the {@code CLOSED} state.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
Expand All @@ -113,7 +117,7 @@ void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependenc
short weight, boolean exclusive) throws Http2Exception;

/**
* Handles an inbound RST_STREAM frame.
* Handles an inbound {@code RST_STREAM} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the stream that is terminating.
Expand All @@ -122,21 +126,21 @@ void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependenc
void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception;

/**
* Handles an inbound SETTINGS acknowledgment frame.
* Handles an inbound {@code SETTINGS} acknowledgment frame.
* @param ctx the context from the handler where the frame was read.
*/
void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception;

/**
* Handles an inbound SETTINGS frame.
* Handles an inbound {@code SETTINGS} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param settings the settings received from the remote endpoint.
*/
void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception;

/**
* Handles an inbound PING frame.
* Handles an inbound {@code PING} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param data the payload of the frame. If this buffer needs to be retained by the listener
Expand All @@ -145,7 +149,7 @@ void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependenc
void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception;

/**
* Handles an inbound PING acknowledgment.
* Handles an inbound {@code PING} acknowledgment.
*
* @param ctx the context from the handler where the frame was read.
* @param data the payload of the frame. If this buffer needs to be retained by the listener
Expand All @@ -154,13 +158,13 @@ void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependenc
void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception;

/**
* Handles an inbound PUSH_PROMISE frame. Only called if END_HEADERS encountered.
* Handles an inbound {@code PUSH_PROMISE} frame. Only called if {@code END_HEADERS} encountered.
* <p>
* Promised requests MUST be authoritative, cacheable, and safe.
* See <a href="https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-8.2">[RFC http2], Seciton 8.2</a>.
* <p>
* Only one of the following methods will be called for each HEADERS frame sequence.
* One will be called when the END_HEADERS flag has been received.
* Only one of the following methods will be called for each {@code HEADERS} frame sequence.
* One will be called when the {@code END_HEADERS} flag has been received.
* <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
Expand All @@ -180,7 +184,7 @@ void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStre
Http2Headers headers, int padding) throws Http2Exception;

/**
* Handles an inbound GO_AWAY frame.
* Handles an inbound {@code GO_AWAY} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param lastStreamId the last known stream of the remote endpoint.
Expand All @@ -192,7 +196,7 @@ void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, B
throws Http2Exception;

/**
* Handles an inbound WINDOW_UPDATE frame.
* Handles an inbound {@code WINDOW_UPDATE} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the stream the frame was sent on.
Expand Down
Expand Up @@ -45,6 +45,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -379,6 +380,40 @@ public void priorityReadShouldSucceed() throws Exception {
verify(stream, never()).open(anyBoolean());
}

@Test
public void priorityReadOnPreviouslyExistingStreamShouldSucceed() throws Exception {
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(remote).createStream(eq(STREAM_ID));
when(connection.stream(STREAM_ID)).thenReturn(null);
when(connection.requireStream(STREAM_ID)).thenReturn(null);
// Just return the stream object as the connection stream to ensure the dependent stream "exists"
when(connection.stream(0)).thenReturn(stream);
when(connection.requireStream(0)).thenReturn(stream);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
verify(remote).createStream(STREAM_ID);
}

@Test
public void priorityReadOnPreviouslyParentExistingStreamShouldSucceed() throws Exception {
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(stream).setPriority(eq(0), eq((short) 255), eq(true));
when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
}

@Test
public void windowUpdateReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.goAwaySent()).thenReturn(true);
Expand Down

0 comments on commit ab74dcc

Please sign in to comment.