Skip to content

Commit

Permalink
ParAff applicative should abort on uncaught exception (#135)
Browse files Browse the repository at this point in the history
* ParAff applicative should abort on uncaught exception

* Test parallel/throw with never
  • Loading branch information
natefaubion committed Jan 14, 2018
1 parent a048d9c commit 22a3ad8
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
47 changes: 27 additions & 20 deletions src/Control/Monad/Aff.js
Expand Up @@ -19,7 +19,7 @@ var Aff = function () {
| Async ((Either Error a -> Eff eff Unit) -> Eff eff (Canceler eff))
| forall b. Bind (Aff eff b) (b -> Aff eff a)
| forall b. Bracket (Aff eff b) (BracketConditions eff b) (b -> Aff eff a)
| forall b. Fork Boolean (Aff eff b) ?(Thread eff b -> a)
| forall b. Fork Boolean (Aff eff b) ?(Fiber eff b -> a)
| Sequential (ParAff aff a)
*/
Expand Down Expand Up @@ -762,26 +762,33 @@ var Aff = function () {
case APPLY:
lhs = head._1._3;
rhs = head._2._3;
// We can only proceed if both sides have resolved.
if (lhs === EMPTY || rhs === EMPTY) {
return;
}
// If either side resolve with an error, we should continue with
// the first error.
if (util.isLeft(lhs)) {
if (util.isLeft(rhs)) {
if (fail === lhs) {
fail = rhs;
}
} else {
fail = lhs;
}
step = null;
head._3 = fail;
} else if (util.isLeft(rhs)) {
step = null;
fail = rhs;
// If we have a failure we should kill the other side because we
// can't possible yield a result anymore.
if (fail) {
head._3 = fail;
tmp = true;
kid = killId++;

kills[kid] = kill(early, fail === lhs ? head._2 : head._1, function (/* unused */) {
return function () {
delete kills[kid];
if (tmp) {
tmp = false;
} else if (tail === null) {
join(step, null, null);
} else {
join(step, tail._1, tail._2);
}
};
});

if (tmp) {
tmp = false;
return;
}
} else if (lhs === EMPTY || rhs === EMPTY) {
// We can only proceed if both sides have resolved.
return;
} else {
step = util.right(util.fromRight(lhs)(util.fromRight(rhs)));
head._3 = step;
Expand Down
23 changes: 22 additions & 1 deletion test/Test/Main.purs
Expand Up @@ -19,7 +19,7 @@ import Control.Monad.Error.Class (throwError, catchError)
import Control.Parallel (parallel, sequential, parTraverse_)
import Data.Array as Array
import Data.Bifunctor (lmap)
import Data.Either (Either(..), isLeft, isRight)
import Data.Either (Either(..), either, isLeft, isRight)
import Data.Foldable (sum)
import Data.Maybe (Maybe(..))
import Data.Monoid (mempty)
Expand Down Expand Up @@ -64,6 +64,11 @@ assertEq s a aff = liftEff <<< assertEff s <<< map (eq a) =<< try aff
assert eff. String TestAff eff Boolean TestAff eff Unit
assert s aff = liftEff <<< assertEff s =<< try aff

withTimeout eff a. Milliseconds TestAff eff a TestAff eff a
withTimeout ms aff =
either throwError pure =<< sequential do
parallel (try aff) <|> parallel (delay ms $> Left (error "Timed out"))

test_pure eff. TestEff eff Unit
test_pure = runAssertEq "pure" 42 (pure 42)

Expand Down Expand Up @@ -411,6 +416,21 @@ test_parallel = assert "parallel" do
r2 ← joinFiber f1
pure (r1 == "foobar" && r2.a == "foo" && r2.b == "bar")

test_parallel_throw eff. TestAff eff Unit
test_parallel_throw = assert "parallel/throw" $ withTimeout (Milliseconds 100.0) do
ref ← newRef ""
let
action n s = do
delay (Milliseconds n)
modifyRef ref (_ <> s)
pure s
r1 ← try $ sequential $
{ a: _, b: _ }
<$> parallel (action 10.0 "foo" *> throwError (error "Nope"))
<*> parallel never
r2 ← readRef ref
pure (isLeft r1 && r2 == "foo")

test_kill_parallel eff. TestAff eff Unit
test_kill_parallel = assert "kill/parallel" do
ref ← newRef ""
Expand Down Expand Up @@ -641,6 +661,7 @@ main = do
test_kill_finalizer_catch
test_kill_finalizer_bracket
test_parallel
test_parallel_throw
test_kill_parallel
test_parallel_alt
test_parallel_alt_throw
Expand Down

0 comments on commit 22a3ad8

Please sign in to comment.