Skip to content

Commit

Permalink
Recover from executor shutdowns gracefully.
Browse files Browse the repository at this point in the history
This turns out to be pretty difficult because of the way our
dispatcher works.

Calls can be rejected either immediately when the user calls
enqueue(), or later when a queued call is promoted.

It's also awkward because we don't want to hold locks when
calling the user's callFailed() method.
  • Loading branch information
swankjesse committed Nov 4, 2018
1 parent 2b0a9f4 commit 07f62a0
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 28 deletions.
57 changes: 56 additions & 1 deletion okhttp-tests/src/test/java/okhttp3/DispatcherTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package okhttp3;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -10,6 +11,7 @@
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.RealCall.AsyncCall;
Expand All @@ -28,13 +30,16 @@ public final class DispatcherTest {
RecordingCallback callback = new RecordingCallback();
RecordingWebSocketListener webSocketListener = new RecordingWebSocketListener();
Dispatcher dispatcher = new Dispatcher(executor);
RecordingEventListener listener = new RecordingEventListener();
OkHttpClient client = defaultClient().newBuilder()
.dispatcher(dispatcher)
.eventListener(listener)
.build();

@Before public void setUp() throws Exception {
dispatcher.setMaxRequests(20);
dispatcher.setMaxRequestsPerHost(10);
listener.forbidLock(dispatcher);
}

@Test public void maxRequestsZero() throws Exception {
Expand Down Expand Up @@ -264,6 +269,54 @@ public final class DispatcherTest {
assertTrue(idle.get());
}

@Test public void executionRejectedImmediately() throws Exception {
Request request = newRequest("http://a/1");
executor.shutdown();
client.newCall(request).enqueue(callback);
callback.await(request.url()).assertFailure(InterruptedIOException.class);
assertEquals(Arrays.asList("CallStart", "CallFailed"), listener.recordedEventTypes());
}

@Test public void executionRejectedAfterMaxRequestsChange() throws Exception {
Request request1 = newRequest("http://a/1");
Request request2 = newRequest("http://a/2");
dispatcher.setMaxRequests(1);
client.newCall(request1).enqueue(callback);
executor.shutdown();
client.newCall(request2).enqueue(callback);
dispatcher.setMaxRequests(2); // Trigger promotion.
callback.await(request2.url()).assertFailure(InterruptedIOException.class);

assertEquals(Arrays.asList("CallStart", "CallStart", "CallFailed"),
listener.recordedEventTypes());
}

@Test public void executionRejectedAfterMaxRequestsPerHostChange() throws Exception {
Request request1 = newRequest("http://a/1");
Request request2 = newRequest("http://a/2");
dispatcher.setMaxRequestsPerHost(1);
client.newCall(request1).enqueue(callback);
executor.shutdown();
client.newCall(request2).enqueue(callback);
dispatcher.setMaxRequestsPerHost(2); // Trigger promotion.
callback.await(request2.url()).assertFailure(InterruptedIOException.class);
assertEquals(Arrays.asList("CallStart", "CallStart", "CallFailed"),
listener.recordedEventTypes());
}

@Test public void executionRejectedAfterPrecedingCallFinishes() throws Exception {
Request request1 = newRequest("http://a/1");
Request request2 = newRequest("http://a/2");
dispatcher.setMaxRequests(1);
client.newCall(request1).enqueue(callback);
executor.shutdown();
client.newCall(request2).enqueue(callback);
executor.finishJob("http://a/1"); // Trigger promotion.
callback.await(request2.url()).assertFailure(InterruptedIOException.class);
assertEquals(Arrays.asList("CallStart", "CallStart", "CallFailed"),
listener.recordedEventTypes());
}

private <T> Set<T> set(T... values) {
return set(Arrays.asList(values));
}
Expand All @@ -287,9 +340,11 @@ private Thread makeSynchronousCall(final Call call) {
}

class RecordingExecutor extends AbstractExecutorService {
private boolean shutdown;
private List<AsyncCall> calls = new ArrayList<>();

@Override public void execute(Runnable command) {
if (shutdown) throw new RejectedExecutionException();
calls.add((AsyncCall) command);
}

Expand All @@ -314,7 +369,7 @@ public void finishJob(String url) {
}

@Override public void shutdown() {
throw new UnsupportedOperationException();
shutdown = true;
}

@Override public List<Runnable> shutdownNow() {
Expand Down
72 changes: 45 additions & 27 deletions okhttp/src/main/java/okhttp3/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ public synchronized ExecutorService executorService() {
* <p>If more than {@code maxRequests} requests are in flight when this is invoked, those requests
* will remain in flight.
*/
public synchronized void setMaxRequests(int maxRequests) {
public void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
synchronized (this) {
this.maxRequests = maxRequests;
}
promoteAndExecute();
}

public synchronized int getMaxRequests() {
Expand All @@ -98,12 +100,14 @@ public synchronized int getMaxRequests() {
*
* <p>WebSocket connections to hosts <b>do not</b> count against this limit.
*/
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
public void setMaxRequestsPerHost(int maxRequestsPerHost) {
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteCalls();
synchronized (this) {
this.maxRequestsPerHost = maxRequestsPerHost;
}
promoteAndExecute();
}

public synchronized int getMaxRequestsPerHost() {
Expand All @@ -126,13 +130,11 @@ public synchronized void setIdleCallback(@Nullable Runnable idleCallback) {
this.idleCallback = idleCallback;
}

synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
}
promoteAndExecute();
}

/**
Expand All @@ -153,21 +155,38 @@ public synchronized void cancelAll() {
}
}

private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
/**
* Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs
* them on the executor service. Must not be called with synchronization because executing calls
* can call into user code.
*
* @return true if the dispatcher is currently running calls.
*/
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));

List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();

for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}

if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}

return isRunning;
}

/** Returns the number of running calls that share a host with {@code call}. */
Expand All @@ -187,25 +206,24 @@ synchronized void executed(RealCall call) {

/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
finished(runningAsyncCalls, call);
}

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
finished(runningSyncCalls, call);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}

if (runningCallsCount == 0 && idleCallback != null) {
boolean isRunning = promoteAndExecute();

if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
Expand Down
25 changes: 25 additions & 0 deletions okhttp/src/main/java/okhttp3/RealCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package okhttp3;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import javax.annotation.Nullable;
import okhttp3.internal.NamedRunnable;
import okhttp3.internal.cache.CacheInterceptor;
Expand Down Expand Up @@ -142,6 +145,28 @@ RealCall get() {
return RealCall.this;
}

/**
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}

@Override protected void execute() {
boolean signalledCallback = false;
try {
Expand Down

0 comments on commit 07f62a0

Please sign in to comment.