Skip to content

Commit

Permalink
Fixes #250 - Implement HTTP CONNECT for HTTP/2.
Browse files Browse the repository at this point in the history
Final modifications after review.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 13, 2019
1 parent 85b455d commit 2dd5896
Show file tree
Hide file tree
Showing 15 changed files with 434 additions and 141 deletions.
35 changes: 26 additions & 9 deletions jetty-http/src/main/java/org/eclipse/jetty/http/MetaData.java
Expand Up @@ -135,7 +135,6 @@ public static class Request extends MetaData
{
private String _method;
private HttpURI _uri;
private String _protocol;

public Request(HttpFields fields)
{
Expand Down Expand Up @@ -175,7 +174,7 @@ public Request(String method, String scheme, HostPortHttpField hostPort, String
public Request(Request request)
{
this(request.getMethod(), new HttpURI(request.getURI()), request.getHttpVersion(), new HttpFields(request.getFields()), request.getContentLength());
setProtocol(request.getProtocol()); }
}

@Override
public void recycle()
Expand All @@ -184,7 +183,6 @@ public void recycle()
_method = null;
if (_uri != null)
_uri.clear();
_protocol = null;
}

@Override
Expand Down Expand Up @@ -235,12 +233,7 @@ public String getURIString()

public String getProtocol()
{
return _protocol;
}

public void setProtocol(String protocol)
{
_protocol = protocol;
return null;
}

@Override
Expand All @@ -252,6 +245,30 @@ public String toString()
}
}

public static class ConnectRequest extends Request
{
private String _protocol;

public ConnectRequest(HttpScheme scheme, HostPortHttpField authority, String path, HttpFields fields, String protocol)
{
super(HttpMethod.CONNECT.asString(), scheme, authority, path, HttpVersion.HTTP_2, fields, Long.MIN_VALUE);
_protocol = protocol;
}

@Override
public String getProtocol()
{
return _protocol;
}

@Override
public void recycle()
{
super.recycle();
_protocol = null;
}
}

public static class Response extends MetaData
{
private int _status;
Expand Down
Expand Up @@ -131,8 +131,7 @@ public void onData(Stream stream, DataFrame frame, Callback callback)
String host = "localhost";
int port = connector.getLocalPort();
String authority = host + ":" + port;
MetaData.Request request = new MetaData.Request(HttpMethod.CONNECT.asString(), HttpScheme.HTTP, new HostPortHttpField(authority), "/", HttpVersion.HTTP_2, new HttpFields());
request.setProtocol("websocket");
MetaData.Request request = new MetaData.ConnectRequest(HttpScheme.HTTP, new HostPortHttpField(authority), "/", new HttpFields(), "websocket");
FuturePromise<Stream> streamPromise = new FuturePromise<>();
client.newStream(new HeadersFrame(request, null, false), streamPromise, new Stream.Listener.Adapter()
{
Expand Down
Expand Up @@ -24,8 +24,17 @@
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;

/**
* <p>A HTTP/2 specific handler of events for normal and tunneled exchanges.</p>
*/
public interface HTTP2Channel
{
/**
* <p>A client specific handler for events that happen after
* a {@code HEADERS} response frame is received.</p>
* <p>{@code DATA} frames may be handled as response content
* or as opaque tunnelled data.</p>
*/
public interface Client
{
public void onData(DataFrame frame, Callback callback);
Expand All @@ -35,6 +44,12 @@ public interface Client
public void onFailure(Throwable failure, Callback callback);
}

/**
* <p>A server specific handler for events that happen after
* a {@code HEADERS} request frame is received.</p>
* <p>{@code DATA} frames may be handled as request content
* or as opaque tunnelled data.</p>
*/
public interface Server
{
public Runnable onData(DataFrame frame, Callback callback);
Expand Down
Expand Up @@ -48,6 +48,7 @@ public abstract class HTTP2StreamEndPoint implements EndPoint
private final AtomicReference<Callback> readCallback = new AtomicReference<>();
private final long created = System.currentTimeMillis();
private final AtomicBoolean eof = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private final IStream stream;
private Connection connection;

Expand All @@ -71,7 +72,7 @@ public InetSocketAddress getRemoteAddress()
@Override
public boolean isOpen()
{
return !stream.isClosed();
return !closed.get();
}

@Override
Expand Down Expand Up @@ -157,29 +158,41 @@ public boolean isInputShutdown()
@Override
public void close(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("closing {}, cause: {}", this, cause);
shutdownOutput();
// TODO: do we need this code after the shutdownOutput() callback?
stream.close();
onClose(cause);
if (closed.compareAndSet(false, true))
{
if (LOG.isDebugEnabled())
LOG.debug("closing {}, cause: {}", this, cause);
shutdownOutput();
stream.close();
onClose(cause);
}
}

@Override
public int fill(ByteBuffer sink)
public int fill(ByteBuffer sink) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("filling {} on {}", BufferUtil.toDetailString(sink), this);

Entry entry;
synchronized (this)
{
entry = dataQueue.poll();
}

if (LOG.isDebugEnabled())
LOG.debug("filled {} on {}", entry, this);

if (entry == null)
return 0;
if (entry.eof)
if (entry.isEOF())
{
entry.succeed();
return shutdownInput();
}
IOException failure = entry.ioFailure();
if (failure != null)
{
entry.fail(failure);
throw failure;
}

int sinkPosition = BufferUtil.flipToFill(sink);
ByteBuffer source = entry.buffer;
Expand All @@ -200,7 +213,7 @@ public int fill(ByteBuffer sink)
}
else
{
entry.callback.succeeded();
entry.succeed();
}
return length;
}
Expand Down Expand Up @@ -516,24 +529,30 @@ protected void offerData(DataFrame frame, Callback callback)
if (frame.isEndStream())
{
if (buffer.hasRemaining())
offer(buffer, Callback.from(() -> {}, callback::failed), false);
offer(BufferUtil.EMPTY_BUFFER, callback, true);
offer(buffer, Callback.from(() -> {}, callback::failed), null);
offer(BufferUtil.EMPTY_BUFFER, callback, Entry.EOF);
}
else
{
if (buffer.hasRemaining())
offer(buffer, callback, false);
offer(buffer, callback, null);
else
callback.succeeded();
}
process();
}

private void offer(ByteBuffer buffer, Callback callback, boolean eof)
protected void offerFailure(Throwable failure)
{
offer(BufferUtil.EMPTY_BUFFER, Callback.NOOP, failure);
process();
}

private void offer(ByteBuffer buffer, Callback callback, Throwable failure)
{
synchronized (this)
{
dataQueue.offer(new Entry(buffer, callback, eof));
dataQueue.offer(new Entry(buffer, callback, failure));
}
}

Expand Down Expand Up @@ -564,15 +583,46 @@ public String toString()

private static class Entry
{
private static final Throwable EOF = new Throwable();

private final ByteBuffer buffer;
private final Callback callback;
private final boolean eof;
private final Throwable failure;

private Entry(ByteBuffer buffer, Callback callback, boolean eof)
private Entry(ByteBuffer buffer, Callback callback, Throwable failure)
{
this.buffer = buffer;
this.callback = callback;
this.eof = eof;
this.failure = failure;
}

private boolean isEOF()
{
return failure == EOF;
}

private IOException ioFailure()
{
if (failure == null || isEOF())
return null;
return failure instanceof IOException ? (IOException)failure : new IOException(failure);
}

private void succeed()
{
callback.succeeded();
}

private void fail(Throwable failure)
{
callback.failed(failure);
}

@Override
public String toString()
{
return String.format("%s@%x[b=%s,eof=%b,f=%s]", getClass().getSimpleName(), hashCode(),
BufferUtil.toDetailString(buffer), isEOF(), isEOF() ? null : failure);
}
}

Expand Down
Expand Up @@ -255,9 +255,10 @@ public MetaData build() throws HpackException.StreamException
if (_path == null)
throw new HpackException.StreamException("No Path");
}
MetaData.Request request = new MetaData.Request(_method, _scheme, _authority, _path, HttpVersion.HTTP_2, fields, _contentLength);
request.setProtocol(_protocol);
return request;
if (isConnect)
return new MetaData.ConnectRequest(_scheme, _authority, _path, fields, _protocol);
else
return new MetaData.Request(_method, _scheme, _authority, _path, HttpVersion.HTTP_2, fields, _contentLength);
}
if (_response)
{
Expand Down
Expand Up @@ -376,7 +376,7 @@ public Runnable onRequest(HeadersFrame frame)
}

