Skip to content

Commit

Permalink
Simplify API
Browse files Browse the repository at this point in the history
  • Loading branch information
3noch committed Apr 30, 2020
1 parent 8a55de6 commit d2bb3b5
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 76 deletions.
134 changes: 66 additions & 68 deletions src/Reflex/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ Description: Run interactive shell commands in reflex
module Reflex.Process
( createProcess
, createProcessBufferingInput
, createRedirectedProcess
, createProcessWith
, defProcessConfig
, redirectingCreateProcess
, Process(..)
, ProcessConfig(..)
) where
Expand All @@ -21,14 +23,13 @@ import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Default
import qualified GHC.IO.Handle as H
import GHC.IO.Handle (Handle)
import System.Exit (ExitCode)
import Data.Function
import qualified System.Posix.Signals as P
import qualified System.Process as P
import System.Process hiding (createProcess)
import System.Process (std_err, std_in, std_out)

import Reflex

Expand All @@ -38,15 +39,16 @@ data ProcessConfig t i = ProcessConfig
-- ^ "stdin" input to be fed to the process
, _processConfig_signal :: Event t P.Signal
-- ^ Signals to send to the process
, _processConfig_createProcess
:: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-- ^ Used for dependency injection (for example to ensure, in the test suite,
-- that the child process is properly terminated). Defaults to
-- 'System.Process.createProcess'
, _processConfig_createProcess :: IO (Handle, Handle, Handle, P.ProcessHandle)
-- ^ Configurable version of 'System.Process.createProcess' that must create pipes
-- for 'std_in', 'std_out', and 'std_err'.
}

instance Reflex t => Default (ProcessConfig t i) where
def = ProcessConfig never never P.createProcess
-- | Make a default 'ProcessConfig' with the given 'System.Process.CreateProcess'
-- using 'redirectingCreateProcess'.
defProcessConfig :: Reflex t => P.CreateProcess -> ProcessConfig t i
defProcessConfig p = ProcessConfig never never (redirectingCreateProcess p)


-- | The output of a process
data Process t o e = Process
Expand All @@ -67,67 +69,56 @@ data Process t o e = Process
-- NB: The 'std_in', 'std_out', and 'std_err' parameters of the
-- provided 'CreateProcess' are replaced with new pipes and all output is redirected
-- to those pipes.
createRedirectedProcess
createProcessWith
:: forall t m i o e. (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> (Handle -> IO (i -> IO ()))
-- ^ Builder for the standard input handler
-> (Handle -> (o -> IO ()) -> IO (IO ()))
-- ^ Builder for the standard output handler
-> (Handle -> (e -> IO ()) -> IO (IO ()))
-- ^ Builder for the standard error handler
-> CreateProcess
-> ProcessConfig t i
-> m (Process t o e)
createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (ProcessConfig input signal createProcessFunction) = do
let redirectedProc = p
{ std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
}
po@(mi, mout, merr, ph) <- liftIO $ createProcessFunction redirectedProc
case (mi, mout, merr) of
(Just hIn, Just hOut, Just hErr) -> do
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
case mpid of
Nothing -> return Nothing
Just pid -> do
P.signalProcess sig pid >> return (Just sig)
let
output :: Handle -> m (Event t o, ThreadId)
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ forkIO reader
return (e, t)
createProcessWith mkWriteStdInput mkReadStdOutput mkReadStdError (ProcessConfig input signal createProc) = do
(hIn, hOut, hErr, ph) <- liftIO createProc
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
case mpid of
Nothing -> return Nothing
Just pid -> Just sig <$ P.signalProcess sig pid
let
output :: Handle -> m (Event t o, ThreadId)
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ forkIO reader
return (e, t)

let
err_output :: Handle -> m (Event t e, ThreadId)
err_output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ forkIO reader
return (e, t)
(out, outThread) <- output hOut
(err, errThread) <- err_output hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ forkIO $ do
waited <- waitForProcess ph
mask_ $ do
ecTrigger waited
P.cleanupProcess po
killThread outThread
killThread errThread
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
, _process_stderr = err
, _process_signal = fmapMaybe id sigOut
, _process_handle = ph
}
_ -> error "Reflex.Process.createRedirectedProcess: Created pipes were not returned by System.Process.createProcess."
err_output :: Handle -> m (Event t e, ThreadId)
err_output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ forkIO reader
return (e, t)
(out, outThread) <- output hOut
(err, errThread) <- err_output hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ forkIO $ do
waited <- P.waitForProcess ph
mask_ $ do
ecTrigger waited
P.cleanupProcess (Just hIn, Just hOut, Just hErr, ph)
killThread outThread
killThread errThread
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
, _process_stderr = err
, _process_signal = fmapMaybe id sigOut
, _process_handle = ph
}

