Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ZStream#zipWithLatest #1482

Merged
merged 23 commits into from
Aug 31, 2019
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ lazy val streams = crossProject(JSPlatform, JVMPlatform)
.dependsOn(core)
.settings(stdSettings("zio-streams"))
.settings(buildInfoSettings)
.settings(replSettings)
.settings(streamReplSettings)
.enablePlugins(BuildInfoPlugin)

lazy val streamsJVM = streams.jvm
Expand All @@ -121,7 +121,7 @@ lazy val streamsTests = crossProject(JSPlatform, JVMPlatform)
.dependsOn(coreTests % "test->test;compile->compile")
.settings(stdSettings("zio-streams-tests"))
.settings(buildInfoSettings)
.settings(replSettings)
.settings(streamReplSettings)
.enablePlugins(BuildInfoPlugin)

lazy val streamsTestsJVM = streamsTests.jvm.dependsOn(coreTestsJVM % "test->compile")
Expand Down
3 changes: 1 addition & 2 deletions core/shared/src/main/scala/zio/ZSchedule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,7 @@ trait ZSchedule[-R, -A, +B] extends Serializable { self =>
object ZSchedule {

/**
* A schedule that recurs forever, producing a count of inputs.
* Not in alphabetic order because other vals below depend on it.
* A schedule that recurs forever, producing a count of repeats: 0, 1, 2, ...
*/
final val forever: Schedule[Any, Int] = unfold(0)(_ + 1)

Expand Down
32 changes: 23 additions & 9 deletions project/BuildHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,28 @@ object BuildHelper {
}
)

val replSettings = Seq(
val replSettings = makeReplSettings {
"""|import zio._
|import zio.console._
|import zio.duration._
|object replRTS extends DefaultRuntime {}
|import replRTS._
|implicit class RunSyntax[R >: replRTS.Environment, E, A](io: ZIO[R, E, A]){ def unsafeRun: A = replRTS.unsafeRun(io) }
""".stripMargin
}

val streamReplSettings = makeReplSettings {
"""|import zio._
|import zio.console._
|import zio.duration._
|import zio.stream._
regiskuckaertz marked this conversation as resolved.
Show resolved Hide resolved
|object replRTS extends DefaultRuntime {}
|import replRTS._
|implicit class RunSyntax[R >: replRTS.Environment, E, A](io: ZIO[R, E, A]){ def unsafeRun: A = replRTS.unsafeRun(io) }
""".stripMargin
}

def makeReplSettings(initialCommandsStr: String) = Seq(
// In the repl most warnings are useless or worse.
// This is intentionally := as it's more direct to enumerate the few
// options we do want than to try to subtract off the ones we don't.
Expand All @@ -73,14 +94,7 @@ object BuildHelper {
"-Xsource:2.13",
"-Yrepl-class-based"
),
initialCommands in Compile in console := """
|import zio._
|import zio.console._
|import zio.duration._
|object replRTS extends DefaultRuntime {}
|import replRTS._
|implicit class RunSyntax[R >: replRTS.Environment, E, A](io: ZIO[R, E, A]){ def unsafeRun: A = replRTS.unsafeRun(io) }
""".stripMargin
initialCommands in Compile in console := initialCommandsStr
)

def extraOptions(scalaVersion: String, optimize: Boolean) =
Expand Down
54 changes: 43 additions & 11 deletions streams-tests/jvm/src/test/scala/zio/stream/StreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,11 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestR
repeat $repeat
short circuits $repeatShortCircuits

Stream.spaced
spaced $spaced
spacedEither $spacedEither
Stream.schedule
scheduleElementsWith $scheduleElementsWith
scheduleElementsEither $scheduleElementsEither
scheduleWith $scheduleWith
scheduleEither $scheduleEither
repeated and spaced $repeatedAndSpaced
short circuits in schedule $spacedShortCircuitsWhileInSchedule
short circuits after schedule $spacedShortCircuitsAfterScheduleFinished
Expand Down Expand Up @@ -236,6 +238,7 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestR
zipWithIndex $zipWithIndex
zipWith ignore RHS $zipWithIgnoreRhs
zipWith prioritizes failure $zipWithPrioritizesFailure
zipWithLatest $zipWithLatest
"""

def aggregate = unsafeRun {
Expand Down Expand Up @@ -1470,34 +1473,50 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestR
} yield result must_=== List(1, 1)
)

private def spaced =
private def scheduleWith =
unsafeRun(
Stream("A", "B", "C")
.spaced(Schedule.recurs(0) *> Schedule.fromFunction((_) => "!"))
.scheduleWith(Schedule.recurs(3) *> Schedule.fromFunction((_) => "!"))(_.toLowerCase, identity)
.run(Sink.collectAll[String])
.map(_ must_=== List("A", "!", "B", "!", "C", "!"))
.map(_ must_=== List("a", "b", "c", "!"))
)

private def spacedEither =
private def scheduleEither =
unsafeRun(
Stream("A", "B", "C")
.spacedEither(Schedule.recurs(0) *> Schedule.fromFunction((_) => 123))
.scheduleEither(Schedule.recurs(3) *> Schedule.fromFunction((_) => "!"))
.run(Sink.collectAll[Either[String, String]])
.map(_ must_=== List(Right("A"), Right("B"), Right("C"), Left("!")))
)

private def scheduleElementsWith =
unsafeRun(
Stream("A", "B", "C")
.scheduleElementsWith(Schedule.recurs(0) *> Schedule.fromFunction((_) => 123))(identity, _.toString)
.run(Sink.collectAll[String])
.map(_ must_=== List("A", "123", "B", "123", "C", "123"))
)

private def scheduleElementsEither =
unsafeRun(
Stream("A", "B", "C")
.scheduleElementsEither(Schedule.recurs(0) *> Schedule.fromFunction((_) => 123))
.run(Sink.collectAll[Either[Int, String]])
.map(_ must_=== List(Right("A"), Left(123), Right("B"), Left(123), Right("C"), Left(123)))
)

private def repeatedAndSpaced =
unsafeRun(
Stream("A", "B", "C")
.spaced(Schedule.recurs(1) *> Schedule.fromFunction((_) => "!"))
.scheduleElements(Schedule.recurs(1) >>> Schedule.fromFunction((_) => "!"))
.run(Sink.collectAll[String])
.map(_ must_=== List("A", "A", "!", "B", "B", "!", "C", "C", "!"))
)

private def spacedShortCircuitsAfterScheduleFinished =
unsafeRun(
Stream("A", "B", "C")
.spaced(Schedule.recurs(1) *> Schedule.fromFunction((_) => "!"))
.scheduleElements(Schedule.recurs(1) *> Schedule.fromFunction((_) => "!"))
.take(3)
.run(Sink.collectAll[String])
.map(_ must_=== List("A", "A", "!"))
Expand All @@ -1506,7 +1525,7 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestR
private def spacedShortCircuitsWhileInSchedule =
unsafeRun(
Stream("A", "B", "C")
.spaced(Schedule.recurs(1) *> Schedule.fromFunction((_) => "!"))
.scheduleElements(Schedule.recurs(1) *> Schedule.fromFunction((_) => "!"))
.take(4)
.run(Sink.collectAll[String])
.map(_ must_=== List("A", "A", "!", "B"))
Expand Down Expand Up @@ -1772,6 +1791,19 @@ class ZStreamSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestR
.map(_ must_=== Left("Ouch"))
}

private def zipWithLatest = unsafeRun {
regiskuckaertz marked this conversation as resolved.
Show resolved Hide resolved
val s1 = Stream.iterate(0)(_ + 1).fixed(100.millis)
val s2 = Stream.iterate(0)(_ + 1).fixed(70.millis)

withLatch { release =>
s1.zipWithLatest(s2)((_, _))
.take(8)
.runCollect
.tap(_ => release)
.map(_ must_=== List(0 -> 0, 0 -> 1, 1 -> 1, 1 -> 2, 2 -> 2, 2 -> 3, 2 -> 4, 3 -> 4))
}
}

private def interleave = unsafeRun {
val s1 = Stream(2, 3)
val s2 = Stream(5, 6, 7)
Expand Down
5 changes: 5 additions & 0 deletions streams/shared/src/main/scala/zio/stream/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ object Stream extends ZStreamPlatformSpecific {
*/
final def halt[E](cause: Cause[E]): Stream[E, Nothing] = fromEffect(ZIO.halt(cause))

/**
* See [[ZStream.iterate]]
*/
final def iterate[A](a: A)(f: A => A): ZStream[Any, Nothing, A] = Stream.unfold(a)(a => Some(a -> f(a)))

/**
* See [[ZStream.managed]]
*/
Expand Down