Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't block FRP network if writing to stdin blocks #11

Merged
merged 10 commits into from
May 8, 2020
23 changes: 23 additions & 0 deletions reflex-process.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ library
, data-default >= 0.7.1 && < 0.8
, process >= 1.6.4 && < 1.7
, reflex >= 0.6.2.4 && < 0.7
, stm
, unix >= 2.7 && < 2.8
hs-source-dirs: src
default-language: Haskell2010
Expand All @@ -40,3 +41,25 @@ executable readme
, text >= 1.2.3 && < 1.3
, vty >= 5.21 && < 5.26
default-language: Haskell2010

test-suite tests
default-language: Haskell2010
type: exitcode-stdio-1.0
main-is: Main.hs
hs-source-dirs: test
ghc-options: -O2 -Wall -rtsopts -threaded
build-depends:
async,
base,
bytestring,
containers,
data-default,
dependent-sum,
hspec,
mtl,
primitive,
process,
ref-tf,
reflex,
reflex-process,
unix
67 changes: 45 additions & 22 deletions src/Reflex/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ Description: Run interactive shell commands in reflex
-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Reflex.Process
( createProcess
, createRedirectedProcess
, Process(..)
, ProcessConfig(..)
) where

import Control.Concurrent (forkIO, killThread)
import Control.Concurrent (forkIO, killThread, ThreadId)
import Control.Exception (mask_)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
Expand All @@ -25,6 +27,9 @@ import System.Exit (ExitCode)
import qualified System.Posix.Signals as P
import qualified System.Process as P
import System.Process hiding (createProcess)
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM
import Data.Function

import Reflex

Expand All @@ -34,10 +39,15 @@ data ProcessConfig t i = ProcessConfig
-- ^ "stdin" input to be fed to the process
, _processConfig_signal :: Event t P.Signal
-- ^ Signals to send to the process
, _processConfig_createProcess
:: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
3noch marked this conversation as resolved.
Show resolved Hide resolved
-- ^ Used for dependency injection (for example to ensure, in the test suite,
-- that the child process is properly terminated). Defaults to
-- 'System.Process.createProcess'
}

instance Reflex t => Default (ProcessConfig t i) where
def = ProcessConfig never never
def = ProcessConfig never never P.createProcess

