From 327ebf0cb5e236579bece057eda27b21aed0e2dc Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Sat, 18 Apr 2015 10:14:56 +0100 Subject: [PATCH] [core] [minor] Make sure ConnectionManager stops. My previous fix (force a selector wakeup) didn't seem to work since I ran into the hang again. So change the code a bit to be more explicit about the condition when the selector thread should exit. Author: Marcelo Vanzin Closes #5566 from vanzin/conn-mgr-hang and squashes the following commits: ddb2c03 [Marcelo Vanzin] [core] [minor] Make sure ConnectionManager stops. --- .../spark/network/nio/ConnectionManager.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 5a74c13b38bf7..1a68e621eaee7 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -188,6 +188,7 @@ private[nio] class ConnectionManager( private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]() private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]() + @volatile private var isActive = true private val selectorThread = new Thread("connection-manager-thread") { override def run(): Unit = ConnectionManager.this.run() } @@ -342,7 +343,7 @@ private[nio] class ConnectionManager( def run() { try { - while(!selectorThread.isInterrupted) { + while (isActive) { while (!registerRequests.isEmpty) { val conn: SendingConnection = registerRequests.dequeue() addListeners(conn) @@ -398,7 +399,7 @@ private[nio] class ConnectionManager( } catch { // Explicitly only dealing with CancelledKeyException here since other exceptions // should be dealt with differently. - case e: CancelledKeyException => { + case e: CancelledKeyException => // Some keys within the selectors list are invalid/closed. clear them. val allKeys = selector.keys().iterator() @@ -420,8 +421,11 @@ private[nio] class ConnectionManager( } } } - } - 0 + 0 + + case e: ClosedSelectorException => + logDebug("Failed select() as selector is closed.", e) + return } if (selectedKeysCount == 0) { @@ -988,11 +992,11 @@ private[nio] class ConnectionManager( } def stop() { + isActive = false ackTimeoutMonitor.stop() - selector.wakeup() + selector.close() selectorThread.interrupt() selectorThread.join() - selector.close() val connections = connectionsByKey.values connections.foreach(_.close()) if (connectionsByKey.size != 0) {