Skip to content

Commit

Permalink
Finished implementing things
Browse files Browse the repository at this point in the history
  • Loading branch information
snoyberg committed Sep 29, 2013
1 parent 23f0a83 commit f86a1c2
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 57 deletions.
1 change: 1 addition & 0 deletions conduit/Data/Conduit/Internal.hs
Expand Up @@ -20,6 +20,7 @@ module Data.Conduit.Internal
, tryYield
, yieldOr
, leftover
, draw
-- * Finalization
, bracketP
, addCleanup
Expand Down
3 changes: 1 addition & 2 deletions conduit/Data/Conduit/List.hs
Expand Up @@ -445,7 +445,6 @@ sequence :: Monad m
=> Consumer i m o -- ^ @Pipe@ to run repeatedly
-> Conduit i m o
sequence sink =
Prelude.error "sequence" {-self
self
where
self = awaitForever $ \i -> leftover i >> sink >>= yield
-}
69 changes: 33 additions & 36 deletions conduit/Data/Conduit/Util.hs
Expand Up @@ -9,34 +9,26 @@ module Data.Conduit.Util
import Prelude hiding (zip)
import Control.Monad (liftM, liftM2)
import Control.Monad.Trans.Class (lift)
import Data.Conduit (Source, Sink)
import Data.Conduit.Internal (Pipe (..))
import Data.Conduit (Source, Sink, ($$))
import Data.Conduit.Internal
import Data.Void (Void, absurd)

-- | Combines two sources. The new source will stop producing once either
-- source has been exhausted.
--
-- Since 0.3.0
zip :: Monad m => Source m a -> Source m b -> Source m (a, b)
zip l0 r0 = do
error "Data.Conduit.Util.zip"
{-
(cl, l1) <- lift $ getCleanup l0
(cr, r1) <- lift $ getCleanup r0
go cl cr l1 r1
where
go _ _ (Done _ ()) (Done _ ()) = Done [] ()
go _ cr (Done _ ()) (HaveOutput _ _) = dropOutput (cr [])
go cl _ (HaveOutput _ _) (Done _ ()) = dropOutput (cl [])
go _ cr (Done _ ()) (ConduitM _) = dropOutput (cr [])
go cl _ (ConduitM _) (Done _ ()) = dropOutput (cl [])
go cl cr (ConduitM mx) (ConduitM my) = ConduitM (liftM2 (go cl cr) mx my)
go cl cr (ConduitM mx) y@HaveOutput{} = ConduitM (liftM (\x -> go cl cr x y) mx)
go cl cr x@HaveOutput{} (ConduitM my) = ConduitM (liftM (go cl cr x) my)
go cl cr (HaveOutput srcx x) (HaveOutput srcy y) = HaveOutput (go cl cr srcx srcy) (x, y)
go cl cr (NeedInput _ c) right = go cl cr c right
go cl cr left (NeedInput _ c) = go cl cr left c
-}
zip left right = do
mleft <- lift $ draw left
case mleft of
Nothing -> lift $ right $$ return ()
Just (left', a) -> do
mright <- lift $ draw right
case mright of
Nothing -> lift $ left' $$ return ()
Just (right', b) -> do
yield (a, b)
zip left' right'

-- | Combines two sinks. The new sink will complete when both input sinks have
-- completed.
Expand All @@ -45,20 +37,25 @@ zip l0 r0 = do
--
-- Since 0.4.1
zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
zipSinks x0 y0 =
error "Data.Conduit.Util.zipSinks"
{-
x0 >< y0
zipSinks (Pipe x0) (Pipe y0) =
Pipe $ \m -> go m (x0 m) (y0 m)
where
(><) :: Monad m => ConduitM i Void m r1 -> ConduitM i Void m r2 -> ConduitM i o m (r1, r2)
HaveOutput _ o >< _ = absurd o
_ >< HaveOutput _ o = absurd o

ConduitM mx >< y = ConduitM (liftM (>< y) mx)
x >< ConduitM my = ConduitM (liftM (x ><) my)
Done _ x >< Done _ y = Done [] (x, y)
NeedInput px cx >< NeedInput py cy = NeedInput (\i -> px i >< py i) (cx >< cy)
NeedInput px cx >< y@Done{} = NeedInput (\i -> px i >< y) (cx >< y)
x@Done{} >< NeedInput py cy = NeedInput (\i -> x >< py i) (x >< cy)
-}
go m (Yield _ (Just o)) _ = absurd o
go m _ (Yield _ (Just o)) = absurd o
go m (Yield x Nothing) y = go m (x m) y
go m x (Yield y Nothing) = go m x (y m)
go m (Pure (PipeCont (Endpoint _ x) _)) (Pure (PipeCont (Endpoint _ y) _)) = Pure $ PipeCont (Endpoint [] (x, y)) m
go _ (Pure PipeTerm{}) _ = error "zipSinks: PipeTerm"
go _ _ (Pure PipeTerm{}) = error "zipSinks: PipeTerm"
go m (M x) y = lift x >>= \x' -> go m x' y
go m x (M y) = lift y >>= \y' -> go m x y'
go m (Await x) (Await y) = do
mi <- Await Pure
go m (x mi) (y mi)
go m (Await x) (Pure y) = do
mi <- Await Pure
go m (x mi) (Pure y)
go m (Pure x) (Await y) = do
mi <- Await Pure
go m (Pure x) (y mi)
22 changes: 3 additions & 19 deletions conduit/test/main.hs
Expand Up @@ -52,7 +52,6 @@ equivToList f conduit xs =

main :: IO ()
main = hspec $ do
{-
describe "data loss rules" $ do
it "consumes the source to quickly" $ do
x <- runResourceT $ CL.sourceList [1..10 :: Int] C.$$ do
Expand Down Expand Up @@ -735,17 +734,16 @@ main = hspec $ do

describe "generalizing" $ do
it' "works" $ do
x <- CI.runPipe
$ (CL.sourceList [1..10 :: Int])
CI.>+> (CL.map (+ 1))
Just x <- CI.runPipe
$ CI.fromDown (CL.sourceList [1..10 :: Int])
CI.>+> CI.fromDown (CL.map (+ 1))
CI.>+> (CL.fold (+) 0)
x `shouldBe` sum [2..11]

describe "iterate" $ do
it' "works" $ do
res <- CL.iterate (+ 1) (1 :: Int) C.$$ CL.isolate 10 C.=$ CL.fold (+) 0
res `shouldBe` sum [1..10]
-}

describe "unwrapResumable" $ do
it' "works" $ do
Expand Down Expand Up @@ -912,20 +910,6 @@ main = hspec $ do
assert $ c1 == c2
assert $ s1 == s2

{-
describe "generalizing" $ do
it "works" $ do
let src :: Int -> C.Source IO Int
src i = CL.sourceList [1..i]
sink :: C.Sink Int IO Int
sink = CL.fold (+) 0
res <- C.yield 10
C.$$ C.awaitForever (C.toProducer src)
C.=$ (C.toConsumer sink >>= C.yield)
C.=$ C.await
res `shouldBe` Just (sum [1..10])
-}

describe "sinkCacheLength" $ do
it' "works" $ C.runResourceT $ do
lbs <- liftIO $ L.readFile "test/main.hs"
Expand Down

0 comments on commit f86a1c2

Please sign in to comment.