-- | The output of a process
data Process t o e = Process
Expand All @@ -59,7 +69,7 @@ data Process t o e = Process
-- provided 'CreateProcess' are replaced with new pipes and all output is redirected
-- to those pipes.
createRedirectedProcess
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
:: forall t m i o e. (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> (Handle -> IO (i -> IO ()))
-- ^ Builder for the standard input handler
-> (Handle -> (o -> IO ()) -> IO (IO ()))
Expand All @@ -69,42 +79,48 @@ createRedirectedProcess
-> CreateProcess
-> ProcessConfig t i
-> m (Process t o e)
createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (ProcessConfig input signal) = do
createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (ProcessConfig input signal createProcessFunction) = do
let redirectedProc = p
{ std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
}
po@(mi, mout, merr, ph) <- liftIO $ P.createProcess redirectedProc
po@(mi, mout, merr, ph) <- liftIO $ createProcessFunction redirectedProc
case (mi, mout, merr) of
(Just hIn, Just hOut, Just hErr) -> do
writeInput <- liftIO $ mkWriteStdInput hIn
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut <- performEvent $ ffor signal $ \sig -> liftIO $ do
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
case mpid of
Nothing -> return Nothing
Just pid -> do
P.signalProcess sig pid >> return (Just sig)
let output h = do
let
output :: Handle -> m (Event t o, ThreadId)
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ forkIO reader
return (e, t)

let err_output h = do
let
err_output :: Handle -> m (Event t e, ThreadId)
err_output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ forkIO reader
return (e, t)
(out, outThread) <- output hOut
(err, errThread) <- err_output hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ forkIO $ waitForProcess ph >>= \ec -> mask_ $ do
ecTrigger ec
P.cleanupProcess po
killThread outThread
killThread errThread
void $ liftIO $ forkIO $ do
waited <- waitForProcess ph
mask_ $ do
ecTrigger waited
P.cleanupProcess po
killThread outThread
killThread errThread
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
Expand All @@ -127,16 +143,21 @@ createProcess
=> CreateProcess
-> ProcessConfig t ByteString
-> m (Process t ByteString ByteString)
createProcess = createRedirectedProcess input output output
where
createProcess p processConfig = do
3noch marked this conversation as resolved.
Show resolved Hide resolved
channel <- liftIO newTChanIO

let
input h = do
H.hSetBuffering h H.NoBuffering
let go b = do
open <- H.hIsOpen h
when open $ do
writable <- H.hIsWritable h
when writable $ Char8.hPutStrLn h b
return go
void $ liftIO $ forkIO $ fix $ \loop -> do
newMessage <- atomically $ readTChan channel
open <- H.hIsOpen h
when open $ do
writable <- H.hIsWritable h
when writable $ do
Char8.hPutStrLn h newMessage
3noch marked this conversation as resolved.
Show resolved Hide resolved
loop
return (liftIO . atomically . writeTChan channel)
output h trigger = do
H.hSetBuffering h H.LineBuffering
let go = do
Expand All @@ -151,3 +172,5 @@ createProcess = createRedirectedProcess input output output
void $ trigger out
go
return go

createRedirectedProcess input output output p processConfig
78 changes: 78 additions & 0 deletions test/Main.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{-# language ScopedTypeVariables #-}

module Main where

import Control.Concurrent (MVar, newEmptyMVar, takeMVar, tryPutMVar)
import Control.Concurrent.Async (race)
import Control.Exception (finally)
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Data.ByteString (ByteString)
import Data.IORef (newIORef, writeIORef, readIORef)
import Reflex
import System.Timeout (timeout)
import qualified Data.ByteString.Char8 as BS
import qualified System.Process as P

import Reflex.Host.Headless
import Reflex.Process
import Test.Hspec

main :: IO ()
main = hspec $ do
describe "reflex-process" $ do
it "isn't blocked by a downstream non-blocking process" $ do
timeoutWrapperAsync (checkFRPBlocking goodProcess) `shouldReturn` Right (Just Exit)
it "isn't blocked by a downstream blocking process" $ do
timeoutWrapperAsync (checkFRPBlocking blockingProcess) `shouldReturn` Right (Just Exit)

-------------------------------CheckFrpBlocking---------------------------------
goodProcess, blockingProcess :: P.CreateProcess
goodProcess = P.proc "cat" []
blockingProcess = P.proc "sleep" ["infinity"]

-- This datatype signals that the FRP network was able to exit on its own.
data Exit = Exit deriving (Show, Eq)

-- This creates the MVar with which the FRP network sends the exit signals, and
-- checks if a response from the FRP networks comes back in the allotted time.
timeoutWrapperAsync :: (MVar Exit -> IO ()) -> IO (Either () (Maybe Exit))
timeoutWrapperAsync wrapped = do
exitCommMVar :: MVar Exit <- newEmptyMVar
race (wrapped exitCommMVar) (timeout (3*1000000) (takeMVar exitCommMVar))

-- The frp network spawns, starts a timer, and tries to send some long input to
-- the created process. If the underlying process blocks and is able to block
-- the FRP network, the first tick of the timer will never happen, and, the Exit
-- signal will never be put in the MVar.
checkFRPBlocking :: P.CreateProcess -> MVar Exit -> IO ()
checkFRPBlocking downstreamProcess exitMVar = do
spawnedProcess <- newIORef Nothing

let
createProcessWithTermination cp = do
procData <- P.createProcess cp
writeIORef spawnedProcess (Just procData)
pure procData

finally
(runHeadlessApp $ do
timer <- tickLossyFromPostBuildTime 1
void $ performEvent $ (liftIO $ tryPutMVar exitMVar Exit) <$ timer

(ev, evTrigger :: ByteString -> IO ()) <- newTriggerEvent
processOutput <- createProcess downstreamProcess (ProcessConfig ev never createProcessWithTermination)
liftIO $ evTrigger $ veryLongByteString 'a'
liftIO $ evTrigger $ veryLongByteString 'b'
liftIO $ evTrigger $ veryLongByteString 'c'

void $ performEvent $ liftIO . BS.putStrLn <$> (_process_stdout processOutput)
pure never)

(readIORef spawnedProcess >>= mapM_ P.cleanupProcess)

-- It's important to try this with long bytestrings to be sure that they're not
-- put in an operative system inter-process buffer.
veryLongByteString :: Char -> ByteString
veryLongByteString c = BS.replicate 100000 c
--------------------------------------------------------------------------------