Skip to content

Commit

Permalink
Improve support for HTTP/2 clear via upgrade
Browse files Browse the repository at this point in the history
The reverse proxy can now use h2c via upgrade opertunistically, falling
back to HTTP if the upgrade is not availble.
  • Loading branch information
stuartwdouglas committed Dec 3, 2015
1 parent cef8430 commit 103b4bf
Show file tree
Hide file tree
Showing 20 changed files with 238 additions and 20 deletions.
26 changes: 26 additions & 0 deletions core/pom.xml
Expand Up @@ -367,6 +367,32 @@
<reportsDirectory>${project.build.directory}/surefire-h2c-reports</reportsDirectory>
</configuration>
</execution>

<execution>
<id>proxy-h2c-upgrade</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<enableAssertions>true</enableAssertions>
<runOrder>reversealphabetical</runOrder>
<systemPropertyVariables>
<test.h2c-upgrade>true</test.h2c-upgrade>
<test.dump>${dump}</test.dump>
<test.bufferSize>${bufferSize}</test.bufferSize>
<default.server.address>localhost</default.server.address>
<default.server.port>7777</default.server.port>
<java.util.logging.manager>org.jboss.logmanager.LogManager
</java.util.logging.manager>
<test.level>${test.level}</test.level>
<java.net.preferIPv6Addresses>${test.ipv6}</java.net.preferIPv6Addresses>
<alpn-boot-string>${alpn-boot-string}</alpn-boot-string>
<sun.net.useExclusiveBind>false</sun.net.useExclusiveBind>
</systemPropertyVariables>
<reportsDirectory>${project.build.directory}/surefire-h2c-upgrade-reports</reportsDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/undertow/UndertowMessages.java
Expand Up @@ -425,4 +425,7 @@ public interface UndertowMessages {

@Message(id = 132, value = "HPACK decode failed")
HpackException hpackFailed();

@Message(id = 133, value = "Request did not contain an Upgrade header, upgrade is not permitted")
IllegalStateException notAnUpgradeRequest();
}
2 changes: 2 additions & 0 deletions core/src/main/java/io/undertow/client/ClientConnection.java
Expand Up @@ -111,4 +111,6 @@ public interface ClientConnection extends Channel {
* @return the statistics information, or <code>null</code> if statistics are not supported or disabled
*/
ClientStatistics getStatistics();

boolean isUpgradeSupported();
}
Expand Up @@ -194,6 +194,11 @@ public ClientStatistics getStatistics() {
return clientStatistics;
}

@Override
public boolean isUpgradeSupported() {
return false;
}

