Skip to content

Commit

Permalink
Fixed tests and code after merge of #5310 from 9.4.x.
Browse files Browse the repository at this point in the history
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Nov 27, 2020
1 parent 963ea59 commit 8c465ac
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -611,20 +611,9 @@ public void newStream(IStream.FrameList frames, Promise<Stream> promise, Stream.
* allocated stream id, or null if not interested in the modified headers frame
* @param listener the listener that gets notified of stream events
*/
public void newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Promise<Stream> promise)
public Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer<Throwable> failFn)
{
streamsState.newUpgradeStream(frame, listener, promise);
/*
// TODO: cannot do this, we need to call StreamsState.
HeadersFrame frame = frame;
int streamId = frame.getStreamId();
if (streamId <= 0)
{
streamId = localStreamIds.getAndAdd(2);
frame = frame.withStreamId(streamId);
}
return createLocalStream(streamId, (MetaData.Request)frame.getMetaData());
*/
return streamsState.newUpgradeStream(frame, listener, failFn);
}

protected IStream newStream(int streamId, MetaData.Request request, boolean local)
Expand Down Expand Up @@ -790,15 +779,15 @@ private void frame(HTTP2Flusher.Entry entry, boolean flush)
}
}

protected IStream createLocalStream(int streamId, MetaData.Request request, Promise<Stream> promise)
protected IStream createLocalStream(int streamId, MetaData.Request request, Consumer<Throwable> failFn)
{
while (true)
{
int localCount = localStreamCount.get();
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
failFn.accept(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
Expand All @@ -817,7 +806,7 @@ protected IStream createLocalStream(int streamId, MetaData.Request request, Prom
else
{
localStreamCount.decrementAndGet();
promise.failed(new IllegalStateException("Duplicate stream " + streamId));
failFn.accept(new IllegalStateException("Duplicate stream " + streamId));
return null;
}
}
Expand Down Expand Up @@ -2110,28 +2099,25 @@ private void newLocalStream(IStream.FrameList frameList, Promise<Stream> promise
}
}

private void newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Promise<Stream> promise)
private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer<Throwable> failFn)
{
int streamId;
try (AutoLock l = lock.lock())
{
streamId = localStreamIds.getAndAdd(2);
HTTP2Session.this.onStreamCreated(streamId);
}
IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData(), new Promise<>()
IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData(), x ->
{
@Override
public void failed(Throwable x)
{
HTTP2Session.this.onStreamDestroyed(streamId);
promise.failed(x);
}
HTTP2Session.this.onStreamDestroyed(streamId);
failFn.accept(x);
});
if (stream != null)
{
stream.setListener(listener);
stream.updateClose(frame.isEndStream(), CloseState.Event.AFTER_SEND);
}
return stream;
}

private boolean newRemoteStream(int streamId)
Expand Down Expand Up @@ -2180,7 +2166,7 @@ private boolean createLocalStream(Slot slot, List<StreamFrame> frames, Promise<S
MetaData.Request request = extractMetaDataRequest(frames.get(0));
if (request == null)
return false;
IStream stream = HTTP2Session.this.createLocalStream(streamId, request, promise);
IStream stream = HTTP2Session.this.createLocalStream(streamId, request, promise::failed);
if (stream == null)
return false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,27 +114,21 @@ public void upgrade(Map<String, Object> context)
MetaData.Request metaData = new MetaData.Request(request.getMethod(), HttpURI.from(request.getURI()), HttpVersion.HTTP_2, request.getHeaders());
// We do not support upgrade requests with content, so endStream=true.
HeadersFrame frame = new HeadersFrame(metaData, null, true);
((HTTP2Session)session).newUpgradeStream(frame, http2Channel.getStreamListener(), new Promise<>()
Stream stream = ((HTTP2Session)session).newUpgradeStream(frame, http2Channel.getStreamListener(), failure ->
{
@Override
public void succeeded(Stream stream)
{
http2Channel.setStream(stream);
newExchange.requestComplete(null);
newExchange.terminateRequest();
if (LOG.isDebugEnabled())
LOG.debug("Upgrade succeeded for {}", HttpConnectionOverHTTP2.this);
}

@Override
public void failed(Throwable failure)
{
newExchange.requestComplete(failure);
newExchange.terminateRequest();
if (LOG.isDebugEnabled())
LOG.debug("Upgrade failed for {}", HttpConnectionOverHTTP2.this);
}
newExchange.requestComplete(failure);
newExchange.terminateRequest();
if (LOG.isDebugEnabled())
LOG.debug("Upgrade failed for {}", HttpConnectionOverHTTP2.this);
});
if (stream != null)
{
http2Channel.setStream(stream);
newExchange.requestComplete(null);
newExchange.terminateRequest();
if (LOG.isDebugEnabled())
LOG.debug("Upgrade succeeded for {}", HttpConnectionOverHTTP2.this);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ public void testHTTP11UpgradeToH2CFailedServerClose() throws Exception
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
jettyRequest.getHttpChannel().getEndPoint().getConnection().close();
jettyRequest.getHttpChannel().getEndPoint().close();
}
});
ClientConnector clientConnector = new ClientConnector();
Expand Down

0 comments on commit 8c465ac

Please sign in to comment.