Skip to content

Commit

Permalink
Improved handling of the stream close state.
Browse files Browse the repository at this point in the history
Now the stream close state is updated when the frame has been
successfully written, and when it is received.
The stream is closed in case of failures.
Just after the stream close state update, if the stream is closed
then it is removed from the session.
  • Loading branch information
sbordet committed Feb 18, 2015
1 parent 4b6d024 commit d4809e9
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public void onHeaders(HeadersFrame frame)
{
stream.process(frame, Callback.Adapter.INSTANCE);
notifyHeaders(stream, frame);
if (stream.isClosed())
removeStream(stream, false);
}
}

Expand Down Expand Up @@ -97,8 +95,6 @@ public void onPushPromise(PushPromiseFrame frame)
pushStream.process(frame, Callback.Adapter.INSTANCE);
Stream.Listener listener = notifyPush(stream, pushStream, frame);
pushStream.setListener(listener);
if (pushStream.isClosed())
removeStream(pushStream, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@
package org.eclipse.jetty.http2.client;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.HTTP2Stream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
Expand Down Expand Up @@ -199,18 +204,17 @@ public void succeeded()
}
});
}
});
}, new Stream.Listener.Adapter());
HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true);
stream.headers(response, Callback.Adapter.INSTANCE);
return null;
}
});

Session session = newClient(new Session.Listener.Adapter());
HeadersFrame frame = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, false);
Promise<Stream> promise = new Promise.Adapter<>();
HeadersFrame frame = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, true);
final CountDownLatch clientLatch = new CountDownLatch(1);
session.newStream(frame, promise, new Stream.Listener.Adapter()
session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public Stream.Listener onPush(Stream pushedStream, PushPromiseFrame frame)
Expand All @@ -231,4 +235,106 @@ public void onData(Stream pushedStream, DataFrame frame, Callback callback)
Assert.assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}

@Test
public void testPushedStreamResetIsClosed() throws Exception
{
final CountDownLatch serverLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(final Stream stream, HeadersFrame frame)
{
PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), 0, newRequest("GET", new HttpFields()));
stream.push(pushFrame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream pushedStream, ResetFrame frame)
{
Assert.assertTrue(pushedStream.isReset());
Assert.assertTrue(pushedStream.isClosed());
HeadersFrame response = new HeadersFrame(stream.getId(), new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields()), null, true);
stream.headers(response, Callback.Adapter.INSTANCE);
serverLatch.countDown();
}
});
return null;
}
});

Session session = newClient(new Session.Listener.Adapter());
HeadersFrame frame = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, true);
final CountDownLatch clientLatch = new CountDownLatch(2);
session.newStream(frame, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public Stream.Listener onPush(final Stream pushedStream, PushPromiseFrame frame)
{
pushedStream.reset(new ResetFrame(pushedStream.getId(), ErrorCode.REFUSED_STREAM_ERROR.code), new Callback.Adapter()
{
@Override
public void succeeded()
{
Assert.assertTrue(pushedStream.isReset());
Assert.assertTrue(pushedStream.isClosed());
clientLatch.countDown();
}
});
return null;
}

@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
clientLatch.countDown();
}
});

Assert.assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}

@Test
public void testFailedSessionClosesIdleStream() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
final List<Stream> streams = new ArrayList<>();
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
streams.add(stream);
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if ("GET".equals(request.getMethod()))
{
((HTTP2Session)stream.getSession()).getEndPoint().close();
// Try to write something to force an error.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.Adapter.INSTANCE);
}
return null;
}

@Override
public void onFailure(Session session, Throwable failure)
{
Assert.assertEquals(0, session.getStreams().size());
for (Stream stream : streams)
Assert.assertTrue(stream.isClosed());
latch.countDown();
}
});

Session session = newClient(new Session.Listener.Adapter());

// First stream will be idle on server.
HeadersFrame request1 = new HeadersFrame(0, newRequest("HEAD", new HttpFields()), null, true);
session.newStream(request1, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter());

// Second stream will fail on server.
HeadersFrame request2 = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, true);
session.newStream(request2, new Promise.Adapter<Stream>(), new Stream.Listener.Adapter());

Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ public void reset()
@Override
public void failed(Throwable x)
{
if (stream != null)
{
stream.close();
stream.getSession().removeStream(stream, true);
}
callback.failed(x);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ public void failed(Throwable x)
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
}
});
if (stream.isClosed())
removeStream(stream, false);
}
}
else
Expand Down Expand Up @@ -214,9 +212,6 @@ public void onReset(ResetFrame frame)
stream.process(frame, Callback.Adapter.INSTANCE);
else
notifyReset(this, frame);

if (stream != null)
removeStream(stream, false);
}

@Override
Expand Down Expand Up @@ -416,7 +411,6 @@ public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listen
final IStream stream = createLocalStream(streamId, promise);
if (stream == null)
return;
stream.updateClose(frame.isEndStream(), true);
stream.setListener(listener);

ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
Expand All @@ -428,7 +422,7 @@ public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listen
}

@Override
public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame)
public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
{
// Synchronization is necessary to atomically create
// the stream id and enqueue the frame to be sent.
Expand All @@ -441,7 +435,7 @@ public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame
final IStream pushStream = createLocalStream(streamId, promise);
if (pushStream == null)
return;
pushStream.updateClose(true, false);
pushStream.setListener(listener);

ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
queued = flusher.append(entry);
Expand Down Expand Up @@ -647,7 +641,8 @@ protected IStream newStream(int streamId)
return new HTTP2Stream(scheduler, this, streamId);
}

protected void removeStream(IStream stream, boolean local)
@Override
public void removeStream(IStream stream, boolean local)
{
IStream removed = streams.remove(stream.getId());
if (removed != null)
Expand Down Expand Up @@ -845,8 +840,10 @@ private void terminate()
{
if (closed.compareAndSet(current, CloseState.CLOSED))
{
// Close the flusher and disconnect.
flusher.close();
for (IStream stream : streams.values())
stream.close();
streams.clear();
disconnect();
return;
}
Expand Down Expand Up @@ -988,15 +985,14 @@ public void succeeded()
case HEADERS:
{
HeadersFrame headersFrame = (HeadersFrame)frame;
stream.updateClose(headersFrame.isEndStream(), true);
if (stream.isClosed())
if (stream.updateClose(headersFrame.isEndStream(), true))
removeStream(stream, true);
break;
}
case RST_STREAM:
{
if (stream != null)
removeStream(stream, true);
stream.close();
removeStream(stream, true);
break;
}
case SETTINGS:
Expand All @@ -1007,6 +1003,13 @@ public void succeeded()
flowControl.updateInitialStreamWindow(HTTP2Session.this, initialWindow, true);
break;
}
case PUSH_PROMISE:
{
// Pushed streams are implicitly remotely closed.
// They are closed when sending an end-stream DATA frame.
stream.updateClose(true, false);
break;
}
case GO_AWAY:
{
// We just sent a GO_AWAY, only shutdown the
Expand Down Expand Up @@ -1097,8 +1100,7 @@ public void succeeded()
{
// Only now we can update the close state
// and eventually remove the stream.
stream.updateClose(dataFrame.isEndStream(), true);
if (stream.isClosed())
if (stream.updateClose(dataFrame.isEndStream(), true))
removeStream(stream, true);
callback.succeeded();
}
Expand Down

0 comments on commit d4809e9

Please sign in to comment.