Skip to content

Commit

Permalink
Issue #3537 - Bootstrapping WebSockets with HTTP/2.
Browse files Browse the repository at this point in the history
Second draft pass the implementation.
Rewired the HTTP/1.1 mechanism to the new APIs.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Jul 9, 2019
1 parent a485f03 commit 1079bf8
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
Expand Down Expand Up @@ -98,30 +99,18 @@ public Result exchangeTerminating(HttpExchange exchange, Result result)
return result;

HttpResponse response = exchange.getResponse();

if ((response.getVersion() == HttpVersion.HTTP_1_1) &&
(response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101))
if (response.getVersion() == HttpVersion.HTTP_1_1 && response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
String next_connection = response.getHeaders().get(HttpHeader.CONNECTION);
if ((next_connection == null) || !next_connection.toLowerCase(Locale.US).contains("upgrade"))
{
return new Result(result,new HttpResponseException("101 Switching Protocols without Connection: Upgrade not supported",response));
}

// Upgrade Response
String header = response.getHeaders().get(HttpHeader.CONNECTION);
if (header == null || !header.toLowerCase(Locale.US).contains("upgrade"))
return new Result(result, new HttpResponseException("101 response without 'Connection: Upgrade'", response));

HttpRequest request = exchange.getRequest();
HttpConnectionUpgrader upgrader = (HttpConnectionUpgrader) request.getConversation().getAttribute(HttpConnectionUpgrader.class.getName());
if (upgrader != null)
{
try
{
upgrader.upgrade(response, getHttpConnection());
}
catch (Throwable x)
{
return new Result(result, x);
}
}
HttpUpgrader upgrader = (HttpUpgrader)request.getAttributes().get(HttpUpgrader.class.getName());
if (upgrader == null)
return new Result(result, new HttpResponseException("101 response without " + HttpUpgrader.class.getSimpleName(), response));

upgrader.upgrade(response, getHttpConnection().getEndPoint());
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,7 @@ public boolean messageComplete()
if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
return true;

if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) &&
status == HttpStatus.OK_200)
if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) && status == HttpStatus.OK_200)
return true;

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ void onHeaders(Stream stream, HeadersFrame frame)
LOG.debug("Successful HTTP2 tunnel on {} via {}", stream, endPoint);
((IStream)stream).setAttachment(endPoint);
httpRequest.getConversation().setAttribute(EndPoint.class.getName(), endPoint);

// TODO: can we make this similar to the proxy case where the after tunnel
// logic is done in the onHeaders listener, rather than here?
HttpUpgrader upgrader = (HttpUpgrader)httpRequest.getAttributes().get(HttpUpgrader.class.getName());
if (upgrader != null)
upgrader.upgrade(httpResponse, endPoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@

import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.ClientConnectionFactoryOverHTTP2;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
Expand Down Expand Up @@ -58,7 +62,10 @@ public void startServer() throws Exception
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerConnector(server, 1, 1, new HTTP2CServerConnectionFactory(new HttpConfiguration()));
HttpConfiguration httpConfiguration = new HttpConfiguration();
HttpConnectionFactory h1 = new HttpConnectionFactory(httpConfiguration);
HTTP2CServerConnectionFactory h2c = new HTTP2CServerConnectionFactory(httpConfiguration);
connector = new ServerConnector(server, 1, 1, h1, h2c);
server.addConnector(connector);

ServletContextHandler context = new ServletContextHandler(server, "/");
Expand All @@ -81,16 +88,26 @@ public void stopServer() throws Exception
server.stop();
}

@Test
public void testWebSocketOverDynamicHTTP1() throws Exception
{
testWebSocketOverDynamicTransport(clientConnector -> HttpClientConnectionFactory.HTTP11);
}

@Test
public void testWebSocketOverDynamicHTTP2() throws Exception
{
testWebSocketOverDynamicTransport(clientConnector -> new ClientConnectionFactoryOverHTTP2.H2C(new HTTP2Client(clientConnector)));
}

private void testWebSocketOverDynamicTransport(Function<ClientConnector, ClientConnectionFactory.Info> protocolFn) throws Exception
{
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
ClientConnectionFactoryOverHTTP2.H2C h2c = new ClientConnectionFactoryOverHTTP2.H2C(new HTTP2Client(clientConnector));
HttpClient httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector, h2c));
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
httpClient.setExecutor(clientThreads);
clientConnector.setExecutor(clientThreads);
HttpClient httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector, protocolFn.apply(clientConnector)));

WebSocketClient wsClient = new WebSocketClient(httpClient);
wsClient.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand All @@ -45,7 +44,6 @@
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
Expand Down Expand Up @@ -140,12 +138,10 @@ public void addExtensions(String... configs)

public List<ExtensionConfig> getExtensions()
{
List<ExtensionConfig> extensions = getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, true)
.stream()
.map(ExtensionConfig::parse)
.collect(Collectors.toList());

return extensions;
return getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, true)
.stream()
.map(ExtensionConfig::parse)
.collect(Collectors.toList());
}

public void setExtensions(List<ExtensionConfig> configs)
Expand All @@ -158,8 +154,7 @@ public void setExtensions(List<ExtensionConfig> configs)

public List<String> getSubProtocols()
{
List<String> subProtocols = getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL, true);
return subProtocols;
return getHeaders().getCSV(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL, true);
}

public void setSubProtocols(String... protocols)
Expand Down Expand Up @@ -278,13 +273,6 @@ protected WebSocketCoreSession newWebSocketCoreSession(FrameHandler handler, Neg

public abstract FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response);

private final String genRandomKey()
{
byte[] bytes = new byte[16];
ThreadLocalRandom.current().nextBytes(bytes);
return new String(B64Code.encode(bytes));
}

private void initWebSocketHeaders()
{
// TODO: verify why we need to call listeners here (too early).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public void prepare(HttpRequest request)
@Override
public void upgrade(HttpResponse response, EndPoint endPoint)
{
// TODO: do we need some extra check here?

clientUpgradeRequest.upgrade(response, endPoint);
}
}

0 comments on commit 1079bf8

Please sign in to comment.