Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* A scriptable request/response conversation. Create the script by calling methods like {@link
Expand All @@ -50,6 +51,17 @@ public MockDuplexResponseBody exhaustRequest() {
return this;
}

public MockDuplexResponseBody requestIOException() {
actions.add((request, requestBody, responseBody) -> {
try {
requestBody.exhausted();
fail();
} catch (IOException expected) {
}
});
return this;
}

public MockDuplexResponseBody sendResponse(String s) {
actions.add((request, requestBody, responseBody) -> {
responseBody.writeUtf8(s);
Expand Down
21 changes: 15 additions & 6 deletions okhttp-tests/src/test/java/okhttp3/ConnectionReuseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
*/
package okhttp3;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import okhttp3.internal.Util;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.SocketPolicy;
Expand All @@ -30,7 +33,6 @@
import static okhttp3.TestUtil.defaultClient;
import static okhttp3.tls.internal.TlsUtil.localhost;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public final class ConnectionReuseTest {
Expand Down Expand Up @@ -300,12 +302,15 @@ public final class ConnectionReuseTest {
* https://github.com/square/okhttp/issues/2409
*/
@Test public void connectionsAreNotReusedIfNetworkInterceptorInterferes() throws Exception {
List<Response> responsesNotClosed = new ArrayList<>();

client = client.newBuilder()
// Since this test knowingly leaks a connection, avoid using the default shared connection
// pool, which should remain clean for subsequent tests.
.connectionPool(new ConnectionPool())
.addNetworkInterceptor(chain -> {
Response response = chain.proceed(chain.request());
responsesNotClosed.add(response);
return response
.newBuilder()
.body(ResponseBody.create(null, "unrelated response body!"))
Expand All @@ -324,11 +329,15 @@ public final class ConnectionReuseTest {
.url(server.url("/"))
.build();
Call call = client.newCall(request);
try {
call.execute();
fail();
} catch (IllegalStateException expected) {
assertTrue(expected.getMessage().startsWith("Closing the body of"));
try (Response response = call.execute()) {
assertEquals("unrelated response body!", response.body().string());
}

assertEquals(0, server.takeRequest().getSequenceNumber());
assertEquals(0, server.takeRequest().getSequenceNumber()); // No connection reuse.

for (Response response : responsesNotClosed) {
Util.closeQuietly(response);
}
}

Expand Down
112 changes: 112 additions & 0 deletions okhttp-tests/src/test/java/okhttp3/DuplexTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package okhttp3;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import okhttp3.internal.RecordingOkAuthenticator;
import okhttp3.internal.duplex.AsyncRequestBody;
import okhttp3.internal.duplex.MwsDuplexAccess;
import okhttp3.mockwebserver.MockResponse;
Expand Down Expand Up @@ -271,6 +273,116 @@ public final class DuplexTest {
mockDuplexResponseBody.awaitSuccess();
}

/**
* Duplex calls that have follow-ups are weird. By the time we know there's a follow-up we've
* already split off another thread to stream the request body. Because we permit at most one
* exchange at a time we break the request stream out from under that writer.
*/
@Test public void duplexWithRedirect() throws Exception {
enableProtocol(Protocol.HTTP_2);

MockDuplexResponseBody mockDuplexResponseBody = enqueueResponseWithBody(
new MockResponse()
.clearHeaders()
.setResponseCode(HttpURLConnection.HTTP_MOVED_PERM)
.addHeader("Location: /b"),
new MockDuplexResponseBody()
.sendResponse("/a has moved!\n")
.requestIOException()
.exhaustResponse());
server.enqueue(new MockResponse()
.setBody("this is /b"));

Call call = client.newCall(new Request.Builder()
.url(server.url("/"))
.post(new AsyncRequestBody())
.build());

try (Response response = call.execute()) {
BufferedSource responseBody = response.body().source();
assertEquals("this is /b", responseBody.readUtf8Line());
}

BufferedSink requestBody = ((AsyncRequestBody) call.request().body()).takeSink();
try {
requestBody.writeUtf8("request body\n");
requestBody.flush();
fail();
} catch (IOException expected) {
assertEquals("stream was reset: CANCEL", expected.getMessage());
}

mockDuplexResponseBody.awaitSuccess();

List<String> expectedEvents = Arrays.asList("CallStart", "DnsStart", "DnsEnd", "ConnectStart",
"SecureConnectStart", "SecureConnectEnd", "ConnectEnd", "ConnectionAcquired",
"RequestHeadersStart", "RequestHeadersEnd", "RequestBodyStart", "ResponseHeadersStart",
"ResponseHeadersEnd", "ResponseBodyStart", "ResponseBodyEnd", "RequestHeadersStart",
"RequestHeadersEnd", "ResponseHeadersStart", "ResponseHeadersEnd", "ResponseBodyStart",
"ResponseBodyEnd", "ConnectionReleased", "CallEnd", "RequestBodyEnd");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you happy with this ordering of CallEnd not being last?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, definitely not. I’m gonna see if I can change things up so the event gets published when the stream gets detached. Follow up!

assertEquals(expectedEvents, listener.recordedEventTypes());
}

/**
* Auth requires follow-ups. Unlike redirects, the auth follow-up also has a request body. This
* test makes a single call with two duplex requests!
*/
@Test public void duplexWithAuthChallenge() throws Exception {
enableProtocol(Protocol.HTTP_2);

String credential = Credentials.basic("jesse", "secret");
client = client.newBuilder()
.authenticator(new RecordingOkAuthenticator(credential, null))
.build();

MockDuplexResponseBody mockResponseBody1 = enqueueResponseWithBody(
new MockResponse()
.clearHeaders()
.setResponseCode(HttpURLConnection.HTTP_UNAUTHORIZED),
new MockDuplexResponseBody()
.sendResponse("please authenticate!\n")
.requestIOException()
.exhaustResponse());
MockDuplexResponseBody mockResponseBody2 = enqueueResponseWithBody(
new MockResponse()
.clearHeaders(),
new MockDuplexResponseBody()
.sendResponse("response body\n")
.exhaustResponse()
.receiveRequest("request body\n")
.exhaustRequest());

Call call = client.newCall(new Request.Builder()
.url(server.url("/"))
.post(new AsyncRequestBody())
.build());

Response response2 = call.execute();

// First duplex request is detached with violence.
BufferedSink requestBody1 = ((AsyncRequestBody) call.request().body()).takeSink();
try {
requestBody1.writeUtf8("not authenticated\n");
requestBody1.flush();
fail();
} catch (IOException expected) {
assertEquals("stream was reset: CANCEL", expected.getMessage());
}
mockResponseBody1.awaitSuccess();

// Second duplex request proceeds normally.
BufferedSink requestBody2 = ((AsyncRequestBody) call.request().body()).takeSink();
requestBody2.writeUtf8("request body\n");
requestBody2.close();
BufferedSource responseBody2 = response2.body().source();
assertEquals("response body", responseBody2.readUtf8Line());
assertTrue(responseBody2.exhausted());
mockResponseBody2.awaitSuccess();

// No more requests attempted!
((AsyncRequestBody) call.request().body()).assertNoMoreSinks();
}

private MockDuplexResponseBody enqueueResponseWithBody(
MockResponse response, MockDuplexResponseBody body) {
MwsDuplexAccess.instance.setBody(response, body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ public void cancel() {
codec.cancel();
}

/**
* Revoke this exchange's access to streams. This is necessary when a follow-up request is
* required but the preceding exchange hasn't completed yet.
*/
public void detachWithViolence() {
codec.cancel();
transmitter.exchangeMessageDone(this, true, true, null);
}

void trackFailure(IOException e) {
finder.trackFailure();
codec.connection().trackFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,19 @@ void exchangeMessageDone(
Exchange exchange, boolean requestDone, boolean responseDone, @Nullable IOException e) {
boolean exchangeDone = false;
synchronized (connectionPool) {
if (this.exchange == null) throw new IllegalStateException();
if (this.exchange != exchange) throw new IllegalStateException();
if (exchange != this.exchange) {
return; // This exchange was detached violently!
}
boolean changed = false;
if (requestDone) {
if (!exchangeRequestDone) changed = true;
this.exchangeRequestDone = true;
}
if (responseDone) {
if (!exchangeResponseDone) changed = true;
this.exchangeResponseDone = true;
}
if (exchangeRequestDone && exchangeResponseDone) {
if (exchangeRequestDone && exchangeResponseDone && changed) {
exchangeDone = true;
this.exchange.connection().successCount++;
this.exchange = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,17 @@ public RetryAndFollowUpInterceptor(OkHttpClient client) {
.build();
}

Request followUp;
try {
Exchange exchange = Internal.instance.exchange(response);
Route route = exchange != null ? exchange.connection().route() : null;
followUp = followUpRequest(response, route);
} catch (IOException e) {
throw e;
}
Exchange exchange = Internal.instance.exchange(response);
Route route = exchange != null ? exchange.connection().route() : null;
Request followUp = followUpRequest(response, route);

if (followUp == null) {
return response;
}

closeQuietly(response.body());

if (transmitter.hasExchange()) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
exchange.detachWithViolence();
}

if (++followUpCount > MAX_FOLLOW_UPS) {
Expand Down