@Override
protected void prepareUpgrade()
protected void checkAndPrepareUpgrade()
{
if (isTunnel())
getHttpTransport().prepareUpgrade();
Expand Down
Expand Up @@ -389,7 +389,6 @@ public boolean isTunnellingSupported()
@Override
public EndPoint getTunnellingEndPoint()
{
// TODO: idle timeout
return new ServerHTTP2StreamEndPoint(getStream());
}

Expand Down
Expand Up @@ -157,17 +157,17 @@ public void succeeded()
}
else
{
HttpFields trailers = retrieveTrailers();
if (trailers != null)
{
if (transportCallback.start(new SendTrailers(callback, trailers), true))
sendHeadersFrame(response, false, transportCallback);
}
else
{
if (transportCallback.start(callback, true))
sendHeadersFrame(response, true, transportCallback);
}
HttpFields trailers = retrieveTrailers();
if (trailers != null)
{
if (transportCallback.start(new SendTrailers(callback, trailers), true))
sendHeadersFrame(response, false, transportCallback);
}
else
{
if (transportCallback.start(callback, true))
sendHeadersFrame(response, true, transportCallback);
}
}
}
else
Expand Down
Expand Up @@ -49,33 +49,38 @@ public Runnable onData(DataFrame frame, Callback callback)
@Override
public Runnable onTrailer(HeadersFrame frame)
{
// TODO
// We are tunnelling, so there are no trailers.
return null;
}

@Override
public boolean onTimeout(Throwable failure, Consumer<Runnable> consumer)
{
// TODO: Consumer not used?
if (LOG.isDebugEnabled())
LOG.debug("idle timeout on {}: {}", this, failure);
offerFailure(failure);
boolean result = true;
Connection connection = getConnection();
if (connection != null)
return connection.onIdleExpired();
return true;
result = connection.onIdleExpired();
consumer.accept(() -> close(failure));
return result;
}

@Override
public Runnable onFailure(Throwable failure, Callback callback)
{
// TODO
return null;
if (LOG.isDebugEnabled())
LOG.debug("failure on {}: {}", this, failure);
offerFailure(failure);
close(failure);
return callback::succeeded;
}

@Override
public boolean isIdle()
{
// TODO
// We are tunnelling, so we are never idle.
return false;
}
}

0 comments on commit 2dd5896

Please sign in to comment.