Skip to content

Commit

Permalink
Issue #4382 - Support HTTP/1 upgrade to HTTP/2 in HttpClient.
Browse files Browse the repository at this point in the history
* Moved implementation of UpgradeTo from HTTP2ServerConnection
  to HTTP2Connection, since now also the client connection
  can be upgraded to.
* Split HTTP2Session.newStream(), since now the client must
  be able to create the implicit stream 1 corresponding to
  the HTTP/1.1 upgrade request, so that the HTTP/2 response
  after the upgrade finds the stream.
* The HTTP/1.1 upgrade mechanism has been generalized.
  Before it was based on HttpConnectionUpgrader and a hook
  in HttpChannelOverHTTP.exchangeTerminating().
  Now it is based on UpgradeProtocolHandler that when sees
  a 101 response it will trigger the upgrade mechanism.
* Introduced ConnectionPool.accept(Connection) to transfer
  a connection from the HTTP/1.1 connection pool to the
  HTTP/2 connection pool after the upgrade.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Mar 13, 2020
1 parent b1d30fc commit 466517a
Show file tree
Hide file tree
Showing 33 changed files with 751 additions and 205 deletions.
Expand Up @@ -146,6 +146,19 @@ public void failed(Throwable x)
}
}

@Override
public boolean accept(Connection connection)
{
while (true)
{
int count = connections.getLo();
if (count >= maxConnections)
return false;
if (connections.compareAndSetLo(count, count + 1))
return true;
}
}

protected abstract void onCreated(Connection connection);

protected void proceed()
Expand Down
Expand Up @@ -52,6 +52,14 @@ public interface ConnectionPool extends Closeable
*/
Connection acquire();

/**
* <p>Accepts the given connection to be managed by this ConnectionPool.</p>
*
* @param connection the connection to accept
* @return whether the connection has been accepted
*/
boolean accept(Connection connection);

