Skip to content

Commit

Permalink
New disconnect strategy.
Browse files Browse the repository at this point in the history
Support asynchronous disconnects by breaking the socket only, which
should cause the thread using that socket to trigger clean-up.
  • Loading branch information
swankjesse committed Apr 5, 2014
1 parent b21b40e commit 9c30213
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.internal.http;

import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

import static org.junit.Assert.fail;

public final class DisconnectTest {
private final MockWebServer server = new MockWebServer();
private final OkHttpClient client = new OkHttpClient();

@Test public void interruptWritingRequestBody() throws Exception {
int requestBodySize = 2 * 1024 * 1024; // 2 MiB

server.enqueue(new MockResponse()
.throttleBody(64 * 1024, 125, TimeUnit.MILLISECONDS)); // 500 Kbps
server.play();

HttpURLConnection connection = client.open(server.getUrl("/"));
disconnectLater(connection, 500);

connection.setDoOutput(true);
connection.setFixedLengthStreamingMode(requestBodySize);
OutputStream requestBody = connection.getOutputStream();
byte[] buffer = new byte[1024];
try {
for (int i = 0; i < requestBodySize; i += buffer.length) {
requestBody.write(buffer);
requestBody.flush();
}
fail("Expected connection to be closed");
} catch (IOException expected) {
}

connection.disconnect();
}

@Test public void interruptReadingResponseBody() throws Exception {
int responseBodySize = 2 * 1024 * 1024; // 2 MiB

server.enqueue(new MockResponse()
.setBody(new byte[responseBodySize])
.throttleBody(64 * 1024, 125, TimeUnit.MILLISECONDS)); // 500 Kbps
server.play();

HttpURLConnection connection = client.open(server.getUrl("/"));
disconnectLater(connection, 500);

InputStream responseBody = connection.getInputStream();
byte[] buffer = new byte[1024];
try {
while (responseBody.read(buffer) != -1) {
}
fail("Expected connection to be closed");
} catch (IOException expected) {
}

connection.disconnect();
}

private void disconnectLater(final HttpURLConnection connection, final int delayMillis) {
Thread interruptingCow = new Thread() {
@Override public void run() {
try {
sleep(delayMillis);
connection.disconnect();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
interruptingCow.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
Expand Down Expand Up @@ -942,14 +943,20 @@ private void initResponseCache() throws IOException {
}

@Test public void disconnectedConnection() throws IOException {
server.enqueue(new MockResponse().setBody("ABCDEFGHIJKLMNOPQR"));
server.enqueue(new MockResponse()
.throttleBody(2, 100, TimeUnit.MILLISECONDS)
.setBody("ABCD"));
server.play();

connection = client.open(server.getUrl("/"));
InputStream in = connection.getInputStream();
assertEquals('A', (char) in.read());
connection.disconnect();
try {
// Reading 'B' may succeed if it's buffered.
in.read();

// But 'C' shouldn't be buffered (the response is throttled) and this should fail.
in.read();
fail("Expected a connection closed exception");
} catch (IOException expected) {
Expand Down Expand Up @@ -1230,11 +1237,13 @@ private void testEarlyDisconnectDoesntHarmPooling(TransferKind transferKind) thr
HttpURLConnection connection1 = client.open(server.getUrl("/"));
InputStream in1 = connection1.getInputStream();
assertEquals("ABCDE", readAscii(in1, 5));
in1.close();
connection1.disconnect();

HttpURLConnection connection2 = client.open(server.getUrl("/"));
InputStream in2 = connection2.getInputStream();
assertEquals("LMNOP", readAscii(in2, 5));
in2.close();
connection2.disconnect();

assertEquals(0, server.takeRequest().getSequenceNumber());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public boolean isClosed() {
return state == STATE_CLOSED;
}

public void closeIfOwnedBy(Object owner) throws IOException {
connection.closeIfOwnedBy(owner);
}

public void flush() throws IOException {
sink.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,18 @@ public final void releaseConnection() throws IOException {
connection = null;
}

/**
* Immediately closes the socket connection if it's currently held by this
* engine. Use this to interrupt an in-flight request from any thread. It's
* the caller's responsibility to close the request body and response body
* streams; otherwise resources may be leaked.
*/
public final void disconnect() throws IOException {
if (transport != null) {
transport.disconnect(this);
}
}

/**
* Release any resources held by this engine. If a connection is still held by
* this engine, it is returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,8 @@ public void writeRequestHeaders(Request request) throws IOException {
// reference escapes.
return httpConnection.newUnknownLengthSource(cacheRequest);
}

@Override public void disconnect(HttpEngine engine) throws IOException {
httpConnection.closeIfOwnedBy(engine);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,18 @@ public HttpURLConnectionImpl(URL url, OkHttpClient client) {

@Override public final void disconnect() {
// Calling disconnect() before a connection exists should have no effect.
if (httpEngine != null) {
httpEngine.close();
if (httpEngine == null) return;

try {
httpEngine.disconnect();
} catch (IOException ignored) {
}

// This doesn't close the stream because doing so would require all stream
// access to be synchronized. It's expected that the thread using the
// connection will close its streams directly. If it doesn't, the worst
// case is that the GzipSource's Inflater won't be released until it's
// finalized. (This logs a warning on Android.)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ public static Response.Builder readNameValueBlock(List<Header> headerBlock,
@Override public void releaseConnectionOnIdle() {
}

@Override public void disconnect(HttpEngine engine) throws IOException {
stream.close(ErrorCode.CANCEL);
}

@Override public boolean canReuseConnection() {
return true; // TODO: spdyConnection.isClosed() ?
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ interface Transport {
*/
void releaseConnectionOnIdle() throws IOException;

void disconnect(HttpEngine engine) throws IOException;

/**
* Returns true if the socket connection held by this transport can be reused
* for a follow-up exchange.
Expand Down

0 comments on commit 9c30213

Please sign in to comment.