Skip to content

Commit

Permalink
Issue #3537 - Bootstrapping WebSockets with HTTP/2.
Browse files Browse the repository at this point in the history
Implemented upgrade logic for WebSocket over HTTP/2.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 14, 2019
1 parent a700907 commit 01af85d
Show file tree
Hide file tree
Showing 35 changed files with 870 additions and 363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class HttpRequest implements Request
private List<RequestListener> requestListeners;
private BiFunction<Request, Request, Response.CompleteListener> pushListener;
private Supplier<HttpFields> trailers;
private String upgradeProtocol;

protected HttpRequest(HttpClient client, HttpConversation conversation, URI uri)
{
Expand Down Expand Up @@ -609,6 +610,12 @@ public HttpRequest trailers(Supplier<HttpFields> trailers)
return this;
}

public HttpRequest upgradeProtocol(String upgradeProtocol)
{
this.upgradeProtocol = upgradeProtocol;
return this;
}

@Override
public ContentProvider getContent()
{
Expand Down Expand Up @@ -765,6 +772,11 @@ public Supplier<HttpFields> getTrailers()
return trailers;
}

public String getUpgradeProtocol()
{
return upgradeProtocol;
}

@Override
public boolean abort(Throwable cause)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.client;

import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;

public interface HttpUpgrader
{
public void prepare(HttpRequest request);

public void upgrade(HttpResponse response, EndPoint endPoint);

public interface Factory
{
public HttpUpgrader newHttpUpgrader(HttpVersion version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
Expand Down Expand Up @@ -98,29 +99,24 @@ public Result exchangeTerminating(HttpExchange exchange, Result result)
return result;

HttpResponse response = exchange.getResponse();

if ((response.getVersion() == HttpVersion.HTTP_1_1) &&
(response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101))
if (response.getVersion() == HttpVersion.HTTP_1_1 && response.getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
String nextConnection = response.getHeaders().get(HttpHeader.CONNECTION);
if ((nextConnection == null) || !nextConnection.toLowerCase(Locale.US).contains("upgrade"))
{
return new Result(result, new HttpResponseException("101 Switching Protocols without Connection: Upgrade not supported", response));
}
String header = response.getHeaders().get(HttpHeader.CONNECTION);
if (header == null || !header.toLowerCase(Locale.US).contains("upgrade"))
return new Result(result, new HttpResponseException("101 response without 'Connection: Upgrade'", response));

// Upgrade Response
HttpRequest request = exchange.getRequest();
HttpConnectionUpgrader upgrader = (HttpConnectionUpgrader)request.getConversation().getAttribute(HttpConnectionUpgrader.class.getName());
if (upgrader != null)
HttpUpgrader upgrader = (HttpUpgrader)request.getConversation().getAttribute(HttpUpgrader.class.getName());
if (upgrader == null)
return new Result(result, new HttpResponseException("101 response without " + HttpUpgrader.class.getSimpleName(), response));

try
{
upgrader.upgrade(response, getHttpConnection().getEndPoint());
}
catch (Throwable x)
{
try
{
upgrader.upgrade(response, getHttpConnection());
}
catch (Throwable x)
{
return new Result(result, x);
}
return new Result(result, new HttpResponseException("Could not upgrade to WebSocket", response, x));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.IConnection;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
Expand Down Expand Up @@ -268,6 +271,12 @@ protected void normalizeRequest(Request request)
request.timeout(connectTimeout, TimeUnit.MILLISECONDS)
.idleTimeout(2 * connectTimeout, TimeUnit.MILLISECONDS);
}
if (request instanceof HttpUpgrader.Factory)
{
HttpUpgrader upgrader = ((HttpUpgrader.Factory)request).newHttpUpgrader(HttpVersion.HTTP_1_1);
((HttpRequest)request).getConversation().setAttribute(HttpUpgrader.class.getName(), upgrader);
upgrader.prepare((HttpRequest)request);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,7 @@ public boolean messageComplete()
if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
return true;

if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) &&
status == HttpStatus.OK_200)
if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) && status == HttpStatus.OK_200)
return true;

return false;
Expand Down
5 changes: 5 additions & 0 deletions jetty-http/src/main/java/org/eclipse/jetty/http/MetaData.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ public static class ConnectRequest extends Request
private String _protocol;

public ConnectRequest(HttpScheme scheme, HostPortHttpField authority, String path, HttpFields fields, String protocol)
{
this(scheme == null ? null : scheme.asString(), authority, path, fields, protocol);
}

public ConnectRequest(String scheme, HostPortHttpField authority, String path, HttpFields fields, String protocol)
{
super(HttpMethod.CONNECT.asString(), scheme, authority, path, HttpVersion.HTTP_2, fields, Long.MIN_VALUE);
_protocol = protocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,22 +175,22 @@ else if (value != null)
_contentLength = field.getLongValue();
_fields.add(field);
break;

case TE:
if ("trailers".equalsIgnoreCase(value))
_fields.add(field);
else
streamException("Unsupported TE value '%s'", value);
break;

case CONNECTION:
if ("TE".equalsIgnoreCase(value))
_fields.add(field);
else
streamException("Connection specific field '%s'", header);
break;
break;

default:
default:
if (name.charAt(0) == ':')
streamException("Unknown pseudo header '%s'", name);
else
Expand Down Expand Up @@ -236,7 +236,7 @@ public MetaData build() throws HpackException.StreamException
_streamException.addSuppressed(new Throwable());
throw _streamException;
}

if (_request && _response)
throw new HpackException.StreamException("Request and Response headers");

Expand Down Expand Up @@ -266,7 +266,7 @@ public MetaData build() throws HpackException.StreamException
throw new HpackException.StreamException("No Status");
return new MetaData.Response(HttpVersion.HTTP_2, _status, fields, _contentLength);
}