/**
* <p>Returns the given connection, previously obtained via {@link #acquire()},
* back to this ConnectionPool.</p>
Expand Down
Expand Up @@ -216,6 +216,7 @@ protected void doStart() throws Exception
handlers.put(new RedirectProtocolHandler(this));
handlers.put(new WWWAuthenticationProtocolHandler(this));
handlers.put(new ProxyAuthenticationProtocolHandler(this));
handlers.put(new UpgradeProtocolHandler());

decoderFactories.add(new GZIPContentDecoder.Factory(byteBufferPool));

Expand Down Expand Up @@ -523,7 +524,7 @@ public Origin createOrigin(HttpRequest request, Origin.Protocol protocol)
return new Origin(scheme, host, port, request.getTag(), protocol);
}

HttpDestination resolveDestination(Origin origin)
public HttpDestination resolveDestination(Origin origin)
{
return destinations.computeIfAbsent(origin, o ->
{
Expand Down
Expand Up @@ -80,6 +80,11 @@ public HttpDestination(HttpClient client, Origin origin)

this.timeout = new TimeoutTask(client.getScheduler());

String host = HostPort.normalizeHost(getHost());
if (!client.isDefaultPort(getScheme(), getPort()))
host += ":" + getPort();
hostField = new HttpField(HttpHeader.HOST, host);

ProxyConfiguration proxyConfig = client.getProxyConfiguration();
proxy = proxyConfig.match(origin);
ClientConnectionFactory connectionFactory = client.getTransport();
Expand All @@ -98,11 +103,11 @@ public HttpDestination(HttpClient client, Origin origin)
if (tag instanceof ClientConnectionFactory.Decorator)
connectionFactory = ((ClientConnectionFactory.Decorator)tag).apply(connectionFactory);
this.connectionFactory = connectionFactory;
}

String host = HostPort.normalizeHost(getHost());
if (!client.isDefaultPort(getScheme(), getPort()))
host += ":" + getPort();
hostField = new HttpField(HttpHeader.HOST, host);
public void accept(Connection connection)
{
connectionPool.accept(connection);
}

@Override
Expand Down Expand Up @@ -497,7 +502,7 @@ public String toString()
{
return String.format("%s[%s]@%x%s,queue=%d,pool=%s",
HttpDestination.class.getSimpleName(),
asString(),
getOrigin(),
hashCode(),
proxy == null ? "" : "(via " + proxy + ")",
exchanges.size(),
Expand Down
Expand Up @@ -50,6 +50,11 @@ public HttpExchange(HttpDestination destination, HttpRequest request, List<Respo
conversation.updateResponseListeners(null);
}

public HttpDestination getHttpDestination()
{
return destination;
}

public HttpConversation getConversation()
{
return request.getConversation();
Expand Down
Expand Up @@ -59,7 +59,7 @@
*/
public abstract class HttpSender implements AsyncContentProvider.Listener
{
protected static final Logger LOG = Log.getLogger(HttpSender.class);
private static final Logger LOG = Log.getLogger(HttpSender.class);

private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
Expand Down
Expand Up @@ -18,15 +18,48 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;

/**
* <p>HttpUpgrader prepares a HTTP request to upgrade from one protocol to another,
* and implements the upgrade mechanism.</p>
* <p>The upgrade mechanism can be the
* <a href="https://tools.ietf.org/html/rfc7230#section-6.7">HTTP/1.1 upgrade mechanism</a>
* or the
* <a href="https://tools.ietf.org/html/rfc8441#section-4">HTTP/2 extended CONNECT mechanism</a>.</p>
* <p>Given the differences among mechanism implementations, a request needs to be
* prepared before being sent to comply with the mechanism requirements (for example,
* add required headers, etc.).</p>
*/
public interface HttpUpgrader
{
/**
* <p>Prepares the request for the upgrade, for example by setting the HTTP method
* or by setting HTTP headers required for the upgrade.</p>
*
* @param request the request to prepare
*/
public void prepare(HttpRequest request);

public void upgrade(HttpResponse response, EndPoint endPoint);
/**
* <p>Upgrades the given {@code endPoint} to a different protocol.</p>
* <p>The success or failure of the upgrade should be communicated via the given {@code callback}.</p>
* <p>An exception thrown by this method is equivalent to failing the callback.</p>
*
* @param response the response with the information about the upgrade
* @param endPoint the EndPoint to upgrade
* @param callback a callback to notify of the success or failure of the upgrade
*/
public void upgrade(HttpResponse response, EndPoint endPoint, Callback callback);

/**
* <p>A factory for {@link HttpUpgrader}s.</p>
* <p>A {@link Request} subclass should implement this interface
* if it wants to create a specific HttpUpgrader.</p>
*/
public interface Factory
{
public HttpUpgrader newHttpUpgrader(HttpVersion version);
Expand Down
Expand Up @@ -85,6 +85,23 @@ public void setMaxMultiplex(int maxMultiplex)
}
}

@Override
public boolean accept(Connection connection)
{
boolean accepted = super.accept(connection);
if (accepted)
{
synchronized (this)
{
Holder holder = new Holder(connection);
activeConnections.put(connection, holder);
++holder.count;
}
active(connection);
}
return accepted;
}

@Override
public boolean isActive(Connection connection)
{
Expand Down
@@ -0,0 +1,110 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.client;

import java.util.List;

import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;

/**
* <p>A protocol handler that handles HTTP 101 responses.</p>
*/
public class UpgradeProtocolHandler implements ProtocolHandler
{
private final List<String> protocols = List.of("websocket", "h2c");

@Override
public String getName()
{
return "upgrade";
}

@Override
public boolean accept(Request request, Response response)
{
boolean upgraded = HttpStatus.SWITCHING_PROTOCOLS_101 == response.getStatus();
boolean accepted = false;
if (upgraded)
accepted = acceptHeaders(request, response);
return upgraded && accepted;
}

protected boolean acceptHeaders(Request request, Response response)
{
HttpField responseUpgrade = response.getHeaders().getField(HttpHeader.UPGRADE);
if (responseUpgrade != null && protocols.stream().anyMatch(responseUpgrade::contains))
return true;
// The response may not contain the Upgrade header, so check the request.
HttpField requestUpgrade = request.getHeaders().getField(HttpHeader.UPGRADE);
return requestUpgrade != null && protocols.stream().anyMatch(requestUpgrade::contains);
}

@Override
public Response.Listener getResponseListener()
{
return new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
HttpResponse response = (HttpResponse)result.getResponse();
HttpRequest request = (HttpRequest)response.getRequest();
if (result.isSucceeded())
{
try
{
HttpConversation conversation = request.getConversation();
HttpUpgrader upgrader = (HttpUpgrader)conversation.getAttribute(HttpUpgrader.class.getName());
if (upgrader == null)
throw new HttpResponseException("101 response without " + HttpUpgrader.class.getSimpleName(), response);
EndPoint endPoint = (EndPoint)conversation.getAttribute(EndPoint.class.getName());
if (endPoint == null)
throw new HttpResponseException("Upgrade without " + EndPoint.class.getSimpleName(), response);
upgrader.upgrade(response, endPoint, Callback.from(Callback.NOOP::succeeded, x -> forwardFailureComplete(request, null, response, x)));
}
catch (Throwable x)
{
forwardFailureComplete(request, null, response, x);
}
}
else
{
forwardFailureComplete(request, result.getRequestFailure(), response, result.getResponseFailure());
}
}
};
}

private void forwardFailureComplete(HttpRequest request, Throwable requestFailure, Response response, Throwable responseFailure)
{
HttpConversation conversation = request.getConversation();
conversation.updateResponseListeners(null);
List<Response.ResponseListener> responseListeners = conversation.getResponseListeners();
ResponseNotifier notifier = new ResponseNotifier();
notifier.forwardFailure(responseListeners, response, responseFailure);
notifier.notifyComplete(responseListeners, new Result(request, requestFailure, response, responseFailure));
}
}
Expand Up @@ -109,10 +109,7 @@ public HttpClientTransportDynamic(ClientConnector connector, ClientConnectionFac
.distinct()
.map(p -> p.toLowerCase(Locale.ENGLISH))
.collect(Collectors.toList());
for (ClientConnectionFactory.Info factoryInfo : factoryInfos)
{
addBean(factoryInfo);
}
Arrays.stream(factoryInfos).forEach(this::addBean);
setConnectionPoolFactory(destination ->
new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1));
}
Expand All @@ -133,13 +130,22 @@ public Origin newOrigin(HttpRequest request)
}
else
{
// Preserve the order of protocols chosen by the application.
// We need to keep multiple protocols in case the protocol
// is negotiated: e.g. [http/1.1, h2] negotiates [h2], but
// here we don't know yet what will be negotiated.
protocols = this.protocols.stream()
.filter(p -> p.equals(http1) || p.equals(http2))
.collect(Collectors.toList());
if (ssl)
{
// There may be protocol negotiation, so preserve the order
// of protocols chosen by the application.
// We need to keep multiple protocols in case the protocol
// is negotiated: e.g. [http/1.1, h2] negotiates [h2], but
// here we don't know yet what will be negotiated.
protocols = this.protocols.stream()
.filter(p -> p.equals(http1) || p.equals(http2))
.collect(Collectors.toList());
}
else
{
// Pick the first.
protocols = List.of(this.protocols.get(0));
}
}
Origin.Protocol protocol = null;
if (!protocols.isEmpty())
Expand Down Expand Up @@ -179,6 +185,15 @@ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<Stri
return factoryInfo.getClientConnectionFactory().newConnection(endPoint, context);
}

public void upgrade(EndPoint endPoint, Map<String, Object> context)
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
Origin.Protocol protocol = destination.getOrigin().getProtocol();
Info info = findClientConnectionFactoryInfo(protocol.getProtocols())
.orElseThrow(() -> new IllegalStateException("Cannot find " + ClientConnectionFactory.class.getSimpleName() + " to upgrade to " + protocol));
info.upgrade(endPoint, context);
}

protected Connection newNegotiatedConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
try
Expand Down

0 comments on commit 466517a

Please sign in to comment.