Permalink
Browse files

cherry pick ExecutorService exception fix from thrift-trunk

  • Loading branch information...
1 parent 64a6486 commit 638a767967755f1ea9f7a7227d1f2d1f78ca3f12 eaceaser committed Sep 28, 2010
@@ -22,6 +22,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -31,12 +32,16 @@
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
* Like TNonblockingServer, it relies on the use of TFramedTransport.
*/
public class THsHaServer extends TNonblockingServer {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(THsHaServer.class.getName());
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the Selector to workers.
@@ -285,8 +290,14 @@ protected void gracefullyShutdownInvokerPool() {
* invoker service instead of immediately invoking. The thread pool takes care of the rest.
*/
@Override
- protected void requestInvoke(FrameBuffer frameBuffer) {
- invoker.execute(new Invocation(frameBuffer));
+ protected boolean requestInvoke(FrameBuffer frameBuffer) {
+ try {
+ invoker.execute(new Invocation(frameBuffer));
+ return true;
+ } catch (RejectedExecutionException rx) {
+ LOGGER.warn("ExecutorService rejected execution!", rx);
+ return false;
+ }
}
/**
@@ -62,7 +62,7 @@
LoggerFactory.getLogger(TNonblockingServer.class.getName());
// Flag for stopping the server
- private volatile boolean stopped_;
+ private volatile boolean stopped_ = true;
private SelectThread selectThread_;
@@ -218,6 +218,7 @@ protected boolean startSelectorThread() {
// start the selector
try {
selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_);
+ stopped_ = false;
selectThread_.start();
return true;
} catch (IOException e) {
@@ -253,8 +254,9 @@ public void stop() {
* Perform an invocation. This method could behave several different ways
* - invoke immediately inline, queue for separate execution, etc.
*/
- protected void requestInvoke(FrameBuffer frameBuffer) {
+ protected boolean requestInvoke(FrameBuffer frameBuffer) {
frameBuffer.invoke();
+ return true;
}
/**
@@ -265,6 +267,10 @@ protected void requestSelectInterestChange(FrameBuffer frameBuffer) {
selectThread_.requestSelectInterestChange(frameBuffer);
}
+ public boolean isStopped() {
+ return selectThread_.isStopped();
+ }
+
/**
* The thread that will be doing all the selecting, managing new connections
* and those that still need to be read.
@@ -288,14 +294,24 @@ public SelectThread(final TNonblockingServerTransport serverTransport)
serverTransport.registerSelector(selector);
}
+ public boolean isStopped() {
+ return stopped_;
+ }
+
/**
* The work loop. Handles both selecting (all IO operations) and managing
* the selection preferences of all existing connections.
*/
public void run() {
- while (!stopped_) {
- select();
- processInterestChanges();
+ try {
+ while (!stopped_) {
+ select();
+ processInterestChanges();
+ }
+ } catch (Throwable t) {
+ LOGGER.error("run() exiting due to uncaught error", t);
+ } finally {
+ stopped_ = true;
}
}
@@ -405,13 +421,16 @@ private void handleAccept() throws IOException {
*/
private void handleRead(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer)key.attachment();
- if (buffer.read()) {
- // if the buffer's frame read is complete, invoke the method.
- if (buffer.isFrameFullyRead()) {
- requestInvoke(buffer);
- }
- } else {
+ if (!buffer.read()) {
cleanupSelectionkey(key);
+ return;
+ }
+
+ // if the buffer's frame read is complete, invoke the method.
+ if (buffer.isFrameFullyRead()) {
+ if (!requestInvoke(buffer)) {
+ cleanupSelectionkey(key);
+ }
}
}

0 comments on commit 638a767

Please sign in to comment.