Skip to content

Commit

Permalink
Merge pull request #11 from reflex-frp/cn-#9-dont-block-network-when-…
Browse files Browse the repository at this point in the history
…child-blocks-stdin

Don't block FRP network if writing to stdin blocks
  • Loading branch information
3noch committed May 8, 2020
2 parents 3dfe385 + 274a18e commit e2b6477
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 37 deletions.
26 changes: 24 additions & 2 deletions reflex-process.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ library
, bytestring >=0.10 && < 0.11
, data-default >= 0.7.1 && < 0.8
, process >= 1.6.4 && < 1.7
, reflex >= 0.6.2.4 && < 0.7
, reflex >= 0.7.1 && < 0.8
, unix >= 2.7 && < 2.8
hs-source-dirs: src
default-language: Haskell2010
Expand All @@ -38,5 +38,27 @@ executable readme
, reflex-process
, reflex-vty >= 0.1.2.1 && < 0.2
, text >= 1.2.3 && < 1.3
, vty >= 5.21 && < 5.26
, vty
default-language: Haskell2010

test-suite tests
default-language: Haskell2010
type: exitcode-stdio-1.0
main-is: Main.hs
hs-source-dirs: test
ghc-options: -O2 -Wall -rtsopts -threaded
build-depends:
async,
base,
bytestring,
containers,
data-default,
dependent-sum,
hspec,
mtl,
primitive,
process,
ref-tf,
reflex,
reflex-process,
unix
26 changes: 20 additions & 6 deletions release.nix
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
{ reflex-platform-fun ? import ./reflex-platform
, supportedSystems ? [ "x86_64-linux" "x86_64-darwin" ]
}:

let
native-reflex-platform = reflex-platform-fun {};
inherit (native-reflex-platform.nixpkgs) lib;
systems = [
"x86_64-linux"
"x86_64-darwin"
];

