Skip to content

Commit

Permalink
Merge branch 'master' into bug/channel-halt
Browse files Browse the repository at this point in the history
Conflicts:
	src/test/scala/scalaz/stream/ProcessSpec.scala
  • Loading branch information
djspiewak committed Jun 14, 2015
2 parents 83ece9d + 5947ad6 commit 9f6a325
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 8 deletions.
26 changes: 20 additions & 6 deletions src/main/scala/scalaz/stream/Process.scala
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,20 @@ sealed trait Process[+F[_], +O]

}



final def uncons[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[(O2, Process[F2, O2])] =
unconsOption(F, C).flatMap(_.map(F.point[(O2, Process[F2, O2])](_)).getOrElse(C.fail(new NoSuchElementException)))

final def unconsOption[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Option[(O2, Process[F2, O2])]] = step match {
case Step(head, next) => head match {
case Emit(as) => as.headOption.map(x => F.point[Option[(O2, Process[F2, O2])]](Some((x, Process.emitAll[O2](as drop 1) +: next)))) getOrElse
next.continue.unconsOption
case await: Await[F2, _, O2] => await.evaluate.flatMap(p => (p +: next).unconsOption(F,C))
}
case Halt(cause) => cause match {
case End | Kill => F.point(None)
case _ : EarlyCause => C.fail(cause.asThrowable)
}
}

///////////////////////////////////////////
//
Expand All @@ -475,10 +487,7 @@ sealed trait Process[+F[_], +O]
go(cont.continue.asInstanceOf[Process[F2,O]], nacc)
}
case (awt:Await[F2,Any,O]@unchecked, cont) =>
F.bind(C.attempt(awt.req)) { r =>
go((Try(awt.rcv(EarlyCause.fromTaskResult(r)).run) +: cont).asInstanceOf[Process[F2,O]]
, acc)
}
awt.evaluate.flatMap(p => go(p +: cont, acc))
}
case Halt(End) => F.point(acc)
case Halt(Kill) => F.point(acc)
Expand Down Expand Up @@ -613,6 +622,11 @@ object Process extends ProcessInstances {
*/
def extend[F2[x] >: F[x], O2](f: Process[F, O] => Process[F2, O2]): Await[F2, A, O2] =
Await[F2, A, O2](req, r => Trampoline.suspend(rcv(r)).map(f), preempt)

def evaluate[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Process[F2,O2]] =
C.attempt(req).map { e =>
rcv(EarlyCause.fromTaskResult(e)).run
}
}


Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/scalaz/stream/io.scala
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,9 @@ object io {

case Step(Emit(_), _) => assert(false) // this is impossible, according to the types

case Step(Await(request, receive, _), cont) => { // todo: ??? Cleanup
case Step(await: Await[Task,_,ByteVector], cont) => { // todo: ??? Cleanup
// yay! run the Task
cur = Util.Try(receive(EarlyCause.fromTaskResult(request.attempt.run)).run) +: cont
cur = Util.Try(await.evaluate.run) +: cont
close()
}
}
Expand Down
56 changes: 56 additions & 0 deletions src/test/scala/scalaz/stream/ProcessSpec.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package scalaz.stream

import java.util.NoSuchElementException

import org.scalacheck.Prop._

import Cause._
Expand Down Expand Up @@ -339,6 +341,60 @@ class ProcessSpec extends Properties("Process") {
((halt pipe process1.id).runLog timed 3000 map { _.toList }).attempt.run === (halt.runLog timed 3000 map { _.toList }).attempt.run
}

property("uncons (constant stream)") = secure {
val process: Process[Task, Int] = Process(1,2,3)
val result = process.uncons
// Not sure why I need to use .equals() here instead of using ==
result.run.equals((1, Process(2,3)))
}
property("uncons (async stream v1)") = secure {
val task = Task.now(1)
val process = Process.await(task)(Process.emit(_) ++ Process(2,3))
val result = process.uncons
val (a, newProcess) = result.run
a == 1 && newProcess.runLog.run == Seq(2,3)
}
property("uncons (async stream v2)") = secure {
val task = Task.now(1)
val process = Process.await(task)(a => Process(2,3).prepend(Seq(a)))
val result = process.uncons
val (a, newProcess) = result.run
a == 1 && newProcess.runLog.run == Seq(2,3)
}
property("uncons (mutable queue)") = secure {
import scalaz.stream.async
import scala.concurrent.duration._
val q = async.unboundedQueue[Int]
val process = q.dequeue
val result = process.uncons
q.enqueueAll(List(1,2,3)).timed(1.second).run
q.close.run
val (a, newProcess) = result.timed(1.second).run
val newProcessResult = newProcess.runLog.timed(1.second).run
a == 1 && newProcessResult == Seq(2,3)
}
property("uncons (mutable queue) v2") = secure {
import scalaz.stream.async
import scala.concurrent.duration._
val q = async.unboundedQueue[Int]
val process = q.dequeue
val result = process.uncons
q.enqueueOne(1).timed(1.second).run
val (a, newProcess) = result.timed(1.second).run
a == 1
}
property("uncons should throw a NoSuchElementException if Process is empty") = secure {
val process = Process.empty[Task, Int]
val result = process.uncons
try {result.run; false} catch { case _: NoSuchElementException => true case _ : Throwable => false}
}
property("uncons should propogate failure if stream fails") = secure {
case object TestException extends java.lang.Exception
val process: Process[Task, Int] = Process.fail(TestException)
val result = process.uncons
try {result.run; false} catch { case TestException => true; case _ : Throwable => false}
}

property("to.halt") = secure {
var count = 0

Expand Down

0 comments on commit 9f6a325

Please sign in to comment.