@Override
public void sendRequest(final ClientRequest request, final ClientCallback<ClientExchange> clientCallback) {
if (anyAreSet(state, UPGRADE_REQUESTED | UPGRADED | CLOSE_REQ | CLOSED)) {
Expand Down
Expand Up @@ -27,13 +27,16 @@
import io.undertow.client.ClientResponse;
import io.undertow.client.ClientStatistics;
import io.undertow.client.UndertowClientMessages;
import io.undertow.client.http2.Http2ClearClientProvider;
import io.undertow.client.http2.Http2ClientConnection;
import io.undertow.conduits.ByteActivityCallback;
import io.undertow.conduits.BytesReceivedStreamSourceConduit;
import io.undertow.conduits.BytesSentStreamSinkConduit;
import io.undertow.conduits.ChunkedStreamSinkConduit;
import io.undertow.conduits.ChunkedStreamSourceConduit;
import io.undertow.conduits.ConduitListener;
import io.undertow.conduits.FixedLengthStreamSourceConduit;
import io.undertow.protocols.http2.Http2Channel;
import io.undertow.server.Connectors;
import io.undertow.server.protocol.http.HttpContinue;
import io.undertow.util.AbstractAttachable;
Expand Down Expand Up @@ -122,6 +125,13 @@ public void handleEvent(StreamSourceConduit channel) {
private final ClientStatistics clientStatistics;
private int requestCount;
private int read, written;
private boolean http2Tried = false;
private boolean http2UpgradeReceived = false;

/**
* The actual connection if this has been upgraded to h2c
*/
private ClientConnection http2Delegate;

HttpClientConnection(final StreamConnection connection, final OptionMap options, final ByteBufferPool bufferPool) {

Expand Down Expand Up @@ -211,53 +221,97 @@ public XnioIoThread getIoThread() {

@Override
public boolean isOpen() {
if(http2Delegate != null) {
return http2Delegate.isOpen();
}
return connection.isOpen() && allAreClear(state, CLOSE_REQ | CLOSED);
}

@Override
public boolean supportsOption(Option<?> option) {
if(http2Delegate != null) {
return http2Delegate.supportsOption(option);
}
return connection.supportsOption(option);
}


@Override
public <T> T getOption(Option<T> option) throws IOException {
if(http2Delegate != null) {
return http2Delegate.getOption(option);
}
return connection.getOption(option);
}

@Override
public <T> T setOption(Option<T> option, T value) throws IllegalArgumentException, IOException {
if(http2Delegate != null) {
return http2Delegate.setOption(option, value);
}
return connection.setOption(option, value);
}

@Override
public boolean isUpgraded() {
if(http2Delegate != null) {
return http2Delegate.isUpgraded();
}
return anyAreSet(state, UPGRADE_REQUESTED | UPGRADED);
}

@Override
public boolean isPushSupported() {
if(http2Delegate != null) {
return http2Delegate.isPushSupported();
}
return false;
}

@Override
public boolean isMultiplexingSupported() {
if(http2Delegate != null) {
return http2Delegate.isMultiplexingSupported();
}
return false;
}

@Override
public ClientStatistics getStatistics() {
if(http2Delegate != null) {
return http2Delegate.getStatistics();
}
return clientStatistics;
}

@Override
public boolean isUpgradeSupported() {
if(http2Delegate != null) {
return false;
}
return true;
}

@Override
public void sendRequest(final ClientRequest request, final ClientCallback<ClientExchange> clientCallback) {
if(http2Delegate != null) {
http2Delegate.sendRequest(request, clientCallback);
return;
}
count++;
if (anyAreSet(state, UPGRADE_REQUESTED | UPGRADED | CLOSE_REQ | CLOSED)) {
clientCallback.failed(UndertowClientMessages.MESSAGES.invalidConnectionState());
return;
}
final HttpClientExchange httpClientExchange = new HttpClientExchange(clientCallback, request, this);
if(!http2Tried && options.get(UndertowOptions.ENABLE_HTTP2, false) && !request.getRequestHeaders().contains(Headers.UPGRADE) && request.getMethod().equals(Methods.GET)) {
//this is the first request, as we want to try a HTTP2 upgrade
request.getRequestHeaders().put(new HttpString("HTTP2-Settings"), Http2ClearClientProvider.createSettingsFrame(options, bufferPool));
request.getRequestHeaders().put(Headers.UPGRADE, Http2Channel.CLEARTEXT_UPGRADE_STRING);
request.getRequestHeaders().put(Headers.CONNECTION, "Upgrade, HTTP2-Settings");
http2Tried = true;
}

if (currentRequest == null) {
initiateRequest(httpClientExchange);
} else {
Expand Down Expand Up @@ -352,7 +406,6 @@ private void handleError(IOException exception) {
}

public StreamConnection performUpgrade() throws IOException {

// Upgrade the connection
// Set the upgraded flag already to prevent new requests after this one
if (allAreSet(state, UPGRADED | CLOSE_REQ | CLOSED)) {
Expand All @@ -365,6 +418,9 @@ public StreamConnection performUpgrade() throws IOException {
}

public void close() throws IOException {
if(http2Delegate != null) {
http2Delegate.close();
}
if (anyAreSet(state, CLOSED)) {
return;
}
Expand All @@ -375,7 +431,7 @@ public void close() throws IOException {
/**
* Notification that the current request is finished
*/
public void requestDone() {
public void exchangeDone() {

connection.getSinkChannel().setConduit(originalSinkConduit);
connection.getSourceChannel().setConduit(pushBackStreamSourceConduit);
Expand Down Expand Up @@ -404,6 +460,12 @@ public void requestDone() {
}
}

public void requestDataSent() {
if(http2UpgradeReceived) {
doHttp2Upgrade();
}
}

class ClientReadListener implements ChannelListener<StreamSourceChannel> {

public void handleEvent(StreamSourceChannel channel) {
Expand Down Expand Up @@ -494,8 +556,14 @@ public void handleEvent(StreamSourceChannel channel) {
HttpClientConnection.this.state |= CLOSE_REQ;
}
}
if(response.getResponseCode() == StatusCodes.SWITCHING_PROTOCOLS && Http2Channel.CLEARTEXT_UPGRADE_STRING.equals(response.getResponseHeaders().getFirst(Headers.UPGRADE))) {
//http2 upgrade

if (builder.getStatusCode() == StatusCodes.CONTINUE) {
http2UpgradeReceived = true;
if(currentRequest.isRequestDataSent()) {
doHttp2Upgrade();
}
} else if (builder.getStatusCode() == StatusCodes.CONTINUE) {
pendingResponse = new HttpResponseBuilder();
currentRequest.setContinueResponse(response);
} else {
Expand Down Expand Up @@ -536,6 +604,24 @@ public void handleEvent(StreamSourceChannel channel) {
}
}

protected void doHttp2Upgrade() {
try {
Http2Channel http2Channel = new Http2Channel(this.performUpgrade(), null, bufferPool, null, true, true, options);
Http2ClientConnection http2ClientConnection = new Http2ClientConnection(http2Channel, currentRequest.getResponseCallback(), currentRequest.getRequest(), currentRequest.getRequest().getRequestHeaders().getFirst(Headers.HOST), clientStatistics);
http2ClientConnection.getCloseSetter().set(new ChannelListener<ClientConnection>() {
@Override
public void handleEvent(ClientConnection channel) {
ChannelListeners.invokeChannelListener(HttpClientConnection.this, HttpClientConnection.this.closeSetter.get());
}
});
http2Delegate = http2ClientConnection;
currentRequest = null;
} catch (IOException e) {
UndertowLogger.REQUEST_IO_LOGGER.ioException(e);
safeClose(this);
}
}

private void prepareResponseChannel(ClientResponse response, ClientExchange exchange) {
String encoding = response.getResponseHeaders().getLast(TRANSFER_ENCODING);
boolean chunked = encoding != null && Headers.CHUNKED.equals(new HttpString(encoding));
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/java/io/undertow/client/http/HttpClientExchange.java
Expand Up @@ -77,18 +77,23 @@ void terminateRequest() {
return;
}
state |= REQUEST_TERMINATED;
clientConnection.requestDataSent();
if (anyAreSet(state, RESPONSE_TERMINATED)) {
clientConnection.requestDone();
clientConnection.exchangeDone();
}
}

boolean isRequestDataSent() {
return anyAreSet(state, REQUEST_TERMINATED);
}

void terminateResponse() {
if(anyAreSet(state, RESPONSE_TERMINATED)) {
return;
}
state |= RESPONSE_TERMINATED;
if (anyAreSet(state, REQUEST_TERMINATED)) {
clientConnection.requestDone();
clientConnection.exchangeDone();
}
}

Expand Down Expand Up @@ -185,6 +190,10 @@ public ClientConnection getConnection() {
return clientConnection;
}

ClientCallback<ClientExchange> getResponseCallback() {
return responseCallback;
}

void invokeReadReadyCallback() {
if(readyCallback != null) {
readyCallback.completed(this);
Expand Down
Expand Up @@ -55,7 +55,10 @@
import io.undertow.util.Headers;

/**
* HTTP2 client provider that uses HTTP upgrade rather than ALPN
* HTTP2 client provider that uses HTTP upgrade rather than ALPN. This provider will only use h2c, and sends an initial
* dummy request to do the initial upgrade.
*
*
*
* @author Stuart Douglas
*/
Expand Down Expand Up @@ -135,12 +138,12 @@ private Map<String, String> createHeaders(OptionMap options, ByteBufferPool buff
headers.put(Headers.UPGRADE_STRING, Http2Channel.CLEARTEXT_UPGRADE_STRING);
headers.put(Headers.CONNECTION_STRING, "Upgrade, HTTP2-Settings");
headers.put(Headers.HOST_STRING, uri.getHost());
headers.put("X-HTTP2-connect-only", "connect"); //undertow specific header that tells the remote server that this request should
headers.put("X-HTTP2-connect-only", "connect"); //undertow specific header that tells the remote server that this request should be ignored
return headers;
}


private String createSettingsFrame(OptionMap options, ByteBufferPool bufferPool) {
public static String createSettingsFrame(OptionMap options, ByteBufferPool bufferPool) {
PooledByteBuffer b = bufferPool.allocate();
try {
ByteBuffer currentBuffer = b.getBuffer();
Expand Down
Expand Up @@ -95,6 +95,26 @@ public void handleEvent(Http2Channel channel) {
this.initialUpgradeRequest = initialUpgradeRequest;
}

public Http2ClientConnection(Http2Channel http2Channel, ClientCallback<ClientExchange> upgradeReadyCallback, ClientRequest clientRequest, String defaultHost, ClientStatistics clientStatistics) {

this.http2Channel = http2Channel;
this.defaultHost = defaultHost;
this.clientStatistics = clientStatistics;
http2Channel.getReceiveSetter().set(new Http2ReceiveListener());
http2Channel.resumeReceives();
http2Channel.addCloseTask(new ChannelListener<Http2Channel>() {
@Override
public void handleEvent(Http2Channel channel) {
ChannelListeners.invokeChannelListener(Http2ClientConnection.this, closeSetter.get());
}
});
this.initialUpgradeRequest = false;

Http2ClientExchange exchange = new Http2ClientExchange(this, null, clientRequest);
exchange.setResponseListener(upgradeReadyCallback);
currentExchanges.put(1, exchange);
}

@Override
public void sendRequest(ClientRequest request, ClientCallback<ClientExchange> clientCallback) {
request.getRequestHeaders().put(PATH, request.getPath());
Expand Down Expand Up @@ -308,6 +328,11 @@ public ClientStatistics getStatistics() {
return clientStatistics;
}

@Override
public boolean isUpgradeSupported() {
return false;
}

private class Http2ReceiveListener implements ChannelListener<Http2Channel> {

@Override
Expand Down
Expand Up @@ -266,6 +266,11 @@ public ClientStatistics getStatistics() {
return clientStatistics;
}

@Override
public boolean isUpgradeSupported() {
return false;
}

private class SpdyReceiveListener implements ChannelListener<SpdyChannel> {

@Override
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/undertow/server/HttpServerExchange.java
Expand Up @@ -853,6 +853,9 @@ public HttpServerExchange upgradeChannel(final HttpUpgradeListener listener) {
if (!connection.isUpgradeSupported()) {
throw UndertowMessages.MESSAGES.upgradeNotSupported();
}
if(!getRequestHeaders().contains(Headers.UPGRADE)) {
throw UndertowMessages.MESSAGES.notAnUpgradeRequest();
}
connection.setUpgradeListener(listener);
setStatusCode(StatusCodes.SWITCHING_PROTOCOLS);
getResponseHeaders().put(Headers.CONNECTION, Headers.UPGRADE_STRING);
Expand Down

0 comments on commit 103b4bf

Please sign in to comment.