Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't block FRP network if writing to stdin blocks #11

Merged
merged 10 commits into from
May 8, 2020
26 changes: 24 additions & 2 deletions reflex-process.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ library
, bytestring >=0.10 && < 0.11
, data-default >= 0.7.1 && < 0.8
, process >= 1.6.4 && < 1.7
, reflex >= 0.6.2.4 && < 0.7
, reflex >= 0.7.1 && < 0.8
, unix >= 2.7 && < 2.8
hs-source-dirs: src
default-language: Haskell2010
Expand All @@ -38,5 +38,27 @@ executable readme
, reflex-process
, reflex-vty >= 0.1.2.1 && < 0.2
, text >= 1.2.3 && < 1.3
, vty >= 5.21 && < 5.26
, vty
default-language: Haskell2010

test-suite tests
default-language: Haskell2010
type: exitcode-stdio-1.0
main-is: Main.hs
hs-source-dirs: test
ghc-options: -O2 -Wall -rtsopts -threaded
build-depends:
async,
base,
bytestring,
containers,
data-default,
dependent-sum,
hspec,
mtl,
primitive,
process,
ref-tf,
reflex,
reflex-process,
unix
26 changes: 20 additions & 6 deletions release.nix
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
{ reflex-platform-fun ? import ./reflex-platform
, supportedSystems ? [ "x86_64-linux" "x86_64-darwin" ]
}:

let
native-reflex-platform = reflex-platform-fun {};
inherit (native-reflex-platform.nixpkgs) lib;
systems = [
"x86_64-linux"
"x86_64-darwin"
];

