Skip to content
This repository has been archived by the owner on Apr 24, 2024. It is now read-only.

Commit

Permalink
Merge pull request #624 from jrudolph/i/620/fix-suspendreading
Browse files Browse the repository at this point in the history
= io: fix racy SuspendReading, fixes #620
  • Loading branch information
sirthias committed Oct 22, 2013
2 parents e9b7dea + 49e4a33 commit d7a0901
Showing 1 changed file with 52 additions and 41 deletions.
93 changes: 52 additions & 41 deletions spray-io/src/main/scala/akka/io/TcpConnection.scala
Expand Up @@ -34,6 +34,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
private[this] var pendingWrite: PendingWrite = EmptyPendingWrite
private[this] var peerClosed = false
private[this] var writingSuspended = false
private[this] var readingSuspended = false
private[this] var interestedInResume: Option[ActorRef] = None
var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop

Expand Down Expand Up @@ -71,8 +72,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
/** normal connected state */
def connected(info: ConnectionInfo): Receive =
handleWriteMessages(info) orElse {
case SuspendReading info.registration.disableInterest(OP_READ)
case ResumeReading info.registration.enableInterest(OP_READ)
case SuspendReading suspendReading(info)
case ResumeReading resumeReading(info)
case ChannelReadable doRead(info, None)
case cmd: CloseCommand handleClose(info, Some(sender), cmd.event)
case Terminated(h) if h == info.handler handlerTerminated()
Expand All @@ -88,8 +89,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
/** connection is closing but a write has to be finished first */
def closingWithPendingWrite(info: ConnectionInfo, closeCommander: Option[ActorRef],
closedEvent: ConnectionClosed): Receive = {
case SuspendReading info.registration.disableInterest(OP_READ)
case ResumeReading info.registration.enableInterest(OP_READ)
case SuspendReading suspendReading(info)
case ResumeReading resumeReading(info)
case ChannelReadable doRead(info, closeCommander)

case ChannelWritable
Expand All @@ -111,8 +112,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha

/** connection is closed on our side and we're waiting from confirmation from the other side */
def closing(info: ConnectionInfo, closeCommander: Option[ActorRef]): Receive = {
case SuspendReading info.registration.disableInterest(OP_READ)
case ResumeReading info.registration.enableInterest(OP_READ)
case SuspendReading suspendReading(info)
case ResumeReading resumeReading(info)
case ChannelReadable doRead(info, closeCommander)
case Abort handleClose(info, Some(sender), Aborted)
case Terminated(h) if h == info.handler handlerTerminated()
Expand Down Expand Up @@ -190,43 +191,53 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
context.become(waitingForRegistration(registration, commander))
}

def doRead(info: ConnectionInfo, closeCommander: Option[ActorRef]): Unit = {
@tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult =
if (remainingLimit > 0) {
// never read more than the configured limit
buffer.clear()
val maxBufferSpace = math.min(DirectBufferSize, remainingLimit)
buffer.limit(maxBufferSpace)
val readBytes = channel.read(buffer)
buffer.flip()

if (TraceLogging) log.debug("Read [{}] bytes.", readBytes)
if (readBytes > 0) info.handler ! Received(ByteString(buffer))

readBytes match {
case `maxBufferSpace` innerRead(buffer, remainingLimit - maxBufferSpace)
case x if x >= 0 AllRead
case -1 EndOfStream
case _
throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
}
} else MoreDataWaiting

val buffer = bufferPool.acquire()
try innerRead(buffer, ReceivedMessageSizeLimit) match {
case AllRead info.registration.enableInterest(OP_READ)
case MoreDataWaiting self ! ChannelReadable
case EndOfStream if channel.socket.isOutputShutdown
if (TraceLogging) log.debug("Read returned end-of-stream, our side already closed")
doCloseConnection(info.handler, closeCommander, ConfirmedClosed)
case EndOfStream
if (TraceLogging) log.debug("Read returned end-of-stream, our side not yet closed")
handleClose(info, closeCommander, PeerClosed)
} catch {
case e: IOException handleError(info.handler, e)
} finally bufferPool.release(buffer)
def suspendReading(info: ConnectionInfo): Unit = {
readingSuspended = true
info.registration.disableInterest(OP_READ)
}
def resumeReading(info: ConnectionInfo): Unit = {
readingSuspended = false
info.registration.enableInterest(OP_READ)
}

def doRead(info: ConnectionInfo, closeCommander: Option[ActorRef]): Unit =
if (!readingSuspended) {
@tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult =
if (remainingLimit > 0) {
// never read more than the configured limit
buffer.clear()
val maxBufferSpace = math.min(DirectBufferSize, remainingLimit)
buffer.limit(maxBufferSpace)
val readBytes = channel.read(buffer)
buffer.flip()

if (TraceLogging) log.debug("Read [{}] bytes.", readBytes)
if (readBytes > 0) info.handler ! Received(ByteString(buffer))

readBytes match {
case `maxBufferSpace` innerRead(buffer, remainingLimit - maxBufferSpace)
case x if x >= 0 AllRead
case -1 EndOfStream
case _
throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
}
} else MoreDataWaiting

val buffer = bufferPool.acquire()
try innerRead(buffer, ReceivedMessageSizeLimit) match {
case AllRead info.registration.enableInterest(OP_READ)
case MoreDataWaiting self ! ChannelReadable
case EndOfStream if channel.socket.isOutputShutdown
if (TraceLogging) log.debug("Read returned end-of-stream, our side already closed")
doCloseConnection(info.handler, closeCommander, ConfirmedClosed)
case EndOfStream
if (TraceLogging) log.debug("Read returned end-of-stream, our side not yet closed")
handleClose(info, closeCommander, PeerClosed)
} catch {
case e: IOException handleError(info.handler, e)
} finally bufferPool.release(buffer)
}

def doWrite(info: ConnectionInfo): Unit = pendingWrite = pendingWrite.doWrite(info)

def closeReason =
Expand Down

0 comments on commit d7a0901

Please sign in to comment.