Skip to content

Commit

Permalink
adapt conduit-0.5
Browse files Browse the repository at this point in the history
  • Loading branch information
tanakh committed Jul 11, 2012
1 parent 97b5a4d commit 57d8756
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 105 deletions.
212 changes: 108 additions & 104 deletions Data/Conduit/Process.hs
Original file line number Diff line number Diff line change
@@ -1,104 +1,108 @@
{-# LANGUAGE FlexibleContexts, OverloadedStrings, DoAndIfThenElse, BangPatterns #-}
module Data.Conduit.Process (
-- * Run process
pipeProcess,
sourceProcess,
conduitProcess,

-- * Run shell command
pipeCmd,
sourceCmd,
conduitCmd,

-- * Convenience re-exports
shell,
proc,
CreateProcess(..),
CmdSpec(..),
StdStream(..),
ProcessHandle,
) where

import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.Resource
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.List as C
import System.Exit
import System.IO
import System.Process

bufSize :: Int
bufSize = 64 * 1024

-- | Pipe of process
pipeProcess
:: MonadResource m
=> CreateProcess
-> Pipe B.ByteString B.ByteString m ()
pipeProcess cp = flip PipeM (return ()) $ do
(_, (Just cin, Just cout, _, ph)) <- allocate createp closep
mvar <- liftIO newEmptyMVar
return $ go cin cout ph mvar False B.hGetNonBlocking
where
createp = createProcess cp
{ std_in = CreatePipe
, std_out = CreatePipe
}

closep (Just cin, Just cout, _, ph) = do
hClose cin
hClose cout
_ <- waitForProcess ph
return ()

go !cin !cout !ph !mvar !wait !rd = do
out <- liftIO $ rd cout bufSize
if B.null out then do
end <- liftIO $ getProcessExitCode ph
case end of
Just ec -> do
lift $ when (ec /= ExitSuccess) $ monadThrow ec
Done Nothing ()
Nothing ->
if wait then do
emp <- liftIO $ isEmptyMVar mvar
if emp then do
go cin cout ph mvar wait rd
else do
liftIO $ takeMVar mvar
go cin cout ph mvar False rd
else do
NeedInput
(\inp -> do
liftIO $ do
B.hPut cin inp
forkIO (hFlush cin >>= putMVar mvar)
go cin cout ph mvar True rd)
(do liftIO (hClose cin)
go cin cout ph mvar wait B.hGetSome)
else do
HaveOutput (go cin cout ph mvar wait rd) (return ()) out

-- | Source of process
sourceProcess :: MonadResource m => CreateProcess -> Source m B.ByteString
sourceProcess cp = C.sourceNull $= conduitProcess cp

-- | Conduit of process
conduitProcess :: MonadResource m => CreateProcess -> Conduit B.ByteString m B.ByteString
conduitProcess = pipeProcess

-- | Pipe of shell command
pipeCmd :: MonadResource m => String -> Pipe B.ByteString B.ByteString m ()
pipeCmd = pipeProcess . shell

-- | Source of shell command
sourceCmd :: MonadResource m => String -> Source m B.ByteString
sourceCmd = sourceProcess . shell

-- | Conduit of shell command
conduitCmd :: MonadResource m => String -> Conduit B.ByteString m B.ByteString
conduitCmd = conduitProcess . shell
{-# LANGUAGE FlexibleContexts, OverloadedStrings, DoAndIfThenElse, BangPatterns #-}
module Data.Conduit.Process (
-- * Run process
pipeProcess,
sourceProcess,
conduitProcess,

-- * Run shell command
pipeCmd,
sourceCmd,
conduitCmd,

-- * Convenience re-exports
shell,
proc,
CreateProcess(..),
CmdSpec(..),
StdStream(..),
ProcessHandle,
) where

import Control.Concurrent hiding (yield)
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.Resource
import qualified Data.ByteString as S
import Data.Conduit
import qualified Data.Conduit.List as CL
import System.Exit
import System.IO
import System.Process

bufSize :: Int
bufSize = 64 * 1024

-- | Pipe of process
pipeProcess
:: MonadResource m
=> CreateProcess
-> GConduit S.ByteString m S.ByteString
pipeProcess cp = do
(_, (Just cin, Just cout, _, ph)) <- lift $ allocate createp closep
mvar <- liftIO newEmptyMVar
go cin cout ph mvar False S.hGetNonBlocking
where
createp = createProcess cp
{ std_in = CreatePipe
, std_out = CreatePipe
}

closep (Just cin, Just cout, _, ph) = do
hClose cin
hClose cout
_ <- waitForProcess ph
return ()

go !cin !cout !ph !mvar !wait !rd = do
out <- liftIO $ rd cout bufSize
if S.null out then do
end <- liftIO $ getProcessExitCode ph
case end of
Just ec -> do
lift $ when (ec /= ExitSuccess) $ monadThrow ec
return ()
Nothing ->
if wait then do
emp <- liftIO $ isEmptyMVar mvar
if emp then do
go cin cout ph mvar wait rd
else do
liftIO $ takeMVar mvar
go cin cout ph mvar False rd
else do
mb <- await
case mb of
Just inp -> do
liftIO $ do
S.hPut cin inp
forkIO (hFlush cin >>= putMVar mvar)
go cin cout ph mvar True rd
Nothing -> do
liftIO (hClose cin)
go cin cout ph mvar wait S.hGetSome
else do
yield out
go cin cout ph mvar wait rd

-- | Source of process
sourceProcess :: MonadResource m => CreateProcess -> GSource m S.ByteString
sourceProcess cp = CL.sourceNull >+> conduitProcess cp

-- | Conduit of process
conduitProcess :: MonadResource m
=> CreateProcess -> GConduit S.ByteString m S.ByteString
conduitProcess = pipeProcess

-- | Pipe of shell command
pipeCmd :: MonadResource m
=> String -> GConduit S.ByteString m S.ByteString
pipeCmd = pipeProcess . shell

-- | Source of shell command
sourceCmd :: MonadResource m => String -> GSource m S.ByteString
sourceCmd = sourceProcess . shell

-- | Conduit of shell command
conduitCmd :: MonadResource m => String -> GConduit S.ByteString m S.ByteString
conduitCmd = conduitProcess . shell
2 changes: 1 addition & 1 deletion process-conduit.cabal
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: process-conduit
version: 0.4.1
version: 0.5.0
synopsis: Conduits for processes

description:
Expand Down

0 comments on commit 57d8756

Please sign in to comment.