Skip to content

Commit

Permalink
zio#3649 - Add ZStream.fromTcpSocketServer
Browse files Browse the repository at this point in the history
  • Loading branch information
regis-leray committed May 21, 2020
1 parent 6d0ee04 commit 8a969f1
Showing 1 changed file with 51 additions and 1 deletion.
52 changes: 51 additions & 1 deletion streams/jvm/src/main/scala/zio/stream/platform.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package zio.stream

import java.io.{ IOException, InputStream, OutputStream }
import java.net.{ InetSocketAddress }
import java.nio.{ Buffer, ByteBuffer }
import java.nio.channels.{ ServerSocketChannel, SocketChannel }
import java.{ util => ju }

import zio._
import zio.blocking.Blocking
import zio.blocking.{ effectBlockingIO, Blocking }

trait ZSinkPlatformSpecificConstructors { self: ZSink.type =>

Expand Down Expand Up @@ -236,4 +239,51 @@ trait ZStreamPlatformSpecificConstructors { self: ZStream.type =>
*/
final def fromJavaStreamTotal[A](stream: => ju.stream.Stream[A]): ZStream[Any, Nothing, A] =
ZStream.fromJavaIteratorTotal(stream.iterator())

/**
* Create a stream of accepted connection from server socket
* Emit socket `Connection` from which you can read / write and ensure it is closed after it is used
*/
def fromSocketServer(
port: Int,
host: Option[String] = None
): ZStream[Blocking, Throwable, Managed[Throwable, Connection]] =
for {
server <- ZStream.managed(ZManaged.fromAutoCloseable(Task {
val serverSocket = ServerSocketChannel.open()
serverSocket.configureBlocking(false)
serverSocket
.socket()
.bind(
host.fold(new InetSocketAddress(port))(new InetSocketAddress(_, port))
)
serverSocket
}))

socket <- ZStream
.unfoldM(server)(s => effectBlockingIO(Option(s.accept())).map(sc => Option(sc -> s)))
.mapConcat(_.toList)
conn = new Connection(socket)
} yield Managed.make(ZIO.succeed(conn))(c => ZIO.effectTotal(c.close()))

private[zio] class Connection(socket: SocketChannel) {
def read: ZStream[Blocking, IOException, Byte] =
ZStream.unfoldChunkM(0) {
case -1 => ZIO.succeed(Option.empty)
case _ =>
val buff = ByteBuffer.allocate(2048)
effectBlockingIO(socket.read(buff)).map { bytesRead =>
(buff: Buffer).flip()
Some(Chunk.fromByteBuffer(buff) -> bytesRead)
}
}

def write: ZSink[Blocking, IOException, Byte, Int] =
ZSink.foldLeftChunksM(0) {
case (_, c) =>
effectBlockingIO(socket.write(ByteBuffer.wrap(c.toArray)))
}

private[zio] def close(): Unit = socket.close()
}
}

0 comments on commit 8a969f1

Please sign in to comment.