Skip to content

Commit

Permalink
Merge pull request #14 from reflex-frp/eac@rework-api
Browse files Browse the repository at this point in the history
Simplify API
  • Loading branch information
3noch committed May 21, 2020
2 parents 5023873 + 0574163 commit 02abadd
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 75 deletions.
14 changes: 14 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Revision history for reflex-process

## Unreleased

* ([#11](https://github.com/reflex-frp/reflex-process/pull/11)) Add `createProcessBufferingInput` for buffering input to processes and change `createProcess` to use an unbounded buffer instead of blocking the FRP network when the process blocks on its input handle.
* ([#11](https://github.com/reflex-frp/reflex-process/pull/11), [#14](https://github.com/reflex-frp/reflex-process/pull/14)) `ProcessConfig` now includes a `_processConfig_createProcess` field for customizing how the process is created.
* ([#13](https://github.com/reflex-frp/reflex-process/pull/13)) Fix race condition between process completion `Event`s and process `stdout`/`stderr` `Event`s. Process completion is now always the very last `Event` to fire for a given `Process`.
* ([#15](https://github.com/reflex-frp/reflex-process/pull/15), [#13](https://github.com/reflex-frp/reflex-process/pull/13)) **(Breaking change)** Introduce `SendPipe` type for encoding when an input stream should send EOF and change `createProcess` to take a `ProcessConfig t (SendPipe ByteString)` so that sending EOF is possible.
* ([#14](https://github.com/reflex-frp/reflex-process/pull/14)) **(Breaking change)** Consolidate `ProcessConfig` and `CreateProcess` which has the following effects:
* Instead of constructing `ProcessConfig` with `def` you now use `defProcessConfig` and provide your `CreateProcess` here.
* `createRedirectedProcess` was renamed to `createProcessWith` and now takes a single `ProcessConfig` as normally constructed by `defProcessConfig`.
* `createProcess` now takes a single `ProcessConfig` as normally constructed by `defProcessConfig`.

For example, if you had `createProcess cmd def`, you would change it to `createProcess (defProcessConfig cmd)`.


## 0.2.1.0

* `createProcess`: Ensure that handle is open before attempting to check whether it is readable
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
reflex-process
reflex-process
==============

[![hackage](https://img.shields.io/hackage/v/reflex-process.svg)](https://hackage.haskell.org/package/reflex-process) [![hackage-ci](https://matrix.hackage.haskell.org/api/v2/packages/reflex-process/badge)](https://matrix.hackage.haskell.org/#/package/reflex-process) [![travis-ci](https://api.travis-ci.org/reflex-frp/reflex-process.svg?branch=develop)](https://travis-ci.org/reflex-frp/reflex-process)
Expand Down Expand Up @@ -32,7 +32,7 @@ The following example uses [reflex-vty](https://github.com/reflex-frp/reflex-vty
> main :: IO ()
> main = mainWidget $ do
> exit <- keyCombos $ Set.singleton (V.KChar 'c', [V.MCtrl])
> p <- createProcess cmd def
> p <- createProcess $ defProcessConfig cmd
> stdout <- foldDyn (flip mappend) "" $ _process_stdout p
> boxStatic def $ col $ do
> fixed 3 $ boxStatic def $ text "reflex-process"
Expand Down
2 changes: 0 additions & 2 deletions reflex-process.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ library
build-depends: base >=4.12 && <4.14
, async >= 2 && < 3
, bytestring >=0.10 && < 0.11
, data-default >= 0.7.1 && < 0.8
, process >= 1.6.4 && < 1.7
, reflex >= 0.7.1 && < 0.8
, unix >= 2.7 && < 2.8
Expand Down Expand Up @@ -53,7 +52,6 @@ test-suite tests
base,
bytestring,
containers,
data-default,
dependent-sum,
hspec,
mtl,
Expand Down
125 changes: 62 additions & 63 deletions src/Reflex/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ Description: Run interactive shell commands in reflex
module Reflex.Process
( createProcess
, createProcessBufferingInput
, createRedirectedProcess
, createProcessWith
, defProcessConfig
, redirectingCreateProcess
, Process(..)
, ProcessConfig(..)
, SendPipe (..)
Expand All @@ -22,7 +24,6 @@ import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Default (Default (..))
import Data.Function (fix)
import GHC.IO.Handle (Handle)
import qualified GHC.IO.Handle as H
Expand All @@ -49,15 +50,16 @@ data ProcessConfig t i = ProcessConfig
-- ^ "stdin" input to be fed to the process
, _processConfig_signal :: Event t P.Signal
-- ^ Signals to send to the process
, _processConfig_createProcess
:: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-- ^ Used for dependency injection (for example to ensure, in the test suite,
-- that the child process is properly terminated). Defaults to
-- 'System.Process.createProcess'
, _processConfig_createProcess :: IO (Handle, Handle, Handle, P.ProcessHandle)
-- ^ Configurable version of 'System.Process.createProcess' that must create pipes
-- for 'std_in', 'std_out', and 'std_err'.
}

instance Reflex t => Default (ProcessConfig t i) where
def = ProcessConfig never never P.createProcess
-- | Make a default 'ProcessConfig' with the given 'System.Process.CreateProcess'
-- using 'redirectingCreateProcess'.
defProcessConfig :: Reflex t => P.CreateProcess -> ProcessConfig t i
defProcessConfig p = ProcessConfig never never (redirectingCreateProcess p)


-- | The output of a process
data Process t o e = Process
Expand All @@ -80,64 +82,55 @@ data Process t o e = Process
-- NB: The 'std_in', 'std_out', and 'std_err' parameters of the
-- provided 'CreateProcess' are replaced with new pipes and all output is redirected
-- to those pipes.
createRedirectedProcess
createProcessWith
:: forall t m i o e. (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> (Handle -> IO (i -> IO ()))
-- ^ Builder for the standard input handler
-> (Handle -> (o -> IO ()) -> IO (IO ()))
-- ^ Builder for the standard output handler
-> (Handle -> (e -> IO ()) -> IO (IO ()))
-- ^ Builder for the standard error handler
-> CreateProcess
-> ProcessConfig t i
-> m (Process t o e)
createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (ProcessConfig input signal createProcessFunction) = do
let redirectedProc = p
{ std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
}
po@(mi, mout, merr, ph) <- liftIO $ createProcessFunction redirectedProc
case (mi, mout, merr) of
(Just hIn, Just hOut, Just hErr) -> do
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
case mpid of
Nothing -> return Nothing
Just pid -> do
P.signalProcess sig pid >> return (Just sig)
let
output :: Handle -> m (Event t o, Async ())
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ async reader
return (e, t)
createProcessWith mkWriteStdInput mkReadStdOutput mkReadStdError (ProcessConfig input signal createProc) = do
(hIn, hOut, hErr, ph) <- liftIO createProc
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
case mpid of
Nothing -> return Nothing
Just pid -> do
P.signalProcess sig pid >> return (Just sig)
let
output :: Handle -> m (Event t o, Async ())
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ async reader
return (e, t)

errOutput :: Handle -> m (Event t e, Async ())
errOutput h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ async reader
return (e, t)
errOutput :: Handle -> m (Event t e, Async ())
errOutput h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ async reader
return (e, t)

(out, outThread) <- output hOut
(err, errThread) <- errOutput hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ async $ flip finally (P.cleanupProcess po) $ do
waited <- waitForProcess ph
_ <- waitBoth outThread errThread
ecTrigger waited -- Output events should never fire after process completion
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
, _process_stderr = err
, _process_signal = fmapMaybe id sigOut
, _process_handle = ph
}
_ -> error "Reflex.Process.createRedirectedProcess: Created pipes were not returned by System.Process.createProcess."
(out, outThread) <- output hOut
(err, errThread) <- errOutput hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ async $ flip finally (P.cleanupProcess (Just hIn, Just hOut, Just hErr, ph)) $ do
waited <- waitForProcess ph
_ <- waitBoth outThread errThread
ecTrigger waited -- Output events should never fire after process completion
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
, _process_stderr = err
, _process_signal = fmapMaybe id sigOut
, _process_handle = ph
}

-- | Create a process feeding it input using an 'Event' and exposing its output with 'Event's
-- for its exit code, stdout, and stderr. The input is fed via a buffer represented by a
Expand All @@ -152,10 +145,9 @@ createProcessBufferingInput
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> IO (SendPipe ByteString) -- ^ Read a value from the input stream buffer
-> (SendPipe ByteString -> IO ()) -- ^ Write a value to the input stream buffer
-> CreateProcess -- ^ The process specification
-> ProcessConfig t (SendPipe ByteString) -- ^ The process configuration in terms of Reflex
-> m (Process t ByteString ByteString)
createProcessBufferingInput readBuffer writeBuffer p procConfig = do
createProcessBufferingInput readBuffer writeBuffer procConfig = do
let
input :: Handle -> IO (SendPipe ByteString -> IO ())
input h = do
Expand Down Expand Up @@ -183,7 +175,7 @@ createProcessBufferingInput readBuffer writeBuffer p procConfig = do
if BS.null out
then H.hClose h
else void (trigger out) *> go
createRedirectedProcess input output output p procConfig
createProcessWith input output output procConfig

-- | Create a process feeding it input using an 'Event' and exposing its output
-- 'Event's representing the process exit code, stdout, and stderr.
Expand All @@ -198,9 +190,16 @@ createProcessBufferingInput readBuffer writeBuffer p procConfig = do
-- to those pipes.
createProcess
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> CreateProcess
-> ProcessConfig t (SendPipe ByteString)
=> ProcessConfig t (SendPipe ByteString)
-> m (Process t ByteString ByteString)
createProcess p procConfig = do
createProcess procConfig = do
channel <- liftIO newChan
createProcessBufferingInput (readChan channel) (writeChan channel) p procConfig
createProcessBufferingInput (readChan channel) (writeChan channel) procConfig

-- | Like 'System.Process.createProcess' but always redirects 'std_in', 'std_out', and 'std_err'.
redirectingCreateProcess :: P.CreateProcess -> IO (Handle, Handle, Handle, P.ProcessHandle)
redirectingCreateProcess p = do
r <- P.createProcess p { std_in = P.CreatePipe, std_out = P.CreatePipe, std_err = P.CreatePipe }
case r of
(Just hIn, Just hOut, Just hErr, ph) -> pure (hIn, hOut, hErr, ph)
_ -> error "Reflex.Process.redirectingCreateProcess: Created pipes were not returned by System.Process.createProcess."
17 changes: 9 additions & 8 deletions test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Data.ByteString (ByteString)
import Data.IORef (newIORef, writeIORef, readIORef)
import Data.Foldable (traverse_)
import Reflex
import System.Timeout (timeout)
import qualified Data.ByteString.Char8 as BS
Expand Down Expand Up @@ -51,28 +52,28 @@ checkFRPBlocking downstreamProcess exitMVar = do

let
createProcessWithTermination cp = do
procData <- P.createProcess cp
procData <- redirectingCreateProcess cp
writeIORef spawnedProcess (Just procData)
pure procData

finally
(runHeadlessApp $ do
timer <- tickLossyFromPostBuildTime 1
void $ performEvent $ (liftIO $ tryPutMVar exitMVar Exit) <$ timer
void $ performEvent $ liftIO (tryPutMVar exitMVar Exit) <$ timer

(ev, evTrigger :: SendPipe ByteString -> IO ()) <- newTriggerEvent
processOutput <- createProcess downstreamProcess (ProcessConfig ev never createProcessWithTermination)
processOutput <- createProcess $ ProcessConfig ev never (createProcessWithTermination downstreamProcess)
liftIO $ evTrigger $ SendPipe_Message $ veryLongByteString 'a'
liftIO $ evTrigger $ SendPipe_Message $ veryLongByteString 'b'
liftIO $ evTrigger $ SendPipe_LastMessage $ veryLongByteString 'c'

void $ performEvent $ liftIO . BS.putStrLn <$> (_process_stdout processOutput)
pure never)

(readIORef spawnedProcess >>= mapM_ P.cleanupProcess)
void $ performEvent $ liftIO . BS.putStrLn <$> _process_stdout processOutput
pure never
)
(readIORef spawnedProcess >>= traverse_ (\(hIn, hOut, hErr, ph) -> P.cleanupProcess (Just hIn, Just hOut, Just hErr, ph)))

-- It's important to try this with long bytestrings to be sure that they're not
-- put in an operative system inter-process buffer.
veryLongByteString :: Char -> ByteString
veryLongByteString c = BS.replicate 100000 c
veryLongByteString = BS.replicate 100000
--------------------------------------------------------------------------------

0 comments on commit 02abadd

Please sign in to comment.