Skip to content

Commit

Permalink
Recover from REFUSED_STREAM errors in HTTP/2.
Browse files Browse the repository at this point in the history
This implements the following policy:

 - If a REFUSED_STREAM error is received, OkHttp will retry the same stream
   on the same socket 1x.
 - If any other error is received, or an additional REFUSED_STREAM error is
   received, OkHttp will retry on a different route if one exists.

We may want to follow up by going through HTTP/2 error codes and deciding
a per-code retry policy, but this should be good enough for now.

Closes: #2543
  • Loading branch information
swankjesse committed May 21, 2016
1 parent 0693302 commit 51326d2
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 78 deletions.
Expand Up @@ -38,6 +38,7 @@ public final class MockResponse implements Cloneable {
private TimeUnit throttlePeriodUnit = TimeUnit.SECONDS;

private SocketPolicy socketPolicy = SocketPolicy.KEEP_OPEN;
private int http2ErrorCode = -1;

private long bodyDelayAmount = 0;
private TimeUnit bodyDelayUnit = TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -205,6 +206,15 @@ public MockResponse setSocketPolicy(SocketPolicy socketPolicy) {
return this;
}

public int getHttp2ErrorCode() {
return http2ErrorCode;
}

public MockResponse setHttp2ErrorCode(int http2ErrorCode) {
this.http2ErrorCode = http2ErrorCode;
return this;
}

/**
* Throttles the request reader and response writer to sleep for the given period after each
* series of {@code bytesPerPeriod} bytes are transferred. Use this to simulate network behavior.
Expand Down
Expand Up @@ -18,6 +18,7 @@
package okhttp3.mockwebserver;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
Expand Down Expand Up @@ -88,6 +89,7 @@
import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_DURING_RESPONSE_BODY;
import static okhttp3.mockwebserver.SocketPolicy.FAIL_HANDSHAKE;
import static okhttp3.mockwebserver.SocketPolicy.NO_RESPONSE;
import static okhttp3.mockwebserver.SocketPolicy.RESET_STREAM_AT_START;
import static okhttp3.mockwebserver.SocketPolicy.SHUTDOWN_INPUT_AT_END;
import static okhttp3.mockwebserver.SocketPolicy.SHUTDOWN_OUTPUT_AT_END;
import static okhttp3.mockwebserver.SocketPolicy.UPGRADE_TO_SSL_AT_END;
Expand Down Expand Up @@ -562,8 +564,11 @@ private void processHandshakeFailure(Socket raw) throws Exception {

private void dispatchBookkeepingRequest(int sequenceNumber, Socket socket)
throws InterruptedException {
RecordedRequest request = new RecordedRequest(
null, null, null, -1, null, sequenceNumber, socket);
requestCount.incrementAndGet();
dispatcher.dispatch(new RecordedRequest(null, null, null, -1, null, sequenceNumber, socket));
requestQueue.add(request);
dispatcher.dispatch(request);
}

/** @param sequenceNumber the index of this request on this connection. */
Expand Down Expand Up @@ -843,7 +848,19 @@ private FramedSocketHandler(Socket socket, Protocol protocol) {
}

@Override public void onStream(FramedStream stream) throws IOException {
MockResponse peekedResponse = dispatcher.peek();
if (peekedResponse.getSocketPolicy() == RESET_STREAM_AT_START) {
try {
dispatchBookkeepingRequest(sequenceNumber.getAndIncrement(), socket);
stream.close(ErrorCode.fromHttp2(peekedResponse.getHttp2ErrorCode()));
return;
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}

RecordedRequest request = readRequest(stream);
requestCount.incrementAndGet();
requestQueue.add(request);
MockResponse response;
try {
Expand Down
Expand Up @@ -88,5 +88,11 @@ public enum SocketPolicy {
* Don't respond to the request but keep the socket open. For testing read response header timeout
* issue.
*/
NO_RESPONSE
NO_RESPONSE,

/**
* Fail HTTP/2 requests without processing them by sending an {@link
* MockResponse#getHttp2ErrorCode() HTTP/2 error code}.
*/
RESET_STREAM_AT_START
}
9 changes: 6 additions & 3 deletions okhttp-tests/src/test/java/okhttp3/URLConnectionTest.java
Expand Up @@ -631,9 +631,12 @@ private void doUpload(TransferKind uploadKind, WriteKind writeKind) throws Excep

assertContent("this response comes via SSL", connection);

RecordedRequest request = server.takeRequest();
assertEquals("GET /foo HTTP/1.1", request.getRequestLine());
assertEquals(TlsVersion.TLS_1_0, request.getTlsVersion());
RecordedRequest failHandshakeRequest = server.takeRequest();
assertNull(failHandshakeRequest.getRequestLine());

RecordedRequest fallbackRequest = server.takeRequest();
assertEquals("GET /foo HTTP/1.1", fallbackRequest.getRequestLine());
assertEquals(TlsVersion.TLS_1_0, fallbackRequest.getTlsVersion());
}

@Test public void connectViaHttpsWithSSLFallbackFailuresRecorded() throws Exception {
Expand Down
Expand Up @@ -34,17 +34,18 @@
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.RecordingCookieJar;
import okhttp3.RecordingHostnameVerifier;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.internal.DoubleInetAddressDns;
import okhttp3.internal.RecordingOkAuthenticator;
import okhttp3.internal.SslContextBuilder;
import okhttp3.internal.Util;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import okhttp3.mockwebserver.SocketPolicy;
import okhttp3.RecordingHostnameVerifier;
import okio.Buffer;
import okio.BufferedSink;
import okio.GzipSink;
Expand Down Expand Up @@ -582,6 +583,99 @@ protected HttpOverSpdyTest(Protocol protocol) {
assertEquals(0, server.takeRequest().getSequenceNumber());
}

@Test public void recoverFromOneRefusedStreamReusesConnection() throws Exception {
server.enqueue(new MockResponse()
.setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START)
.setHttp2ErrorCode(ErrorCode.REFUSED_STREAM.httpCode));
server.enqueue(new MockResponse()
.setBody("abc"));

Call call = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
Response response = call.execute();
assertEquals("abc", response.body().string());

assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
assertEquals(1, server.takeRequest().getSequenceNumber()); // Reused connection.
}

@Test public void recoverFromOneInternalErrorRequiresNewConnection() throws Exception {
server.enqueue(new MockResponse()
.setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START)
.setHttp2ErrorCode(ErrorCode.INTERNAL_ERROR.httpCode));
server.enqueue(new MockResponse()
.setBody("abc"));

client = client.newBuilder()
.dns(new DoubleInetAddressDns())
.build();

Call call = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
Response response = call.execute();
assertEquals("abc", response.body().string());

assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
}

@Test public void recoverFromMultipleRefusedStreamsRequiresNewConnection() throws Exception {
server.enqueue(new MockResponse()
.setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START)
.setHttp2ErrorCode(ErrorCode.REFUSED_STREAM.httpCode));
server.enqueue(new MockResponse()
.setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START)
.setHttp2ErrorCode(ErrorCode.REFUSED_STREAM.httpCode));
server.enqueue(new MockResponse()
.setBody("abc"));

client = client.newBuilder()
.dns(new DoubleInetAddressDns())
.build();

Call call = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
Response response = call.execute();
assertEquals("abc", response.body().string());

assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
assertEquals(1, server.takeRequest().getSequenceNumber()); // Reused connection.
assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
}

