From 4b2ee053b1ea392836a3e32a465c32d59036d2da Mon Sep 17 00:00:00 2001 From: Vladimir Shevtsov Date: Wed, 14 Aug 2019 04:04:40 +0300 Subject: [PATCH] Better representation for SelectionKey operations #21 --- .../zio/nio/channels/SelectableChannel.scala | 18 +++++++-- .../scala/zio/nio/channels/SelectionKey.scala | 38 +++++++++++++++---- src/test/resources/async_file_write_test.txt | 1 + src/test/scala/zio/nio/SelectorSuite.scala | 11 +++--- 4 files changed, 51 insertions(+), 17 deletions(-) create mode 100644 src/test/resources/async_file_write_test.txt diff --git a/src/main/scala/zio/nio/channels/SelectableChannel.scala b/src/main/scala/zio/nio/channels/SelectableChannel.scala index 677b94de..d8e13d13 100644 --- a/src/main/scala/zio/nio/channels/SelectableChannel.scala +++ b/src/main/scala/zio/nio/channels/SelectableChannel.scala @@ -9,6 +9,7 @@ import java.nio.channels.{ SocketChannel => JSocketChannel } +import zio.nio.channels.SelectionKey.Operation import zio.nio.channels.spi.SelectorProvider import zio.nio.{ Buffer, SocketAddress, SocketOption } import zio.{ IO, UIO } @@ -27,12 +28,21 @@ class SelectableChannel(private val channel: JSelectableChannel) { final def keyFor(sel: Selector): UIO[Option[SelectionKey]] = IO.effectTotal(Option(channel.keyFor(sel.selector)).map(new SelectionKey(_))) - final def register(sel: Selector, ops: Int, att: Option[AnyRef]): IO[IOException, SelectionKey] = - IO.effect(new SelectionKey(channel.register(sel.selector, ops, att.orNull))) + final def register(sel: Selector, ops: Set[Operation], att: Option[AnyRef]): IO[IOException, SelectionKey] = + IO.effect(new SelectionKey(channel.register(sel.selector, Operation.toInt(ops), att.orNull))) .refineToOrDie[IOException] - final def register(sel: Selector, ops: Int): IO[IOException, SelectionKey] = - IO.effect(new SelectionKey(channel.register(sel.selector, ops))).refineToOrDie[IOException] + final def register(sel: Selector, ops: Set[Operation]): IO[IOException, SelectionKey] = + IO.effect(new SelectionKey(channel.register(sel.selector, Operation.toInt(ops)))) + .refineToOrDie[IOException] + + final def register(sel: Selector, op: Operation, att: Option[AnyRef]): IO[IOException, SelectionKey ] = + IO.effect(new SelectionKey(channel.register(sel.selector, op.intVal, att.orNull))) + .refineToOrDie[IOException] + + final def register(sel: Selector, op: Operation): IO[IOException, SelectionKey ] = + IO.effect(new SelectionKey(channel.register(sel.selector, op.intVal))) + .refineToOrDie[IOException] final def configureBlocking(block: Boolean): IO[IOException, SelectableChannel] = IO.effect(new SelectableChannel(channel.configureBlocking(block))).refineToOrDie[IOException] diff --git a/src/main/scala/zio/nio/channels/SelectionKey.scala b/src/main/scala/zio/nio/channels/SelectionKey.scala index 9a5f2165..8f033f69 100644 --- a/src/main/scala/zio/nio/channels/SelectionKey.scala +++ b/src/main/scala/zio/nio/channels/SelectionKey.scala @@ -14,6 +14,24 @@ object SelectionKey { case e: CancelledKeyException => e } + sealed abstract class Operation(val intVal: Int) + object Operation { + + final case object Read extends Operation(JSelectionKey.OP_READ) + final case object Write extends Operation(JSelectionKey.OP_WRITE) + final case object Connect extends Operation(JSelectionKey.OP_CONNECT) + final case object Accept extends Operation(JSelectionKey.OP_ACCEPT) + + final val fullSet: Set[Operation] = Set(Read, Write, Connect, Accept) + + def fromInt(ops: Int): Set[Operation] = { + fullSet.filter(op => (ops & op.intVal) != 0) + } + + def toInt(set: Set[Operation]): Int = { + set.foldLeft(0) ((ops, op) => ops | op.intVal) + } + } } class SelectionKey(private[nio] val selectionKey: JSelectionKey) { @@ -32,16 +50,20 @@ class SelectionKey(private[nio] val selectionKey: JSelectionKey) { final val cancel: UIO[Unit] = IO.effectTotal(selectionKey.cancel()) - final val interestOps: IO[CancelledKeyException, Int] = - IO.effect(selectionKey.interestOps()).refineToOrDie[CancelledKeyException] - - final def interestOps(ops: Int): IO[CancelledKeyException, SelectionKey] = - IO.effect(selectionKey.interestOps(ops)) - .map(new SelectionKey(_)) + final val interestOps: IO[CancelledKeyException, Set[Operation]] = + IO.effectTotal(selectionKey.interestOps()) + .map(Operation.fromInt(_)) .refineToOrDie[CancelledKeyException] - final val readyOps: IO[CancelledKeyException, Int] = - IO.effect(selectionKey.readyOps()).refineToOrDie[CancelledKeyException] + final def interestOps(ops: Set[Operation]): IO[CancelledKeyException, Unit] = + IO.effect(selectionKey.interestOps(Operation.toInt(ops))) + .unit + .refineToOrDie[CancelledKeyException] + + final val readyOps: IO[CancelledKeyException, Set[Operation]] = + IO.effect(selectionKey.readyOps()) + .map(Operation.fromInt(_)) + .refineToOrDie[CancelledKeyException] final def isReadable: IO[CancelledKeyException, Boolean] = IO.effect(selectionKey.isReadable()).refineOrDie(JustCancelledKeyException) diff --git a/src/test/resources/async_file_write_test.txt b/src/test/resources/async_file_write_test.txt new file mode 100644 index 00000000..5e1c309d --- /dev/null +++ b/src/test/resources/async_file_write_test.txt @@ -0,0 +1 @@ +Hello World \ No newline at end of file diff --git a/src/test/scala/zio/nio/SelectorSuite.scala b/src/test/scala/zio/nio/SelectorSuite.scala index b099e151..3879407a 100644 --- a/src/test/scala/zio/nio/SelectorSuite.scala +++ b/src/test/scala/zio/nio/SelectorSuite.scala @@ -1,11 +1,12 @@ package zio.nio -import java.nio.channels.{ CancelledKeyException, SelectionKey => JSelectionKey, SocketChannel => JSocketChannel } +import java.nio.channels.{CancelledKeyException, SocketChannel => JSocketChannel} +import testz.{Harness, assert} import zio._ import zio.clock.Clock -import testz.{ Harness, assert } -import zio.nio.channels.{ Selector, ServerSocketChannel, SocketChannel } +import zio.nio.channels.SelectionKey.Operation +import zio.nio.channels.{Selector, ServerSocketChannel, SocketChannel} object SelectorSuite extends DefaultRuntime { @@ -36,7 +37,7 @@ object SelectorSuite extends DefaultRuntime { clientOpt <- channel.accept client = clientOpt.get _ <- client.configureBlocking(false) - _ <- client.register(selector, JSelectionKey.OP_READ) + _ <- client.register(selector, Operation.Read) } yield () } *> IO.whenM(safeStatusCheck(key.isReadable)) { @@ -62,7 +63,7 @@ object SelectorSuite extends DefaultRuntime { channel <- ServerSocketChannel.open _ <- channel.bind(address) _ <- channel.configureBlocking(false) - _ <- channel.register(selector, JSelectionKey.OP_ACCEPT) + _ <- channel.register(selector, Operation.Accept) buffer <- Buffer.byte(256) _ <- started.succeed(())