Permalink
Browse files

updatet to use with conduit-0.4.*

  • Loading branch information...
1 parent aaa1255 commit ef0c87cc3c346777f351044effdd0257c7c23d29 @qnikst committed Apr 7, 2012
Showing with 38 additions and 32 deletions.
  1. +28 −26 Data/Conduit/TMChan.hs
  2. +4 −2 stm-conduit.cabal
  3. +6 −4 test/Test.hs
@@ -1,3 +1,4 @@
+{-# LANGUAGE NoMonomorphismRestriction, FlexibleContexts, RankNTypes,KindSignatures #-}
-- | * Introduction
--
-- Contains a simple source and sink for linking together conduits in
@@ -52,7 +53,7 @@ module Data.Conduit.TMChan ( -- * Bounded Channel Connectors
import Control.Applicative
import Control.Monad
-import Control.Monad.IO.Class ( liftIO )
+import Control.Monad.IO.Class ( liftIO, MonadIO )
import Control.Monad.Trans.Resource
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMChan
@@ -61,63 +62,64 @@ import Control.Concurrent.STM.TMChan
import Data.Conduit
chanSource
- :: ResourceIO m
+ :: MonadIO m
=> chan -- ^ The channel.
-> (chan -> STM (Maybe a)) -- ^ The 'read' function.
-> (chan -> STM ()) -- ^ The 'close' function.
-> Source m a
chanSource ch reader closer = src
where
- src = Source pull close
+ src = PipeM pull close
pull = do a <- liftSTM $ reader ch
case a of
- Just x -> return $ Open src x
- Nothing -> return Closed
- close = liftIO . atomically $ closer ch
+ Just x -> return $ HaveOutput src close x
+ Nothing -> return $ Done Nothing ()
+ close = liftSTM $ closer ch
{-# INLINE chanSource #-}
-chanSink
- :: ResourceIO m
+chanSink
+ :: MonadIO m
=> chan -- ^ The channel.
-> (chan -> a -> STM ()) -- ^ The 'write' function.
-> (chan -> STM ()) -- ^ The 'close' function.
-> Sink a m ()
chanSink ch writer closer = sink
where
- sink = SinkData push close
- push input = do liftSTM $ writer ch input
- return $ Processing push close
- close = liftIO . atomically $ closer ch
+ sink = NeedInput push close
+
+ push input = PipeM ((liftIO . atomically $ writer ch input)
+ >> (return $ NeedInput push close)) close
+ close = liftSTM $ closer ch
{-# INLINE chanSink #-}
-- | A simple wrapper around a TBMChan. As data is pushed into the channel, the
-- source will read it and pass it down the conduit pipeline. When the
-- channel is closed, the source will close also.
--
-- If the channel fills up, the pipeline will stall until values are read.
-sourceTBMChan :: ResourceIO m => TBMChan a -> Source m a
+sourceTBMChan :: MonadIO m => TBMChan a -> Source m a
sourceTBMChan ch = chanSource ch readTBMChan closeTBMChan
{-# INLINE sourceTBMChan #-}
-- | A simple wrapper around a TMChan. As data is pushed into the channel, the
-- source will read it and pass it down the conduit pipeline. When the
-- channel is closed, the source will close also.
-sourceTMChan :: ResourceIO m => TMChan a -> Source m a
+sourceTMChan :: MonadIO m => TMChan a -> Source m a
sourceTMChan ch = chanSource ch readTMChan closeTMChan
{-# INLINE sourceTMChan #-}
-- | A simple wrapper around a TBMChan. As data is pushed into the sink, it
-- will magically begin to appear in the channel. If the channel is full,
-- the sink will block until space frees up. When the sink is closed, the
-- channel will close too.
-sinkTBMChan :: ResourceIO m => TBMChan a -> Sink a m ()
+sinkTBMChan :: MonadIO m => TBMChan a -> Sink a m ()
sinkTBMChan ch = chanSink ch writeTBMChan closeTBMChan
{-# INLINE sinkTBMChan #-}
-- | A simple wrapper around a TMChan. As data is pushed into this sink, it
-- will magically begin to appear in the channel. When the sink is closed,
-- the channel will close too.
-sinkTMChan :: ResourceIO m => TMChan a -> Sink a m ()
+sinkTMChan :: MonadIO m => TMChan a -> Sink a m ()
sinkTMChan ch = chanSink ch writeTMChan closeTMChan
{-# INLINE sinkTMChan #-}
@@ -128,19 +130,18 @@ modifyTVar'' :: TVar a -> (a -> a) -> STM a
modifyTVar'' tv f = do x <- f <$> readTVar tv
writeTVar tv x
return x
-
-liftSTM :: ResourceIO m => STM a -> ResourceT m a
+liftSTM :: forall (m :: * -> *) a. MonadIO m => STM a -> m a
liftSTM = liftIO . atomically
-- | Combines two sources with an unbounded channel, creating a new source
-- which pulls data from a mix of the two sources: whichever produces first.
--
-- The order of the new source's data is undefined, but it will be some
-- combination of the two given sources.
-(>=<) :: ResourceIO m
- => Source m a
- -> Source m a
- -> ResourceT m (Source m a)
+(>=<) :: (MonadIO m, MonadBaseControl IO m)
+ => Source (ResourceT m) a
+ -> Source (ResourceT m) a
+ -> ResourceT m (Source (ResourceT m) a)
sa >=< sb = mergeSources [ sa, sb ] 16
{-# INLINE (>=<) #-}
@@ -155,11 +156,12 @@ decRefcount tv chan = do n <- modifyTVar'' tv (subtract 1)
--
-- The order of the new source's data is undefined, but it will be some
-- combination of the given sources.
-mergeSources :: ResourceIO m
- => [Source m a] -- ^ The sources to merge.
+mergeSources :: (MonadIO m, MonadBaseControl IO m)
+ => [Source (ResourceT m) a] -- ^ The sources to merge.
-> Int -- ^ The bound of the intermediate channel.
- -> ResourceT m (Source m a)
-mergeSources sx bound = do c <- liftIO . atomically $ newTBMChan bound
+ -> ResourceT m (Source (ResourceT m) a)
+mergeSources sx bound = do c <- liftSTM $ newTBMChan bound
refcount <- liftSTM . newTVar $ length sx
mapM_ (\s -> resourceForkIO $ s $$ chanSink c writeTBMChan $ decRefcount refcount) sx
return $ sourceTBMChan c
+
View
@@ -1,5 +1,5 @@
Name: stm-conduit
-Version: 0.2.4.1
+Version: 0.2.4.2
Synopsis: Introduces conduits to channels, and promotes using
conduits concurrently.
Description: Provides two simple conduit wrappers around STM
@@ -23,7 +23,8 @@ Library
transformers ==0.2.*,
stm == 2.3.*,
stm-chans ==1.3.*,
- conduit ==0.2.*
+ conduit ==0.4.*,
+ resourcet == 0.3.*
ghc-options: -Wall -fwarn-tabs
@@ -43,6 +44,7 @@ test-suite stm-conduit-tests
stm,
stm-conduit,
conduit,
+ transformers,
stm-chans
source-repository head
View
@@ -9,6 +9,8 @@ import Test.Framework.Providers.QuickCheck2 (testProperty)
import Test.QuickCheck
import Test.HUnit
+import Control.Monad.IO.Class (liftIO)
+import Control.Monad.Trans.Class (lift)
import Control.Concurrent
import Control.Concurrent.STM
import Data.Conduit
@@ -35,8 +37,8 @@ test_simpleList = do chan <- atomically $ newTMChan
where
testList = [1..10000]
-test_multipleWriters = do ms <- runResourceT $ mergeSources [ sourceList [1..10]
- , sourceList [11..20]
- ] 3
+test_multipleWriters = do ms <- runResourceT $ mergeSources [ sourceList ([1..10]::[Integer])
+ , sourceList ([11..20]::[Integer])
+ ] 3
xs <- runResourceT $ ms $$ consume
- assertEqual "for the numbers [1..10] and [11..20]," [1..20] $ sort xs
+ liftIO $ assertEqual "for the numbers [1..10] and [11..20]," [1..20] $ sort xs

0 comments on commit ef0c87c

Please sign in to comment.