From 8a969f1b8b881ec6f669cbcc1231d63c2b8dd8d2 Mon Sep 17 00:00:00 2001 From: regis-leray Date: Thu, 21 May 2020 10:05:23 -0400 Subject: [PATCH] #3649 - Add ZStream.fromTcpSocketServer --- .../src/main/scala/zio/stream/platform.scala | 52 ++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/streams/jvm/src/main/scala/zio/stream/platform.scala b/streams/jvm/src/main/scala/zio/stream/platform.scala index f247a68aaf0c..35678a298019 100644 --- a/streams/jvm/src/main/scala/zio/stream/platform.scala +++ b/streams/jvm/src/main/scala/zio/stream/platform.scala @@ -1,10 +1,13 @@ package zio.stream import java.io.{ IOException, InputStream, OutputStream } +import java.net.{ InetSocketAddress } +import java.nio.{ Buffer, ByteBuffer } +import java.nio.channels.{ ServerSocketChannel, SocketChannel } import java.{ util => ju } import zio._ -import zio.blocking.Blocking +import zio.blocking.{ effectBlockingIO, Blocking } trait ZSinkPlatformSpecificConstructors { self: ZSink.type => @@ -236,4 +239,51 @@ trait ZStreamPlatformSpecificConstructors { self: ZStream.type => */ final def fromJavaStreamTotal[A](stream: => ju.stream.Stream[A]): ZStream[Any, Nothing, A] = ZStream.fromJavaIteratorTotal(stream.iterator()) + + /** + * Create a stream of accepted connection from server socket + * Emit socket `Connection` from which you can read / write and ensure it is closed after it is used + */ + def fromSocketServer( + port: Int, + host: Option[String] = None + ): ZStream[Blocking, Throwable, Managed[Throwable, Connection]] = + for { + server <- ZStream.managed(ZManaged.fromAutoCloseable(Task { + val serverSocket = ServerSocketChannel.open() + serverSocket.configureBlocking(false) + serverSocket + .socket() + .bind( + host.fold(new InetSocketAddress(port))(new InetSocketAddress(_, port)) + ) + serverSocket + })) + + socket <- ZStream + .unfoldM(server)(s => effectBlockingIO(Option(s.accept())).map(sc => Option(sc -> s))) + .mapConcat(_.toList) + conn = new Connection(socket) + } yield Managed.make(ZIO.succeed(conn))(c => ZIO.effectTotal(c.close())) + + private[zio] class Connection(socket: SocketChannel) { + def read: ZStream[Blocking, IOException, Byte] = + ZStream.unfoldChunkM(0) { + case -1 => ZIO.succeed(Option.empty) + case _ => + val buff = ByteBuffer.allocate(2048) + effectBlockingIO(socket.read(buff)).map { bytesRead => + (buff: Buffer).flip() + Some(Chunk.fromByteBuffer(buff) -> bytesRead) + } + } + + def write: ZSink[Blocking, IOException, Byte, Int] = + ZSink.foldLeftChunksM(0) { + case (_, c) => + effectBlockingIO(socket.write(ByteBuffer.wrap(c.toArray))) + } + + private[zio] def close(): Unit = socket.close() + } }