Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ZStream.fromTcpSocketServer #3649

Closed
iravid opened this issue May 19, 2020 · 5 comments · Fixed by #3677
Closed

Add ZStream.fromTcpSocketServer #3649

iravid opened this issue May 19, 2020 · 5 comments · Fixed by #3677
Labels
enhancement New feature or request stream ZIO Stream streams: file/socket IO

Comments

@iravid
Copy link
Member

iravid commented May 19, 2020

A stream that binds a listening TCP socket and emits a stream of connections wrapped in ZManaged. Something like:

def fromSocketServer(<params>): ZStream[R, Throwable, ZManaged[R, Throwable, Connection]]

This ticket also involves defining the Connection representation. At a minimum, it should sport the following methods:

def read: ZStream[Any, Throwable, Byte]
def write: ZSink[Any, Throwable, Byte, Unit]
@iravid iravid added enhancement New feature or request stream ZIO Stream streams: file/socket IO labels May 19, 2020
@iravid iravid changed the title Add ZStream.fromSocketServer Add ZStream.fromTcpSocketServer May 19, 2020
@regis-leray
Copy link
Member

@iravid can you please explain what is the logic to emit socket connections

@iravid
Copy link
Member Author

iravid commented May 20, 2020

Every time a connection is made to the listening port, it is accepted and represented using ZManaged[Connection], and emitted to the stream.

@regis-leray
Copy link
Member

oki thanks i will try to implement

@regis-leray
Copy link
Member

regis-leray commented May 20, 2020

Something like this @iravid ?

I was wondering if i put this function into trait ZStreamPlatformSpecificConstructors to use ZStream.fromInputStreamEffect() how can i access to ZSink.fromOutputStream() ?

 import blocking._
  import java.net.ServerSocket

  def fromTcpSocketServer[R](port: Int, host: Option[String] = None): ZStream[Blocking, Throwable, Managed[Throwable, Connection]] = {
    val server = Task{
      val serverSocket = new ServerSocket()

      serverSocket.bind(
        host.fold(new InetSocketAddress(port))(new InetSocketAddress(_, port))
      )
      serverSocket
    }

    for{
      srv <- ZStream.fromEffect(server)
      socket <- ZStream.unfoldM(srv)(s => effectBlockingIO(s.accept()).map(conn => Option(conn -> s)))
      conn = new Connection(socket)
    } yield ZManaged.make(ZIO.succeed(conn))(c => ZIO(c.close()).ignore)
  }

  private[zio] class Connection(socket: Socket) {
    def read: ZStream[Blocking, IOException, Byte] = fromInputStreamEffect(ZIO(socket.getInputStream).refineToOrDie[IOException])
    def write: ZSink[Any, Throwable, Byte, Unit] =  ???
    private[zio] def close(): Unit = socket.close()
  }

@iravid
Copy link
Member Author

iravid commented May 20, 2020

@regis-leray Yes, that's a good start, but we have to use NIO :-)

regis-leray added a commit to regis-leray/zio that referenced this issue May 21, 2020
regis-leray added a commit to regis-leray/zio that referenced this issue May 21, 2020
regis-leray added a commit to regis-leray/zio that referenced this issue May 21, 2020
regis-leray added a commit to regis-leray/zio that referenced this issue May 21, 2020
regis-leray added a commit to regis-leray/zio that referenced this issue May 22, 2020
regis-leray added a commit to regis-leray/zio that referenced this issue May 23, 2020
regis-leray added a commit to regis-leray/zio that referenced this issue Jun 1, 2020
iravid pushed a commit that referenced this issue Jun 6, 2020
* #3649 - Add ZStream.fromTcpSocketServer

* add unit test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request stream ZIO Stream streams: file/socket IO
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants