Skip to content

Commit

Permalink
Apply Codensity transform on ConduitM type.
Browse files Browse the repository at this point in the history
This greatly improves performance in some cases by forcing
right-associativity. More importantly, it obviates the need for rewrite
rules for many common cases, e.g. yield >>= foo no longer needs to be
rewritten to be efficient. This was especially important, given that
these rules would not fire reliably in do-notation, since do-notation
associates to the left.

Pinging @feuerbach I bet you thought I forgot about this entirely ;)
  • Loading branch information
snoyberg committed Aug 16, 2014
1 parent 23b6c07 commit 5cacdc3
Show file tree
Hide file tree
Showing 8 changed files with 378 additions and 230 deletions.
361 changes: 253 additions & 108 deletions conduit/Data/Conduit/Internal/Conduit.hs

Large diffs are not rendered by default.

52 changes: 25 additions & 27 deletions conduit/Data/Conduit/Internal/Fusion.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,24 @@ data Stream m o = forall s. Stream

streamProducerM :: Monad m => Stream m o -> Producer m o
streamProducerM (Stream step ms0) =
ConduitM $ PipeM $ ms0 >>= loop
where
loop s = do
res <- step s
case res of
Emit s' o -> return $ HaveOutput (PipeM $ loop s') (return ()) o
Skip s' -> loop s'
Stop -> return $ Done ()
ConduitM $ \rest ->
let loop s = do
res <- step s
case res of
Emit s' o -> return $ HaveOutput (PipeM $ loop s') (return ()) o
Skip s' -> loop s'
Stop -> return $ rest ()
in PipeM $ ms0 >>= loop
{-# INLINE [0] streamProducerM #-}

streamProducerId :: Monad m => Stream Identity o -> Producer m o
streamProducerId (Stream step ms0) =
ConduitM $ loop $ runIdentity ms0
where
streamProducerId (Stream step ms0) = ConduitM $ \rest -> let
loop s =
case runIdentity $ step s of
Emit s' o -> HaveOutput (loop s') (return ()) o
Skip s' -> loop s'
Stop -> Done ()
Stop -> rest ()
in loop $ runIdentity ms0
{-# INLINE [0] streamProducerId #-}

data Unstream i m r = forall s. Unstream
Expand All @@ -63,34 +62,33 @@ data Unstream i m r = forall s. Unstream
streamConsumerM :: Monad m
=> Unstream i m r
-> Consumer i m r
streamConsumerM (Unstream step final ms0) =
ConduitM $ PipeM $ ms0 >>= return . loop
where
streamConsumerM (Unstream step final ms0) = ConduitM $ \rest -> let
loop s =
NeedInput more done
where
more i = PipeM $ do
res <- step s i
case res of
Left s' -> return $ loop s'
Right r -> return $ Done r
done () = PipeM $ liftM Done $ final s
Right r -> return $ rest r
done () = PipeM $ liftM rest $ final s
in PipeM $ ms0 >>= return . loop
{-# INLINE [0] streamConsumerM #-}

streamConsumerId :: Monad m
=> Unstream i Identity r
-> Consumer i m r
streamConsumerId (Unstream step final ms0) =
ConduitM $ loop (runIdentity ms0)
where
loop s =
NeedInput more done
where
more i =
case runIdentity $ step s i of
Left s' -> loop s'
Right r -> Done r
done () = Done $ runIdentity $ final s
ConduitM $ \rest ->
let loop s =
NeedInput more done
where
more i =
case runIdentity $ step s i of
Left s' -> loop s'
Right r -> rest r
done () = rest $ runIdentity $ final s
in loop (runIdentity ms0)
{-# INLINE [0] streamConsumerId #-}

{-
Expand Down
4 changes: 2 additions & 2 deletions conduit/Data/Conduit/Internal/Pipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ instance MonadReader r m => MonadReader r (Pipe l i o u m) where
local f (HaveOutput p c o) = HaveOutput (local f p) c o
local f (NeedInput p c) = NeedInput (\i -> local f (p i)) (\u -> local f (c u))
local _ (Done x) = Done x
local f (PipeM mp) = PipeM (local f mp)
local f (PipeM mp) = PipeM (liftM (local f) $ local f mp)
local f (Leftover p i) = Leftover (local f p) i

-- Provided for doctest
Expand Down Expand Up @@ -481,7 +481,7 @@ mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> Pipe l i o1 u m r -> Pipe l i o
mapOutputMaybe f =
go
where
go (HaveOutput p c o) = maybe id (\o' p' -> HaveOutput p' c o') (f o) (mapOutputMaybe f p)
go (HaveOutput p c o) = maybe id (\o' p' -> HaveOutput p' c o') (f o) (go p)
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM (go) mp)
Expand Down
138 changes: 69 additions & 69 deletions conduit/Data/Conduit/Lift.hs
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,18 @@ errorC p = do
runErrorC
:: (Monad m, E.Error e) =>
ConduitM i o (E.ErrorT e m) r -> ConduitM i o m (Either e r)
runErrorC =
ConduitM . go . unConduitM
where
go (Done r) = Done (Right r)
go (PipeM mp) = PipeM $ do
eres <- E.runErrorT mp
return $ case eres of
Left e -> Done $ Left e
Right p -> go p
go (Leftover p i) = Leftover (go p) i
go (HaveOutput p f o) = HaveOutput (go p) (E.runErrorT f >> return ()) o
go (NeedInput x y) = NeedInput (go . x) (go . y)
runErrorC (ConduitM c0) =
ConduitM $ \rest ->
let go (Done r) = rest (Right r)
go (PipeM mp) = PipeM $ do
eres <- E.runErrorT mp
return $ case eres of
Left e -> rest $ Left e
Right p -> go p
go (Leftover p i) = Leftover (go p) i
go (HaveOutput p f o) = HaveOutput (go p) (E.runErrorT f >> return ()) o
go (NeedInput x y) = NeedInput (go . x) (go . y)
in go (c0 Done)
{-# INLINABLE runErrorC #-}

-- | Catch an error in the base monad
Expand All @@ -161,17 +161,18 @@ catchErrorC
-> (e -> ConduitM i o (E.ErrorT e m) r)
-> ConduitM i o (E.ErrorT e m) r
catchErrorC c0 h =
ConduitM $ go $ unConduitM c0
ConduitM $ \rest ->
let go (Done r) = rest r
go (PipeM mp) = PipeM $ do
eres <- lift $ E.runErrorT mp
return $ case eres of
Left e -> unConduitM (h e) rest
Right p -> go p
go (Leftover p i) = Leftover (go p) i
go (HaveOutput p f o) = HaveOutput (go p) f o
go (NeedInput x y) = NeedInput (go . x) (go . y)
in go $ unConduitM c0 Done
where
go (Done r) = Done r
go (PipeM mp) = PipeM $ do
eres <- lift $ E.runErrorT mp
return $ case eres of
Left e -> unConduitM $ h e
Right p -> go p
go (Leftover p i) = Leftover (go p) i
go (HaveOutput p f o) = HaveOutput (go p) f o
go (NeedInput x y) = NeedInput (go . x) (go . y)
{-# INLINABLE catchErrorC #-}

-- | Run 'CatchT' in the base monad
Expand All @@ -180,18 +181,18 @@ catchErrorC c0 h =
runCatchC
:: Monad m =>
ConduitM i o (CatchT m) r -> ConduitM i o m (Either SomeException r)
runCatchC =
ConduitM . go . unConduitM
where
go (Done r) = Done (Right r)
go (PipeM mp) = PipeM $ do
eres <- runCatchT mp
return $ case eres of
Left e -> Done $ Left e
Right p -> go p
go (Leftover p i) = Leftover (go p) i
go (HaveOutput p f o) = HaveOutput (go p) (runCatchT f >> return ()) o
go (NeedInput x y) = NeedInput (go . x) (go . y)
runCatchC c0 =
ConduitM $ \rest ->
let go (Done r) = rest (Right r)
go (PipeM mp) = PipeM $ do
eres <- runCatchT mp
return $ case eres of
Left e -> rest $ Left e
Right p -> go p
go (Leftover p i) = Leftover (go p) i
go (HaveOutput p f o) = HaveOutput (go p) (runCatchT f >> return ()) o
go (NeedInput x y) = NeedInput (go . x) (go . y)
in go $ unConduitM c0 Done
{-# INLINABLE runCatchC #-}

-- | Catch an exception in the base monad
Expand All @@ -202,18 +203,18 @@ catchCatchC
ConduitM i o (CatchT m) r
-> (SomeException -> ConduitM i o (CatchT m) r)
-> ConduitM i o (CatchT m) r
catchCatchC c0 h =
ConduitM $ go $ unConduitM c0
where
go (Done r) = Done r
go (PipeM mp) = PipeM $ do
eres <- lift $ runCatchT mp
return $ case eres of
Left e -> unConduitM $ h e
Right p -> go p
go (Leftover p i) = Leftover (go p) i
go (HaveOutput p f o) = HaveOutput (go p) f o
go (NeedInput x y) = NeedInput (go . x) (go . y)
catchCatchC (ConduitM c0) h =
ConduitM $ \rest ->
let go (Done r) = rest r
go (PipeM mp) = PipeM $ do
eres <- lift $ runCatchT mp
return $ case eres of
Left e -> unConduitM (h e) rest
Right p -> go p
go (Leftover p i) = Leftover (go p) i
go (HaveOutput p f o) = HaveOutput (go p) f o
go (NeedInput x y) = NeedInput (go . x) (go . y)
in go (c0 Done)
{-# INLINABLE catchCatchC #-}

-- | Wrap the base monad in 'M.MaybeT'
Expand All @@ -235,18 +236,18 @@ maybeC p = do
runMaybeC
:: Monad m =>
ConduitM i o (M.MaybeT m) r -> ConduitM i o m (Maybe r)
runMaybeC =
ConduitM . go . unConduitM
where
go (Done r) = Done (Just r)
go (PipeM mp) = PipeM $ do
mres <- M.runMaybeT mp
return $ case mres of
Nothing -> Done Nothing
Just p -> go p
go (Leftover p i) = Leftover (go p) i
go (HaveOutput p c o) = HaveOutput (go p) (M.runMaybeT c >> return ()) o
go (NeedInput x y) = NeedInput (go . x) (go . y)
runMaybeC (ConduitM c0) =
ConduitM $ \rest ->
let go (Done r) = rest (Just r)
go (PipeM mp) = PipeM $ do
mres <- M.runMaybeT mp
return $ case mres of
Nothing -> rest Nothing
Just p -> go p
go (Leftover p i) = Leftover (go p) i
go (HaveOutput p c o) = HaveOutput (go p) (M.runMaybeT c >> return ()) o
go (NeedInput x y) = NeedInput (go . x) (go . y)
in go (c0 Done)
{-# INLINABLE runMaybeC #-}

-- | Wrap the base monad in 'R.ReaderT'
Expand Down Expand Up @@ -293,16 +294,16 @@ thread :: Monad m
-> s
-> ConduitM i o (t m) r
-> ConduitM i o m res
thread toRes runM s0 =
ConduitM . go s0 . unConduitM
where
go s (Done r) = Done (toRes r s)
go s (PipeM mp) = PipeM $ do
(p, s') <- runM mp s
return $ go s' p
go s (Leftover p i) = Leftover (go s p) i
go s (NeedInput x y) = NeedInput (go s . x) (go s . y)
go s (HaveOutput p f o) = HaveOutput (go s p) (runM f s >> return ()) o
thread toRes runM s0 (ConduitM c0) =
ConduitM $ \rest ->
let go s (Done r) = rest (toRes r s)
go s (PipeM mp) = PipeM $ do
(p, s') <- runM mp s
return $ go s' p
go s (Leftover p i) = Leftover (go s p) i
go s (NeedInput x y) = NeedInput (go s . x) (go s . y)
go s (HaveOutput p f o) = HaveOutput (go s p) (runM f s >> return ()) o
in go s0 (c0 Done)
{-# INLINABLE thread #-}

-- | Run 'SL.StateT' in the base monad
Expand Down Expand Up @@ -568,4 +569,3 @@ execRWSC
execRWSC i s p = fmap f $ runRWSC i s p
where f x = let (_, s2, w2) = x in (s2, w2)
{-# INLINABLE execRWSC #-}

25 changes: 15 additions & 10 deletions conduit/Data/Conduit/List.hs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import Data.Monoid (Monoid, mempty, mappend)
import qualified Data.Foldable as F
import Data.Conduit
import qualified Data.Conduit.Internal as CI
import qualified Data.Conduit.Internal.Conduit as CIC
import Control.Monad (when, (<=<), liftM, void)
import Control.Monad.Trans.Class (lift)

Expand Down Expand Up @@ -126,7 +127,12 @@ enumFromTo :: (Enum a, Eq a, Monad m)
=> a
-> a
-> Producer m a
enumFromTo x = CI.ConduitM . CI.enumFromTo x
enumFromTo x0 y =
loop x0
where
loop x
| x == y = yield x
| otherwise = yield x >> loop (Prelude.succ x)
{-# INLINE enumFromTo #-}

enumFromToFold :: (Enum a, Eq a, Monad m) -- FIXME far too specific
Expand Down Expand Up @@ -174,7 +180,7 @@ fold f =

connectFold :: Monad m => Source m a -> (b -> a -> b) -> b -> m b -- FIXME replace with better, more general function
connectFold (CI.ConduitM src0) f =
go src0
go (src0 CI.Done)
where
go (CI.Done ()) b = return b
go (CI.HaveOutput src _ a) b =
Expand Down Expand Up @@ -208,7 +214,7 @@ foldM f =

connectFoldM :: Monad m => Source m a -> (b -> a -> m b) -> b -> m b -- FIXME replace with better, more general function
connectFoldM (CI.ConduitM src0) f =
go src0
go (src0 CI.Done)
where
go (CI.Done ()) b = return b
go (CI.HaveOutput src _ a) b = do
Expand Down Expand Up @@ -255,7 +261,7 @@ mapM_ f = awaitForever $ lift . f

srcMapM_ :: Monad m => Source m a -> (a -> m ()) -> m ()
srcMapM_ (CI.ConduitM src) f =
go src
go (src CI.Done)
where
go (CI.Done ()) = return ()
go (CI.PipeM mp) = mp >>= go
Expand Down Expand Up @@ -325,7 +331,7 @@ map f = awaitForever $ yield . f
{-# RULES "source/map fusion =$=" forall f src. src =$= map f = mapFuseRight src f #-}

mapFuseRight :: Monad m => Source m a -> (a -> b) -> Source m b
mapFuseRight (CI.ConduitM src) f = CI.ConduitM (CI.mapOutput f src)
mapFuseRight src f = CIC.mapOutput f src
{-# INLINE mapFuseRight #-}

{-
Expand Down Expand Up @@ -576,16 +582,15 @@ filter :: Monad m => (a -> Bool) -> Conduit a m a
filter f = awaitForever $ \i -> when (f i) (yield i)

filterFuseRight :: Monad m => Source m a -> (a -> Bool) -> Source m a
filterFuseRight (CI.ConduitM src) f =
CI.ConduitM (go src)
where
go (CI.Done ()) = CI.Done ()
filterFuseRight (CI.ConduitM src) f = CI.ConduitM $ \rest -> let
go (CI.Done ()) = rest ()
go (CI.PipeM mp) = CI.PipeM (liftM go mp)
go (CI.Leftover p i) = CI.Leftover (go p) i
go (CI.HaveOutput p c o)
| f o = CI.HaveOutput (go p) c o
| otherwise = go p
go (CI.NeedInput p c) = CI.NeedInput (go . p) (go . c)
in go (src CI.Done)
-- Intermediate finalizers are dropped, but this is acceptable: the next
-- yielded value would be demanded by downstream in any event, and that new
-- finalizer will always override the existing finalizer.
Expand All @@ -602,7 +607,7 @@ sinkNull = awaitForever $ \_ -> return ()

srcSinkNull :: Monad m => Source m a -> m ()
srcSinkNull (CI.ConduitM src) =
go src
go (src CI.Done)
where
go (CI.Done ()) = return ()
go (CI.PipeM mp) = mp >>= go
Expand Down
8 changes: 4 additions & 4 deletions conduit/benchmarks/optimize-201408.hs
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,17 @@ sourceRandomNBind cnt0 = lift (liftIO MWC.createSystemRandom) >>= \gen ->
in loop cnt0

sourceRandomNPipe :: (MWC.Variate a, MonadIO m) => Int -> Source m a
sourceRandomNPipe cnt0 = ConduitM $ do
sourceRandomNPipe cnt0 = ConduitM $ \rest -> do
gen <- liftIO MWC.createSystemRandom
let loop 0 = return ()
let loop 0 = rest ()
loop cnt = do
liftIO (MWC.uniform gen) >>= CI.yield >> loop (cnt - 1)
loop cnt0

sourceRandomNConstr :: (MWC.Variate a, MonadIO m) => Int -> Source m a
sourceRandomNConstr cnt0 = ConduitM $ PipeM $ do
sourceRandomNConstr cnt0 = ConduitM $ \rest -> PipeM $ do
gen <- liftIO MWC.createSystemRandom
let loop 0 = return $ Done ()
let loop 0 = return $ rest ()
loop cnt = do
x <- liftIO (MWC.uniform gen)
return $ HaveOutput (PipeM $ loop (cnt - 1)) (return ()) x
Expand Down
Loading

17 comments on commit 5cacdc3

@snoyberg
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinging @klao. This change potentially improves performance of slidingVector significantly (it did in my tests at least). The same speedup can be achieved in the current conduit by dropping down to the raw constructors, but that's rather ugly.

@UnkindPartition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

@UnkindPartition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a new approach to this problem, by the way: http://homepages.cwi.nl/~ploeg/papers/zseq.pdf

@snoyberg
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting paper, thanks for the link. After reading it through, I'm not convinced it will actually provide a performance advantage over the codensity approach. While the issue raised of switching between representations is important, in all of the code I rewrote in conduit, it doesn't seem to present an actual problem. The reason is that any functions that need to inspect the Pipe values need to inspect the entire tree, and then I can simply combine the change in representations with the traversal. For example, see toProducer.

On the other hand, my experience with difference lists vs Seq is that, for cases of purely constructing a value by appending elements (a.k.a., snoc), difference lists are always faster, which would imply that codensity is similarly faster than the type indexed sequences described in that paper.

I'm open to looking into this further, but I was wondering if you had thoughts on this.

@UnkindPartition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, Seq has shown high constant factors in my experiments. But it isn't the only option.

Here's the result of my comparison of various free monad implementations (for a somewhat different purpose).

It'd be interesting to see a similar benchmark for a typical conduit application.

@klao
Copy link

@klao klao commented on 5cacdc3 Aug 17, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great, Michael! I didn't know you were working on this (or, even that you were considering it).

We should check the performance of a naive vector builder with this. It might need no tricks anymore. Though, I think for that the whole Pipe should have been codensified...

@klao
Copy link

@klao klao commented on 5cacdc3 Aug 17, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About Seq and the paper Roman mentioned. I recently saw @ekmett talking on irc about something, which I now realize was this transformation. (I think. :)) It was about some data structure which is much more suitable for it (that has constant time concatenation, afair).

@ekmett
Copy link

@ekmett ekmett commented on 5cacdc3 Aug 17, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snoyberg

The zseq paper approach turns out to be a win when you need to both "match on the outside of the monad" and continue binding on the inside.

The one case where this arises here is in pipes/conduit where you might want to chain two of them in series (categorical composition) which has to "match on the outside" to figure out when to pull from the second, but then bind the result of that composition into a third process (monadic composition), which takes place on the inside of the continuation.

Left associated binds are expensive for the normal ADT approach, composition followed by binding is expensive for the codensity approach, because you have to force the whole continuation to start. On the other hand the zseq approach trades off constant factors to never suffer an asymptotic hit for either. As you put things explicitly into the continuation-as-catenable-output-restricted-deque you are paying as you go to get the reified continuation reassociated just enough that you always retain O(1) access to either end.

Ultimately the issue is that there will be code for which you absolutely need the zseq / reflection without remorse style, so then the question is if you are willing to just say that those usecases are out of scope. This has been the approach taken by pipes so far for instance, and its a perfectly reasonable stance.

That said, I personally want something that is never suboptimal asymptotically, and the zseq paper finally offers us a path to get there. The question then becomes how to adjust the constant factors to make them palatable.

@ekmett
Copy link

@ekmett ekmett commented on 5cacdc3 Aug 17, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note: Seq is a suboptimal data structure for implementing the catenable output-restricted deque. You are paying an O(log n) cost on appends, while an output-restricted catenable deque can get you O(1) appends.

The issue is that you can wind up with cases where you have O(n^2) with regards to the number of steps taken if you write particularly the bad cases in either the codensity or direct style, while the asymptotic cost never rises above O(n) with a catenable output restricted deque in the mix.

With Seq you only get down to O(n log n), and of course the extra log n factor (with large constants to boot!) is going to lose in the common O(n) cases where you don't have the slowdown, but when you do, even Seq is a win.

@klao
Copy link

@klao klao commented on 5cacdc3 Aug 17, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ekmett: Thank you for this extensive comment!
Where are you at implementing an efficient output-restricted catenable deque? Are you working on the basis of Kaplan & Tarjan, or is there a better one?

@ekmett
Copy link

@ekmett ekmett commented on 5cacdc3 Aug 17, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"better" is relative. There is a write up by Tarjan and his then-student Mihaescu from 2003 on a fancier form of catenable deque. That builds a completely catenable deque (not just output-restricted deque / steque), but it is rather complicated to implement in a typed world.

I have code I'm likely to push into free to make a Control.Monad.Free.Reflection that will supply a reflection without remorse based free monad using a catenable deque built over a realtime deque, like they do in the paper. The major difference is I swap some arguments around to make the deque act as a model for an actual free category with folding/traversal based on (.) not (>>>).

The K&T deque is going to be more expensive than you want. Look up the reference code for the zseq paper that @atzeus has on his github if you don't want to wait for me to circle back to finishing up the code for free. ;)

@snoyberg
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@feuerbach Good call on the benchmarks, I've started a repo for it here: https://github.com/snoyberg/conduit-cps-benchmarks.

@ekmett Thanks for getting involved in the discussion. If you have some code available for the Reflection module, I'd love to see it. I have a few ideas of what such a data structure would look like, but I'd like to see what you're thinking of.

composition followed by binding is expensive for the codensity approach, because you have to force the whole continuation to start

Maybe I'm being naive here, but I think this can be worked around. If you look at the Codensity module in my benchmark repo, my categorical composition (which I call fuse) is:

    fuse (Conduit left0) (Conduit right0) = Conduit $ \rest -> let
        goR _left (Done r) = rest r
        goR left (M mc) = M (liftM (goR left) mc)
        goR left (Await x y) = goL x y left
        goR left (Yield o c) = Yield o (goR left c)

        goL x _y (Done ()) = goR (Done ()) x
        goL _x y (Yield o c) = goR c (y o)
        goL x y (M mc) = M (liftM (goL x y) mc)
        goL x y (Await x' y') = Await
            (goL x y x')
            (\a -> goL x y (y' a))

        in goR (left0 Done) (right0 Done)

I believe this allows for the cheap append with low constant factors which we get from codensity, yet does not require forcing the left side of a monadic bind.

To be sure we're talking about the same thing, I'd imagine you're worried about code like the following:

mapC :: (i -> o) -> Conduit i o m r

possiblyExpensive =
    (mapM_ yield [0..999] `fuse` mapC (+ 1)) >> mapM_ yield [1001..2000]

My benchmarks (the "mix compositions" one in the linked repo) seem to support that this is still relatively efficient.

@snoyberg
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@klao
Copy link

@klao klao commented on 5cacdc3 Aug 18, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Michael, the problem in general is this:

c1 `fuse` c2 `fuse` c3 `fuse` ...

with a lot of conduits fused together. Admittedly, this is not how we normally program with conduits. But, I can imagine a problem, where the most natural solution is to recursively create a conduit by fusing things together. This is OK with the original solution, but would be quadratic with the codensity approach.

All-in-all, I don't think that this is a big issue, but it can appear in many different forms, eg. zipping sources together is similar, and possibly more likely that someone will zip many (variable number) of sources... So, there might be a bigger issue here that we just don't see yet. But, I don't think it's very likely, and the quadratic behavior of replicateM and co. is definitely problematic (I definitely ran into it unsuspectingly :)). So, fixing it with the codensity transformation is really good, especially if it brings a general speed-up too!

@snoyberg
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that there's any increase complexity for the code you described with the codensity approach. Both in conduit 1.1 (a.k.a. "standard") and codensity, each fuse call requires traversing each argument and generating a new structure. (By the way, that's exactly what I'm hoping to eliminate by implementing stream fusion.) The way I've implemented codensity, I don't think there's any extra traversal of the newly generated structure when you proceed to fuse it again. I think zipping sources falls into this same category.

It's also entirely possible that I'm simply not seeing the problem, so if you think I'm mistaken, please do tell me so.

@klao
Copy link

@klao klao commented on 5cacdc3 Aug 18, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, sorry, what I wrote was bogus.

Fortunately for conduit, it doesn't have any function corresponding to pipes' next: http://hackage.haskell.org/package/pipes-4.1.2/docs/Pipes.html#v:next
That is something that cannot be done if pipes were codensified. And that previously could have been written for conduit, but not anymore. Of course, you can still do it if you go down to the level of Pipes.

I'll try to see if I can find a not too convoluted degenerate example for the new ConduitM.

@snoyberg
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, conduit does have something like next: connect-and-resume. It would look like:

(rsrc, ()) <- src $$+ return ()
(rsrc', mnext) <- rsrc $$++ await

And that's exactly why ResumableSource is not codensity-fied. That does mean that it's inefficient to monadically compose ResumableSources, but as it turns out, they aren't an instance of Monad anyway (due to finalizers). So it seems like (by chance), we keep the same semantics and performance of what we had previously.

Please sign in to comment.