@Test public void noRecoveryFromRefusedStreamWithRetryDisabled() throws Exception {
noRecoveryFromErrorWithRetryDisabled(ErrorCode.REFUSED_STREAM);
}

@Test public void noRecoveryFromInternalErrorWithRetryDisabled() throws Exception {
noRecoveryFromErrorWithRetryDisabled(ErrorCode.INTERNAL_ERROR);
}

private void noRecoveryFromErrorWithRetryDisabled(ErrorCode errorCode) throws Exception {
server.enqueue(new MockResponse()
.setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START)
.setHttp2ErrorCode(errorCode.httpCode));
server.enqueue(new MockResponse()
.setBody("abc"));

client = client.newBuilder()
.retryOnConnectionFailure(false)
.build();

Call call = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
try {
call.execute();
fail();
} catch (StreamResetException expected) {
assertEquals(errorCode, expected.errorCode);
}
}

public Buffer gzip(String bytes) throws IOException {
Buffer bytesOut = new Buffer();
BufferedSink sink = Okio.buffer(new GzipSink(bytesOut));
Expand Down
Expand Up @@ -142,7 +142,7 @@ public synchronized List<Header> getResponseHeaders() throws IOException {
readTimeout.exitAndThrowIfTimedOut();
}
if (responseHeaders != null) return responseHeaders;
throw new IOException("stream was reset: " + errorCode);
throw new StreamResetException(errorCode);
}