perPlatform = lib.genAttrs systems (system: let
reflex-platform = reflex-platform-fun { inherit system; };
perPlatform = lib.genAttrs supportedSystems (system: let
reflex-platform = reflex-platform-fun {
inherit system;
haskellOverlaysPost = [
(self: super: {
reflex = self.callHackageDirect {
pkg = "reflex";
ver = "0.7.1.0";
sha256 = "0a933xz7yl931m90bbwi9akfz77q6px36grlx6wba55mn1klpn27";
} {};

reflex-vty = self.callHackageDirect {
pkg = "reflex-vty";
ver = "0.1.4.0";
sha256 = "0djs7y4mmkb2q5hvp1fr1gn81k08hzab8v3c6qvh7nyn1fdh8zvh";
} {};
})
];
};
src = builtins.filterSource (path: type: !(builtins.elem (baseNameOf path) [
"release.nix"
".git"
Expand Down
101 changes: 72 additions & 29 deletions src/Reflex/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@ Description: Run interactive shell commands in reflex
-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Reflex.Process
( createProcess
, createProcessBufferingInput
, createRedirectedProcess
, Process(..)
, ProcessConfig(..)
) where

import Control.Concurrent (forkIO, killThread)
import Control.Concurrent (forkIO, killThread, ThreadId)
import Control.Concurrent.Chan (newChan, readChan, writeChan)
import Control.Exception (mask_)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as Char8
import Data.Default
import qualified GHC.IO.Handle as H
import GHC.IO.Handle (Handle)
import System.Exit (ExitCode)
import Data.Function
import qualified System.Posix.Signals as P
import qualified System.Process as P
import System.Process hiding (createProcess)
Expand All @@ -34,10 +38,15 @@ data ProcessConfig t i = ProcessConfig
-- ^ "stdin" input to be fed to the process
, _processConfig_signal :: Event t P.Signal
-- ^ Signals to send to the process
, _processConfig_createProcess
:: P.CreateProcess -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-- ^ Used for dependency injection (for example to ensure, in the test suite,
-- that the child process is properly terminated). Defaults to
-- 'System.Process.createProcess'
}

instance Reflex t => Default (ProcessConfig t i) where
def = ProcessConfig never never
def = ProcessConfig never never P.createProcess

-- | The output of a process
data Process t o e = Process
Expand All @@ -59,7 +68,7 @@ data Process t o e = Process
-- provided 'CreateProcess' are replaced with new pipes and all output is redirected
-- to those pipes.
createRedirectedProcess
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
:: forall t m i o e. (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> (Handle -> IO (i -> IO ()))
-- ^ Builder for the standard input handler
-> (Handle -> (o -> IO ()) -> IO (IO ()))
Expand All @@ -69,42 +78,48 @@ createRedirectedProcess
-> CreateProcess
-> ProcessConfig t i
-> m (Process t o e)
createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (ProcessConfig input signal) = do
createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (ProcessConfig input signal createProcessFunction) = do
let redirectedProc = p
{ std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
}
po@(mi, mout, merr, ph) <- liftIO $ P.createProcess redirectedProc
po@(mi, mout, merr, ph) <- liftIO $ createProcessFunction redirectedProc
case (mi, mout, merr) of
(Just hIn, Just hOut, Just hErr) -> do
writeInput <- liftIO $ mkWriteStdInput hIn
writeInput :: i -> IO () <- liftIO $ mkWriteStdInput hIn
performEvent_ $ liftIO . writeInput <$> input
sigOut <- performEvent $ ffor signal $ \sig -> liftIO $ do
sigOut :: Event t (Maybe P.Signal) <- performEvent $ ffor signal $ \sig -> liftIO $ do
mpid <- P.getPid ph
case mpid of
Nothing -> return Nothing
Just pid -> do
P.signalProcess sig pid >> return (Just sig)
let output h = do
let
output :: Handle -> m (Event t o, ThreadId)
output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdOutput h trigger
t <- liftIO $ forkIO reader
return (e, t)

let err_output h = do
let
err_output :: Handle -> m (Event t e, ThreadId)
err_output h = do
(e, trigger) <- newTriggerEvent
reader <- liftIO $ mkReadStdError h trigger
t <- liftIO $ forkIO reader
return (e, t)
(out, outThread) <- output hOut
(err, errThread) <- err_output hErr
(ecOut, ecTrigger) <- newTriggerEvent
void $ liftIO $ forkIO $ waitForProcess ph >>= \ec -> mask_ $ do
ecTrigger ec
P.cleanupProcess po
killThread outThread
killThread errThread
void $ liftIO $ forkIO $ do
waited <- waitForProcess ph
mask_ $ do
ecTrigger waited
P.cleanupProcess po
killThread outThread
killThread errThread
return $ Process
{ _process_exit = ecOut
, _process_stdout = out
Expand All @@ -114,29 +129,36 @@ createRedirectedProcess mkWriteStdInput mkReadStdOutput mkReadStdError p (Proces
}
_ -> error "Reflex.Process.createRedirectedProcess: Created pipes were not returned by System.Process.createProcess."

-- | Run a shell process, feeding it input using an 'Event' and exposing its output
-- 'Event's representing the process exit code, stdout and stderr.
-- | Create a process feeding it input using an 'Event' and exposing its output with 'Event's
-- for its exit code, stdout, and stderr. The input is fed via a buffer represented by a
-- reading action and a writing action.
--
-- The input 'Handle' is not buffered and the output 'Handle's are line-buffered.
-- The @stdout@ and @stderr@ 'Handle's are line-buffered.
--
-- NB: The 'std_in', 'std_out', and 'std_err' parameters of the
-- provided 'CreateProcess' are replaced with new pipes and all output is redirected
-- to those pipes.
createProcess
createProcessBufferingInput
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> CreateProcess
-> ProcessConfig t ByteString
=> IO ByteString -- ^ Read a value from the input stream buffer
-> (ByteString -> IO ()) -- ^ Write a value to the input stream buffer
-> CreateProcess -- ^ The process specification
-> ProcessConfig t ByteString -- ^ The process configuration in terms of Reflex
-> m (Process t ByteString ByteString)
createProcess = createRedirectedProcess input output output
where
createProcessBufferingInput readBuffer writeBuffer p procConfig = do
let
input h = do
H.hSetBuffering h H.NoBuffering
let go b = do
open <- H.hIsOpen h
when open $ do
writable <- H.hIsWritable h
when writable $ Char8.hPutStrLn h b
return go
void $ liftIO $ forkIO $ fix $ \loop -> do
newMessage <- readBuffer
open <- H.hIsOpen h
when open $ do
writable <- H.hIsWritable h
when writable $ do
BS.hPutStr h newMessage
loop
return writeBuffer

output h trigger = do
H.hSetBuffering h H.LineBuffering
let go = do
Expand All @@ -151,3 +173,24 @@ createProcess = createRedirectedProcess input output output
void $ trigger out
go
return go
createRedirectedProcess input output output p procConfig

-- | Create a process feeding it input using an 'Event' and exposing its output
-- 'Event's representing the process exit code, stdout, and stderr.
--
-- The @stdout@ and @stderr@ 'Handle's are line-buffered.
--
-- N.B. The process input is buffered with an unbounded channel! For more control of this,
-- use 'createProcessBufferingInput' directly.
--
-- NB: The 'std_in', 'std_out', and 'std_err' parameters of the
-- provided 'CreateProcess' are replaced with new pipes and all output is redirected
-- to those pipes.
createProcess
:: (MonadIO m, TriggerEvent t m, PerformEvent t m, MonadIO (Performable m))
=> CreateProcess
-> ProcessConfig t ByteString
-> m (Process t ByteString ByteString)
createProcess p procConfig = do
channel <- liftIO newChan
createProcessBufferingInput (readChan channel) (writeChan channel) p procConfig
78 changes: 78 additions & 0 deletions test/Main.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{-# language ScopedTypeVariables #-}

module Main where

import Control.Concurrent (MVar, newEmptyMVar, takeMVar, tryPutMVar)
import Control.Concurrent.Async (race)
import Control.Exception (finally)
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Data.ByteString (ByteString)
import Data.IORef (newIORef, writeIORef, readIORef)
import Reflex
import System.Timeout (timeout)
import qualified Data.ByteString.Char8 as BS
import qualified System.Process as P

import Reflex.Host.Headless
import Reflex.Process
import Test.Hspec

main :: IO ()
main = hspec $ do
describe "reflex-process" $ do
it "isn't blocked by a downstream non-blocking process" $ do
timeoutWrapperAsync (checkFRPBlocking goodProcess) `shouldReturn` Right (Just Exit)
it "isn't blocked by a downstream blocking process" $ do
timeoutWrapperAsync (checkFRPBlocking blockingProcess) `shouldReturn` Right (Just Exit)

-------------------------------CheckFrpBlocking---------------------------------
goodProcess, blockingProcess :: P.CreateProcess
goodProcess = P.proc "cat" []
blockingProcess = P.proc "sleep" ["infinity"]

-- This datatype signals that the FRP network was able to exit on its own.
data Exit = Exit deriving (Show, Eq)

-- This creates the MVar with which the FRP network sends the exit signals, and
-- checks if a response from the FRP networks comes back in the allotted time.
timeoutWrapperAsync :: (MVar Exit -> IO ()) -> IO (Either () (Maybe Exit))
timeoutWrapperAsync wrapped = do
exitCommMVar :: MVar Exit <- newEmptyMVar
race (wrapped exitCommMVar) (timeout (3*1000000) (takeMVar exitCommMVar))

-- The frp network spawns, starts a timer, and tries to send some long input to
-- the created process. If the underlying process blocks and is able to block
-- the FRP network, the first tick of the timer will never happen, and, the Exit
-- signal will never be put in the MVar.
checkFRPBlocking :: P.CreateProcess -> MVar Exit -> IO ()
checkFRPBlocking downstreamProcess exitMVar = do
spawnedProcess <- newIORef Nothing

let
createProcessWithTermination cp = do
procData <- P.createProcess cp
writeIORef spawnedProcess (Just procData)
pure procData

finally
(runHeadlessApp $ do
timer <- tickLossyFromPostBuildTime 1
void $ performEvent $ (liftIO $ tryPutMVar exitMVar Exit) <$ timer

(ev, evTrigger :: ByteString -> IO ()) <- newTriggerEvent
processOutput <- createProcess downstreamProcess (ProcessConfig ev never createProcessWithTermination)
liftIO $ evTrigger $ veryLongByteString 'a'
liftIO $ evTrigger $ veryLongByteString 'b'
liftIO $ evTrigger $ veryLongByteString 'c'

void $ performEvent $ liftIO . BS.putStrLn <$> (_process_stdout processOutput)
pure never)

(readIORef spawnedProcess >>= mapM_ P.cleanupProcess)

-- It's important to try this with long bytestrings to be sure that they're not
-- put in an operative system inter-process buffer.
veryLongByteString :: Char -> ByteString
veryLongByteString c = BS.replicate 100000 c
--------------------------------------------------------------------------------

0 comments on commit e2b6477

Please sign in to comment.