Permalink
Browse files

Adapt coroutine interface and swap suspend and resume.

  • Loading branch information...
pcapriotti committed Mar 31, 2012
1 parent bc54caf commit c38cabcd94675028c0dbb6ebd7757e378b97e5a0
Showing with 28 additions and 38 deletions.
  1. +8 −0 pipes-extra/Control/Pipe/ChunkPipe.hs
  2. +17 −35 pipes-extra/Control/Pipe/Coroutine.hs
  3. +3 −3 pipes-extra/Control/Pipe/Zip.hs
@@ -6,10 +6,13 @@
module Control.Pipe.ChunkPipe (
ChunkPipe(..),
nonchunked,
+ nonchunkedMaybe,
) where
import Control.Pipe
+import Control.Monad
import Data.Monoid
+import Data.Maybe
-- | Newtype wrapper for Pipe proving a monad instance that takes care of
-- passing leftover data automatically.
@@ -27,3 +30,8 @@ instance (Monoid a, Monad m) => Monad (ChunkPipe a b m) where
-- input.
nonchunked :: (Monoid a, Monad m) => Pipe a b m r -> ChunkPipe a b m r
nonchunked p = ChunkPipe $ p >>= \r -> return (mempty, r)
+
+nonchunkedMaybe :: Monad m => Pipe a b m r -> ChunkPipe (First a) b m r
+nonchunkedMaybe p = nonchunked $ ignoreNothing >+> p
+ where
+ ignoreNothing = forever $ await >>= maybe (return ()) yield . getFirst
@@ -1,10 +1,9 @@
{-# LANGUAGE DeriveDataTypeable #-}
module Control.Pipe.Coroutine (
Coroutine,
- coroutine,
- suspend,
- suspendE,
resume,
+ suspend,
+ coroutine,
step,
terminate
) where
@@ -17,47 +16,30 @@ import Data.Typeable
import Prelude hiding (catch)
data Coroutine a b m r = Coroutine
- { suspend :: Pipe a b m r
- , suspendE :: E.SomeException -> Pipe a b m r }
-
-resume :: Monad m
- => Pipe a b m r
- -> Pipe a x m (Either r (b, Coroutine a b m r))
-resume (Pure r) = return $ Left r
-resume (Throw e) = throwP e
-resume (Free c h) = go c >>= \x -> case x of
- Left p -> resume p
- Right (b, p) -> return $ Right (b, Coroutine p h)
- where
- go (Await k) = liftM (Left . k) await
- go (Yield b p) = return $ Right (b, p)
- go (M m s) = liftM Left $ liftP s m
+ { resume :: Pipe a b m r
+ , finalizer :: [m ()]
+ }
+
+suspend :: Monad m
+ => Pipe a b m r
+ -> Pipe a x m (Either r (b, Coroutine a b m r))
+suspend (Pure r w) = Pure (Left r) w
+suspend (Throw e w) = Throw e w
+suspend (Yield x p w) = return (Right (x, Coroutine p w))
+suspend (M s m h) = M s (liftM suspend m) (suspend . h)
+suspend (Await k h) = Await (suspend . k) (suspend . h)
coroutine :: Monad m
=> Pipe a b m r
-> Coroutine a b m r
-coroutine p = Coroutine p throwP
+coroutine p = Coroutine p []
step :: Monad m
=> Coroutine a b m r
-> Pipe a x m (Either r (b, Coroutine a b m r))
-step = resume . suspend
-
-data CoroutineTerminated = CoroutineTerminated
- deriving (Show, Typeable)
-
-instance E.Exception CoroutineTerminated
+step = suspend . resume
terminate :: Monad m
=> Coroutine a b m r
-> Pipe a b m ()
-terminate p = go (suspendE p (E.toException CoroutineTerminated))
- where
- go (Pure r) = return ()
- go (Throw e) = return ()
- go (Free c h) = catchP (step c) (return . h) >>= go
-
- step (Await k) = liftM k await
- step (Yield b p) = return p
- step (M m (Finalizer _)) = masked m
- step (M m s) = liftP s m
+terminate p = mapM_ masked (finalizer p)
@@ -22,15 +22,15 @@ controllable :: Monad m
=> Producer a m r
-> Pipe (Either () (ProducerControl r)) a m r
controllable p = do
- x <- pipe (const ()) >+> resume p
+ x <- pipe (const ()) >+> suspend p
case x of
Left r -> return r
Right (b, p') ->
join $ onException
(await >>= \c -> case c of
- Left () -> yield b >> return (controllable (suspend p'))
+ Left () -> yield b >> return (controllable (resume p'))
Right (Done r) -> return $ (pipe (const ()) >+> terminate p') >> return r
- Right (Error e) -> return $ controllable (suspendE p' e))
+ Right (Error e) -> return $ (pipe (const ()) >+> terminate p') >> throw e)
(pipe (const ()) >+> terminate p')
controllable_ :: Monad m

0 comments on commit c38cabc

Please sign in to comment.