Skip to content

Commit

Permalink
some refactorings after introducing fold1 to an Iteratee
Browse files Browse the repository at this point in the history
  • Loading branch information
sadache committed May 9, 2012
1 parent f22ed25 commit 2091678
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 69 deletions.
Expand Up @@ -65,13 +65,13 @@ object Concurrent {
val ready = interested.zipWithIndex.map {
case (t, index) =>
val p = t._2
t._1.fold(
(a, e) => {
t._1.fold1 {
case Step.Done(a, e) =>
p.redeem(Done(a, e))
commitDone.single.transform(_ :+ index)
Promise.pure(())
},
k => {

case Step.Cont(k) =>
val next = k(in)
next.pureFold(
(a, e) => {
Expand All @@ -83,12 +83,12 @@ object Concurrent {
p.redeem(Error(msg, e))
commitDone.single.transform(_ :+ index)
})
},
(msg, e) => {

case Step.Error(msg, e) =>
p.redeem(Error(msg, e))
commitDone.single.transform(_ :+ index)
Promise.pure(())
}).extend1 {
}.extend1 {
case Redeemed(a) => a
case Thrown(e) => p.throwing(e)
case _ => throw new RuntimeException("should be either Redeemed or Thrown at this point")
Expand Down
Expand Up @@ -78,10 +78,10 @@ object Enumerator {

def enumInput[E](e: Input[E]) = new Enumerator[E] {
def apply[A](i: Iteratee[E, A]): Promise[Iteratee[E, A]] =
i.fold((a, e) => Promise.pure(i),
k => Promise.pure(k(e)),
(_, _) => Promise.pure(i))

i.fold1{
case Step.Cont(k) => Promise.pure(k(e))
case _ => Promise.pure(i)
}
}

def interleave[E1, E2 >: E1](e1: Enumerator[E1], e2: Enumerator[E2]): Enumerator[E2] = new Enumerator[E2] {
Expand Down Expand Up @@ -262,19 +262,14 @@ object Enumerator {

def apply[A](it: Iteratee[E, A]): Promise[Iteratee[E, A]] = {

def step(it: Iteratee[E, A]): Promise[Iteratee[E,A]] = {

it.fold(
(a, e) => Promise.pure(Done(a,e)),
k => {
inner[A](step,k)
},
(msg, e) => Promise.pure(Error(msg,e))
)
def step(it: Iteratee[E, A]): Promise[Iteratee[E,A]] = it.fold1{
case Step.Done(a, e) => Promise.pure(Done(a,e))
case Step.Cont(k) => inner[A](step,k)
case Step.Error(msg, e) => Promise.pure(Error(msg,e))
}

step(it)
}

}

trait TreatCont1[E,S]{
Expand All @@ -287,15 +282,10 @@ object Enumerator {

def apply[A](it: Iteratee[E, A]): Promise[Iteratee[E, A]] = {

def step(it: Iteratee[E, A], state:S): Promise[Iteratee[E,A]] = {

it.fold(
(a, e) => Promise.pure(Done(a,e)),
k => {
inner[A](step,state,k)
},
(msg, e) => Promise.pure(Error(msg,e))
)
def step(it: Iteratee[E, A], state:S): Promise[Iteratee[E,A]] = it.fold1{
case Step.Done(a, e) => Promise.pure(Done(a,e))
case Step.Cont(k) => inner[A](step,state,k)
case Step.Error(msg, e) => Promise.pure(Error(msg,e))
}
step(it,s)
}
Expand All @@ -311,9 +301,8 @@ object Enumerator {

def step(it: Iteratee[E, A], initial: Boolean = false) {

val next = it.fold(
(a, e) => { iterateeP.redeem(it); Promise.pure(None) },
k => {
val next = it.fold1 {
case Step.Cont(k) => {
retriever(initial).map {
case None => {
val remainingIteratee = k(Input.EOF)
Expand All @@ -325,9 +314,9 @@ object Enumerator {
Some(nextIteratee)
}
}
},
(_, _) => { iterateeP.redeem(it); Promise.pure(None) }
)
}
case _ => { iterateeP.redeem(it); Promise.pure(None) }
}

next.extend1 {
case Redeemed(Some(i)) => step(i)
Expand All @@ -351,9 +340,8 @@ object Enumerator {

def step(it: Iteratee[E, A]) {

val next = it.fold(
(a, e) => { iterateeP.redeem(it); Promise.pure(None) },
k => {
val next = it.fold1{
case Step.Cont(k) => {
retriever().map {
case None => {
val remainingIteratee = k(Input.EOF)
Expand All @@ -365,9 +353,9 @@ object Enumerator {
Some(nextIteratee)
}
}
},
(_, _) => { iterateeP.redeem(it); Promise.pure(None) }
)
}
case _ => { iterateeP.redeem(it); Promise.pure(None) }
}

next.extend1 {
case Redeemed(Some(i)) => step(i)
Expand Down
Expand Up @@ -11,9 +11,8 @@ object Iteratee {
*/
def flatten[E, A](i: Promise[Iteratee[E, A]]): Iteratee[E, A] = new Iteratee[E, A] {

def fold[B](done: (A, Input[E]) => Promise[B],
cont: (Input[E] => Iteratee[E, A]) => Promise[B],
error: (String, Input[E]) => Promise[B]): Promise[B] = i.flatMap(_.fold(done, cont, error))
def fold1[B](folder: Step[E,A] => Promise[B]): Promise[B] = i.flatMap(_.fold1(folder))

}

def isDoneOrError[E, A](it: Iteratee[E, A]): Promise[Boolean] = it.pureFold((_, _) => true, _ => false, (_, _) => true)
Expand Down Expand Up @@ -223,11 +222,15 @@ trait Iteratee[E, +A] {
*
* @return a [[play.api.libs.concurrent.Promise]] of the eventually computed result
*/
def run[AA >: A]: Promise[AA] = fold((a, _) => Promise.pure(a),
k => k(Input.EOF).fold((a1, _) => Promise.pure(a1),
_ => sys.error("diverging iteratee after Input.EOF"),
(msg, e) => sys.error(msg)),
(msg, e) => sys.error(msg))
def run[AA >: A]: Promise[AA] = fold1({
case Step.Done(a,_) => Promise.pure(a)
case Step.Cont(k) => k(Input.EOF).fold1({
case Step.Done(a1,_) => Promise.pure(a1)
case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF")
case Step.Error(msg,e) => sys.error(msg)
})
case Step.Error(msg,e) => sys.error(msg)
})

def feed[AA >: A](in: Input[E]): Promise[Iteratee[E, AA]] = {
Enumerator.enumInput(in) |>> this
Expand All @@ -243,12 +246,13 @@ trait Iteratee[E, +A] {
*/
def fold[B](done: (A, Input[E]) => Promise[B],
cont: (Input[E] => Iteratee[E, A]) => Promise[B],
error: (String, Input[E]) => Promise[B]): Promise[B]
error: (String, Input[E]) => Promise[B]): Promise[B] = fold1({
case Step.Done(a,e) => done(a,e)
case Step.Cont(k) => cont(k)
case Step.Error(msg,e) => error(msg,e)
})

def fold1[B](folder: Step[E,A] => Promise[B]): Promise[B] = fold(
(a,e) => folder(Step.Done(a,e)),
k => folder(Step.Cont(k)),
(msg,e) => folder(Step.Error(msg,e)))
def fold1[B](folder: Step[E,A] => Promise[B]): Promise[B]

/**
* Like fold but taking functions returning pure values (not in promises)
Expand Down Expand Up @@ -366,9 +370,8 @@ object Done {
* @param e Remaining unused input
*/
def apply[E, A](a: A, e: Input[E]): Iteratee[E, A] = new Iteratee[E, A] {
def fold[B](done: (A, Input[E]) => Promise[B],
cont: (Input[E] => Iteratee[E, A]) => Promise[B],
error: (String, Input[E]) => Promise[B]): Promise[B] = done(a, e)

def fold1[B](folder: Step[E,A] => Promise[B]): Promise[B] = folder(Step.Done(a,e))

}

Expand All @@ -380,9 +383,8 @@ object Cont {
* @param k Continuation which will compute the next Iteratee state according to an input
*/
def apply[E, A](k: Input[E] => Iteratee[E, A]): Iteratee[E, A] = new Iteratee[E, A] {
def fold[B](done: (A, Input[E]) => Promise[B],
cont: (Input[E] => Iteratee[E, A]) => Promise[B],
error: (String, Input[E]) => Promise[B]): Promise[B] = cont(k)

def fold1[B](folder: Step[E,A] => Promise[B]): Promise[B] = folder(Step.Cont(k))

}
}
Expand All @@ -393,9 +395,8 @@ object Error {
* @param e The input that caused the error
*/
def apply[E](msg: String, e: Input[E]): Iteratee[E, Nothing] = new Iteratee[E, Nothing] {
def fold[B](done: (Nothing, Input[E]) => Promise[B],
cont: (Input[E] => Iteratee[E, Nothing]) => Promise[B],
error: (String, Input[E]) => Promise[B]): Promise[B] = error(msg, e)

def fold1[B](folder: Step[E,Nothing] => Promise[B]): Promise[B] = folder(Step.Error(msg,e))

}
}
Expand Down
Expand Up @@ -42,16 +42,17 @@ private[server] trait WebSocketHandler {
(a, e) => { sys.error("Getting messages on a supposedly closed socket? frame: " + input) },
k => {
val next = k(input)
next.fold(
(a, e) => {
next.fold1 {
case Step.Done(a, e) =>
iterateeAgent.close()
ctx.getChannel().disconnect();
promise.redeem(next);
Logger("play").trace("cleaning for channel " + ctx.getChannel());
Promise.pure(next)
},
_ => Promise.pure(next),
(msg, e) => { /* deal with error, maybe close the socket */ Promise.pure(next) })

case Step.Cont(_) => Promise.pure(next)
case Step.Error(msg, e) => { /* deal with error, maybe close the socket */ Promise.pure(next) }
}
},
(err, e) => /* handle error, maybe close the socket */ Promise.pure(it))))
}
Expand Down

0 comments on commit 2091678

Please sign in to comment.