Skip to content

Commit

Permalink
#3649 - Add ZStream.fromTcpSocketServer
Browse files Browse the repository at this point in the history
  • Loading branch information
regis-leray committed May 21, 2020
1 parent 6d0ee04 commit f107be1
Showing 1 changed file with 80 additions and 0 deletions.
80 changes: 80 additions & 0 deletions streams/jvm/src/main/scala/zio/stream/platform.scala
@@ -1,6 +1,9 @@
package zio.stream

import java.io.{ IOException, InputStream, OutputStream }
import java.net.InetSocketAddress
import java.nio.channels.{ AsynchronousServerSocketChannel, AsynchronousSocketChannel, CompletionHandler }
import java.nio.{ Buffer, ByteBuffer }
import java.{ util => ju }

import zio._
Expand Down Expand Up @@ -236,4 +239,81 @@ trait ZStreamPlatformSpecificConstructors { self: ZStream.type =>
*/
final def fromJavaStreamTotal[A](stream: => ju.stream.Stream[A]): ZStream[Any, Nothing, A] =
ZStream.fromJavaIteratorTotal(stream.iterator())

/**
* Create a stream of accepted connection from server socket
* Emit socket `Connection` from which you can read / write and ensure it is closed after it is used
*/
def fromSocketServer(
port: Int,
host: Option[String] = None
): ZStream[Blocking, Throwable, Managed[Throwable, Connection]] =
for {
server <- ZStream.managed(ZManaged.fromAutoCloseable(Task {
AsynchronousServerSocketChannel
.open()
.bind(
host.fold(new InetSocketAddress(port))(new InetSocketAddress(_, port))
)
}))

conn <- ZStream.unfoldM(server) { s =>
IO.effectAsync[Throwable, Connection] { callback =>
s.accept(
null,
new CompletionHandler[AsynchronousSocketChannel, Void]() {
self =>
override def completed(result: AsynchronousSocketChannel, attachment: Void): Unit =
callback(ZIO.succeed(new Connection(result)))

override def failed(exc: Throwable, attachment: Void): Unit = callback(ZIO.fail(exc))
}
)
}
.map(c => Option(c -> s))
}

} yield Managed.make(ZIO.succeed(conn))(c => ZIO.effectTotal(c.close()))

private[zio] class Connection(socket: AsynchronousSocketChannel) {
def read: ZStream[Blocking, Throwable, Byte] =
ZStream.unfoldChunkM(0) {
case -1 => ZIO.succeed(Option.empty)
case _ =>
val buff = ByteBuffer.allocate(ZStream.DefaultChunkSize)

IO.effectAsync[Throwable, Option[(Chunk[Byte], Int)]] { callback =>
socket.read(
buff,
null,
new CompletionHandler[Integer, Void] {
override def completed(bytesRead: Integer, attachment: Void): Unit = {
(buff: Buffer).flip()
callback(ZIO.succeed(Option(Chunk.fromByteBuffer(buff) -> bytesRead.toInt)))
}

override def failed(error: Throwable, attachment: Void): Unit = callback(ZIO.fail(error))
}
)
}
}

def write: ZSink[Blocking, Throwable, Byte, Int] =
ZSink.foldLeftChunksM(0) {
case (_, c) =>
IO.effectAsync[Throwable, Int] { callback =>
socket.write(
ByteBuffer.wrap(c.toArray),
null,
new CompletionHandler[Integer, Void] {
override def completed(result: Integer, attachment: Void): Unit = callback(ZIO.succeed(result.toInt))

override def failed(error: Throwable, attachment: Void): Unit = callback(ZIO.fail(error))
}
)
}
}

private[zio] def close(): Unit = socket.close()
}
}

0 comments on commit f107be1

Please sign in to comment.