-- | Create a process feeding it input using an 'Event' and exposing its output with 'Event's
-- for its exit code, stdout, and stderr. The input is fed via a buffer represented by a
Expand All @@ -142,10 +133,9 @@ createProcessBufferingInput
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> IO ByteString -- ^ Read a value from the input stream buffer
-> (ByteString -> IO ()) -- ^ Write a value to the input stream buffer
-> CreateProcess -- ^ The process specification
-> ProcessConfig t ByteString -- ^ The process configuration in terms of Reflex
-> m (Process t ByteString ByteString)
createProcessBufferingInput readBuffer writeBuffer p procConfig = do
createProcessBufferingInput readBuffer writeBuffer procConfig = do
let
input h = do
H.hSetBuffering h H.NoBuffering
Expand Down Expand Up @@ -173,7 +163,7 @@ createProcessBufferingInput readBuffer writeBuffer p procConfig = do
void $ trigger out
go
return go
createRedirectedProcess input output output p procConfig
createProcessWith input output output procConfig

-- | Create a process feeding it input using an 'Event' and exposing its output
-- 'Event's representing the process exit code, stdout, and stderr.
Expand All @@ -188,9 +178,17 @@ createProcessBufferingInput readBuffer writeBuffer p procConfig = do
-- to those pipes.
createProcess
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> CreateProcess
-> ProcessConfig t ByteString
=> ProcessConfig t ByteString
-> m (Process t ByteString ByteString)
createProcess p procConfig = do
createProcess procConfig = do
channel <- liftIO newChan
createProcessBufferingInput (readChan channel) (writeChan channel) p procConfig
createProcessBufferingInput (readChan channel) (writeChan channel) procConfig


-- | Like 'System.Process.createProcess' but always redirects 'std_in', 'std_out', and 'std_err'.
redirectingCreateProcess :: P.CreateProcess -> IO (Handle, Handle, Handle, P.ProcessHandle)
redirectingCreateProcess p = do
r <- P.createProcess p { std_in = P.CreatePipe, std_out = P.CreatePipe, std_err = P.CreatePipe }
case r of
(Just hIn, Just hOut, Just hErr, ph) -> pure (hIn, hOut, hErr, ph)
_ -> error "Reflex.Process.redirectingCreateProcess: Created pipes were not returned by System.Process.createProcess."
16 changes: 8 additions & 8 deletions test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,28 @@ checkFRPBlocking downstreamProcess exitMVar = do

let
createProcessWithTermination cp = do
procData <- P.createProcess cp
procData <- redirectingCreateProcess cp
writeIORef spawnedProcess (Just procData)
pure procData

finally
(runHeadlessApp $ do
timer <- tickLossyFromPostBuildTime 1
void $ performEvent $ (liftIO $ tryPutMVar exitMVar Exit) <$ timer
void $ performEvent $ liftIO (tryPutMVar exitMVar Exit) <$ timer

(ev, evTrigger :: ByteString -> IO ()) <- newTriggerEvent
processOutput <- createProcess downstreamProcess (ProcessConfig ev never createProcessWithTermination)
processOutput <- createProcess $ ProcessConfig ev never (createProcessWithTermination downstreamProcess)
liftIO $ evTrigger $ veryLongByteString 'a'
liftIO $ evTrigger $ veryLongByteString 'b'
liftIO $ evTrigger $ veryLongByteString 'c'

void $ performEvent $ liftIO . BS.putStrLn <$> (_process_stdout processOutput)
pure never)

(readIORef spawnedProcess >>= mapM_ P.cleanupProcess)
void $ performEvent $ liftIO . BS.putStrLn <$> _process_stdout processOutput
pure never
)
(readIORef spawnedProcess >>= traverse_ (\(hIn, hOut, hErr, ph) -> P.cleanupProcess (Just hIn, Just hOut, Just hErr, ph)))

-- It's important to try this with long bytestrings to be sure that they're not
-- put in an operative system inter-process buffer.
veryLongByteString :: Char -> ByteString
veryLongByteString c = BS.replicate 100000 c
veryLongByteString = BS.replicate 100000
--------------------------------------------------------------------------------

0 comments on commit d2bb3b5

Please sign in to comment.