perPlatform = lib.genAttrs systems (system: let
reflex-platform = reflex-platform-fun { inherit system; };
perPlatform = lib.genAttrs supportedSystems (system: let
reflex-platform = reflex-platform-fun {
inherit system;
haskellOverlaysPost = [
(self: super: {
reflex = self.callHackageDirect {
pkg = "reflex";
ver = "0.7.1.0";
sha256 = "0a933xz7yl931m90bbwi9akfz77q6px36grlx6wba55mn1klpn27";
} {};

reflex-vty = self.callHackageDirect {
pkg = "reflex-vty";
ver = "0.1.4.0";
sha256 = "0djs7y4mmkb2q5hvp1fr1gn81k08hzab8v3c6qvh7nyn1fdh8zvh";
} {};
})
];
};
src = builtins.filterSource (path: type: !(builtins.elem (baseNameOf path) [
"release.nix"
".git"
Expand Down
101 changes: 72 additions & 29 deletions src/Reflex/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@ Description: Run interactive shell commands in reflex
-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Reflex.Process
( createProcess
, createProcessBufferingInput
, createRedirectedProcess
, Process(..)
, ProcessConfig(..)
) where

import Control.Concurrent (forkIO, killThread)
import Control.Concurrent (forkIO, killThread, ThreadId)
import Control.Concurrent.Chan (newChan, readChan, writeChan)
import Control.Exception (mask_)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as Char8
import Data.Default
import qualified GHC.IO.Handle as H
import GHC.IO.Handle (Handle)
import System.Exit (ExitCode)
import Data.Function
import qualified System.Posix.Signals as P
import qualified System.Process as P
import System.Process hiding (createProcess)
Expand All @@ -34,10 +38,15 @@ data ProcessConfig t i = ProcessConfig
-- ^ "stdin" input to be fed to the process
, _processConfig_signal :: Event t P.Signal
-- ^ Signals to send to the process
, _processConfig_createProcess
:: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
3noch marked this conversation as resolved.
Show resolved Hide resolved
-- ^ Used for dependency injection (for example to ensure, in the test suite,
-- that the child process is properly terminated). Defaults to
-- 'System.Process.createProcess'
}

instance Reflex t => Default (ProcessConfig t i) where
def = ProcessConfig never never
def = ProcessConfig never never P.createProcess

-- | The output of a process
data Process t o e = Process
Expand All @@ -59,7 +68,7 @@ data Process t o e = Process
-- provided 'CreateProcess' are replaced with new pipes and all output is redirected
-- to those pipes.
createRedirectedProcess
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
:: forall t m i o e. (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> (Handle -> IO (i -> IO ()))
-- ^ Builder for the standard input handler
-> (Handle -> (o -> IO ()) -> IO (IO ()))
Expand All @@ -69,42 +78,48 @@ createRedirectedProcess
-> CreateProcess
-> ProcessConfig t i
-> m (Process t o e)
createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (ProcessConfig input signal) = do
createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (ProcessConfig input signal createProcessFunction) = do
let redirectedProc = p
{ std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
}
po@(mi, mout, merr, ph) <- liftIO $ P.createProcess redirectedProc
po@(mi, mout, merr, ph) <- liftIO $ createProcessFunction redirectedProc
case (mi, mout, merr) of
(Just hIn, Just hOut, Just hErr) -> do
writeInput <- liftIO $ mkWriteStdInput hIn
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut <- performEvent $ ffor signal $ \sig -> liftIO $ do
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
case mpid of
Nothing -> return Nothing
Just pid -> do
P.signalProcess sig pid >> return (Just sig)
let output h = do
let
output :: Handle -> m (Event t o, ThreadId)
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ forkIO reader
return (e, t)

let err_output h = do
let
err_output :: Handle -> m (Event t e, ThreadId)
err_output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ forkIO reader
return (e, t)
(out, outThread) <- output hOut
(err, errThread) <- err_output hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ forkIO $ waitForProcess ph >>= \ec -> mask_ $ do
ecTrigger ec
P.cleanupProcess po
killThread outThread
killThread errThread
void $ liftIO $ forkIO $ do
waited <- waitForProcess ph
mask_ $ do
ecTrigger waited
P.cleanupProcess po
killThread outThread
killThread errThread
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
Expand All @@ -114,29 +129,36 @@ createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (Proces
}
_ -> error "Reflex.Process.createRedirectedProcess: Created pipes were not returned by System.Process.createProcess."

-- | Run a shell process, feeding it input using an 'Event' and exposing its output
-- 'Event's representing the process exit code, stdout and stderr.
-- | Create a process feeding it input using an 'Event' and exposing its output with 'Event's
-- for its exit code, stdout, and stderr. The input is fed via a buffer represented by a
-- reading action and a writing action.
--
-- The input 'Handle' is not buffered and the output 'Handle's are line-buffered.
-- The @stdout@ and @stderr@ 'Handle's are line-buffered.
--
-- NB: The 'std_in', 'std_out', and 'std_err' parameters of the
-- provided 'CreateProcess' are replaced with new pipes and all output is redirected
-- to those pipes.
createProcess
createProcessBufferingInput
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> CreateProcess
-> ProcessConfig t ByteString
=> IO ByteString -- ^ Read a value from the input stream buffer
-> (ByteString -> IO ()) -- ^ Write a value to the input stream buffer
-> CreateProcess -- ^ The process specification
-> ProcessConfig t ByteString -- ^ The process configuration in terms of Reflex
-> m (Process t ByteString ByteString)
createProcess = createRedirectedProcess input output output
where
createProcessBufferingInput readBuffer writeBuffer p procConfig = do
let
input h = do
H.hSetBuffering h H.NoBuffering
let go b = do
open <- H.hIsOpen h
when open $ do
writable <- H.hIsWritable h
when writable $ Char8.hPutStrLn h b
return go
void $ liftIO $ forkIO $ fix $ \loop -> do
newMessage <- readBuffer
open <- H.hIsOpen h
when open $ do
writable <- H.hIsWritable h
when writable $ do
BS.hPutStr h newMessage
loop
return writeBuffer

output h trigger = do
H.hSetBuffering h H.LineBuffering
let go = do
Expand All @@ -151,3 +173,24 @@ createProcess = createRedirectedProcess input output output
void $ trigger out
go
return go
createRedirectedProcess input output output p procConfig

-- | Create a process feeding it input using an 'Event' and exposing its output
-- 'Event's representing the process exit code, stdout, and stderr.
--
-- The @stdout@ and @stderr@ 'Handle's are line-buffered.
--
-- N.B. The process input is buffered with an unbounded channel! For more control of this,
-- use 'createProcessBufferingInput' directly.
--
-- NB: The 'std_in', 'std_out', and 'std_err' parameters of the
-- provided 'CreateProcess' are replaced with new pipes and all output is redirected
-- to those pipes.
createProcess
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> CreateProcess
-> ProcessConfig t ByteString
-> m (Process t ByteString ByteString)
createProcess p procConfig = do
channel <- liftIO newChan
createProcessBufferingInput (readChan channel) (writeChan channel) p procConfig
78 changes: 78 additions & 0 deletions test/Main.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{-# language ScopedTypeVariables #-}

module Main where

import Control.Concurrent (MVar, newEmptyMVar, takeMVar, tryPutMVar)
import Control.Concurrent.Async (race)
import Control.Exception (finally)
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Data.ByteString (ByteString)
import Data.IORef (newIORef, writeIORef, readIORef)
import Reflex
import System.Timeout (timeout)
import qualified Data.ByteString.Char8 as BS
import qualified System.Process as P

import Reflex.Host.Headless
import Reflex.Process
import Test.Hspec

main :: IO ()
main = hspec $ do
describe "reflex-process" $ do
it "isn't blocked by a downstream non-blocking process" $ do
timeoutWrapperAsync (checkFRPBlocking goodProcess) `shouldReturn` Right (Just Exit)
it "isn't blocked by a downstream blocking process" $ do
timeoutWrapperAsync (checkFRPBlocking blockingProcess) `shouldReturn` Right (Just Exit)

-------------------------------CheckFrpBlocking---------------------------------
goodProcess, blockingProcess :: P.CreateProcess
goodProcess = P.proc "cat" []
blockingProcess = P.proc "sleep" ["infinity"]

-- This datatype signals that the FRP network was able to exit on its own.
data Exit = Exit deriving (Show, Eq)

-- This creates the MVar with which the FRP network sends the exit signals, and
-- checks if a response from the FRP networks comes back in the allotted time.
timeoutWrapperAsync :: (MVar Exit -> IO ()) -> IO (Either () (Maybe Exit))
timeoutWrapperAsync wrapped = do
exitCommMVar :: MVar Exit <- newEmptyMVar
race (wrapped exitCommMVar) (timeout (3*1000000) (takeMVar exitCommMVar))

-- The frp network spawns, starts a timer, and tries to send some long input to
-- the created process. If the underlying process blocks and is able to block
-- the FRP network, the first tick of the timer will never happen, and, the Exit
-- signal will never be put in the MVar.
checkFRPBlocking :: P.CreateProcess -> MVar Exit -> IO ()
checkFRPBlocking downstreamProcess exitMVar = do
spawnedProcess <- newIORef Nothing

let
createProcessWithTermination cp = do
procData <- P.createProcess cp
writeIORef spawnedProcess (Just procData)
pure procData

finally
(runHeadlessApp $ do
timer <- tickLossyFromPostBuildTime 1
void $ performEvent $ (liftIO $ tryPutMVar exitMVar Exit) <$ timer

(ev, evTrigger :: ByteString -> IO ()) <- newTriggerEvent
processOutput <- createProcess downstreamProcess (ProcessConfig ev never createProcessWithTermination)
liftIO $ evTrigger $ veryLongByteString 'a'
liftIO $ evTrigger $ veryLongByteString 'b'
liftIO $ evTrigger $ veryLongByteString 'c'

void $ performEvent $ liftIO . BS.putStrLn <$> (_process_stdout processOutput)
pure never)

(readIORef spawnedProcess >>= mapM_ P.cleanupProcess)

-- It's important to try this with long bytestrings to be sure that they're not
-- put in an operative system inter-process buffer.
veryLongByteString :: Char -> ByteString
veryLongByteString c = BS.replicate 100000 c
--------------------------------------------------------------------------------