diff --git a/conduit/Data/Conduit/Internal/Conduit.hs b/conduit/Data/Conduit/Internal/Conduit.hs index 151a1f3a6..1858abb5a 100644 --- a/conduit/Data/Conduit/Internal/Conduit.hs +++ b/conduit/Data/Conduit/Internal/Conduit.hs @@ -53,8 +53,8 @@ module Data.Conduit.Internal.Conduit , toProducer , toConsumer -- ** Cleanup - , Data.Conduit.Internal.Conduit.bracketP - , Data.Conduit.Internal.Conduit.addCleanup + , bracketP + , addCleanup -- ** Exceptions , catchC , handleC @@ -83,7 +83,8 @@ module Data.Conduit.Internal.Conduit import Control.Applicative (Applicative (..)) import Control.Exception.Lifted as E (Exception) -import Control.Monad (liftM, when, liftM2) +import qualified Control.Exception.Lifted as E (catch) +import Control.Monad (liftM, when, liftM2, ap) import Control.Monad.Error.Class(MonadError(..)) import Control.Monad.Reader.Class(MonadReader(..)) import Control.Monad.RWS.Class(MonadRWS()) @@ -97,13 +98,11 @@ import Data.Monoid (Monoid (mappend, mempty)) import Control.Monad.Trans.Resource import qualified Data.IORef as I import Control.Monad.Morph (MFunctor (..)) -import Data.Conduit.Internal.Pipe hiding (yield, mapOutput, leftover, yieldM, yieldOr, await, awaitForever) +import Data.Conduit.Internal.Pipe hiding (yield, mapOutput, leftover, yieldM, yieldOr, await, awaitForever, addCleanup, bracketP) import qualified Data.Conduit.Internal.Pipe as CI -import Control.Monad (forever, unless) +import Control.Monad (forever) import Data.Traversable (Traversable (..)) -#if MIN_VERSION_exceptions(0, 6, 0) -import qualified Control.Monad.Catch as Catch -#endif +import Control.Monad.Catch (MonadCatch, catch) -- | Core datatype of the conduit package. This type represents a general -- component which can consume a stream of input values @i@, produce a stream @@ -112,48 +111,125 @@ import qualified Control.Monad.Catch as Catch -- type. -- -- Since 1.0.0 -newtype ConduitM i o m r = ConduitM { unConduitM :: Pipe i i o () m r } - deriving (Functor, Applicative, Monad, MonadThrow, MFunctor -#if MIN_VERSION_exceptions(0, 6, 0) - , Catch.MonadCatch -#endif - ) +newtype ConduitM i o m r = ConduitM + { unConduitM :: forall b. + (r -> Pipe i i o () m b) -> Pipe i i o () m b + } + +instance Functor (ConduitM i o m) where + fmap f (ConduitM c) = ConduitM $ \rest -> c (rest . f) + +instance Applicative (ConduitM i o m) where + pure = return + {-# INLINE pure #-} + (<*>) = ap + {-# INLINE (<*>) #-} + +instance Monad (ConduitM i o m) where + return x = ConduitM ($ x) + ConduitM f >>= g = ConduitM $ \h -> f $ \a -> unConduitM (g a) h + +instance MonadThrow m => MonadThrow (ConduitM i o m) where + throwM = lift . throwM + +instance MFunctor (ConduitM i o) where + hoist f (ConduitM c0) = ConduitM $ \rest -> let + go (HaveOutput p c o) = HaveOutput (go p) (f c) o + go (NeedInput p c) = NeedInput (go . p) (go . c) + go (Done r) = rest r + go (PipeM mp) = + PipeM (f $ liftM go $ collapse mp) + where + -- Combine a series of monadic actions into a single action. Since we + -- throw away side effects between different actions, an arbitrary break + -- between actions will lead to a violation of the monad transformer laws. + -- Example available at: + -- + -- http://hpaste.org/75520 + collapse mpipe = do + pipe' <- mpipe + case pipe' of + PipeM mpipe' -> collapse mpipe' + _ -> return pipe' + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) + +instance MonadCatch m => MonadCatch (ConduitM i o m) where + catch (ConduitM p0) onErr = ConduitM $ \rest -> let + go (Done r) = rest r + go (PipeM mp) = PipeM $ catch (liftM go mp) (return . flip unConduitM rest . onErr) + go (Leftover p i) = Leftover (go p) i + go (NeedInput x y) = NeedInput (go . x) (go . y) + go (HaveOutput p c o) = HaveOutput (go p) c o + in go (p0 Done) instance MonadIO m => MonadIO (ConduitM i o m) where liftIO = lift . liftIO {-# INLINE liftIO #-} instance MonadReader r m => MonadReader r (ConduitM i o m) where - ask = ConduitM ask - local f (ConduitM m) = ConduitM (local f m) + ask = lift ask + {-# INLINE ask #-} + + local f (ConduitM c0) = ConduitM $ \rest -> + let go (HaveOutput p c o) = HaveOutput (go p) c o + go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u)) + go (Done x) = rest x + go (PipeM mp) = PipeM (liftM go $ local f mp) + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) instance MonadWriter w m => MonadWriter w (ConduitM i o m) where #if MIN_VERSION_mtl(2, 1, 0) - writer = ConduitM . writer + writer = lift . writer #endif - tell = ConduitM . tell - listen (ConduitM m) = ConduitM (listen m) - pass (ConduitM m) = ConduitM (pass m) + tell = lift . tell + + listen (ConduitM c0) = ConduitM $ \rest -> + let go front (HaveOutput p c o) = HaveOutput (go front p) c o + go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u)) + go front (Done x) = rest (x, front) + go front (PipeM mp) = PipeM $ do + (p,w) <- listen mp + return $ go (front `mappend` w) p + go front (Leftover p i) = Leftover (go front p) i + in go mempty (c0 Done) + + pass (ConduitM c0) = ConduitM $ \rest -> + let go (HaveOutput p c o) = HaveOutput (go p) c o + go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u)) + go (PipeM mp) = PipeM $ mp >>= (return . go) + go (Done (x,_)) = rest x + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) instance MonadState s m => MonadState s (ConduitM i o m) where - get = ConduitM get - put = ConduitM . put + get = lift get + put = lift . put #if MIN_VERSION_mtl(2, 1, 0) - state = ConduitM . state + state = lift . state #endif instance MonadRWS r w s m => MonadRWS r w s (ConduitM i o m) instance MonadError e m => MonadError e (ConduitM i o m) where - throwError = ConduitM . throwError - catchError (ConduitM m) f = ConduitM $ catchError m (unConduitM . f) + throwError = lift . throwError + catchError (ConduitM c0) f = ConduitM $ \rest -> + let go (HaveOutput p c o) = HaveOutput (go p) c o + go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u)) + go (Done x) = rest x + go (PipeM mp) = + PipeM $ catchError (liftM go mp) $ \e -> do + return $ unConduitM (f e) rest + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) instance MonadBase base m => MonadBase base (ConduitM i o m) where liftBase = lift . liftBase {-# INLINE liftBase #-} instance MonadTrans (ConduitM i o) where - lift mr = ConduitM (PipeM (Done `liftM` mr)) + lift mr = ConduitM $ \rest -> PipeM (liftM rest mr) {-# INLINE [1] lift #-} instance MonadResource m => MonadResource (ConduitM i o m) where @@ -206,7 +282,7 @@ type Conduit i m o = ConduitM i o m () -- to be run to close it. -- -- Since 0.5.0 -data ResumableSource m o = ResumableSource (Source m o) (m ()) +data ResumableSource m o = ResumableSource (Pipe () () o () m ()) (m ()) -- | Since 1.0.13 instance MFunctor ResumableSource where @@ -223,14 +299,14 @@ connectResume :: Monad m => ResumableSource m o -> Sink o m r -> m (ResumableSource m o, r) -connectResume (ResumableSource (ConduitM left0) leftFinal0) (ConduitM right0) = - goRight leftFinal0 left0 right0 +connectResume (ResumableSource left0 leftFinal0) (ConduitM right0) = + goRight leftFinal0 left0 (right0 Done) where goRight leftFinal left right = case right of HaveOutput _ _ o -> absurd o NeedInput rp rc -> goLeft rp rc leftFinal left - Done r2 -> return (ResumableSource (ConduitM left) leftFinal, r2) + Done r2 -> return (ResumableSource left leftFinal, r2) PipeM mp -> mp >>= goRight leftFinal left Leftover p i -> goRight leftFinal (HaveOutput left leftFinal i) p @@ -246,7 +322,7 @@ connectResume (ResumableSource (ConduitM left0) leftFinal0) (ConduitM right0) = sourceToPipe :: Monad m => Source m o -> Pipe l i o u m () sourceToPipe = - go . unConduitM + go . flip unConduitM Done where go (HaveOutput p c o) = HaveOutput (go p) c o go (NeedInput _ c) = go $ c () @@ -256,7 +332,7 @@ sourceToPipe = sinkToPipe :: Monad m => Sink i m r -> Pipe l i o u m r sinkToPipe = - go . injectLeftovers . unConduitM + go . injectLeftovers . flip unConduitM Done where go (HaveOutput _ _ o) = absurd o go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ()) @@ -266,7 +342,7 @@ sinkToPipe = conduitToPipe :: Monad m => Conduit i m o -> Pipe l i o u m () conduitToPipe = - go . injectLeftovers . unConduitM + go . injectLeftovers . flip unConduitM Done where go (HaveOutput p c o) = HaveOutput (go p) c o go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ()) @@ -296,39 +372,37 @@ unwrapResumable (ResumableSource src final) = do let final' = do x <- liftIO $ I.readIORef ref when x final - return (liftIO (I.writeIORef ref False) >> src, final') + return (liftIO (I.writeIORef ref False) >> (ConduitM (src >>=)), final') -- | Turn a @Source@ into a @ResumableSource@ with no attached finalizer. -- -- Since 1.1.4 newResumableSource :: Monad m => Source m o -> ResumableSource m o -newResumableSource s = ResumableSource s (return ()) +newResumableSource (ConduitM s) = ResumableSource (s Done) (return ()) -- | Generalize a 'Source' to a 'Producer'. -- -- Since 1.0.0 toProducer :: Monad m => Source m a -> Producer m a -toProducer = - ConduitM . go . unConduitM - where +toProducer (ConduitM c0) = ConduitM $ \rest -> let go (HaveOutput p c o) = HaveOutput (go p) c o go (NeedInput _ c) = go (c ()) - go (Done r) = Done r + go (Done r) = rest r go (PipeM mp) = PipeM (liftM go mp) go (Leftover p ()) = go p + in go (c0 Done) -- | Generalize a 'Sink' to a 'Consumer'. -- -- Since 1.0.0 toConsumer :: Monad m => Sink a m b -> Consumer a m b -toConsumer = - ConduitM . go . unConduitM - where +toConsumer (ConduitM c0) = ConduitM $ \rest -> let go (HaveOutput _ _ o) = absurd o go (NeedInput p c) = NeedInput (go . p) (go . c) - go (Done r) = Done r + go (Done r) = rest r go (PipeM mp) = PipeM (liftM go mp) go (Leftover p l) = Leftover (go p) l + in go (c0 Done) -- | Catch all exceptions thrown by the current component of the pipeline. -- @@ -348,7 +422,14 @@ catchC :: (MonadBaseControl IO m, Exception e) => ConduitM i o m r -> (e -> ConduitM i o m r) -> ConduitM i o m r -catchC (ConduitM p) f = ConduitM (catchP p (unConduitM . f)) +catchC (ConduitM p0) onErr = ConduitM $ \rest -> let + go (Done r) = rest r + go (PipeM mp) = PipeM $ E.catch (liftM go mp) + (return . flip unConduitM rest . onErr) + go (Leftover p i) = Leftover (go p) i + go (NeedInput x y) = NeedInput (go . x) (go . y) + go (HaveOutput p c o) = HaveOutput (go p) c o + in go (p0 Done) {-# INLINE catchC #-} -- | The same as @flip catchC@. @@ -368,7 +449,13 @@ handleC = flip catchC tryC :: (MonadBaseControl IO m, Exception e) => ConduitM i o m r -> ConduitM i o m (Either e r) -tryC = ConduitM . tryP . unConduitM +tryC (ConduitM c0) = ConduitM $ \rest -> let + go (Done r) = rest (Right r) + go (PipeM mp) = PipeM $ E.catch (liftM go mp) (return . rest . Left) + go (Leftover p i) = Leftover (go p) i + go (NeedInput x y) = NeedInput (go . x) (go . y) + go (HaveOutput p c o) = HaveOutput (go p) c o + in go (c0 Done) {-# INLINE tryC #-} -- | Combines two sinks. The new sink will complete when both input sinks have @@ -378,11 +465,7 @@ tryC = ConduitM . tryP . unConduitM -- -- Since 0.4.1 zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r') -zipSinks (ConduitM x0) (ConduitM y0) = - ConduitM $ injectLeftovers x0 >< injectLeftovers y0 - where - (><) :: Monad m => Pipe Void i Void () m r1 -> Pipe Void i Void () m r2 -> Pipe l i o () m (r1, r2) - +zipSinks (ConduitM x0) (ConduitM y0) = ConduitM $ \rest -> let Leftover _ i >< _ = absurd i _ >< Leftover _ i = absurd i HaveOutput _ _ o >< _ = absurd o @@ -390,54 +473,53 @@ zipSinks (ConduitM x0) (ConduitM y0) = PipeM mx >< y = PipeM (liftM (>< y) mx) x >< PipeM my = PipeM (liftM (x ><) my) - Done x >< Done y = Done (x, y) + Done x >< Done y = rest (x, y) NeedInput px cx >< NeedInput py cy = NeedInput (\i -> px i >< py i) (\() -> cx () >< cy ()) NeedInput px cx >< y@Done{} = NeedInput (\i -> px i >< y) (\u -> cx u >< y) x@Done{} >< NeedInput py cy = NeedInput (\i -> x >< py i) (\u -> x >< cy u) + in injectLeftovers (x0 Done) >< injectLeftovers (y0 Done) -- | Combines two sources. The new source will stop producing once either -- source has been exhausted. -- -- Since 1.0.13 zipSources :: Monad m => Source m a -> Source m b -> Source m (a, b) -zipSources (ConduitM left0) (ConduitM right0) = - ConduitM $ go left0 right0 - where +zipSources (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let go (Leftover left ()) right = go left right go left (Leftover right ()) = go left right - go (Done ()) (Done ()) = Done () - go (Done ()) (HaveOutput _ close _) = PipeM (close >> return (Done ())) - go (HaveOutput _ close _) (Done ()) = PipeM (close >> return (Done ())) - go (Done ()) (PipeM _) = Done () - go (PipeM _) (Done ()) = Done () + go (Done ()) (Done ()) = rest () + go (Done ()) (HaveOutput _ close _) = PipeM (close >> return (rest ())) + go (HaveOutput _ close _) (Done ()) = PipeM (close >> return (rest ())) + go (Done ()) (PipeM _) = rest () + go (PipeM _) (Done ()) = rest () go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my) go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx) go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my) go (HaveOutput srcx closex x) (HaveOutput srcy closey y) = HaveOutput (go srcx srcy) (closex >> closey) (x, y) go (NeedInput _ c) right = go (c ()) right go left (NeedInput _ c) = go left (c ()) + in go (left0 Done) (right0 Done) -- | Combines two sources. The new source will stop producing once either -- source has been exhausted. -- -- Since 1.0.13 zipSourcesApp :: Monad m => Source m (a -> b) -> Source m a -> Source m b -zipSourcesApp (ConduitM left0) (ConduitM right0) = - ConduitM $ go left0 right0 - where +zipSourcesApp (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let go (Leftover left ()) right = go left right go left (Leftover right ()) = go left right - go (Done ()) (Done ()) = Done () - go (Done ()) (HaveOutput _ close _) = PipeM (close >> return (Done ())) - go (HaveOutput _ close _) (Done ()) = PipeM (close >> return (Done ())) - go (Done ()) (PipeM _) = Done () - go (PipeM _) (Done ()) = Done () + go (Done ()) (Done ()) = rest () + go (Done ()) (HaveOutput _ close _) = PipeM (close >> return (rest ())) + go (HaveOutput _ close _) (Done ()) = PipeM (close >> return (rest ())) + go (Done ()) (PipeM _) = rest () + go (PipeM _) (Done ()) = rest () go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my) go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx) go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my) go (HaveOutput srcx closex x) (HaveOutput srcy closey y) = HaveOutput (go srcx srcy) (closex >> closey) (x y) go (NeedInput _ c) right = go (c ()) right go left (NeedInput _ c) = go left (c ()) + in go (left0 Done) (right0 Done) -- | -- @@ -447,10 +529,8 @@ zipConduitApp => ConduitM i o m (x -> y) -> ConduitM i o m x -> ConduitM i o m y -zipConduitApp (ConduitM left0) (ConduitM right0) = - ConduitM $ go (return ()) (return ()) (injectLeftovers left0) (injectLeftovers right0) - where - go _ _ (Done f) (Done x) = Done (f x) +zipConduitApp (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let + go _ _ (Done f) (Done x) = rest (f x) go _ finalY (HaveOutput x finalX o) y = HaveOutput (go finalX finalY x y) (finalX >> finalY) @@ -472,6 +552,7 @@ zipConduitApp (ConduitM left0) (ConduitM right0) = go finalX finalY (Done x) (NeedInput py cy) = NeedInput (\i -> go finalX finalY (Done x) (py i)) (\u -> go finalX finalY (Done x) (cy u)) + in go (return ()) (return ()) (injectLeftovers $ left0 Done) (injectLeftovers $ right0 Done) -- | Same as normal fusion (e.g. @=$=@), except instead of discarding leftovers -- from the downstream component, return them. @@ -481,9 +562,7 @@ fuseReturnLeftovers :: Monad m => ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m (r, [b]) -fuseReturnLeftovers (ConduitM left0) (ConduitM right0) = - ConduitM $ goRight (return ()) [] left0 right0 - where +fuseReturnLeftovers (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let goRight final bs left right = case right of HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o @@ -491,7 +570,7 @@ fuseReturnLeftovers (ConduitM left0) (ConduitM right0) = case bs of [] -> goLeft rp rc final left b:bs' -> goRight final bs' left (rp b) - Done r2 -> PipeM (final >> return (Done (r2, bs))) + Done r2 -> PipeM (final >> return (rest (r2, bs))) PipeM mp -> PipeM (liftM recurse mp) Leftover p b -> goRight final (b:bs) left p where @@ -506,6 +585,7 @@ fuseReturnLeftovers (ConduitM left0) (ConduitM right0) = Leftover left' i -> Leftover (recurse left') i where recurse = goLeft rp rc final + in goRight (return ()) [] (left0 Done) (right0 Done) -- | Similar to @fuseReturnLeftovers@, but use the provided function to convert -- downstream leftovers to upstream leftovers. @@ -527,7 +607,7 @@ fuseLeftovers f left right = do -- -- Since 1.0.17 data ResumableConduit i m o = - ResumableConduit (Conduit i m o) (m ()) + ResumableConduit (Pipe i i o () m ()) (m ()) -- | Connect a 'ResumableConduit' to a sink and return the output of the sink -- together with a new 'ResumableConduit'. @@ -538,14 +618,12 @@ connectResumeConduit => ResumableConduit i m o -> Sink o m r -> Sink i m (ResumableConduit i m o, r) -connectResumeConduit (ResumableConduit (ConduitM left0) leftFinal0) (ConduitM right0) = - ConduitM $ goRight leftFinal0 left0 right0 - where +connectResumeConduit (ResumableConduit left0 leftFinal0) (ConduitM right0) = ConduitM $ \rest -> let goRight leftFinal left right = case right of HaveOutput _ _ o -> absurd o NeedInput rp rc -> goLeft rp rc leftFinal left - Done r2 -> Done (ResumableConduit (ConduitM left) leftFinal, r2) + Done r2 -> rest (ResumableConduit left leftFinal, r2) PipeM mp -> PipeM (liftM (goRight leftFinal left) mp) Leftover p i -> goRight leftFinal (HaveOutput left leftFinal i) p @@ -558,6 +636,7 @@ connectResumeConduit (ResumableConduit (ConduitM left0) leftFinal0) (ConduitM ri Leftover left' i -> Leftover (recurse left') i -- recurse p where recurse = goLeft rp rc leftFinal + in goRight leftFinal0 left0 (right0 Done) -- | Unwraps a @ResumableConduit@ into a @Conduit@ and a finalizer. -- @@ -570,13 +649,13 @@ unwrapResumableConduit (ResumableConduit src final) = do let final' = do x <- liftIO $ I.readIORef ref when x final - return (liftIO (I.writeIORef ref False) >> src, final') + return (ConduitM ((liftIO (I.writeIORef ref False) >> src) >>=), final') -- | Turn a @Conduit@ into a @ResumableConduit@ with no attached finalizer. -- -- Since 1.1.4 newResumableConduit :: Monad m => Conduit i m o -> ResumableConduit i m o -newResumableConduit c = ResumableConduit c (return ()) +newResumableConduit (ConduitM c) = ResumableConduit (c Done) (return ()) -- | Turn a @Sink@ into a @Conduit@ in the following way: -- @@ -595,12 +674,10 @@ passthroughSink :: Monad m => Sink i m r -> (r -> m ()) -- ^ finalizer -> Conduit i m i -passthroughSink (ConduitM sink0) final = - ConduitM $ go [] sink0 - where +passthroughSink (ConduitM sink0) final = ConduitM $ \rest -> let go _ (Done r) = do lift $ final r - CI.awaitForever CI.yield + unConduitM (awaitForever yield) rest go is (Leftover sink i) = go (i:is) sink go _ (HaveOutput _ _ o) = absurd o go is (PipeM mx) = do @@ -614,6 +691,7 @@ passthroughSink (ConduitM sink0) final = Just x -> do CI.yield x go [] (next x) + in go [] (sink0 Done) -- Define fixity of all our operators infixr 0 $$ @@ -675,7 +753,28 @@ src $$ sink = do -- -- Since 0.4.0 (=$=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r -ConduitM left =$= ConduitM right = ConduitM $ pipeL left right +ConduitM left0 =$= ConduitM right0 = ConduitM $ \rest -> + let goRight final left right = + case right of + HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o + NeedInput rp rc -> goLeft rp rc final left + Done r2 -> PipeM (final >> return (rest r2)) + PipeM mp -> PipeM (liftM recurse mp) + Leftover right' i -> goRight final (HaveOutput left final i) right' + where + recurse = goRight final left + + goLeft rp rc final left = + case left of + HaveOutput left' final' o -> goRight final' left' (rp o) + NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) + Done r1 -> goRight (return ()) (Done r1) (rc r1) + PipeM mp -> PipeM (liftM recurse mp) + Leftover left' i -> Leftover (recurse left') i + where + recurse = goLeft rp rc final + in goRight (return ()) (left0 Done) (right0 Done) + where {-# INLINE [1] (=$=) #-} -- | Wait for a single input value from upstream. If no data is available, @@ -683,8 +782,18 @@ ConduitM left =$= ConduitM right = ConduitM $ pipeL left right -- -- Since 0.5.0 await :: Monad m => Consumer i m (Maybe i) -await = ConduitM CI.await -{-# INLINE [1] await #-} +await = ConduitM $ \f -> NeedInput (f . Just) (const $ f Nothing) +{-# INLINE [0] await #-} + +await' :: Monad m + => ConduitM i o m r + -> (i -> ConduitM i o m r) + -> ConduitM i o m r +await' f g = ConduitM $ \rest -> NeedInput + (\i -> unConduitM (g i) rest) + (const $ unConduitM f rest) +{-# INLINE await' #-} +{-# RULES "await >>= maybe" forall x y. await >>= maybe x y = await' x y #-} -- | Send a value downstream to the next component to consume. If the -- downstream component terminates, this call will never return control. If you @@ -694,12 +803,12 @@ await = ConduitM CI.await yield :: Monad m => o -- ^ output value -> ConduitM i o m () -yield = ConduitM . CI.yield -{-# INLINE [1] yield #-} +yield o = yieldOr o (return ()) +{-# INLINE yield #-} yieldM :: Monad m => m o -> ConduitM i o m () -yieldM = ConduitM . CI.yieldM -{-# INLINE [1] yieldM #-} +yieldM mo = lift mo >>= yield +{-# INLINE yieldM #-} -- FIXME rule won't fire, see FIXME in .Pipe; "mapM_ yield" mapM_ yield = ConduitM . sourceList @@ -711,8 +820,8 @@ yieldM = ConduitM . CI.yieldM -- -- Since 0.5.0 leftover :: i -> ConduitM i o m () -leftover = ConduitM . CI.leftover -{-# INLINE [1] leftover #-} +leftover i = ConduitM $ \rest -> Leftover (rest ()) i +{-# INLINE leftover #-} -- | Perform some allocation and run an inner component. Two guarantees are -- given about resource finalization: @@ -728,7 +837,9 @@ bracketP :: MonadResource m -> (a -> IO ()) -> (a -> ConduitM i o m r) -> ConduitM i o m r -bracketP alloc free inside = ConduitM $ CI.bracketP alloc free $ unConduitM . inside +bracketP alloc free inside = ConduitM $ \rest -> PipeM $ do + (key, seed) <- allocate alloc free + return $ unConduitM (addCleanup (const $ release key) (inside seed)) rest -- | Add some code to be run when the given component cleans up. -- @@ -744,7 +855,18 @@ addCleanup :: Monad m => (Bool -> m ()) -> ConduitM i o m r -> ConduitM i o m r -addCleanup f = ConduitM . CI.addCleanup f . unConduitM +addCleanup cleanup (ConduitM c0) = ConduitM $ \rest -> let + go (Done r) = PipeM (cleanup True >> return (rest r)) + go (HaveOutput src close x) = HaveOutput + (go src) + (cleanup False >> close) + x + go (PipeM msrc) = PipeM (liftM (go) msrc) + go (NeedInput p c) = NeedInput + (go . p) + (go . c) + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) -- | Similar to 'yield', but additionally takes a finalizer to be run if the -- downstream component terminates. @@ -754,8 +876,8 @@ yieldOr :: Monad m => o -> m () -- ^ finalizer -> ConduitM i o m () -yieldOr o m = ConduitM $ CI.yieldOr o m -{-# INLINE [1] yieldOr #-} +yieldOr o m = ConduitM $ \rest -> HaveOutput (rest ()) m o +{-# INLINE yieldOr #-} -- | Wait for input forever, calling the given inner component for each piece of -- new input. Returns the upstream result type. @@ -765,7 +887,9 @@ yieldOr o m = ConduitM $ CI.yieldOr o m -- -- Since 0.5.0 awaitForever :: Monad m => (i -> ConduitM i o m r) -> ConduitM i o m () -awaitForever f = ConduitM $ CI.awaitForever (unConduitM . f) +awaitForever f = ConduitM $ \rest -> + let go = NeedInput (\i -> unConduitM (f i) (const go)) rest + in go -- | Transform the monad that a @ConduitM@ lives in. -- @@ -789,13 +913,25 @@ transPipe = hoist -- -- Since 0.4.1 mapOutput :: Monad m => (o1 -> o2) -> ConduitM i o1 m r -> ConduitM i o2 m r -mapOutput f (ConduitM p) = ConduitM $ CI.mapOutput f p +mapOutput f (ConduitM c0) = ConduitM $ \rest -> let + go (HaveOutput p c o) = HaveOutput (go p) c (f o) + go (NeedInput p c) = NeedInput (go . p) (go . c) + go (Done r) = rest r + go (PipeM mp) = PipeM (liftM (go) mp) + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) -- | Same as 'mapOutput', but use a function that returns @Maybe@ values. -- -- Since 0.5.0 mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> ConduitM i o1 m r -> ConduitM i o2 m r -mapOutputMaybe f (ConduitM p) = ConduitM $ CI.mapOutputMaybe f p +mapOutputMaybe f (ConduitM c0) = ConduitM $ \rest -> let + go (HaveOutput p c o) = maybe id (\o' p' -> HaveOutput p' c o') (f o) (go p) + go (NeedInput p c) = NeedInput (go . p) (go . c) + go (Done r) = rest r + go (PipeM mp) = PipeM (liftM (go) mp) + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) -- | Apply a function to all the input values of a @ConduitM@. -- @@ -805,7 +941,13 @@ mapInput :: Monad m -> (i2 -> Maybe i1) -- ^ map new leftovers to initial leftovers -> ConduitM i2 o m r -> ConduitM i1 o m r -mapInput f g (ConduitM p) = ConduitM $ CI.mapInput f g p +mapInput f f' (ConduitM c0) = ConduitM $ \rest -> let + go (HaveOutput p c o) = HaveOutput (go p) c o + go (NeedInput p c) = NeedInput (go . p . f) (go . c) + go (Done r) = rest r + go (PipeM mp) = PipeM $ liftM go mp + go (Leftover p i) = maybe id (flip Leftover) (f' i) (go p) + in go (c0 Done) -- | The connect-and-resume operator. This does not close the @Source@, but -- instead returns it to be used again. This allows a @Source@ to be used @@ -816,7 +958,8 @@ mapInput f g (ConduitM p) = ConduitM $ CI.mapInput f g p -- -- Since 0.5.0 ($$+) :: Monad m => Source m a -> Sink a m b -> m (ResumableSource m a, b) -src $$+ sink = connectResume (ResumableSource src (return ())) sink +ConduitM src $$+ sink = + connectResume (ResumableSource (src Done) (return ())) sink {-# INLINE ($$+) #-} -- | Continue processing after usage of @$$+@. @@ -842,7 +985,8 @@ rsrc $$+- sink = do -- -- Since 1.0.16 ($=+) :: Monad m => ResumableSource m a -> Conduit a m b -> ResumableSource m b -ResumableSource src final $=+ sink = ResumableSource (src $= sink) final +ResumableSource src final $=+ ConduitM sink = + ResumableSource (src `pipeL` sink Done) final -- | Execute the finalizer associated with a @ResumableSource@, rendering the -- @ResumableSource@ invalid for further use. @@ -933,7 +1077,7 @@ sequenceSinks = getZipSink . sequenceA . fmap ZipSink -- -- Since 1.0.17 (=$$+) :: Monad m => Conduit a m b -> Sink b m r -> Sink a m (ResumableConduit a m b, r) -(=$$+) conduit = connectResumeConduit (ResumableConduit conduit (return ())) +(=$$+) (ConduitM conduit) = connectResumeConduit (ResumableConduit (conduit Done) (return ())) {-# INLINE (=$$+) #-} -- | Continue processing after usage of '=$$+'. Connect a 'ResumableConduit' to @@ -1006,7 +1150,7 @@ sequenceConduits = getZipConduit . sequenceA . fmap ZipConduit -- Since 1.1.5 fuseBoth :: Monad m => ConduitM a b m r1 -> ConduitM b c m r2 -> ConduitM a c m (r1, r2) fuseBoth (ConduitM up) (ConduitM down) = - ConduitM $ pipeL up (withUpstream $ generalizeUpstream down) + ConduitM (pipeL (up Done) (withUpstream $ generalizeUpstream $ down Done) >>=) {-# INLINE fuseBoth #-} -- | Same as @fuseBoth@, but ignore the return value from the downstream @@ -1019,6 +1163,7 @@ fuseUpstream up down = fmap fst (fuseBoth up down) -- Rewrite rules +{- FIXME {-# RULES "ConduitM: lift x >>= f" forall m f. lift m >>= f = ConduitM (PipeM (liftM (unConduitM . f) m)) #-} {-# RULES "ConduitM: lift x >> f" forall m f. lift m >> f = ConduitM (PipeM (liftM (\_ -> unConduitM f) m)) #-} @@ -1028,7 +1173,6 @@ fuseUpstream up down = fmap fst (fuseBoth up down) {-# RULES "ConduitM: liftBase x >>= f" forall m (f :: MonadBase b m => a -> ConduitM i o m r). liftBase m >>= f = ConduitM (PipeM (liftM (unConduitM . f) (liftBase m))) #-} {-# RULES "ConduitM: liftBase x >> f" forall m (f :: MonadBase b m => ConduitM i o m r). liftBase m >> f = ConduitM (PipeM (liftM (\_ -> unConduitM f) (liftBase m))) #-} -{-# RULES "await >>= maybe" forall x y. await >>= maybe x y = ConduitM (NeedInput (unConduitM . y) (unConduitM . const x)) #-} {-# RULES "yield o >> p" forall o (p :: ConduitM i o m r). yield o >> p = ConduitM (HaveOutput (unConduitM p) (return ()) o) ; "yieldOr o c >> p" forall o c (p :: ConduitM i o m r). yieldOr o c >> p = @@ -1041,3 +1185,4 @@ fuseUpstream up down = fmap fst (fuseBoth up down) #-} {-# RULES "leftover l >> p" forall l (p :: ConduitM i o m r). leftover l >> p = ConduitM (Leftover (unConduitM p) l) #-} + -} diff --git a/conduit/Data/Conduit/Internal/Fusion.hs b/conduit/Data/Conduit/Internal/Fusion.hs index 804e1e777..38fc877a3 100644 --- a/conduit/Data/Conduit/Internal/Fusion.hs +++ b/conduit/Data/Conduit/Internal/Fusion.hs @@ -34,25 +34,24 @@ data Stream m o = forall s. Stream streamProducerM :: Monad m => Stream m o -> Producer m o streamProducerM (Stream step ms0) = - ConduitM $ PipeM $ ms0 >>= loop - where - loop s = do - res <- step s - case res of - Emit s' o -> return $ HaveOutput (PipeM $ loop s') (return ()) o - Skip s' -> loop s' - Stop -> return $ Done () + ConduitM $ \rest -> + let loop s = do + res <- step s + case res of + Emit s' o -> return $ HaveOutput (PipeM $ loop s') (return ()) o + Skip s' -> loop s' + Stop -> return $ rest () + in PipeM $ ms0 >>= loop {-# INLINE [0] streamProducerM #-} streamProducerId :: Monad m => Stream Identity o -> Producer m o -streamProducerId (Stream step ms0) = - ConduitM $ loop $ runIdentity ms0 - where +streamProducerId (Stream step ms0) = ConduitM $ \rest -> let loop s = case runIdentity $ step s of Emit s' o -> HaveOutput (loop s') (return ()) o Skip s' -> loop s' - Stop -> Done () + Stop -> rest () + in loop $ runIdentity ms0 {-# INLINE [0] streamProducerId #-} data Unstream i m r = forall s. Unstream @@ -63,9 +62,7 @@ data Unstream i m r = forall s. Unstream streamConsumerM :: Monad m => Unstream i m r -> Consumer i m r -streamConsumerM (Unstream step final ms0) = - ConduitM $ PipeM $ ms0 >>= return . loop - where +streamConsumerM (Unstream step final ms0) = ConduitM $ \rest -> let loop s = NeedInput more done where @@ -73,24 +70,25 @@ streamConsumerM (Unstream step final ms0) = res <- step s i case res of Left s' -> return $ loop s' - Right r -> return $ Done r - done () = PipeM $ liftM Done $ final s + Right r -> return $ rest r + done () = PipeM $ liftM rest $ final s + in PipeM $ ms0 >>= return . loop {-# INLINE [0] streamConsumerM #-} streamConsumerId :: Monad m => Unstream i Identity r -> Consumer i m r streamConsumerId (Unstream step final ms0) = - ConduitM $ loop (runIdentity ms0) - where - loop s = - NeedInput more done - where - more i = - case runIdentity $ step s i of - Left s' -> loop s' - Right r -> Done r - done () = Done $ runIdentity $ final s + ConduitM $ \rest -> + let loop s = + NeedInput more done + where + more i = + case runIdentity $ step s i of + Left s' -> loop s' + Right r -> rest r + done () = rest $ runIdentity $ final s + in loop (runIdentity ms0) {-# INLINE [0] streamConsumerId #-} {- diff --git a/conduit/Data/Conduit/Internal/Pipe.hs b/conduit/Data/Conduit/Internal/Pipe.hs index d62c4a6ab..dc43f4c78 100644 --- a/conduit/Data/Conduit/Internal/Pipe.hs +++ b/conduit/Data/Conduit/Internal/Pipe.hs @@ -170,7 +170,7 @@ instance MonadReader r m => MonadReader r (Pipe l i o u m) where local f (HaveOutput p c o) = HaveOutput (local f p) c o local f (NeedInput p c) = NeedInput (\i -> local f (p i)) (\u -> local f (c u)) local _ (Done x) = Done x - local f (PipeM mp) = PipeM (local f mp) + local f (PipeM mp) = PipeM (liftM (local f) $ local f mp) local f (Leftover p i) = Leftover (local f p) i -- Provided for doctest @@ -481,7 +481,7 @@ mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> Pipe l i o1 u m r -> Pipe l i o mapOutputMaybe f = go where - go (HaveOutput p c o) = maybe id (\o' p' -> HaveOutput p' c o') (f o) (mapOutputMaybe f p) + go (HaveOutput p c o) = maybe id (\o' p' -> HaveOutput p' c o') (f o) (go p) go (NeedInput p c) = NeedInput (go . p) (go . c) go (Done r) = Done r go (PipeM mp) = PipeM (liftM (go) mp) diff --git a/conduit/Data/Conduit/Lift.hs b/conduit/Data/Conduit/Lift.hs index 6809327eb..10f87a651 100644 --- a/conduit/Data/Conduit/Lift.hs +++ b/conduit/Data/Conduit/Lift.hs @@ -138,18 +138,18 @@ errorC p = do runErrorC :: (Monad m, E.Error e) => ConduitM i o (E.ErrorT e m) r -> ConduitM i o m (Either e r) -runErrorC = - ConduitM . go . unConduitM - where - go (Done r) = Done (Right r) - go (PipeM mp) = PipeM $ do - eres <- E.runErrorT mp - return $ case eres of - Left e -> Done $ Left e - Right p -> go p - go (Leftover p i) = Leftover (go p) i - go (HaveOutput p f o) = HaveOutput (go p) (E.runErrorT f >> return ()) o - go (NeedInput x y) = NeedInput (go . x) (go . y) +runErrorC (ConduitM c0) = + ConduitM $ \rest -> + let go (Done r) = rest (Right r) + go (PipeM mp) = PipeM $ do + eres <- E.runErrorT mp + return $ case eres of + Left e -> rest $ Left e + Right p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p f o) = HaveOutput (go p) (E.runErrorT f >> return ()) o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go (c0 Done) {-# INLINABLE runErrorC #-} -- | Catch an error in the base monad @@ -161,17 +161,18 @@ catchErrorC -> (e -> ConduitM i o (E.ErrorT e m) r) -> ConduitM i o (E.ErrorT e m) r catchErrorC c0 h = - ConduitM $ go $ unConduitM c0 + ConduitM $ \rest -> + let go (Done r) = rest r + go (PipeM mp) = PipeM $ do + eres <- lift $ E.runErrorT mp + return $ case eres of + Left e -> unConduitM (h e) rest + Right p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p f o) = HaveOutput (go p) f o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go $ unConduitM c0 Done where - go (Done r) = Done r - go (PipeM mp) = PipeM $ do - eres <- lift $ E.runErrorT mp - return $ case eres of - Left e -> unConduitM $ h e - Right p -> go p - go (Leftover p i) = Leftover (go p) i - go (HaveOutput p f o) = HaveOutput (go p) f o - go (NeedInput x y) = NeedInput (go . x) (go . y) {-# INLINABLE catchErrorC #-} -- | Run 'CatchT' in the base monad @@ -180,18 +181,18 @@ catchErrorC c0 h = runCatchC :: Monad m => ConduitM i o (CatchT m) r -> ConduitM i o m (Either SomeException r) -runCatchC = - ConduitM . go . unConduitM - where - go (Done r) = Done (Right r) - go (PipeM mp) = PipeM $ do - eres <- runCatchT mp - return $ case eres of - Left e -> Done $ Left e - Right p -> go p - go (Leftover p i) = Leftover (go p) i - go (HaveOutput p f o) = HaveOutput (go p) (runCatchT f >> return ()) o - go (NeedInput x y) = NeedInput (go . x) (go . y) +runCatchC c0 = + ConduitM $ \rest -> + let go (Done r) = rest (Right r) + go (PipeM mp) = PipeM $ do + eres <- runCatchT mp + return $ case eres of + Left e -> rest $ Left e + Right p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p f o) = HaveOutput (go p) (runCatchT f >> return ()) o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go $ unConduitM c0 Done {-# INLINABLE runCatchC #-} -- | Catch an exception in the base monad @@ -202,18 +203,18 @@ catchCatchC ConduitM i o (CatchT m) r -> (SomeException -> ConduitM i o (CatchT m) r) -> ConduitM i o (CatchT m) r -catchCatchC c0 h = - ConduitM $ go $ unConduitM c0 - where - go (Done r) = Done r - go (PipeM mp) = PipeM $ do - eres <- lift $ runCatchT mp - return $ case eres of - Left e -> unConduitM $ h e - Right p -> go p - go (Leftover p i) = Leftover (go p) i - go (HaveOutput p f o) = HaveOutput (go p) f o - go (NeedInput x y) = NeedInput (go . x) (go . y) +catchCatchC (ConduitM c0) h = + ConduitM $ \rest -> + let go (Done r) = rest r + go (PipeM mp) = PipeM $ do + eres <- lift $ runCatchT mp + return $ case eres of + Left e -> unConduitM (h e) rest + Right p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p f o) = HaveOutput (go p) f o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go (c0 Done) {-# INLINABLE catchCatchC #-} -- | Wrap the base monad in 'M.MaybeT' @@ -235,18 +236,18 @@ maybeC p = do runMaybeC :: Monad m => ConduitM i o (M.MaybeT m) r -> ConduitM i o m (Maybe r) -runMaybeC = - ConduitM . go . unConduitM - where - go (Done r) = Done (Just r) - go (PipeM mp) = PipeM $ do - mres <- M.runMaybeT mp - return $ case mres of - Nothing -> Done Nothing - Just p -> go p - go (Leftover p i) = Leftover (go p) i - go (HaveOutput p c o) = HaveOutput (go p) (M.runMaybeT c >> return ()) o - go (NeedInput x y) = NeedInput (go . x) (go . y) +runMaybeC (ConduitM c0) = + ConduitM $ \rest -> + let go (Done r) = rest (Just r) + go (PipeM mp) = PipeM $ do + mres <- M.runMaybeT mp + return $ case mres of + Nothing -> rest Nothing + Just p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p c o) = HaveOutput (go p) (M.runMaybeT c >> return ()) o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go (c0 Done) {-# INLINABLE runMaybeC #-} -- | Wrap the base monad in 'R.ReaderT' @@ -293,16 +294,16 @@ thread :: Monad m -> s -> ConduitM i o (t m) r -> ConduitM i o m res -thread toRes runM s0 = - ConduitM . go s0 . unConduitM - where - go s (Done r) = Done (toRes r s) - go s (PipeM mp) = PipeM $ do - (p, s') <- runM mp s - return $ go s' p - go s (Leftover p i) = Leftover (go s p) i - go s (NeedInput x y) = NeedInput (go s . x) (go s . y) - go s (HaveOutput p f o) = HaveOutput (go s p) (runM f s >> return ()) o +thread toRes runM s0 (ConduitM c0) = + ConduitM $ \rest -> + let go s (Done r) = rest (toRes r s) + go s (PipeM mp) = PipeM $ do + (p, s') <- runM mp s + return $ go s' p + go s (Leftover p i) = Leftover (go s p) i + go s (NeedInput x y) = NeedInput (go s . x) (go s . y) + go s (HaveOutput p f o) = HaveOutput (go s p) (runM f s >> return ()) o + in go s0 (c0 Done) {-# INLINABLE thread #-} -- | Run 'SL.StateT' in the base monad @@ -568,4 +569,3 @@ execRWSC execRWSC i s p = fmap f $ runRWSC i s p where f x = let (_, s2, w2) = x in (s2, w2) {-# INLINABLE execRWSC #-} - diff --git a/conduit/Data/Conduit/List.hs b/conduit/Data/Conduit/List.hs index f100c9143..f0675ab65 100644 --- a/conduit/Data/Conduit/List.hs +++ b/conduit/Data/Conduit/List.hs @@ -78,6 +78,7 @@ import Data.Monoid (Monoid, mempty, mappend) import qualified Data.Foldable as F import Data.Conduit import qualified Data.Conduit.Internal as CI +import qualified Data.Conduit.Internal.Conduit as CIC import Control.Monad (when, (<=<), liftM, void) import Control.Monad.Trans.Class (lift) @@ -126,7 +127,12 @@ enumFromTo :: (Enum a, Eq a, Monad m) => a -> a -> Producer m a -enumFromTo x = CI.ConduitM . CI.enumFromTo x +enumFromTo x0 y = + loop x0 + where + loop x + | x == y = yield x + | otherwise = yield x >> loop (Prelude.succ x) {-# INLINE enumFromTo #-} enumFromToFold :: (Enum a, Eq a, Monad m) -- FIXME far too specific @@ -174,7 +180,7 @@ fold f = connectFold :: Monad m => Source m a -> (b -> a -> b) -> b -> m b -- FIXME replace with better, more general function connectFold (CI.ConduitM src0) f = - go src0 + go (src0 CI.Done) where go (CI.Done ()) b = return b go (CI.HaveOutput src _ a) b = @@ -208,7 +214,7 @@ foldM f = connectFoldM :: Monad m => Source m a -> (b -> a -> m b) -> b -> m b -- FIXME replace with better, more general function connectFoldM (CI.ConduitM src0) f = - go src0 + go (src0 CI.Done) where go (CI.Done ()) b = return b go (CI.HaveOutput src _ a) b = do @@ -255,7 +261,7 @@ mapM_ f = awaitForever $ lift . f srcMapM_ :: Monad m => Source m a -> (a -> m ()) -> m () srcMapM_ (CI.ConduitM src) f = - go src + go (src CI.Done) where go (CI.Done ()) = return () go (CI.PipeM mp) = mp >>= go @@ -325,7 +331,7 @@ map f = awaitForever $ yield . f {-# RULES "source/map fusion =$=" forall f src. src =$= map f = mapFuseRight src f #-} mapFuseRight :: Monad m => Source m a -> (a -> b) -> Source m b -mapFuseRight (CI.ConduitM src) f = CI.ConduitM (CI.mapOutput f src) +mapFuseRight src f = CIC.mapOutput f src {-# INLINE mapFuseRight #-} {- @@ -576,16 +582,15 @@ filter :: Monad m => (a -> Bool) -> Conduit a m a filter f = awaitForever $ \i -> when (f i) (yield i) filterFuseRight :: Monad m => Source m a -> (a -> Bool) -> Source m a -filterFuseRight (CI.ConduitM src) f = - CI.ConduitM (go src) - where - go (CI.Done ()) = CI.Done () +filterFuseRight (CI.ConduitM src) f = CI.ConduitM $ \rest -> let + go (CI.Done ()) = rest () go (CI.PipeM mp) = CI.PipeM (liftM go mp) go (CI.Leftover p i) = CI.Leftover (go p) i go (CI.HaveOutput p c o) | f o = CI.HaveOutput (go p) c o | otherwise = go p go (CI.NeedInput p c) = CI.NeedInput (go . p) (go . c) + in go (src CI.Done) -- Intermediate finalizers are dropped, but this is acceptable: the next -- yielded value would be demanded by downstream in any event, and that new -- finalizer will always override the existing finalizer. @@ -602,7 +607,7 @@ sinkNull = awaitForever $ \_ -> return () srcSinkNull :: Monad m => Source m a -> m () srcSinkNull (CI.ConduitM src) = - go src + go (src CI.Done) where go (CI.Done ()) = return () go (CI.PipeM mp) = mp >>= go diff --git a/conduit/benchmarks/optimize-201408.hs b/conduit/benchmarks/optimize-201408.hs index acf6b6dcf..4e91e44ba 100644 --- a/conduit/benchmarks/optimize-201408.hs +++ b/conduit/benchmarks/optimize-201408.hs @@ -210,17 +210,17 @@ sourceRandomNBind cnt0 = lift (liftIO MWC.createSystemRandom) >>= \gen -> in loop cnt0 sourceRandomNPipe :: (MWC.Variate a, MonadIO m) => Int -> Source m a -sourceRandomNPipe cnt0 = ConduitM $ do +sourceRandomNPipe cnt0 = ConduitM $ \rest -> do gen <- liftIO MWC.createSystemRandom - let loop 0 = return () + let loop 0 = rest () loop cnt = do liftIO (MWC.uniform gen) >>= CI.yield >> loop (cnt - 1) loop cnt0 sourceRandomNConstr :: (MWC.Variate a, MonadIO m) => Int -> Source m a -sourceRandomNConstr cnt0 = ConduitM $ PipeM $ do +sourceRandomNConstr cnt0 = ConduitM $ \rest -> PipeM $ do gen <- liftIO MWC.createSystemRandom - let loop 0 = return $ Done () + let loop 0 = return $ rest () loop cnt = do x <- liftIO (MWC.uniform gen) return $ HaveOutput (PipeM $ loop (cnt - 1)) (return ()) x diff --git a/conduit/conduit.cabal b/conduit/conduit.cabal index 2024605d2..134737206 100644 --- a/conduit/conduit.cabal +++ b/conduit/conduit.cabal @@ -24,7 +24,7 @@ Library Data.Conduit.Internal.Fusion Build-depends: base >= 4.3 && < 5 , resourcet >= 1.1 && < 1.2 - , exceptions + , exceptions >= 0.6 , lifted-base >= 0.1 , transformers-base >= 0.4.1 && < 0.5 , monad-control >= 0.3.1 && < 0.4 diff --git a/conduit/test/main.hs b/conduit/test/main.hs index b9e718636..aeeee2031 100644 --- a/conduit/test/main.hs +++ b/conduit/test/main.hs @@ -219,8 +219,8 @@ main = hspec $ do it "map, left >+>" $ do x <- runResourceT $ CI.ConduitM - (CI.unConduitM (CL.sourceList [1..10]) - CI.>+> CI.injectLeftovers (CI.unConduitM $ CL.map (* 2))) + ((CI.unConduitM (CL.sourceList [1..10]) CI.Done + CI.>+> CI.injectLeftovers (flip CI.unConduitM CI.Done $ CL.map (* 2))) >>=) C.$$ CL.fold (+) 0 x `shouldBe` 2 * sum [1..10 :: Int] @@ -459,8 +459,8 @@ main = hspec $ do ref <- I.newIORef [] let add x = I.modifyIORef ref (x:) adder' = CI.NeedInput (\a -> liftIO (add a) >> adder') return - adder = CI.ConduitM adder' - residue x = CI.ConduitM $ CI.Leftover (CI.Done ()) x + adder = CI.ConduitM (adder' >>=) + residue x = CI.ConduitM $ \rest -> CI.Leftover (rest ()) x _ <- C.yield 1 C.$$ adder x <- I.readIORef ref @@ -524,12 +524,12 @@ main = hspec $ do describe "left/right identity" $ do it' "left identity" $ do - x <- CL.sourceList [1..10 :: Int] C.$$ CI.ConduitM CI.idP C.=$ CL.fold (+) 0 + x <- CL.sourceList [1..10 :: Int] C.$$ CI.ConduitM (CI.idP >>=) C.=$ CL.fold (+) 0 y <- CL.sourceList [1..10 :: Int] C.$$ CL.fold (+) 0 x `shouldBe` y it' "right identity" $ do - x <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ CI.unConduitM $ CL.fold (+) 0) CI.>+> CI.idP - y <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ CI.unConduitM $ CL.fold (+) 0) + x <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ flip CI.unConduitM CI.Done $ CL.fold (+) 0) CI.>+> CI.idP + y <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ flip CI.unConduitM CI.Done $ CL.fold (+) 0) x `shouldBe` y describe "generalizing" $ do @@ -635,11 +635,11 @@ main = hspec $ do describe "injectLeftovers" $ do it "works" $ do let src = mapM_ CI.yield [1..10 :: Int] - conduit = CI.injectLeftovers $ CI.unConduitM $ C.awaitForever $ \i -> do + conduit = CI.injectLeftovers $ flip CI.unConduitM CI.Done $ C.awaitForever $ \i -> do js <- CL.take 2 mapM_ C.leftover $ reverse js C.yield i - res <- CI.ConduitM (src CI.>+> CI.injectLeftovers conduit) C.$$ CL.consume + res <- CI.ConduitM ((src CI.>+> CI.injectLeftovers conduit) >>=) C.$$ CL.consume res `shouldBe` [1..10] describe "up-upstream finalizers" $ do it "pipe" $ do