Permalink
Browse files

Fix bug in socket server: open connections are not closed when server…

… is shutting down. Apparently calling interrupt() on blocking IO

does not have any effect for sockets. To fix, added a Map of active sessions. Each session removes adds itself to the map when it begins and removes itself when
it is done. This allows the addition of a killSessions() method in SocketServer that will forcefully close all the open sockets and thereby terminate the client connections.
  • Loading branch information...
jkreps committed Jun 22, 2009
1 parent b09c9f9 commit 3a3db2b6dc8924ecfea81c102df7e41fdb2ed0d7
@@ -22,6 +22,9 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
@@ -35,6 +38,7 @@
import voldemort.VoldemortException;
import voldemort.annotations.jmx.JmxGetter;
import voldemort.annotations.jmx.JmxManaged;
+import voldemort.annotations.jmx.JmxOperation;
import voldemort.server.protocol.RequestHandlerFactory;
/**
@@ -56,6 +60,8 @@
private final RequestHandlerFactory handlerFactory;
private final int maxThreads;
private final StatusManager statusManager;
+ private final AtomicLong sessionIdSequence;
+ private final ConcurrentMap<Long, SocketServerSession> activeSessions;
private ServerSocket serverSocket = null;
@@ -77,6 +83,8 @@ public SocketServer(int port,
threadFactory,
rejectedExecutionHandler);
this.statusManager = new StatusManager(this.threadPool);
+ this.sessionIdSequence = new AtomicLong(0);
+ this.activeSessions = new ConcurrentHashMap<Long, SocketServerSession>();
}
private final ThreadFactory threadFactory = new ThreadFactory() {
@@ -124,7 +132,11 @@ public void run() {
while(!isInterrupted() && !serverSocket.isClosed()) {
final Socket socket = serverSocket.accept();
configureSocket(socket);
- this.threadPool.execute(new SocketServerSession(socket, handlerFactory));
+ long sessionId = this.sessionIdSequence.getAndIncrement();
+ this.threadPool.execute(new SocketServerSession(activeSessions,
+ socket,
+ handlerFactory,
+ sessionId));
}
} catch(BindException e) {
logger.error("Could not bind to port " + port + ".");
@@ -160,26 +172,40 @@ private void configureSocket(Socket socket) throws SocketException {
public void shutdown() {
logger.info("Shutting down voldemort socket server.");
+
+ // first shut down the acceptor to stop new connections
+ interrupt();
try {
- serverSocket.close();
+ if(!serverSocket.isClosed())
+ serverSocket.close();
} catch(IOException e) {
logger.error("Error while closing socket server: " + e.getMessage());
}
- threadGroup.interrupt();
- interrupt();
+
+ // now kill all the active sessions
threadPool.shutdownNow();
+ killActiveSessions();
+
try {
- boolean completed = threadPool.awaitTermination(1, TimeUnit.SECONDS);
+ boolean completed = threadPool.awaitTermination(5, TimeUnit.SECONDS);
if(!completed)
- logger.warn("Timed out waiting for sockets to close.");
+ logger.warn("Timed out waiting for threadpool to close.");
} catch(InterruptedException e) {
- logger.warn("Interrupted while waiting for tasks to complete: ", e);
+ logger.warn("Interrupted while waiting for socket server shutdown to complete: ", e);
}
- try {
- if(!serverSocket.isClosed())
- serverSocket.close();
- } catch(IOException e) {
- logger.warn("Exception while closing server socket: ", e);
+ }
+
+ @JmxOperation(description = "Kill all the active sessions.")
+ public void killActiveSessions() {
+ // loop through and close the socket of all the active sessions
+ logger.info("Killing all active sessions.");
+ for(Map.Entry<Long, SocketServerSession> entry: activeSessions.entrySet()) {
+ try {
+ logger.debug("Closing session " + entry.getKey());
+ entry.getValue().close();
+ } catch(IOException e) {
+ logger.warn("Error while closing session socket: ", e);
+ }
}
}
@@ -9,6 +9,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
+import java.util.Map;
import org.apache.log4j.Logger;
@@ -31,12 +32,20 @@
private final Logger logger = Logger.getLogger(SocketServerSession.class);
+ private final Map<Long, SocketServerSession> activeSessions;
+ private final long sessionId;
private final Socket socket;
private final RequestHandlerFactory handlerFactory;
+ private volatile boolean isClosed = false;
- public SocketServerSession(Socket socket, RequestHandlerFactory handlerFactory) {
+ public SocketServerSession(Map<Long, SocketServerSession> activeSessions,
+ Socket socket,
+ RequestHandlerFactory handlerFactory,
+ long id) {
+ this.activeSessions = activeSessions;
this.socket = socket;
this.handlerFactory = handlerFactory;
+ this.sessionId = id;
}
public Socket getSocket() {
@@ -49,6 +58,7 @@ private boolean isInterrupted() {
public void run() {
try {
+ activeSessions.put(sessionId, this);
DataInputStream inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream(),
64000));
DataOutputStream outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(),
@@ -59,20 +69,26 @@ public void run() {
logger.info("Client " + socket.getRemoteSocketAddress()
+ " connected successfully with protocol " + protocol.getCode());
- while(!isInterrupted()) {
+ while(!isInterrupted() && !socket.isClosed() && !isClosed) {
handler.handleRequest(inputStream, outputStream);
outputStream.flush();
}
+ if(isInterrupted())
+ logger.info(Thread.currentThread().getName()
+ + " has been interrupted, closing session.");
} catch(EOFException e) {
logger.info("Client " + socket.getRemoteSocketAddress() + " disconnected.");
} catch(IOException e) {
- logger.error(e);
+ if(!isClosed)
+ logger.error(e);
} finally {
try {
socket.close();
} catch(Exception e) {
logger.error("Error while closing socket", e);
}
+ // now remove ourselves from the set of active sessions
+ this.activeSessions.remove(sessionId);
}
}
@@ -98,4 +114,9 @@ private RequestFormatType negotiateProtocol(InputStream input, OutputStream outp
}
return requestFormat;
}
+
+ public void close() throws IOException {
+ this.isClosed = true;
+ this.socket.close();
+ }
}

0 comments on commit 3a3db2b

Please sign in to comment.