return new MetaData(HttpVersion.HTTP_2, fields, _contentLength);
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
Expand Down Expand Up @@ -88,6 +90,18 @@ public SendFailure send(HttpExchange exchange)
return send(channel, exchange);
}

@Override
protected void normalizeRequest(Request request)
{
super.normalizeRequest(request);
if (request instanceof HttpUpgrader.Factory)
{
HttpUpgrader upgrader = ((HttpUpgrader.Factory)request).newHttpUpgrader(HttpVersion.HTTP_2);
((HttpRequest)request).getConversation().setAttribute(HttpUpgrader.class.getName(), upgrader);
upgrader.prepare((HttpRequest)request);
}
}

protected HttpChannelOverHTTP2 acquireHttpChannel()
{
HttpChannelOverHTTP2 channel = idleChannels.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import java.util.function.BiFunction;

import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConversation;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.HttpUpgrader;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpField;
Expand Down Expand Up @@ -106,7 +108,11 @@ void onHeaders(Stream stream, HeadersFrame frame)
if (LOG.isDebugEnabled())
LOG.debug("Successful HTTP2 tunnel on {} via {}", stream, endPoint);
((IStream)stream).setAttachment(endPoint);
httpRequest.getConversation().setAttribute(EndPoint.class.getName(), endPoint);
HttpConversation conversation = httpRequest.getConversation();
conversation.setAttribute(EndPoint.class.getName(), endPoint);
HttpUpgrader upgrader = (HttpUpgrader)conversation.getAttribute(HttpUpgrader.class.getName());
if (upgrader != null)
upgrader.upgrade(httpResponse, endPoint);
}

if (responseHeaders(exchange))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ protected void sendHeaders(HttpExchange exchange, final HttpContent content, fin
MetaData.Request metaData;
if (isTunnel)
{
metaData = new MetaData.Request(request.getMethod(), null, new HostPortHttpField(request.getPath()), null, HttpVersion.HTTP_2, request.getHeaders());
String upgradeProtocol = request.getUpgradeProtocol();
if (upgradeProtocol == null)
{
metaData = new MetaData.ConnectRequest((String)null, new HostPortHttpField(request.getPath()), null, request.getHeaders(), null);
}
else
{
HostPortHttpField authority = new HostPortHttpField(request.getHost(), request.getPort());
metaData = new MetaData.ConnectRequest(request.getScheme(), authority, request.getPath(), request.getHeaders(), upgradeProtocol);
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ public void onCompleted()
Object attachment = stream.getAttachment();
if (attachment instanceof HttpChannelOverHTTP2)
{
// TODO: we used to "fake" a 101 response to upgrade the endpoint
// but we don't anymore, so this code should be deleted.
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)attachment;
if (channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ boolean onIdleTimeout(Throwable timeout)
return true;
}

@Override
protected void checkAndPrepareUpgrade()
{
// TODO: move the code from HttpConnection.upgrade() here?
}

/**
* <p>Attempts to perform a HTTP/1.1 upgrade.</p>
* <p>The upgrade looks up a {@link ConnectionFactory.Upgrading} from the connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.net.URI;

import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
Expand All @@ -43,11 +43,11 @@ public JavaxClientUpgradeRequest(JavaxWebSocketClientContainer clientContainer,
}

@Override
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
public void upgrade(HttpResponse response, EndPoint endPoint)
{
frameHandler.setUpgradeRequest(new DelegatedJavaxClientUpgradeRequest(this));
frameHandler.setUpgradeResponse(new DelegatedJavaxClientUpgradeResponse(response));
super.upgrade(response, httpConnection);
super.upgrade(response, endPoint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -53,7 +54,7 @@ public LocalFuzzer(Provider provider) throws Exception

public LocalFuzzer(Provider provider, CharSequence requestPath) throws Exception
{
this(provider, requestPath, UpgradeUtils.newDefaultUpgradeRequestHeaders());
this(provider, requestPath, new HashMap<>());
}

public LocalFuzzer(Provider provider, CharSequence requestPath, Map<String, String> headers) throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ public FrameHandler getFrameHandler()
}

@Override
protected void customize(EndPoint endp)
protected void customize(EndPoint endPoint)
{
frameCapture.setEndPoint(endp);
frameCapture.setEndPoint(endPoint);
futureCapture.complete(frameCapture);
}

Expand Down
Loading

0 comments on commit 01af85d

Please sign in to comment.