Permalink
Browse files

Producer/Consumer datatypes

  • Loading branch information...
1 parent 75ac21f commit 45fe6da7b298daed8befa9395bd9ebdf8d799801 @snoyberg committed Feb 13, 2013
@@ -34,7 +34,7 @@ import Control.Monad (unless)
import qualified Data.Attoparsec.ByteString
import qualified Data.Attoparsec.Text
import qualified Data.Attoparsec.Types as A
-import Data.Conduit hiding (Pipe, Sink, Conduit, Source)
+import Data.Conduit
-- | The context and message from a 'A.Fail' value.
data ParseError = ParseError
@@ -113,27 +113,27 @@ instance AttoparsecInput T.Text where
-- If parsing fails, a 'ParseError' will be thrown with 'monadThrow'.
--
-- Since 0.5.0
-sinkParser :: (AttoparsecInput a, MonadThrow m) => A.Parser a b -> GLSink a m b
+sinkParser :: (AttoparsecInput a, MonadThrow m) => A.Parser a b -> Consumer a m b
sinkParser = fmap snd . sinkParserPos (Position 1 1)
-- | Consume a stream of parsed tokens, returning both the token and the
-- position it appears at.
--
-- Since 0.5.0
-conduitParser :: (AttoparsecInput a, MonadThrow m) => A.Parser a b -> GLInfConduit a m (PositionRange, b)
+conduitParser :: (AttoparsecInput a, MonadThrow m) => A.Parser a b -> Conduit a m (PositionRange, b)
conduitParser parser =
conduit $ Position 1 0
where
conduit pos =
- awaitE >>= either return go
+ await >>= maybe (return ()) go
where
go x = do
leftover x
(pos', res) <- sinkParserPos pos parser
yield (PositionRange pos pos', res)
conduit pos'
-sinkParserPos :: (AttoparsecInput a, MonadThrow m) => Position -> A.Parser a b -> GLSink a m (Position, b)
+sinkParserPos :: (AttoparsecInput a, MonadThrow m) => Position -> A.Parser a b -> Consumer a m (Position, b)
sinkParserPos pos0 =
sink empty pos0 . parseA
where
@@ -17,27 +17,27 @@ import qualified Data.ByteString.Base64.URL as B64U
import Data.Conduit
-encode :: Monad m => GInfConduit ByteString m ByteString
+encode :: Monad m => Conduit ByteString m ByteString
encode = codeWith 3 B64.encode
-decode :: Monad m => GInfConduit ByteString m ByteString
+decode :: Monad m => Conduit ByteString m ByteString
decode = codeWith 4 B64.decodeLenient
-encodeURL :: Monad m => GInfConduit ByteString m ByteString
+encodeURL :: Monad m => Conduit ByteString m ByteString
encodeURL = codeWith 3 B64U.encode
-decodeURL :: Monad m => GInfConduit ByteString m ByteString
+decodeURL :: Monad m => Conduit ByteString m ByteString
decodeURL = codeWith 4 B64U.decodeLenient
-codeWith :: Monad m => Int -> (ByteString -> ByteString) -> GInfConduit ByteString m ByteString
+codeWith :: Monad m => Int -> (ByteString -> ByteString) -> Conduit ByteString m ByteString
codeWith size f =
loop
where
- loop = awaitE >>= either return push
+ loop = await >>= maybe (return ()) push
loopWith bs
| S.null bs = loop
- | otherwise = awaitE >>= either (\r -> yield (f bs) >> return r) (pushWith bs)
+ | otherwise = await >>= maybe (yield (f bs)) (pushWith bs)
push bs = do
let (x, y) = S.splitAt (len - (len `mod` size)) bs
@@ -42,7 +42,7 @@ module Data.Conduit.Blaze
, reuseBufferStrategy
) where
-import Data.Conduit hiding (Source, Conduit, Sink, Pipe)
+import Data.Conduit
import Control.Monad (unless, liftM)
import Control.Monad.Trans.Class (lift, MonadTrans)
@@ -54,14 +54,14 @@ import Blaze.ByteString.Builder.Internal.Buffer
-- | Incrementally execute builders and pass on the filled chunks as
-- bytestrings.
-builderToByteString :: MonadUnsafeIO m => GInfConduit Builder m S.ByteString
+builderToByteString :: MonadUnsafeIO m => Conduit Builder m S.ByteString
builderToByteString =
builderToByteStringWith (allNewBuffersStrategy defaultBufferSize)
-- |
--
-- Since 0.0.2
-builderToByteStringFlush :: MonadUnsafeIO m => GInfConduit (Flush Builder) m (Flush S.ByteString)
+builderToByteStringFlush :: MonadUnsafeIO m => Conduit (Flush Builder) m (Flush S.ByteString)
builderToByteStringFlush =
builderToByteStringWithFlush (allNewBuffersStrategy defaultBufferSize)
@@ -74,7 +74,7 @@ builderToByteStringFlush =
-- as control is returned from the inner sink!
unsafeBuilderToByteString :: MonadUnsafeIO m
=> IO Buffer -- action yielding the inital buffer.
- -> GInfConduit Builder m S.ByteString
+ -> Conduit Builder m S.ByteString
unsafeBuilderToByteString = builderToByteStringWith . reuseBufferStrategy
@@ -84,9 +84,9 @@ unsafeBuilderToByteString = builderToByteStringWith . reuseBufferStrategy
-- INV: All bytestrings passed to the inner sink are non-empty.
builderToByteStringWith :: MonadUnsafeIO m
=> BufferAllocStrategy
- -> GInfConduit Builder m S.ByteString
+ -> Conduit Builder m S.ByteString
builderToByteStringWith =
- helper (liftM (fmap Chunk) awaitE) yield'
+ helper (liftM (fmap Chunk) await) yield'
where
yield' Flush = return ()
yield' (Chunk bs) = yield bs
@@ -97,27 +97,26 @@ builderToByteStringWith =
builderToByteStringWithFlush
:: MonadUnsafeIO m
=> BufferAllocStrategy
- -> GInfConduit (Flush Builder) m (Flush S.ByteString)
-builderToByteStringWithFlush = helper awaitE yield
+ -> Conduit (Flush Builder) m (Flush S.ByteString)
+builderToByteStringWithFlush = helper await yield
helper :: (MonadUnsafeIO m, Monad (t m), MonadTrans t)
- => t m (Either term (Flush Builder))
+ => t m (Maybe (Flush Builder))
-> (Flush S.ByteString -> t m ())
-> BufferAllocStrategy
- -> t m term
-helper awaitE' yield' (ioBufInit, nextBuf) =
+ -> t m ()
+helper await' yield' (ioBufInit, nextBuf) =
loop ioBufInit
where
loop ioBuf = do
- awaitE' >>= either (close ioBuf) (cont' ioBuf)
+ await' >>= maybe (close ioBuf) (cont' ioBuf)
cont' ioBuf Flush = push ioBuf flush $ \ioBuf' -> yield' Flush >> loop ioBuf'
cont' ioBuf (Chunk builder) = push ioBuf builder loop
- close ioBuf r = do
+ close ioBuf = do
buf <- lift $ unsafeLiftIO $ ioBuf
maybe (return ()) (yield' . Chunk) (unsafeFreezeNonEmptyBuffer buf)
- return r
push ioBuf0 x continue = do
go (unBuilder x (buildStep finalStep)) ioBuf0
View
@@ -39,9 +39,8 @@ module Data.Conduit
-- * Generalized conduit types
-- $generalizedConduitTypes
- , GSource
- , GSink
- , GConduit
+ , Producer
+ , Consumer
-- * Flushing
, Flush (..)
@@ -62,7 +61,6 @@ module Data.Conduit
import Control.Monad.Trans.Resource
import Data.Conduit.Internal hiding (await, awaitForever, yield, yieldOr, leftover, bracketP, addCleanup, transPipe, mapOutput, mapOutputMaybe, mapInput)
import qualified Data.Conduit.Internal as CI
-import Data.Void (Void)
{- $conduitInterface
@@ -515,7 +513,7 @@ instance Functor Flush where
fmap _ Flush = Flush
fmap f (Chunk a) = Chunk (f a)
-await :: Monad m => GSink i m (Maybe i)
+await :: Monad m => Consumer i m (Maybe i)
await = ConduitM CI.await
mapOutput :: Monad m => (o1 -> o2) -> ConduitM i o1 m r -> ConduitM i o2 m r
@@ -35,7 +35,7 @@ module Data.Conduit.Binary
import Prelude hiding (head, take, drop, takeWhile, dropWhile)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
-import Data.Conduit hiding (Source, Conduit, Sink)
+import Data.Conduit
import Data.Conduit.List (sourceList)
import Control.Exception (assert)
import Control.Monad (unless)
@@ -54,7 +54,7 @@ import qualified System.PosixFile as F
-- Since 0.3.0
sourceFile :: MonadResource m
=> FilePath
- -> GSource m S.ByteString
+ -> Producer m S.ByteString
sourceFile fp =
#if CABAL_OS_WINDOWS || NO_HANDLES
bracketP
@@ -74,7 +74,7 @@ sourceFile fp =
-- Since 0.3.0
sourceHandle :: MonadIO m
=> IO.Handle
- -> GSource m S.ByteString
+ -> Producer m S.ByteString
sourceHandle h =
loop
where
@@ -92,7 +92,7 @@ sourceHandle h =
-- Since 0.3.0
sourceIOHandle :: MonadResource m
=> IO IO.Handle
- -> GSource m S.ByteString
+ -> Producer m S.ByteString
sourceIOHandle alloc = bracketP alloc IO.hClose sourceHandle
-- | Stream all incoming data to the given 'IO.Handle'. Note that this function
@@ -101,7 +101,7 @@ sourceIOHandle alloc = bracketP alloc IO.hClose sourceHandle
-- Since 0.3.0
sinkHandle :: MonadIO m
=> IO.Handle
- -> GSink S.ByteString m ()
+ -> Consumer S.ByteString m ()
sinkHandle h = awaitForever $ liftIO . S.hPut h
-- | An alternative to 'sinkHandle'.
@@ -112,7 +112,7 @@ sinkHandle h = awaitForever $ liftIO . S.hPut h
-- Since 0.3.0
sinkIOHandle :: MonadResource m
=> IO IO.Handle
- -> GSink S.ByteString m ()
+ -> Consumer S.ByteString m ()
sinkIOHandle alloc = bracketP alloc IO.hClose sinkHandle
-- | Stream the contents of a file as binary data, starting from a certain
@@ -123,7 +123,7 @@ sourceFileRange :: MonadResource m
=> FilePath
-> Maybe Integer -- ^ Offset
-> Maybe Integer -- ^ Maximum count
- -> GSource m S.ByteString
+ -> Producer m S.ByteString
sourceFileRange fp offset count = bracketP
(IO.openBinaryFile fp IO.ReadMode)
IO.hClose
@@ -160,7 +160,7 @@ sourceFileRange fp offset count = bracketP
-- Since 0.3.0
sinkFile :: MonadResource m
=> FilePath
- -> GSink S.ByteString m ()
+ -> Consumer S.ByteString m ()
sinkFile fp = sinkIOHandle (IO.openBinaryFile fp IO.WriteMode)
-- | Stream the contents of the input to a file, and also send it along the
@@ -169,7 +169,7 @@ sinkFile fp = sinkIOHandle (IO.openBinaryFile fp IO.WriteMode)
-- Since 0.3.0
conduitFile :: MonadResource m
=> FilePath
- -> GConduit S.ByteString m S.ByteString
+ -> Conduit S.ByteString m S.ByteString
conduitFile fp = bracketP
(IO.openBinaryFile fp IO.WriteMode)
IO.hClose
@@ -184,7 +184,7 @@ conduitFile fp = bracketP
-- Since 0.3.0
isolate :: Monad m
=> Int
- -> GConduit S.ByteString m S.ByteString
+ -> Conduit S.ByteString m S.ByteString
isolate =
loop
where
@@ -204,7 +204,7 @@ isolate =
-- | Return the next byte from the stream, if available.
--
-- Since 0.3.0
-head :: Monad m => GSink S.ByteString m (Maybe Word8)
+head :: Monad m => Consumer S.ByteString m (Maybe Word8)
head = do
mbs <- await
case mbs of
@@ -217,7 +217,7 @@ head = do
-- | Return all bytes while the predicate returns @True@.
--
-- Since 0.3.0
-takeWhile :: Monad m => (Word8 -> Bool) -> GConduit S.ByteString m S.ByteString
+takeWhile :: Monad m => (Word8 -> Bool) -> Conduit S.ByteString m S.ByteString
takeWhile p =
loop
where
@@ -233,7 +233,7 @@ takeWhile p =
-- | Ignore all bytes while the predicate returns @True@.
--
-- Since 0.3.0
-dropWhile :: Monad m => (Word8 -> Bool) -> GSink S.ByteString m ()
+dropWhile :: Monad m => (Word8 -> Bool) -> Consumer S.ByteString m ()
dropWhile p =
loop
where
@@ -248,7 +248,7 @@ dropWhile p =
-- | Take the given number of bytes, if available.
--
-- Since 0.3.0
-take :: Monad m => Int -> GSink S.ByteString m L.ByteString
+take :: Monad m => Int -> Consumer S.ByteString m L.ByteString
take n0 =
go n0 id
where
@@ -266,7 +266,7 @@ take n0 =
-- | Drop up to the given number of bytes.
--
-- Since 0.5.0
-drop :: Monad m => Int -> GSink S.ByteString m ()
+drop :: Monad m => Int -> Consumer S.ByteString m ()
drop =
go
where
@@ -285,7 +285,7 @@ drop =
-- (10), and strip it from the output.
--
-- Since 0.3.0
-lines :: Monad m => GConduit S.ByteString m S.ByteString
+lines :: Monad m => Conduit S.ByteString m S.ByteString
lines =
loop id
where
@@ -307,5 +307,5 @@ lines =
-- | Stream the chunks from a lazy bytestring.
--
-- Since 0.5.0
-sourceLbs :: Monad m => L.ByteString -> GSource m S.ByteString
+sourceLbs :: Monad m => L.ByteString -> Producer m S.ByteString
sourceLbs = sourceList . L.toChunks
@@ -10,11 +10,10 @@ module Data.Conduit.Internal
Pipe (..)
, ConduitM (..)
, Source
- , GSource
+ , Producer
, Sink
- , GSink
+ , Consumer
, Conduit
- , GConduit
, ResumableSource (..)
-- * Primitives
, await
@@ -155,7 +154,7 @@ type Source m o = ConduitM () o m ()
-- | Generalized 'Source'.
--
-- Since 0.5.0
-type GSource m o = forall i. ConduitM i o m ()
+type Producer m o = forall i. ConduitM i o m ()
-- | Consumes a stream of input values and produces a final result, without
-- producing any output.
@@ -166,19 +165,14 @@ type Sink i m r = ConduitM i Void m r
-- | Generalized 'Sink' without leftovers.
--
-- Since 0.5.0
-type GSink i m r = forall o. ConduitM i o m r
+type Consumer i m r = forall o. ConduitM i o m r
-- | Consumes a stream of input values and produces a stream of output values,
-- without producing a final result.
--
-- Since 0.5.0
type Conduit i m o = ConduitM i o m ()
--- | Generalized conduit without leftovers.
---
--- Since 0.5.0
-type GConduit i m o = Conduit i m o
-
-- | A @Source@ which has been started, but has not yet completed.
--
-- This type contains both the current state of the @Source@, and the finalizer
Oops, something went wrong.

0 comments on commit 45fe6da

Please sign in to comment.