Skip to content

Commit

Permalink
[core] [minor] Make sure ConnectionManager stops.
Browse files Browse the repository at this point in the history
My previous fix (force a selector wakeup) didn't seem to work since
I ran into the hang again. So change the code a bit to be more
explicit about the condition when the selector thread should exit.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#5566 from vanzin/conn-mgr-hang and squashes the following commits:

ddb2c03 [Marcelo Vanzin] [core] [minor] Make sure ConnectionManager stops.
  • Loading branch information
Marcelo Vanzin authored and srowen committed Apr 18, 2015
1 parent 5f095d5 commit 327ebf0
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ private[nio] class ConnectionManager(
private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()

@volatile private var isActive = true
private val selectorThread = new Thread("connection-manager-thread") {
override def run(): Unit = ConnectionManager.this.run()
}
Expand Down Expand Up @@ -342,7 +343,7 @@ private[nio] class ConnectionManager(

def run() {
try {
while(!selectorThread.isInterrupted) {
while (isActive) {
while (!registerRequests.isEmpty) {
val conn: SendingConnection = registerRequests.dequeue()
addListeners(conn)
Expand Down Expand Up @@ -398,7 +399,7 @@ private[nio] class ConnectionManager(
} catch {
// Explicitly only dealing with CancelledKeyException here since other exceptions
// should be dealt with differently.
case e: CancelledKeyException => {
case e: CancelledKeyException =>
// Some keys within the selectors list are invalid/closed. clear them.
val allKeys = selector.keys().iterator()

Expand All @@ -420,8 +421,11 @@ private[nio] class ConnectionManager(
}
}
}
}
0
0

case e: ClosedSelectorException =>
logDebug("Failed select() as selector is closed.", e)
return
}

if (selectedKeysCount == 0) {
Expand Down Expand Up @@ -988,11 +992,11 @@ private[nio] class ConnectionManager(
}

def stop() {
isActive = false
ackTimeoutMonitor.stop()
selector.wakeup()
selector.close()
selectorThread.interrupt()
selectorThread.join()
selector.close()
val connections = connectionsByKey.values
connections.foreach(_.close())
if (connectionsByKey.size != 0) {
Expand Down

0 comments on commit 327ebf0

Please sign in to comment.