Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
regis-leray committed Jun 1, 2020
1 parent 9d76671 commit 3b0ac92
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 5 deletions.
1 change: 1 addition & 0 deletions .dotty-ide-artifact
@@ -0,0 +1 @@
ch.epfl.lamp:dotty-language-server_0.23:0.23.0-RC1
1 change: 1 addition & 0 deletions .dotty-ide.json
@@ -0,0 +1 @@
[ ]
@@ -1,14 +1,29 @@
package zio.stream

import java.net.InetSocketAddress
import java.nio.channels.AsynchronousSocketChannel
import java.nio.file.{ Files, NoSuchFileException, Paths }
import java.nio.{ Buffer, ByteBuffer }
import java.util.concurrent.{ Future => JFuture }

import scala.concurrent.ExecutionContext.global

import zio._
import zio.blocking.effectBlockingIO
import zio.duration._
import zio.test.Assertion._
import zio.test._
import zio.test.{ testM, _ }

object ZStreamPlatformSpecificSpec extends ZIOBaseSpec {

def socketClient[T](port: Int)(f: AsynchronousSocketChannel => JFuture[T]) =
effectBlockingIO(AsynchronousSocketChannel.open()).bracket(c => ZIO.effectTotal(c.close())) { c =>
for {
_ <- ZIO.fromFutureJava(c.connect(new InetSocketAddress("localhost", port)))
_ <- ZIO.fromFutureJava(f(c))
} yield ()
}

def spec = suite("ZStream JVM")(
suite("Constructors")(
testM("effectAsync")(checkM(Gen.listOf(Gen.anyInt)) { list =>
Expand Down Expand Up @@ -191,6 +206,48 @@ object ZStreamPlatformSpecificSpec extends ZIOBaseSpec {
fails(isSubtype[NoSuchFileException](anything))
)
}
),
suite("fromSocketServer")(
testM("read data")(checkM(Gen.anyString) { message =>
for {
refOut <- Ref.make("")

fiber <- ZStream
.fromSocketServer(8889)
.foreach { c =>
c.read
.transduce(ZTransducer.utf8Decode)
.runCollect
.map(_.mkString)
.flatMap(s => refOut.update(_ + s))
}
.fork

_ <- socketClient(8889)(_.write(ByteBuffer.wrap(message.getBytes)))
.retry(Schedule.fixed(10.milliseconds))

receive <- refOut.get.repeat(Schedule.doWhile(_.isEmpty))

_ <- fiber.interrupt
} yield assert(receive)(equalTo(message))
}),
testM("write data")(checkM(Gen.anyString) { message =>
for {
fiber <- ZStream
.fromSocketServer(8889)
.foreach(c => ZStream.fromIterable(message.getBytes()).run(c.write))
.fork

buffer = ByteBuffer.allocate(message.length)

receive <- socketClient(8889)(_.read(buffer)).flatMap { _ =>
(buffer: Buffer).flip()
ZIO.succeed(new String(buffer.array))
}.retry(Schedule.fixed(10.milliseconds))

_ <- fiber.interrupt
} yield assert(receive)(equalTo(message))
})
)
)
)
Expand Down
6 changes: 2 additions & 4 deletions streams/jvm/src/main/scala/zio/stream/platform.scala
@@ -1,13 +1,11 @@
package zio.stream

import java.io.{ IOException, InputStream, OutputStream }

import java.nio.channels.FileChannel
import java.nio.file.Path
import java.net.InetSocketAddress
import java.nio.channels.FileChannel
import java.nio.channels.{ AsynchronousServerSocketChannel, AsynchronousSocketChannel, CompletionHandler }
import java.nio.file.Path
import java.nio.{ Buffer, ByteBuffer }

import java.{ util => ju }

import zio._
Expand Down

0 comments on commit 3b0ac92

Please sign in to comment.