/**
Expand Down Expand Up @@ -438,7 +438,7 @@ private void checkNotClosed() throws IOException {
throw new IOException("stream closed");
}
if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
throw new StreamResetException(errorCode);
}
}
}
Expand Down Expand Up @@ -571,7 +571,7 @@ private void checkOutNotClosed() throws IOException {
} else if (sink.finished) {
throw new IOException("stream finished");
} else if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
throw new StreamResetException(errorCode);
}
}

Expand Down
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.framed;

import java.io.IOException;

/** Thrown when an HTTP/2 stream is canceled without damage to the socket that carries it. */
public final class StreamResetException extends IOException {
public final ErrorCode errorCode;

public StreamResetException(ErrorCode errorCode) {
super("stream was reset: " + errorCode);
this.errorCode = errorCode;
}
}
57 changes: 52 additions & 5 deletions okhttp/src/main/java/okhttp3/internal/http/HttpEngine.java
Expand Up @@ -17,11 +17,16 @@
package okhttp3.internal.http;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ProtocolException;
import java.net.Proxy;
import java.net.SocketTimeoutException;
import java.security.cert.CertificateException;
import java.util.Date;
import java.util.List;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSocketFactory;
import okhttp3.Address;
import okhttp3.CertificatePinner;
Expand Down Expand Up @@ -352,12 +357,22 @@ public Connection getConnection() {
* permanent. Requests with a body can only be recovered if the body is buffered.
*/
public HttpEngine recover(IOException e, boolean routeException, Sink requestBodyOut) {
if (!streamAllocation.recover(e, routeException, requestBodyOut)) {
return null;
}
streamAllocation.streamFailed(e);

if (!client.retryOnConnectionFailure()) {
return null;
return null; // The application layer has forbidden retries.
}

if (requestBodyOut != null && !(requestBodyOut instanceof RetryableSink)) {
return null; // The body on this request cannot be retried.
}

if (!isRecoverable(e, routeException)) {
return null; // This exception is fatal.
}

if (!streamAllocation.hasMoreRoutes()) {
return null; // No more routes to attempt.
}

StreamAllocation streamAllocation = close();
Expand All @@ -371,6 +386,38 @@ public HttpEngine recover(IOException e, boolean routeException) {
return recover(e, routeException, requestBodyOut);
}

private boolean isRecoverable(IOException e, boolean routeException) {
// If there was a protocol problem, don't recover.
if (e instanceof ProtocolException) {
return false;
}

// If there was an interruption don't recover, but if there was a timeout connecting to a route
// we should try the next route (if there is one).
if (e instanceof InterruptedIOException) {
return e instanceof SocketTimeoutException && routeException;
}

// Look for known client-side or negotiation errors that are unlikely to be fixed by trying
// again with a different route.
if (e instanceof SSLHandshakeException) {
// If the problem was a CertificateException from the X509TrustManager,
// do not retry.
if (e.getCause() instanceof CertificateException) {
return false;
}
}
if (e instanceof SSLPeerUnverifiedException) {
// e.g. a certificate pinning error.
return false;
}

// An example of one we might want to retry with a different route is a problem connecting to a
// proxy and would manifest as a standard IOException. Unless it is one we know we should not
// retry, we return true and try a new route.
return true;
}

private void maybeCache() throws IOException {
InternalCache responseCache = Internal.instance.internalCache(client);
if (responseCache == null) return;
Expand Down Expand Up @@ -428,7 +475,7 @@ public StreamAllocation close() {
closeQuietly(userResponse.body());
} else {
// If this engine never achieved a response body, its stream allocation is dead.
streamAllocation.connectionFailed(null);
streamAllocation.streamFailed(null);
}

return streamAllocation;
Expand Down

0 comments on commit 51326d2

Please sign in to comment.