Permalink
Fetching contributors…
Cannot retrieve contributors at this time
84 lines (70 sloc) 2.58 KB
{-# LANGUAGE NoMonomorphismRestriction, PatternGuards, CPP #-}
module General.Conduit(
module Data.Conduit, MonadIO, liftIO,
sourceList, sinkList, sourceLStr,
mapC, mapAccumC, filterC,
mapMC, mapAccumMC,
(|$|), pipelineC, groupOnLastC,
zipFromC, linesCR
) where
import Data.Void
import Data.Conduit
import Data.Conduit.List as C
import Data.Conduit.Binary as C
import Data.Maybe
import Control.Applicative
import Control.Monad.Extra
import Control.Exception
import qualified Data.ByteString.Char8 as BS
import Control.Concurrent.Extra hiding (yield)
import Control.Monad.IO.Class
import General.Str
import Prelude
mapC = C.map
mapMC = C.mapM
mapAccumC f = C.mapAccum (\x a -> a `seq` f a x)
mapAccumMC f = C.mapAccumM (\x a -> a `seq` f a x)
filterC = C.filter
zipFromC :: (Monad m, Enum i) => i -> ConduitM a (i, a) m ()
zipFromC = void . mapAccumC (\i x -> (succ i, (i,x)))
(|$|) :: Monad m => ConduitM i o m r1 -> ConduitM i o m r2 -> ConduitM i o m (r1,r2)
(|$|) a b = getZipConduit $ (,) <$> ZipConduit a <*> ZipConduit b
sinkList :: Monad m => ConduitM a o m [a]
sinkList = consume
-- | Group things while they have the same function result, only return the last value.
-- Conduit version of @groupOnLast f = map last . groupOn f@.
groupOnLastC :: (Monad m, Eq b) => (a -> b) -> ConduitM a a m ()
groupOnLastC op = do
x <- await
whenJust x $ \x -> f (op x) x
where
f k v = await >>= \x -> case x of
Nothing -> yield v
Just v2 | let k2 = op v2 -> do
when (k /= k2) $ yield v
f k2 v2
linesCR :: Monad m => ConduitM BStr BStr m ()
linesCR = C.lines .| mapC f
where f x | Just (x, '\r') <- BS.unsnoc x = x
| otherwise = x
sourceLStr :: Monad m => LBStr -> ConduitM i BStr m ()
sourceLStr = sourceList . lbstrToChunks
pipelineC :: Int -> ConduitM o Void IO r -> ConduitM o Void IO r
pipelineC buffer sink = do
sem <- liftIO $ newQSem buffer -- how many are in flow, to avoid memory leaks
chan <- liftIO newChan -- the items in flow (type o)
bar <- liftIO newBarrier -- the result type (type r)
me <- liftIO myThreadId
liftIO $ flip forkFinally (either (throwTo me) (signalBarrier bar)) $ do
runConduit $
(whileM $ do
x <- liftIO $ readChan chan
liftIO $ signalQSem sem
whenJust x yield
return $ isJust x) .|
sink
awaitForever $ \x -> liftIO $ do
waitQSem sem
writeChan chan $ Just x
liftIO $ writeChan chan Nothing
liftIO $ waitBarrier bar