Skip to content

Commit

Permalink
Issue jetty#3537 - implementing handshaker for RFC8441 websockets ove…
Browse files Browse the repository at this point in the history
…r http2

Signed-off-by: lachan-roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed May 3, 2019
1 parent 7743708 commit 148b1cc
Show file tree
Hide file tree
Showing 9 changed files with 514 additions and 205 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package org.eclipse.jetty.http2.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;

public class EndpointOverHttp2 extends AbstractEndPoint
{
private static final int BUFFER_SIZE = 1024; // TODO: configured from session

private HttpTransportOverHTTP2 transport;
private IStream http2Stream;

private InetSocketAddress local;
private InetSocketAddress remote;

private ByteBuffer writeBuffer;

private DataFrame frame;
private Callback callback;
private AtomicReference<State> writeReady = new AtomicReference<>(State.IDLE);

private enum State
{
IDLE,
FLUSHING,
PENDING
}

public EndpointOverHttp2(HttpTransportOverHTTP2 transport, IStream stream, InetSocketAddress local, InetSocketAddress remote)
{
super(new ScheduledExecutorScheduler());
this.transport = transport;

this.http2Stream = stream;
this.local = local;
this.remote = remote;

this.writeBuffer = ByteBuffer.allocate(BUFFER_SIZE); // TODO: use the ByteBufferPool

stream.setListener(new Stream.Listener.Adapter()
{
// TODO: should we override other methods on this (maybe for failures)

@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
EndpointOverHttp2.this.frame = frame;
EndpointOverHttp2.this.callback = callback;
}
});
}

@Override
public InetSocketAddress getLocalAddress()
{
return local;
}

@Override
public InetSocketAddress getRemoteAddress()
{
return remote;
}

@Override
public Object getTransport()
{
return transport;
}

@Override
protected void onIncompleteFlush()
{
while (true)
{
switch (writeReady.get())
{
case IDLE:
getWriteFlusher().completeWrite();
break;

case FLUSHING:
if (!writeReady.compareAndSet(State.FLUSHING, State.PENDING))
continue;
break;

case PENDING:
throw new IllegalStateException();
}

break;
}
}

@Override
protected void needsFillInterest() throws IOException
{
//getFillInterest().fillable();

Callback succeedFrame = callback;
frame = null;
callback = null;
succeedFrame.succeeded();
}

@Override
public int fill(ByteBuffer buffer) throws IOException
{
return BufferUtil.put(frame.getData(), buffer);
}

@Override
public boolean flush(ByteBuffer... buffer) throws IOException
{
if (!writeReady.compareAndSet(State.IDLE, State.FLUSHING))
return false;

boolean incomplete = false;
BufferUtil.clearToFill(writeBuffer);
for (ByteBuffer bb : buffer)
{
int filled = BufferUtil.put(bb, writeBuffer);
if (filled == 0)
{
incomplete = true;
break;
}
}
BufferUtil.flipToFlush(writeBuffer, 0);

DataFrame frame = new DataFrame(http2Stream.getId(), writeBuffer, false);
http2Stream.data(frame, Callback.from(()->
{
while(true)
{
switch (writeReady.get())
{
case IDLE:
throw new IllegalStateException();

case FLUSHING:
if (!writeReady.compareAndSet(State.FLUSHING, State.IDLE))
continue;
break;

case PENDING:
if (!writeReady.compareAndSet(State.PENDING, State.IDLE))
continue;
getWriteFlusher().completeWrite();
break;
}

break;
}
}, t->close(t)));

return incomplete;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public ClientUpgradeRequest(WebSocketCoreClient webSocketClient, URI requestURI)

this.wsClient = webSocketClient;
this.futureCoreSession = new CompletableFuture<>();

// TODO: this is invalid for HTTP/2 requests
method(HttpMethod.GET);
version(HttpVersion.HTTP_1_1);

Expand Down Expand Up @@ -180,6 +182,7 @@ public void setSubProtocols(List<String> protocols)
@Override
public void send(final Response.CompleteListener listener)
{
// TODO: this adds only the HTTP/1.1 headers, (if HTTP/2 send CONNECT request)
initWebSocketHeaders();
super.send(listener);
}
Expand Down Expand Up @@ -232,11 +235,12 @@ public void onComplete(Result result)
}
}

// TODO: this will be a 200 response for HTTP/2 success
if (responseStatusCode != HttpStatus.SWITCHING_PROTOCOLS_101)
{
// Failed to upgrade (other reason)
handleException(
new UpgradeException(requestURI, responseStatusCode, "Failed to upgrade to websocket: Unexpected HTTP Response Status Code: " + responseLine));
handleException( new UpgradeException(requestURI, responseStatusCode,
"Failed to upgrade to websocket: Unexpected HTTP Response Status Code: " + responseLine));
}
}

Expand All @@ -249,9 +253,11 @@ protected void handleException(Throwable failure)
@Override
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
{
// TODO: http2 upgrade does not use upgrade header
if (!this.getHeaders().get(HttpHeader.UPGRADE).equalsIgnoreCase("websocket"))
throw new HttpResponseException("Not a WebSocket Upgrade", response);

// TODO: http2 upgrade does not use SEC_WEBSOCKET_KEY or SEC_WEBSOCKET_ACCEPT
// Check the Accept hash
String reqKey = this.getHeaders().get(HttpHeader.SEC_WEBSOCKET_KEY);
String expectedHash = WebSocketCore.hashKey(reqKey);
Expand Down Expand Up @@ -324,7 +330,6 @@ else if (values.length == 1)
// We can upgrade
EndPoint endp = httpConnection.getEndPoint();
customize(endp);

FrameHandler frameHandler = getFrameHandler(wsClient, response);

if (frameHandler == null)
Expand Down
Loading

0 comments on commit 148b1cc

Please sign in to comment.