Permalink
Browse files

Add finalizer examples.

  • Loading branch information...
1 parent fb30d5f commit cb730584279b5537f0d6ea8c48e36f19fd7a88bf @pcapriotti committed Mar 5, 2012
@@ -0,0 +1,87 @@
+import qualified Data.ByteString as B
+import qualified Data.ByteString.Char8 as BC
+import Control.Monad
+import Control.Monad.Trans
+import Control.Pipe
+import qualified Control.Pipe.Binary as PB
+import Control.Pipe.Combinators
+import Control.Pipe.Exception
+import Control.Pipe.Zip
+import System.IO
+import Prelude hiding (filter, zip)
+
+import Control.Pipe.Coroutine
+
+-- line-by-line reader with verbose initializer and finalizer
+reader :: FilePath -> Producer B.ByteString IO ()
+reader fp = fReader >+> PB.lines >+> filter (not . B.null)
+ where
+ fReader = bracket open close PB.handleReader
+ open = do
+ putStrLn $ "opening file " ++ show fp ++ " for reading"
+ openFile fp ReadMode
+ close h = do
+ hClose h
+ putStrLn $ "closed file " ++ show fp
+
+-- line-by-line writer with verbose initializer and finalizer
+writer :: FilePath -> Consumer B.ByteString IO ()
+writer fp = pipe (`BC.snoc` '\n') >+> fWriter
+ where
+ fWriter = await >>= \x -> feed x (bracket open close PB.handleWriter)
+ open = do
+ putStrLn $ "opening file " ++ show fp ++ " for writing"
+ openFile fp WriteMode
+ close h = do
+ hClose h
+ putStrLn $ "closed file " ++ show fp
+
+-- Oleg's motivating example for monadic regions, reimplemented with Pipes.
+--
+-- 1. open two files for reading, one of them a configuration file
+-- 2. read the name of an output file from the configuration file
+-- 3. open the output file and zip the contents of both input files into
+-- the output file
+-- 4. close the configuration file
+-- 5. copy the rest, if any, of the other input file to the output file
+ex1 :: Pipeline IO ()
+ex1 = reader "conf" >+> -- read configuration file
+ (await >>= process) -- get first line and pass it to process
+ where
+ process out =
+ continue -- keep running when conf terminates
+ >+> splitP -- create second channel
+ >+> ( justs -- discard Nothing values on the first channel
+ *** reader' "input.txt") -- get input file on the second channel
+ >+> joinP -- merge input streams
+ >+> writer (BC.unpack out) -- save to output file
+
+ continue = forP (yield . Just) >> forever (yield Nothing)
+ justs = forever $ await >>= maybe (return ()) yield
+ reader' fp = pipe (const ()) >+> controllable_ (reader fp)
+
+-- Another example, demonstrating the use of controllable producers.
+--
+-- 1. open two input files
+-- 2. read one number out of each input file
+-- 3. close the input file with the lowest number
+-- 4. continue processing the other input file
+-- 5. close the other input file
+ex2 :: Pipeline IO ()
+ex2 = go (reader "input.txt") (reader "input2.txt") >+> printer
+ where
+ go p1 p2 = loopP $ zip p1 p2 >+> choose
+ choose = do
+ (Left line1) <- await
+ (Right line2) <- await
+ let n1 = read (BC.unpack line1) :: Int
+ let n2 = read (BC.unpack line2) :: Int
+ -- close file with the lowest number
+ if n1 < n2
+ then yield (Right (LeftZ (Done ())))
+ else yield (Right (RightZ (Done ())))
+ -- continue processing the other file
+ joinP >+> pipe Left
+
+printer :: Show a => Pipe a Void IO r
+printer = forever $ await >>= lift . print
View
@@ -0,0 +1,5 @@
+out.txt
+a
+b
+c
+d
@@ -0,0 +1,10 @@
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
@@ -0,0 +1,6 @@
+30
+31
+32
+33
+34
+35
@@ -0,0 +1,14 @@
+a
+1
+b
+2
+c
+3
+d
+4
+5
+6
+7
+8
+9
+10
@@ -0,0 +1,125 @@
+{-# LANGUAGE ScopedTypeVariables, DeriveDataTypeable #-}
+import qualified Data.ByteString as B
+import qualified Data.ByteString.Char8 as BC
+import Data.Typeable
+import Control.Concurrent (forkIO, myThreadId, threadDelay)
+import qualified Control.Exception as E
+import Control.Exception (Exception, IOException, throwTo)
+import Control.Monad
+import Control.Monad.Trans
+import Control.Pipe
+import Control.Pipe.Combinators
+import Control.Pipe.Exception
+import qualified Control.Pipe.Binary as PB
+import System.IO
+import Prelude hiding (catch)
+
+-- line-by-line reader with verbose initializer and finalizer
+reader :: FilePath -> Producer B.ByteString IO ()
+reader fp = fReader >+> PB.lines
+ where
+ fReader = bracket open close PB.handleReader
+ open = do
+ putStrLn $ "opening file " ++ show fp ++ " for reading"
+ openFile fp ReadMode
+ close h = do
+ hClose h
+ putStrLn $ "closed file " ++ show fp
+
+-- line-by-line writer with verbose initializer and finalizer
+writer :: FilePath -> Consumer B.ByteString IO ()
+writer fp = pipe (`BC.snoc` '\n') >+> fWriter
+ where
+ fWriter = await >>= \x -> feed x (bracket open close PB.handleWriter)
+ open = do
+ putStrLn $ "opening file " ++ show fp ++ " for writing"
+ openFile fp WriteMode
+ close h = do
+ hClose h
+ putStrLn $ "closed file " ++ show fp
+
+-- interactive pipe
+prompt :: Pipe String String IO ()
+prompt = forever $ await >>= \q -> do
+ lift $ putStr $ q ++ ": "
+ r <- lift getLine
+ yield r
+
+-- copy "/etc/motd" to "/tmp/x"
+ex1 :: Pipeline IO ()
+ex1 = reader "/etc/motd" >+> writer "/tmp/x"
+{-
+ opening file "/etc/motd" for reading
+ opening file "/tmp/x" for writing
+ closed file "/etc/motd"
+ closed file "/tmp/x"
+-}
+-- note that the files are not closed in LIFO order
+
+-- output error
+ex2 :: Pipeline IO ()
+ex2 = reader "/etc/motd" >+> writer "/unopenable"
+{-
+ opening file "/etc/motd" for reading
+ opening file "/unopenable" for writing
+ closed file "/etc/motd"
+ *** Exception: /unopenable: openFile: permission denied (Permission denied)
+-}
+-- note that the input file was automatically closed before the exception
+-- terminated the pipeline
+
+-- joining two files
+ex3 :: Pipeline IO ()
+ex3 = (reader "/etc/motd" >> reader "/usr/share/dict/words") >+>
+ writer "/tmp/x"
+{-
+ opening file "/etc/motd" for reading
+ opening file "/tmp/x" for writing
+ closed file "/etc/motd"
+ opening file "/usr/share/dict/words" for reading
+ closed file "/usr/share/dict/words"
+ closed file "/tmp/x"
+-}
+
+-- recovering from exceptions
+ex4 :: Pipeline IO ()
+ex4 = (safeReader "/etc/motd" >> safeReader "/nonexistent") >+>
+ writer "/tmp/x"
+ where
+ safeReader fp = catch (reader fp) $ \(e :: IOException) ->
+ lift $ putStrLn $ "exception " ++ show e
+{-
+ opening file "/etc/motd" for reading
+ opening file "/tmp/x" for writing
+ closed file "/etc/motd"
+ opening file "/nonexistent" for reading
+ exception /nonexistent: openFile: does not exist (No such file or directory)
+ closed file "/tmp/x"
+-}
+
+data Timeout = Timeout
+ deriving (Show, Typeable)
+instance Exception Timeout
+
+-- recovering from asynchronous exceptions
+ex5 :: Pipeline IO ()
+ex5 = questions >+> safePrompt >+> pipe BC.pack >+> writer "/tmp/x"
+ where
+ questions = do
+ yield "Project name"
+ yield "Version"
+ yield "Description"
+ lift $ E.throwIO Timeout
+ timeout t = lift $ do
+ tid <- myThreadId
+ forkIO $ do
+ threadDelay (t * 1000000)
+ throwTo tid Timeout
+ safePrompt = catch (timeout 5 >> prompt) $ \(_ :: Timeout) ->
+ lift $ putStrLn "timeout"
+{-
+ Project name: test
+ opening file "/tmp/x" for writing
+ Version: timeout
+ closed file "/tmp/x"
+-}

0 comments on commit cb73058

Please sign in to comment.