diff --git a/streams/jvm/src/main/scala/zio/stream/platform.scala b/streams/jvm/src/main/scala/zio/stream/platform.scala index f247a68aaf0c..dd725a8e156e 100644 --- a/streams/jvm/src/main/scala/zio/stream/platform.scala +++ b/streams/jvm/src/main/scala/zio/stream/platform.scala @@ -1,6 +1,9 @@ package zio.stream import java.io.{ IOException, InputStream, OutputStream } +import java.net.InetSocketAddress +import java.nio.channels.{ AsynchronousServerSocketChannel, AsynchronousSocketChannel, CompletionHandler } +import java.nio.{ Buffer, ByteBuffer } import java.{ util => ju } import zio._ @@ -236,4 +239,81 @@ trait ZStreamPlatformSpecificConstructors { self: ZStream.type => */ final def fromJavaStreamTotal[A](stream: => ju.stream.Stream[A]): ZStream[Any, Nothing, A] = ZStream.fromJavaIteratorTotal(stream.iterator()) + + /** + * Create a stream of accepted connection from server socket + * Emit socket `Connection` from which you can read / write and ensure it is closed after it is used + */ + def fromSocketServer( + port: Int, + host: Option[String] = None + ): ZStream[Blocking, Throwable, Managed[Throwable, Connection]] = + for { + server <- ZStream.managed(ZManaged.fromAutoCloseable(Task { + AsynchronousServerSocketChannel + .open() + .bind( + host.fold(new InetSocketAddress(port))(new InetSocketAddress(_, port)) + ) + })) + + conn <- ZStream.unfoldM(server) { s => + IO.effectAsync[Throwable, Connection] { callback => + s.accept( + null, + new CompletionHandler[AsynchronousSocketChannel, Void]() { + self => + override def completed(result: AsynchronousSocketChannel, attachment: Void): Unit = + callback(ZIO.succeed(new Connection(result))) + + override def failed(exc: Throwable, attachment: Void): Unit = callback(ZIO.fail(exc)) + } + ) + } + .map(c => Option(c -> s)) + } + + } yield Managed.make(ZIO.succeed(conn))(c => ZIO.effectTotal(c.close())) + + private[zio] class Connection(socket: AsynchronousSocketChannel) { + def read: ZStream[Blocking, Throwable, Byte] = + ZStream.unfoldChunkM(0) { + case -1 => ZIO.succeed(Option.empty) + case _ => + val buff = ByteBuffer.allocate(ZStream.DefaultChunkSize) + + IO.effectAsync[Throwable, Option[(Chunk[Byte], Int)]] { callback => + socket.read( + buff, + null, + new CompletionHandler[Integer, Void] { + override def completed(bytesRead: Integer, attachment: Void): Unit = { + (buff: Buffer).flip() + callback(ZIO.succeed(Option(Chunk.fromByteBuffer(buff) -> bytesRead.toInt))) + } + + override def failed(error: Throwable, attachment: Void): Unit = callback(ZIO.fail(error)) + } + ) + } + } + + def write: ZSink[Blocking, Throwable, Byte, Int] = + ZSink.foldLeftChunksM(0) { + case (_, c) => + IO.effectAsync[Throwable, Int] { callback => + socket.write( + ByteBuffer.wrap(c.toArray), + null, + new CompletionHandler[Integer, Void] { + override def completed(result: Integer, attachment: Void): Unit = callback(ZIO.succeed(result.toInt)) + + override def failed(error: Throwable, attachment: Void): Unit = callback(ZIO.fail(error)) + } + ) + } + } + + private[zio] def close(): Unit = socket.close() + } }