Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify API #14

Merged
merged 4 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
reflex-process
reflex-process
==============

[![hackage](https://img.shields.io/hackage/v/reflex-process.svg)](https://hackage.haskell.org/package/reflex-process) [![hackage-ci](https://matrix.hackage.haskell.org/api/v2/packages/reflex-process/badge)](https://matrix.hackage.haskell.org/#/package/reflex-process) [![travis-ci](https://api.travis-ci.org/reflex-frp/reflex-process.svg?branch=develop)](https://travis-ci.org/reflex-frp/reflex-process)
Expand Down Expand Up @@ -32,7 +32,7 @@ The following example uses [reflex-vty](https://github.com/reflex-frp/reflex-vty
> main :: IO ()
> main = mainWidget $ do
> exit <- keyCombos $ Set.singleton (V.KChar 'c', [V.MCtrl])
> p <- createProcess cmd def
> p <- createProcess $ defProcessConfig cmd
> stdout <- foldDyn (flip mappend) "" $ _process_stdout p
> boxStatic def $ col $ do
> fixed 3 $ boxStatic def $ text "reflex-process"
Expand Down
2 changes: 0 additions & 2 deletions reflex-process.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ library
build-depends: base >=4.12 && <4.14
, async >= 2 && < 3
, bytestring >=0.10 && < 0.11
, data-default >= 0.7.1 && < 0.8
, process >= 1.6.4 && < 1.7
, reflex >= 0.7.1 && < 0.8
, unix >= 2.7 && < 2.8
Expand Down Expand Up @@ -53,7 +52,6 @@ test-suite tests
base,
bytestring,
containers,
data-default,
dependent-sum,
hspec,
mtl,
Expand Down
125 changes: 62 additions & 63 deletions src/Reflex/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ Description: Run interactive shell commands in reflex
module Reflex.Process
( createProcess
, createProcessBufferingInput
, createRedirectedProcess
, createProcessWith
, defProcessConfig
, redirectingCreateProcess
, Process(..)
, ProcessConfig(..)
, SendPipe (..)
Expand All @@ -22,7 +24,6 @@ import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Default (Default (..))
import Data.Function (fix)
import GHC.IO.Handle (Handle)
import qualified GHC.IO.Handle as H
Expand All @@ -49,15 +50,16 @@ data ProcessConfig t i = ProcessConfig
-- ^ "stdin" input to be fed to the process
, _processConfig_signal :: Event t P.Signal
-- ^ Signals to send to the process
, _processConfig_createProcess
:: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-- ^ Used for dependency injection (for example to ensure, in the test suite,
-- that the child process is properly terminated). Defaults to
-- 'System.Process.createProcess'
, _processConfig_createProcess :: IO (Handle, Handle, Handle, P.ProcessHandle)
-- ^ Configurable version of 'System.Process.createProcess' that must create pipes
-- for 'std_in', 'std_out', and 'std_err'.
}

instance Reflex t => Default (ProcessConfig t i) where
def = ProcessConfig never never P.createProcess
-- | Make a default 'ProcessConfig' with the given 'System.Process.CreateProcess'
-- using 'redirectingCreateProcess'.
defProcessConfig :: Reflex t => P.CreateProcess -> ProcessConfig t i
defProcessConfig p = ProcessConfig never never (redirectingCreateProcess p)


-- | The output of a process
data Process t o e = Process
Expand All @@ -80,64 +82,55 @@ data Process t o e = Process
-- NB: The 'std_in', 'std_out', and 'std_err' parameters of the
-- provided 'CreateProcess' are replaced with new pipes and all output is redirected
-- to those pipes.
createRedirectedProcess
createProcessWith
:: forall t m i o e. (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> (Handle -> IO (i -> IO ()))
-- ^ Builder for the standard input handler
-> (Handle -> (o -> IO ()) -> IO (IO ()))
-- ^ Builder for the standard output handler
-> (Handle -> (e -> IO ()) -> IO (IO ()))
-- ^ Builder for the standard error handler
-> CreateProcess
-> ProcessConfig t i
-> m (Process t o e)
createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (ProcessConfig input signal createProcessFunction) = do
let redirectedProc = p
{ std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
}
po@(mi, mout, merr, ph) <- liftIO $ createProcessFunction redirectedProc
case (mi, mout, merr) of
(Just hIn, Just hOut, Just hErr) -> do
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
case mpid of
Nothing -> return Nothing
Just pid -> do
P.signalProcess sig pid >> return (Just sig)
let
output :: Handle -> m (Event t o, Async ())
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ async reader
return (e, t)
createProcessWith mkWriteStdInput mkReadStdOutput mkReadStdError (ProcessConfig input signal createProc) = do
(hIn, hOut, hErr, ph) <- liftIO createProc
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
case mpid of
Nothing -> return Nothing
Just pid -> do
P.signalProcess sig pid >> return (Just sig)
let
output :: Handle -> m (Event t o, Async ())
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ async reader
return (e, t)

errOutput :: Handle -> m (Event t e, Async ())
errOutput h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ async reader
return (e, t)
errOutput :: Handle -> m (Event t e, Async ())
errOutput h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ async reader
return (e, t)

(out, outThread) <- output hOut
(err, errThread) <- errOutput hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ async $ flip finally (P.cleanupProcess po) $ do
waited <- waitForProcess ph
_ <- waitBoth outThread errThread
ecTrigger waited -- Output events should never fire after process completion
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
, _process_stderr = err
, _process_signal = fmapMaybe id sigOut
, _process_handle = ph
}
_ -> error "Reflex.Process.createRedirectedProcess: Created pipes were not returned by System.Process.createProcess."
(out, outThread) <- output hOut
(err, errThread) <- errOutput hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ async $ flip finally (P.cleanupProcess (Just hIn, Just hOut, Just hErr, ph)) $ do
waited <- waitForProcess ph
_ <- waitBoth outThread errThread
ecTrigger waited -- Output events should never fire after process completion
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
, _process_stderr = err
, _process_signal = fmapMaybe id sigOut
, _process_handle = ph
}

-- | Create a process feeding it input using an 'Event' and exposing its output with 'Event's
-- for its exit code, stdout, and stderr. The input is fed via a buffer represented by a
Expand All @@ -152,10 +145,9 @@ createProcessBufferingInput
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> IO (SendPipe ByteString) -- ^ Read a value from the input stream buffer
-> (SendPipe ByteString -> IO ()) -- ^ Write a value to the input stream buffer
-> CreateProcess -- ^ The process specification
-> ProcessConfig t (SendPipe ByteString) -- ^ The process configuration in terms of Reflex
-> m (Process t ByteString ByteString)
createProcessBufferingInput readBuffer writeBuffer p procConfig = do
createProcessBufferingInput readBuffer writeBuffer procConfig = do
let
input :: Handle -> IO (SendPipe ByteString -> IO ())
input h = do
Expand Down Expand Up @@ -183,7 +175,7 @@ createProcessBufferingInput readBuffer writeBuffer p procConfig = do
if BS.null out
then H.hClose h
else void (trigger out) *> go
createRedirectedProcess input output output p procConfig
createProcessWith input output output procConfig

-- | Create a process feeding it input using an 'Event' and exposing its output
-- 'Event's representing the process exit code, stdout, and stderr.
Expand All @@ -198,9 +190,16 @@ createProcessBufferingInput readBuffer writeBuffer p procConfig = do
-- to those pipes.
createProcess
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> CreateProcess
-> ProcessConfig t (SendPipe ByteString)
=> ProcessConfig t (SendPipe ByteString)
-> m (Process t ByteString ByteString)
createProcess p procConfig = do
createProcess procConfig = do
channel <- liftIO newChan
createProcessBufferingInput (readChan channel) (writeChan channel) p procConfig
createProcessBufferingInput (readChan channel) (writeChan channel) procConfig

-- | Like 'System.Process.createProcess' but always redirects 'std_in', 'std_out', and 'std_err'.
redirectingCreateProcess :: P.CreateProcess -> IO (Handle, Handle, Handle, P.ProcessHandle)
redirectingCreateProcess p = do
r <- P.createProcess p { std_in = P.CreatePipe, std_out = P.CreatePipe, std_err = P.CreatePipe }
case r of
(Just hIn, Just hOut, Just hErr, ph) -> pure (hIn, hOut, hErr, ph)
_ -> error "Reflex.Process.redirectingCreateProcess: Created pipes were not returned by System.Process.createProcess."
17 changes: 9 additions & 8 deletions test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Data.ByteString (ByteString)
import Data.IORef (newIORef, writeIORef, readIORef)
import Data.Foldable (traverse_)
import Reflex
import System.Timeout (timeout)
import qualified Data.ByteString.Char8 as BS
Expand Down Expand Up @@ -51,28 +52,28 @@ checkFRPBlocking downstreamProcess exitMVar = do

let
createProcessWithTermination cp = do
procData <- P.createProcess cp
procData <- redirectingCreateProcess cp
writeIORef spawnedProcess (Just procData)
pure procData

finally
(runHeadlessApp $ do
timer <- tickLossyFromPostBuildTime 1
void $ performEvent $ (liftIO $ tryPutMVar exitMVar Exit) <$ timer
void $ performEvent $ liftIO (tryPutMVar exitMVar Exit) <$ timer

(ev, evTrigger :: SendPipe ByteString -> IO ()) <- newTriggerEvent
processOutput <- createProcess downstreamProcess (ProcessConfig ev never createProcessWithTermination)
processOutput <- createProcess $ ProcessConfig ev never (createProcessWithTermination downstreamProcess)
liftIO $ evTrigger $ SendPipe_Message $ veryLongByteString 'a'
liftIO $ evTrigger $ SendPipe_Message $ veryLongByteString 'b'
liftIO $ evTrigger $ SendPipe_LastMessage $ veryLongByteString 'c'

void $ performEvent $ liftIO . BS.putStrLn <$> (_process_stdout processOutput)
pure never)

(readIORef spawnedProcess >>= mapM_ P.cleanupProcess)
void $ performEvent $ liftIO . BS.putStrLn <$> _process_stdout processOutput
pure never
)
(readIORef spawnedProcess >>= traverse_ (\(hIn, hOut, hErr, ph) -> P.cleanupProcess (Just hIn, Just hOut, Just hErr, ph)))

-- It's important to try this with long bytestrings to be sure that they're not
-- put in an operative system inter-process buffer.
veryLongByteString :: Char -> ByteString
veryLongByteString c = BS.replicate 100000 c
veryLongByteString = BS.replicate 100000
--------------------------------------------------------------------------------