Permalink
Browse files

Adapt to API changes and remove Combinators.

  • Loading branch information...
1 parent 1a07964 commit d829586227049e5f502bc32fbc4a32bde37a8a0f @pcapriotti committed Mar 3, 2012
View
@@ -1,7 +1,7 @@
Collection of basic utilities for [pipes][1].
* **pipes-attoparsec**: Utilities to convert a parser into a pipe.
- * **pipes-extra**: Basic combinators, file readers and writers.
+ * **pipes-extra**: File readers and writers, chunk pipes.
* **pipes-conduit**: [Conduit][2] adapters.
* **pipes-network**: Utilities to deal with sockets. Ported from conduit.
* **pipes-zlib**: Pipes to deal with zipped data. Ported from conduit.
@@ -4,8 +4,10 @@ module Control.Pipe.Attoparsec (
pipeParser,
) where
-import Control.Exception
+import qualified Control.Exception as E
import Control.Pipe
+import Control.Pipe.Combinators
+import Control.Pipe.Exception
import Data.Attoparsec.Types
import Data.Maybe
import Data.Monoid
@@ -21,11 +23,11 @@ data ParseError
-- when its input is exhausted.
deriving (Show, Typeable)
-instance Exception ParseError
+instance E.Exception ParseError
-- | Convert a parser continuation into a Pipe.
--
--- To get a parser continuation from a 'Parser', use the parse function of the
+-- To get a parser continuation from a 'Parser', use the @parse@ function of the
-- appropriate Attoparsec module.
pipeParser :: (Monoid a, Monad m) => (a -> IResult a r) -> Pipe a x m (a, Either ParseError r)
pipeParser p = go p
@@ -11,8 +11,11 @@ module Control.Pipe.Conduit (
) where
import Control.Monad (void)
+import Control.Monad.Trans
import Control.Monad.Trans.Resource
import Control.Pipe
+import Control.Pipe.Combinators
+import Control.Pipe.Exception
import Data.Conduit
-- | Convert a 'Conduit' to 'Pipe'.
@@ -15,9 +15,10 @@ module Control.Pipe.Binary (
) where
import Control.Monad
-import Control.Monad.Trans (MonadIO, liftIO)
+import Control.Monad.Trans (MonadIO, liftIO, lift)
import Control.Pipe
-import qualified Control.Pipe.Combinators as PC
+import Control.Pipe.Exception
+import Control.Pipe.Combinators (tryAwait, feed)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as BC
import Data.Monoid
@@ -53,7 +54,7 @@ fileWriter path = do
-- receive some data before opening the handle
input <- await
-- feed it to the actual worker pipe
- PC.feed input go
+ feed input go
where
go = bracket
(liftIO $ openFile path WriteMode)
@@ -1,129 +0,0 @@
--- | Basic pipe combinators.
-module Control.Pipe.Combinators (
- ($$),
- fromList,
- nullP,
- fold,
- fold1,
- consume,
- consume1,
- take,
- drop,
- pipeList,
- takeWhile,
- takeWhile_,
- dropWhile,
- intersperse,
- groupBy,
- filter,
- feed,
- ) where
-
-import Control.Applicative
-import Control.Monad
-import Control.Pipe
-import Data.Maybe
-import Prelude hiding (until, take, drop, concatMap, filter, takeWhile, dropWhile)
-
--- | Connect producer to consumer, ignoring producer return value.
-infixr 5 $$
-($$) :: Monad m => Pipe x a m r' -> Pipe a y m r -> Pipe x y m (Maybe r)
-p1 $$ p2 = (p1 >> return Nothing) >+> fmap Just p2
-
--- | Successively yield elements of a list.
-fromList :: Monad m => [a] -> Pipe x a m ()
-fromList = mapM_ yield
-
--- | A pipe that terminates immediately.
-nullP :: Monad m => Pipe a b m ()
-nullP = return ()
-
--- | A fold pipe. Apply a binary function to successive input values and an
--- accumulator, and return the final result.
-fold :: Monad m => (b -> a -> b) -> b -> Pipe a x m b
-fold f = go
- where
- go x = tryAwait >>= maybe (return x) (go . f x)
-
--- | A variation of 'fold' without an initial value for the accumulator. This
--- pipe doesn't return any value if no input values are received.
-fold1 :: Monad m => (a -> a -> a) -> Pipe a x m a
-fold1 f = tryAwait >>= maybe discard (fold f)
-
--- | Accumulate all input values into a list.
-consume :: Monad m => Pipe a x m [a]
-consume = pipe (:) >+> (fold (.) id <*> pure [])
-
--- | Accumulate all input values into a non-empty list.
-consume1 :: Monad m => Pipe a x m [a]
-consume1 = pipe (:) >+> (fold1 (.) <*> pure [])
-
--- | Act as an identity for the first 'n' values, then terminate.
-take :: Monad m => Int -> Pipe a a m ()
-take n = replicateM_ n $ await >>= yield
-
--- | Remove the first 'n' values from the stream, then act as an identity.
-drop :: Monad m => Int -> Pipe a a m r
-drop n = replicateM_ n await >> idP
-
--- | Apply a function with multiple return values to the stream.
-pipeList :: Monad m => (a -> [b]) -> Pipe a b m r
-pipeList f = forever $ await >>= mapM_ yield . f
-
--- | Act as an identity until as long as inputs satisfy the given predicate.
--- Return the first element that doesn't satisfy the predicate.
-takeWhile :: Monad m => (a -> Bool) -> Pipe a a m a
-takeWhile p = go
- where
- go = await >>= \x -> if p x then yield x >> go else return x
-
--- | Variation of 'takeWhile' returning @()@.
-takeWhile_ :: Monad m => (a -> Bool) -> Pipe a a m ()
-takeWhile_ p = takeWhile p >> return ()
-
--- | Remove inputs as long as they satisfy the given predicate, then act as an
--- identity.
-dropWhile :: Monad m => (a -> Bool) -> Pipe a a m r
-dropWhile p = (takeWhile p >+> discard) >>= yield >> idP
-
--- | Yield Nothing when an input satisfying the predicate is received.
-intersperse :: Monad m => (a -> Bool) -> Pipe a (Maybe a) m r
-intersperse p = forever $ do
- x <- await
- when (p x) $ yield Nothing
- yield $ Just x
-
--- | Group input values by the given predicate.
-groupBy :: Monad m => (a -> a -> Bool) -> Pipe a [a] m r
-groupBy p = streaks >+> createGroups
- where
- streaks = await >>= \x -> yield (Just x) >> streaks' x
- streaks' x = do
- y <- await
- unless (p x y) $ yield Nothing
- yield $ Just y
- streaks' y
- createGroups = forever $
- takeWhile_ isJust >+>
- pipe fromJust >+>
- (consume1 >>= yield)
-
--- | Remove values from the stream that don't satisfy the given predicate.
-filter :: Monad m => (a -> Bool) -> Pipe a a m r
-filter p = forever $ takeWhile_ p
-
--- | Feed an input element to a pipe.
-feed :: Monad m => a -> Pipe a b m r -> Pipe a b m r
-
--- this could be implemented as
--- feed x p = (yield x >> idP) >+> p
--- but this version is more efficient
-feed _ (Pure r) = return r
-feed _ (Throw e) = throw e
-feed a (Free c h) = case go a c of
- (False, p) -> p >>= feed a
- (True, p) -> join p
- where
- go a (Await k) = (True, return $ k a)
- go _ (Yield y c) = (False, yield y >> return c)
- go _ (M m s) = (False, lift_ s m)
@@ -21,5 +21,4 @@ Library
bytestring (== 0.9.*)
Exposed-Modules:
Control.Pipe.Binary,
- Control.Pipe.ChunkPipe,
- Control.Pipe.Combinators
+ Control.Pipe.ChunkPipe
@@ -13,12 +13,11 @@ import qualified Network.Socket as NS
import Network.Socket (Socket)
import Network.Socket.ByteString (sendAll, recv)
import Data.ByteString (ByteString)
-import Data.Void
import qualified Data.ByteString as B
import Control.Concurrent (forkIO)
import qualified Control.Exception as E
import Control.Monad (forever, unless)
-import Control.Monad.Trans (MonadIO, liftIO)
+import Control.Monad.Trans (MonadIO, liftIO, lift)
import Control.Pipe
-- adapted from conduit
@@ -12,8 +12,10 @@ module Control.Pipe.Zlib (
import Codec.Zlib
import Control.Exception (SomeException)
import Control.Monad
-import Control.Monad.Trans (MonadIO, liftIO)
+import Control.Monad.Trans (MonadIO, liftIO, lift)
import Control.Pipe
+import Control.Pipe.Combinators
+import Control.Pipe.Exception
import qualified Data.ByteString as B
import Prelude hiding (catch)
@@ -31,7 +33,7 @@ decompress
-> Pipe B.ByteString B.ByteString m ()
decompress config = do
inf <- lift . liftIO $ initInflate config
- whileAwait $ \x -> do
+ forP $ \x -> do
chunks <- lift . liftIO $ withInflateInput inf x callback
mapM_ yield chunks
@@ -46,7 +48,7 @@ compress
-> Pipe B.ByteString B.ByteString m ()
compress level config = do
def <- lift . liftIO $ initDeflate level config
- whileAwait $ \x -> do
+ forP $ \x -> do
chunks <- lift . liftIO $ withDeflateInput def x callback
mapM_ yield chunks
chunks <- lift . liftIO $ finishDeflate def callback
@@ -59,7 +61,3 @@ callback pop = go id where
case x of
Nothing -> return $ xs []
Just y -> go (xs . (y:))
-
-whileAwait :: (Show a, MonadIO m) => (a -> Pipe a b m r) -> Pipe a b m ()
-whileAwait f = catch (forever $ await >>= f)
- (\(_ :: BrokenUpstreamPipe) -> return ())

0 comments on commit d829586

Please sign in to comment.