Permalink
Browse files

! io: add support for compound write commands (Tcp.CompoundWrite)

Most of this addition only affects the non-public parts of akka-io, however, the `ack` and `wantsAck` methods that were so far defined on the `Tcp.WriteCommand` type have now been moved to the new `Tcp.CompactWriteCommand` type.
  • Loading branch information...
sirthias committed Aug 30, 2013
1 parent 6fe92ac commit 2c77d8f06d090c4687ef4a42918114d0f54b767a
@@ -234,9 +234,21 @@ object Tcp extends ExtensionKey[TcpExt] {
object NoAck extends NoAck(null)
/**
- * Common interface for all write commands, currently [[akka.io.Tcp.Write]] and [[akka.io.Tcp.WriteFile]].
+ * Common interface for all write commands, i.e. [[akka.io.Tcp.Write]], [[akka.io.Tcp.WriteFile]]
+ * and [[akka.io.Tcp.CompoundWrite]]
*/
- sealed trait WriteCommand extends Command {
+ sealed abstract class WriteCommand extends Command {
+ /**
+ * Prepends this command with another `Write` or `WriteFile` to form
+ * a `CompoundWrite`.
+ */
+ def +:(other: SimpleWriteCommand): CompoundWrite = CompoundWrite(other, this)
+ }
+
+ /**
+ * Common supertype of [[akka.io.Tcp.Write]] and [[akka.io.Tcp.WriteFile]].
+ */
+ sealed abstract class SimpleWriteCommand extends WriteCommand {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
/**
@@ -261,7 +273,7 @@ object Tcp extends ExtensionKey[TcpExt] {
* or have been sent!</b> Unfortunately there is no way to determine whether
* a particular write has been sent by the O/S.
*/
- case class Write(data: ByteString, ack: Event) extends WriteCommand
+ case class Write(data: ByteString, ack: Event) extends SimpleWriteCommand
object Write {
/**
* The empty Write doesn't write anything and isn't acknowledged.
@@ -288,11 +300,21 @@ object Tcp extends ExtensionKey[TcpExt] {
* or have been sent!</b> Unfortunately there is no way to determine whether
* a particular write has been sent by the O/S.
*/
- case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends WriteCommand {
+ case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends SimpleWriteCommand {
require(position >= 0, "WriteFile.position must be >= 0")
require(count > 0, "WriteFile.count must be > 0")
}
+ /**
+ * A write command which aggregates two other write commands. Using this construct
+ * you can chain a number of [[akka.io.Tcp.Write]] and/or [[akka.io.Tcp.WriteFile]] commands together in a way
+ * that allows them to be handled as a single write which gets written out to the
+ * network as quickly as possible.
+ * If the sub commands contain `ack` requests they will be honored as soon as the
+ * respective write has been written completely.
+ */
+ case class CompoundWrite(head: SimpleWriteCommand, tail: WriteCommand) extends WriteCommand
+
/**
* When `useResumeWriting` is in effect as was indicated in the [[akka.io.Tcp.Register]] message
* then this command needs to be sent to the connection actor in order to re-enable
@@ -31,13 +31,13 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
import tcp.bufferPool
import TcpConnection._
- private[this] var pendingWrite: PendingWrite = _
+ private[this] var pendingWrite: PendingWrite = EmptyPendingWrite
private[this] var peerClosed = false
private[this] var writingSuspended = false
private[this] var interestedInResume: Option[ActorRef] = None
var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop
- def writePending = pendingWrite ne null
+ def writePending = pendingWrite ne EmptyPendingWrite
// STATES
@@ -96,8 +96,12 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
doWrite(info)
if (!writePending) // writing is now finished
handleClose(info, closeCommander, closedEvent)
- case SendBufferFull(remaining) { pendingWrite = remaining; info.registration.enableInterest(OP_WRITE) }
- case WriteFileFinished { pendingWrite = null; handleClose(info, closeCommander, closedEvent) }
+
+ case UpdatePendingWrite(remaining)
+ pendingWrite = remaining
+ if (writePending) info.registration.enableInterest(OP_WRITE)
+ else handleClose(info, closeCommander, closedEvent)
+
case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
case Abort handleClose(info, Some(sender), Aborted)
@@ -134,13 +138,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
sender ! write.failureMessage
if (info.useResumeWriting) writingSuspended = true
- } else write match {
- case Write(data, ack) if data.isEmpty
- if (write.wantsAck) sender ! ack
-
- case _
- pendingWrite = createWrite(write)
- doWrite(info)
+ } else {
+ pendingWrite = PendingWrite(sender, write)
+ if (writePending) doWrite(info)
}
case ResumeWriting
@@ -160,9 +160,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
else sender ! CommandFailed(ResumeWriting)
} else sender ! WritingResumed
- case SendBufferFull(remaining) { pendingWrite = remaining; info.registration.enableInterest(OP_WRITE) }
- case WriteFileFinished pendingWrite = null
- case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
+ case UpdatePendingWrite(remaining)
+ pendingWrite = remaining
+ if (writePending) info.registration.enableInterest(OP_WRITE)
+
+ case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
}
// AUXILIARIES and IMPLEMENTATION
@@ -321,114 +323,108 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
override def postRestart(reason: Throwable): Unit =
throw new IllegalStateException("Restarting not supported for connection actors.")
- /** Create a pending write from a WriteCommand */
- private[io] def createWrite(write: WriteCommand): PendingWrite = write match {
- case write: Write
- val buffer = bufferPool.acquire()
-
- try {
- val copied = write.data.copyToBuffer(buffer)
- buffer.flip()
-
- PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer)
- } catch {
- case NonFatal(e)
- bufferPool.release(buffer)
- throw e
+ def PendingWrite(commander: ActorRef, write: WriteCommand): PendingWrite = {
+ @tailrec def create(head: WriteCommand, tail: WriteCommand = Write.empty): PendingWrite =
+ head match {
+ case Write.empty EmptyPendingWrite
+ case Write(data, ack) if data.nonEmpty PendingBufferWrite(commander, data, ack, tail)
+ case WriteFile(path, offset, count, ack) PendingWriteFile(commander, path, offset, count, ack, tail)
+ case CompoundWrite(h, t) create(h, t)
+ case x @ Write(_, ack) // empty write with either an ACK or a non-standard NoACK
+ if (x.wantsAck) commander ! ack
+ create(tail)
}
- case write: WriteFile
- PendingWriteFile(sender, write, new FileInputStream(write.filePath).getChannel, 0L)
+ create(write)
}
- private[io] case class PendingBufferWrite(
- commander: ActorRef,
- ack: Any,
- remainingData: ByteString,
- buffer: ByteBuffer) extends PendingWrite {
+ def PendingBufferWrite(commander: ActorRef, data: ByteString, ack: Event, tail: WriteCommand): PendingBufferWrite = {
+ val buffer = bufferPool.acquire()
+ try {
+ val copied = data.copyToBuffer(buffer)
+ buffer.flip()
+ new PendingBufferWrite(commander, data.drop(copied), ack, buffer, tail)
+ } catch {
+ case NonFatal(e)
+ bufferPool.release(buffer)
+ throw e
+ }
+ }
- def release(): Unit = bufferPool.release(buffer)
+ class PendingBufferWrite(
+ val commander: ActorRef,
+ remainingData: ByteString,
+ ack: Any,
+ buffer: ByteBuffer,
+ tail: WriteCommand) extends PendingWrite {
def doWrite(info: ConnectionInfo): PendingWrite = {
- @tailrec def innerWrite(pendingWrite: PendingBufferWrite): PendingWrite = {
- val toWrite = pendingWrite.buffer.remaining()
- require(toWrite != 0)
- val writtenBytes = channel.write(pendingWrite.buffer)
+ @tailrec def writeToChannel(data: ByteString): PendingWrite = {
+ val writtenBytes = channel.write(buffer) // at first we try to drain the remaining bytes from the buffer
if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes)
-
- val nextWrite = pendingWrite.consume(writtenBytes)
-
- if (pendingWrite.hasData)
- if (writtenBytes == toWrite) innerWrite(nextWrite) // wrote complete buffer, try again now
- else {
- info.registration.enableInterest(OP_WRITE)
- nextWrite
- } // try again later
- else { // everything written
- if (pendingWrite.wantsAck)
- pendingWrite.commander ! pendingWrite.ack
-
- pendingWrite.release()
- null
+ if (buffer.hasRemaining) {
+ // we weren't able to write all bytes from the buffer, so we need to try again later
+ if (data eq remainingData) this
+ else new PendingBufferWrite(commander, data, ack, buffer, tail) // copy with updated remainingData
+
+ } else if (data.nonEmpty) {
+ buffer.clear()
+ val copied = remainingData.copyToBuffer(buffer)
+ buffer.flip()
+ writeToChannel(remainingData drop copied)
+
+ } else {
+ if (!ack.isInstanceOf[NoAck]) commander ! ack
+ release()
+ PendingWrite(commander, tail)
}
}
-
- try innerWrite(this)
- catch { case e: IOException handleError(info.handler, e); this }
+ try {
+ val next = writeToChannel(remainingData)
+ if (next ne EmptyPendingWrite) info.registration.enableInterest(OP_WRITE)
+ next
+ } catch { case e: IOException handleError(info.handler, e); this }
}
- def hasData = buffer.hasRemaining || remainingData.nonEmpty
- def consume(writtenBytes: Int): PendingBufferWrite =
- if (buffer.hasRemaining) this
- else {
- buffer.clear()
- val copied = remainingData.copyToBuffer(buffer)
- buffer.flip()
- copy(remainingData = remainingData.drop(copied))
- }
+
+ def release(): Unit = bufferPool.release(buffer)
}
- private[io] case class PendingWriteFile(
- commander: ActorRef,
- write: WriteFile,
+ def PendingWriteFile(commander: ActorRef, filePath: String, offset: Long, count: Long, ack: Event,
+ tail: WriteCommand): PendingWriteFile =
+ new PendingWriteFile(commander, new FileInputStream(filePath).getChannel, offset, count, ack, tail)
+
+ class PendingWriteFile(
+ val commander: ActorRef,
fileChannel: FileChannel,
- alreadyWritten: Long) extends PendingWrite {
+ offset: Long,
+ remaining: Long,
+ ack: Event,
+ tail: WriteCommand) extends PendingWrite with Runnable {
def doWrite(info: ConnectionInfo): PendingWrite = {
- tcp.fileIoDispatcher.execute(writeFileRunnable(this))
+ tcp.fileIoDispatcher.execute(this)
this
}
- def ack: Any = write.ack
+ def release(): Unit = fileChannel.close()
- /** Release any open resources */
- def release(): Unit = { fileChannel.close() }
-
- def updatedWrite(nowWritten: Long): PendingWriteFile = {
- require(nowWritten < write.count)
- copy(alreadyWritten = nowWritten)
- }
-
- def remainingBytes = write.count - alreadyWritten
- def currentPosition = write.position + alreadyWritten
- }
-
- private[io] def writeFileRunnable(pendingWrite: PendingWriteFile): Runnable =
- new Runnable {
- def run(): Unit = try {
- import pendingWrite._
- val toWrite = math.min(remainingBytes, tcp.Settings.TransferToLimit)
- val writtenBytes = fileChannel.transferTo(currentPosition, toWrite, channel)
+ def run(): Unit =
+ try {
+ val toWrite = math.min(remaining, tcp.Settings.TransferToLimit)
+ val written = fileChannel.transferTo(offset, toWrite, channel)
- if (writtenBytes < remainingBytes) self ! SendBufferFull(pendingWrite.updatedWrite(alreadyWritten + writtenBytes))
- else { // finished
- if (wantsAck) commander ! write.ack
- self ! WriteFileFinished
+ if (written < remaining) {
+ val updated = new PendingWriteFile(commander, fileChannel, offset + written, remaining - written, ack, tail)
+ self ! UpdatePendingWrite(updated)
- pendingWrite.release()
+ } else {
+ if (!ack.isInstanceOf[NoAck]) commander ! ack
+ release()
+ self ! UpdatePendingWrite(PendingWrite(commander, tail))
}
} catch {
case e: IOException self ! WriteFileFailed(e)
}
- }
+ }
}
/**
@@ -456,22 +452,18 @@ private[io] object TcpConnection {
// INTERNAL MESSAGES
- /** Informs actor that no writing was possible but there is still work remaining */
- case class SendBufferFull(remainingWrite: PendingWrite)
- /** Informs actor that a pending file write has finished */
- case object WriteFileFinished
- /** Informs actor that a pending WriteFile failed */
+ case class UpdatePendingWrite(remainingWrite: PendingWrite)
case class WriteFileFailed(e: IOException)
- /** Abstraction over pending writes */
- trait PendingWrite {
+ sealed abstract class PendingWrite {
def commander: ActorRef
- def ack: Any
-
- def wantsAck = !ack.isInstanceOf[NoAck]
def doWrite(info: ConnectionInfo): PendingWrite
+ def release(): Unit // free any occupied resources
+ }
- /** Release any open resources */
- def release(): Unit
+ object EmptyPendingWrite extends PendingWrite {
+ def commander: ActorRef = ???
+ def doWrite(info: ConnectionInfo): PendingWrite = ???
+ def release(): Unit = ???
}
}
@@ -24,7 +24,7 @@ trait ConnectionHandler extends Actor with ActorLogging {
//# final-stages
def baseCommandPipeline(tcpConnection: ActorRef): Pipeline[Command] = {
- case x: Tcp.Write tcpConnection ! x
+ case x: Tcp.WriteCommand tcpConnection ! x
case Pipeline.Tell(receiver, msg, sender) receiver.tell(msg, sender)
case x: Tcp.CloseCommand tcpConnection ! x
case x @ (Tcp.SuspendReading | Tcp.ResumeReading | Tcp.ResumeWriting) tcpConnection ! x

0 comments on commit 2c77d8f

Please sign in to comment.